agner_sup/common/
start_child.rs

1use std::sync::Arc;
2
3use tokio::sync::oneshot;
4
5use agner_actors::system_error::SysSpawnError;
6use agner_actors::{Actor, ActorID, Exit, SpawnOpts, System};
7use agner_utils::future_timeout_ext::FutureTimeoutExt;
8use agner_utils::result_err_flatten::ResultErrFlattenIn;
9use agner_utils::std_error_pp::StdErrorPP;
10
11use crate::common::{stop_child, InitType, ShutdownSequence, WithAck};
12
13#[derive(Debug, Clone, thiserror::Error)]
14pub enum StartChildError {
15    #[error("System failed to spawn child")]
16    SysSpawnError(#[source] Arc<SysSpawnError>),
17
18    #[error("Init-ack failure")]
19    InitAckFailure(#[source] Exit),
20
21    #[error("Timeout")]
22    Timeout(#[source] Arc<tokio::time::error::Elapsed>),
23
24    #[error("oneshot-rx failure")]
25    OneshotRx(#[source] oneshot::error::RecvError),
26}
27
28/// Start a child in accordance with the supervision design principles.
29#[tracing::instrument(skip_all, fields(
30    sup = display(sup_id),
31    behaviour = std::any::type_name::<B>(),
32    init_type = debug(init_type),
33))]
34pub async fn start_child<B, A, M>(
35    system: System,
36    sup_id: ActorID,
37    behaviour: B,
38    args: A,
39    init_type: InitType,
40) -> Result<ActorID, StartChildError>
41where
42    B: for<'a> Actor<'a, A, M>,
43    B: Send + 'static,
44    A: Send + 'static,
45    M: Send + Unpin + 'static,
46{
47    tracing::trace!("[start_child] starting child");
48
49    let child_id = match init_type {
50        InitType::NoAck => do_start_child_no_ack(&system, sup_id, behaviour, args).await?,
51        InitType::WithAck(with_ack) =>
52            do_start_child_init_ack(&system, sup_id, behaviour, args, with_ack).await?,
53    };
54
55    system.put_data(child_id, crate::common::ParentActor(sup_id)).await;
56
57    Ok(child_id)
58}
59
60async fn do_start_child_no_ack<B, A, M>(
61    system: &System,
62    sup_id: ActorID,
63    behaviour: B,
64    args: A,
65) -> Result<ActorID, StartChildError>
66where
67    B: for<'a> Actor<'a, A, M>,
68    B: Send + 'static,
69    A: Send + 'static,
70    M: Send + Unpin + 'static,
71{
72    let spawn_opts = SpawnOpts::new().with_link(sup_id);
73    let child_id = system.spawn(behaviour, args, spawn_opts).await?;
74    tracing::trace!("[start_child_no_ack] started [child_id: {}]", child_id);
75
76    Ok(child_id)
77}
78async fn do_start_child_init_ack<B, A, M>(
79    system: &System,
80    sup_id: ActorID,
81    behaviour: B,
82    args: A,
83    with_ack: WithAck,
84) -> Result<ActorID, StartChildError>
85where
86    B: for<'a> Actor<'a, A, M>,
87    B: Send + 'static,
88    A: Send + 'static,
89    M: Send + Unpin + 'static,
90{
91    let (init_ack_tx, init_ack_rx) = agner_init_ack::new_channel();
92    let spawn_opts = SpawnOpts::new().with_data(init_ack_tx);
93    let intermediary_id = system.spawn(behaviour, args, spawn_opts).await?;
94
95    let init_ack_result = init_ack_rx
96        .timeout(with_ack.init_timeout)
97        .await
98        .map_err(|elapsed| StartChildError::Timeout(Arc::new(elapsed)))
99        .map(|id_result| id_result.map_err(StartChildError::InitAckFailure))
100        .err_flatten_in();
101
102    match init_ack_result {
103        Ok(child_id) => {
104            system.link(sup_id, child_id).await;
105
106            tracing::trace!("[start_child_init_ack] init-ack success [child_id: {}]", child_id,);
107
108            Ok(child_id)
109        },
110        Err(reason) => {
111            tracing::warn!("[start_child_init_ack] canceling init [error: {}]", reason.pp());
112
113            if let Err(cancel_error) = stop_child::stop_child(
114                system.to_owned(),
115                intermediary_id,
116                ShutdownSequence::empty()
117                    .add(
118                        Exit::shutdown_with_source(Arc::new(reason.to_owned())),
119                        with_ack.stop_timeout,
120                    )
121                    .add(Exit::kill(), with_ack.stop_timeout),
122            )
123            .await
124            {
125                tracing::error!("[start_child_init_ack] failed to terminate intermediary [intermediary_id: {}, reason: {}]", intermediary_id, cancel_error.pp());
126            }
127
128            Err(reason)
129        },
130    }
131}
132
133impl From<SysSpawnError> for StartChildError {
134    fn from(e: SysSpawnError) -> Self {
135        Self::SysSpawnError(Arc::new(e))
136    }
137}
138impl From<tokio::time::error::Elapsed> for StartChildError {
139    fn from(e: tokio::time::error::Elapsed) -> Self {
140        Self::Timeout(Arc::new(e))
141    }
142}
143impl From<oneshot::error::RecvError> for StartChildError {
144    fn from(e: oneshot::error::RecvError) -> Self {
145        Self::OneshotRx(e)
146    }
147}