1use std::collections::HashSet;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_common::errors::error_kind::HasErrorKind;
6use mm1_common::errors::error_of::ErrorOf;
7use mm1_common::log::{debug, warn};
8use mm1_core::context::{
9 Fork, ForkErrorKind, InitDone, Linking, Quit, Recv, RecvErrorKind, Start, Stop, Tell, Watching,
10};
11use mm1_core::envelope::dispatch;
12use mm1_proto::message;
13use mm1_proto_sup::uniform::{self as unisup};
14use mm1_proto_system::{
15 StartErrorKind, StopErrorKind, System, {self as system},
16};
17
18use crate::common::child_spec::{ChildSpec, ChildTimeouts, InitType};
19use crate::common::factory::ActorFactory;
20
21#[derive(Debug, thiserror::Error)]
22#[message]
23pub enum UniformSupFailure {
24 #[error("recv error: {}", _0)]
25 Recv(RecvErrorKind),
26
27 #[error("fork error: {}", _0)]
28 Fork(ForkErrorKind),
29}
30
31pub struct UniformSup<F, D = Duration> {
32 pub child_spec: ChildSpec<F, D>,
33}
34
35impl<F, D> UniformSup<F, D> {
36 pub fn new(child_spec: ChildSpec<F, D>) -> Self {
37 Self { child_spec }
38 }
39}
40
41pub async fn uniform_sup<Sys, Ctx, F>(
42 ctx: &mut Ctx,
43 sup_spec: UniformSup<F, Duration>,
44) -> Result<(), UniformSupFailure>
45where
46 Sys: System + Default,
47
48 Ctx: Fork + Recv + Tell + Quit,
49 Ctx: InitDone<Sys>,
50 Ctx: Linking<Sys>,
51 Ctx: Watching<Sys>,
52 Ctx: Start<Sys>,
53 Ctx: Stop<Sys>,
54 F: ActorFactory<Runnable = Sys::Runnable>,
55 F::Args: Send,
56{
57 let UniformSup { child_spec } = sup_spec;
58 let ChildSpec {
59 factory,
60 child_type,
61 init_type,
62 timeouts,
63 } = child_spec;
64 let ChildTimeouts {
65 start_timeout,
66 stop_timeout,
67 } = timeouts;
68
69 let _ = child_type;
70 let _ = timeouts;
71
72 ctx.set_trap_exit(true).await;
73 ctx.init_done(ctx.address()).await;
74
75 let sup_address = ctx.address();
76 let mut started_children: HashSet<Address> = Default::default();
77 let mut stopping_children: HashSet<Address> = Default::default();
78
79 loop {
80 dispatch!(match ctx.recv().await.map_err(UniformSupFailure::recv)? {
81 unisup::StartRequest::<F::Args> { reply_to, args } => {
82 debug!("start request [reply_to: {}]", reply_to);
83
84 let runnable = factory.produce(args);
85 ctx.fork()
86 .await
87 .map_err(UniformSupFailure::fork)?
88 .run(move |mut ctx| {
89 async move {
90 let result = do_start_child(
91 &mut ctx,
92 sup_address,
93 init_type,
94 start_timeout,
95 runnable,
96 )
97 .await;
98 let _ = ctx.tell(reply_to, result).await;
99 }
100 })
101 .await;
102 },
103 ChildStarted(child) => {
104 ctx.link(child).await;
105 assert!(started_children.insert(child));
106 },
107
108 unisup::StopRequest { reply_to, child } => {
109 debug!("stop request [reply_to: {}; child: {}]", reply_to, child);
110
111 if stopping_children.insert(child) {
112 ctx.fork()
113 .await
114 .map_err(UniformSupFailure::fork)?
115 .run(move |mut ctx| {
116 async move {
117 let result =
118 do_stop_child(&mut ctx, sup_address, stop_timeout, child).await;
119 let _ = ctx.tell(reply_to, result).await;
120 }
121 })
122 .await;
123 } else {
124 let _ = ctx
125 .tell(
126 reply_to,
127 unisup::StopResponse::Err(ErrorOf::new(
128 StopErrorKind::NotFound,
129 "not found",
130 )),
131 )
132 .await;
133 }
134 },
135
136 system::Exited { peer, normal_exit } =>
137 match (
138 started_children.remove(&peer),
139 stopping_children.remove(&peer),
140 normal_exit,
141 ) {
142 (false, true, _) => unreachable!(),
143 (true, true, normal_exit) =>
144 debug!(
145 "a stopping child terminated [child: {}; normal_exit: {}]",
146 peer, normal_exit
147 ),
148 (true, false, normal_exit) =>
149 warn!(
150 "a child unexpectedly terminated [child: {}; normal_exit: {}]",
151 peer, normal_exit
152 ),
153 (false, false, true) => (),
154 (false, false, false) => {
155 debug!(
157 "unknown linked process terminated. Exitting. [offender: {}]",
158 peer
159 );
160 ctx.quit_err(UnknownPeerExited(peer)).await;
161 },
162 },
163
164 any @ _ => {
165 warn!("unexpected message: {:?}", any)
166 },
167 })
168 }
169}
170
171async fn do_start_child<Sys, Ctx>(
172 ctx: &mut Ctx,
173 sup_address: Address,
174 init_type: InitType,
175 start_timeout: Duration,
176 runnable: Sys::Runnable,
177) -> unisup::StartResponse
178where
179 Sys: System + Default,
180 Ctx: Recv + Tell,
181 Ctx: Start<Sys>,
182{
183 debug!(
184 "starting child [init_type: {:?}, start_timeout: {:?}]",
185 init_type, start_timeout
186 );
187
188 let result = match init_type {
189 InitType::NoAck => {
190 ctx.spawn(runnable, true)
191 .await
192 .map_err(|e| e.map_kind(StartErrorKind::Spawn))
193 },
194 InitType::WithAck => ctx.start(runnable, true, start_timeout).await,
195 };
196 match result {
197 Err(reason) => {
198 warn!("error [reason: {}]", reason);
199 Err(reason)
200 },
201 Ok(child) => {
202 debug!("child [address: {}]", child);
203 let _ = ctx.tell(sup_address, ChildStarted(child)).await;
204 Ok(child)
205 },
206 }
207}
208
209async fn do_stop_child<Sys, Ctx>(
210 ctx: &mut Ctx,
211 _sup_address: Address,
212 stop_timeout: Duration,
213 child_address: Address,
214) -> unisup::StopResponse
215where
216 Sys: System + Default,
217 Ctx: Recv + Fork,
218 Ctx: Stop<Sys> + Watching<Sys>,
219{
220 debug!(
221 "stopping child [child_address: {}, stop_timeout: {:?}]",
222 child_address, stop_timeout
223 );
224
225 ctx.shutdown(child_address, stop_timeout)
226 .await
227 .map_err(|e| e.map_kind(|_| StopErrorKind::InternalError))
228}
229
230#[derive(Debug)]
231#[message]
232struct ChildStarted(Address);
233
234#[derive(Debug, thiserror::Error)]
235#[error("unknown peer failure: {}", _0)]
236struct UnknownPeerExited(Address);
237
238impl UniformSupFailure {
239 fn fork(e: impl HasErrorKind<ForkErrorKind> + Send) -> Self {
240 Self::Fork(e.kind())
241 }
242
243 fn recv(e: impl HasErrorKind<RecvErrorKind> + Send) -> Self {
244 Self::Recv(e.kind())
245 }
246}