1use std::sync::Arc;
2
3use tokio::sync::oneshot;
4
5use agner_actors::system_error::SysSpawnError;
6use agner_actors::{Actor, ActorID, Exit, SpawnOpts, System};
7use agner_utils::future_timeout_ext::FutureTimeoutExt;
8use agner_utils::result_err_flatten::ResultErrFlattenIn;
9use agner_utils::std_error_pp::StdErrorPP;
10
11use crate::common::{stop_child, InitType, ShutdownSequence, WithAck};
12
13#[derive(Debug, Clone, thiserror::Error)]
14pub enum StartChildError {
15 #[error("System failed to spawn child")]
16 SysSpawnError(#[source] Arc<SysSpawnError>),
17
18 #[error("Init-ack failure")]
19 InitAckFailure(#[source] Exit),
20
21 #[error("Timeout")]
22 Timeout(#[source] Arc<tokio::time::error::Elapsed>),
23
24 #[error("oneshot-rx failure")]
25 OneshotRx(#[source] oneshot::error::RecvError),
26}
27
28#[tracing::instrument(skip_all, fields(
30 sup = display(sup_id),
31 behaviour = std::any::type_name::<B>(),
32 init_type = debug(init_type),
33))]
34pub async fn start_child<B, A, M>(
35 system: System,
36 sup_id: ActorID,
37 behaviour: B,
38 args: A,
39 init_type: InitType,
40) -> Result<ActorID, StartChildError>
41where
42 B: for<'a> Actor<'a, A, M>,
43 B: Send + 'static,
44 A: Send + 'static,
45 M: Send + Unpin + 'static,
46{
47 tracing::trace!("[start_child] starting child");
48
49 let child_id = match init_type {
50 InitType::NoAck => do_start_child_no_ack(&system, sup_id, behaviour, args).await?,
51 InitType::WithAck(with_ack) =>
52 do_start_child_init_ack(&system, sup_id, behaviour, args, with_ack).await?,
53 };
54
55 system.put_data(child_id, crate::common::ParentActor(sup_id)).await;
56
57 Ok(child_id)
58}
59
60async fn do_start_child_no_ack<B, A, M>(
61 system: &System,
62 sup_id: ActorID,
63 behaviour: B,
64 args: A,
65) -> Result<ActorID, StartChildError>
66where
67 B: for<'a> Actor<'a, A, M>,
68 B: Send + 'static,
69 A: Send + 'static,
70 M: Send + Unpin + 'static,
71{
72 let spawn_opts = SpawnOpts::new().with_link(sup_id);
73 let child_id = system.spawn(behaviour, args, spawn_opts).await?;
74 tracing::trace!("[start_child_no_ack] started [child_id: {}]", child_id);
75
76 Ok(child_id)
77}
78async fn do_start_child_init_ack<B, A, M>(
79 system: &System,
80 sup_id: ActorID,
81 behaviour: B,
82 args: A,
83 with_ack: WithAck,
84) -> Result<ActorID, StartChildError>
85where
86 B: for<'a> Actor<'a, A, M>,
87 B: Send + 'static,
88 A: Send + 'static,
89 M: Send + Unpin + 'static,
90{
91 let (init_ack_tx, init_ack_rx) = agner_init_ack::new_channel();
92 let spawn_opts = SpawnOpts::new().with_data(init_ack_tx);
93 let intermediary_id = system.spawn(behaviour, args, spawn_opts).await?;
94
95 let init_ack_result = init_ack_rx
96 .timeout(with_ack.init_timeout)
97 .await
98 .map_err(|elapsed| StartChildError::Timeout(Arc::new(elapsed)))
99 .map(|id_result| id_result.map_err(StartChildError::InitAckFailure))
100 .err_flatten_in();
101
102 match init_ack_result {
103 Ok(child_id) => {
104 system.link(sup_id, child_id).await;
105
106 tracing::trace!("[start_child_init_ack] init-ack success [child_id: {}]", child_id,);
107
108 Ok(child_id)
109 },
110 Err(reason) => {
111 tracing::warn!("[start_child_init_ack] canceling init [error: {}]", reason.pp());
112
113 if let Err(cancel_error) = stop_child::stop_child(
114 system.to_owned(),
115 intermediary_id,
116 ShutdownSequence::empty()
117 .add(
118 Exit::shutdown_with_source(Arc::new(reason.to_owned())),
119 with_ack.stop_timeout,
120 )
121 .add(Exit::kill(), with_ack.stop_timeout),
122 )
123 .await
124 {
125 tracing::error!("[start_child_init_ack] failed to terminate intermediary [intermediary_id: {}, reason: {}]", intermediary_id, cancel_error.pp());
126 }
127
128 Err(reason)
129 },
130 }
131}
132
133impl From<SysSpawnError> for StartChildError {
134 fn from(e: SysSpawnError) -> Self {
135 Self::SysSpawnError(Arc::new(e))
136 }
137}
138impl From<tokio::time::error::Elapsed> for StartChildError {
139 fn from(e: tokio::time::error::Elapsed) -> Self {
140 Self::Timeout(Arc::new(e))
141 }
142}
143impl From<oneshot::error::RecvError> for StartChildError {
144 fn from(e: oneshot::error::RecvError) -> Self {
145 Self::OneshotRx(e)
146 }
147}