1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::sync::Arc;

use tokio::sync::oneshot;

use agner_actors::system_error::SysSpawnError;
use agner_actors::{Actor, ActorID, Exit, SpawnOpts, System};
use agner_utils::future_timeout_ext::FutureTimeoutExt;
use agner_utils::result_err_flatten::ResultErrFlattenIn;
use agner_utils::std_error_pp::StdErrorPP;

use crate::common::{stop_child, InitType, ShutdownSequence, WithAck};

#[derive(Debug, Clone, thiserror::Error)]
pub enum StartChildError {
    #[error("System failed to spawn child")]
    SysSpawnError(#[source] Arc<SysSpawnError>),

    #[error("Init-ack failure")]
    InitAckFailure(#[source] Exit),

    #[error("Timeout")]
    Timeout(#[source] Arc<tokio::time::error::Elapsed>),

    #[error("oneshot-rx failure")]
    OneshotRx(#[source] oneshot::error::RecvError),
}

/// Start a child in accordance with the supervision design principles.
#[tracing::instrument(skip_all, fields(
    sup = display(sup_id),
    behaviour = std::any::type_name::<B>(),
    init_type = debug(init_type),
))]
pub async fn start_child<B, A, M>(
    system: System,
    sup_id: ActorID,
    behaviour: B,
    args: A,
    init_type: InitType,
) -> Result<ActorID, StartChildError>
where
    B: for<'a> Actor<'a, A, M>,
    B: Send + 'static,
    A: Send + 'static,
    M: Send + Unpin + 'static,
{
    tracing::trace!("[start_child] starting child");

    let child_id = match init_type {
        InitType::NoAck => do_start_child_no_ack(&system, sup_id, behaviour, args).await?,
        InitType::WithAck(with_ack) =>
            do_start_child_init_ack(&system, sup_id, behaviour, args, with_ack).await?,
    };

    system.put_data(child_id, crate::common::ParentActor(sup_id)).await;

    Ok(child_id)
}

async fn do_start_child_no_ack<B, A, M>(
    system: &System,
    sup_id: ActorID,
    behaviour: B,
    args: A,
) -> Result<ActorID, StartChildError>
where
    B: for<'a> Actor<'a, A, M>,
    B: Send + 'static,
    A: Send + 'static,
    M: Send + Unpin + 'static,
{
    let spawn_opts = SpawnOpts::new().with_link(sup_id);
    let child_id = system.spawn(behaviour, args, spawn_opts).await?;
    tracing::trace!("[start_child_no_ack] started [child_id: {}]", child_id);

    Ok(child_id)
}
async fn do_start_child_init_ack<B, A, M>(
    system: &System,
    sup_id: ActorID,
    behaviour: B,
    args: A,
    with_ack: WithAck,
) -> Result<ActorID, StartChildError>
where
    B: for<'a> Actor<'a, A, M>,
    B: Send + 'static,
    A: Send + 'static,
    M: Send + Unpin + 'static,
{
    let (init_ack_tx, init_ack_rx) = agner_init_ack::new_channel();
    let spawn_opts = SpawnOpts::new().with_data(init_ack_tx);
    let intermediary_id = system.spawn(behaviour, args, spawn_opts).await?;

    let init_ack_result = init_ack_rx
        .timeout(with_ack.init_timeout)
        .await
        .map_err(|elapsed| StartChildError::Timeout(Arc::new(elapsed)))
        .map(|id_result| id_result.map_err(StartChildError::InitAckFailure))
        .err_flatten_in();

    match init_ack_result {
        Ok(child_id) => {
            system.link(sup_id, child_id).await;

            tracing::trace!("[start_child_init_ack] init-ack success [child_id: {}]", child_id,);

            Ok(child_id)
        },
        Err(reason) => {
            tracing::warn!("[start_child_init_ack] canceling init [error: {}]", reason.pp());

            if let Err(cancel_error) = stop_child::stop_child(
                system.to_owned(),
                intermediary_id,
                ShutdownSequence::empty()
                    .add(
                        Exit::shutdown_with_source(Arc::new(reason.to_owned())),
                        with_ack.stop_timeout,
                    )
                    .add(Exit::kill(), with_ack.stop_timeout),
            )
            .await
            {
                tracing::error!("[start_child_init_ack] failed to terminate intermediary [intermediary_id: {}, reason: {}]", intermediary_id, cancel_error.pp());
            }

            Err(reason)
        },
    }
}

impl From<SysSpawnError> for StartChildError {
    fn from(e: SysSpawnError) -> Self {
        Self::SysSpawnError(Arc::new(e))
    }
}
impl From<tokio::time::error::Elapsed> for StartChildError {
    fn from(e: tokio::time::error::Elapsed) -> Self {
        Self::Timeout(Arc::new(e))
    }
}
impl From<oneshot::error::RecvError> for StartChildError {
    fn from(e: oneshot::error::RecvError) -> Self {
        Self::OneshotRx(e)
    }
}