1use std::collections::HashSet;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_ask::Reply;
6use mm1_common::errors::error_kind::HasErrorKind;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log::{debug, warn};
9use mm1_core::context::{
10 Fork, ForkErrorKind, InitDone, Linking, Messaging, Quit, RecvErrorKind, Start, Stop, Tell,
11 Watching,
12};
13use mm1_core::envelope::dispatch;
14use mm1_proto::message;
15use mm1_proto_ask::Request;
16use mm1_proto_sup::uniform::{self as unisup};
17use mm1_proto_system::{
18 StartErrorKind, StopErrorKind, {self as system},
19};
20
21use crate::common::child_spec::{ChildSpec, InitType};
22use crate::common::factory::ActorFactory;
23
24pub trait UniformSupContext<Runnable>:
25 Fork + InitDone + Linking + Messaging + Quit + Reply + Start<Runnable> + Stop + Watching
26{
27}
28
29#[derive(Debug, thiserror::Error)]
30#[message(base_path = ::mm1_proto)]
31pub enum UniformSupFailure {
32 #[error("recv error: {}", _0)]
33 Recv(RecvErrorKind),
34
35 #[error("fork error: {}", _0)]
36 Fork(ForkErrorKind),
37}
38
39pub struct UniformSup<F> {
40 pub child_spec: ChildSpec<F, ()>,
41}
42
43impl<F> UniformSup<F> {
44 pub fn new(child_spec: ChildSpec<F, ()>) -> Self {
45 Self { child_spec }
46 }
47}
48
49pub async fn uniform_sup<R, Ctx, F>(
50 ctx: &mut Ctx,
51 sup_spec: UniformSup<F>,
52) -> Result<(), UniformSupFailure>
53where
54 R: Send + 'static,
55 Ctx: UniformSupContext<R>,
56 F: ActorFactory<Runnable = R>,
57 F::Args: Send,
58{
59 let UniformSup { child_spec } = sup_spec;
60 let ChildSpec {
61 launcher: factory,
62 child_type: (),
63 init_type,
64 stop_timeout,
65 } = child_spec;
66
67 ctx.set_trap_exit(true).await;
68 ctx.init_done(ctx.address()).await;
69
70 let sup_address = ctx.address();
71 let mut started_children: HashSet<Address> = Default::default();
72 let mut stopping_children: HashSet<Address> = Default::default();
73
74 loop {
75 dispatch!(match ctx.recv().await.map_err(UniformSupFailure::recv)? {
76 Request::<_> {
77 header: reply_to,
78 payload: unisup::StartRequest::<F::Args> { args },
79 } => {
80 debug!("start request [reply_to: {}]", reply_to);
81
82 let runnable = factory.produce(args);
83 ctx.fork()
84 .await
85 .map_err(UniformSupFailure::fork)?
86 .run(move |mut ctx| {
87 async move {
88 let result =
89 do_start_child(&mut ctx, sup_address, init_type, runnable).await;
90 ctx.reply(reply_to, result).await.ok();
91 }
92 })
93 .await;
94 },
95 ChildStarted(child) => {
96 ctx.link(child).await;
97 assert!(started_children.insert(child));
98 },
99
100 Request::<_> {
101 header: reply_to,
102 payload: unisup::StopRequest { child },
103 } => {
104 debug!("stop request [reply_to: {}; child: {}]", reply_to, child);
105
106 if stopping_children.insert(child) {
107 ctx.fork()
108 .await
109 .map_err(UniformSupFailure::fork)?
110 .run(move |mut ctx| {
111 async move {
112 let result =
113 do_stop_child(&mut ctx, sup_address, stop_timeout, child).await;
114 ctx.reply(reply_to, result).await.ok();
115 }
116 })
117 .await;
118 } else {
119 ctx.reply(
120 reply_to,
121 unisup::StopResponse::Err(ErrorOf::new(
122 StopErrorKind::NotFound,
123 "not found",
124 )),
125 )
126 .await
127 .ok();
128 }
129 },
130
131 system::Exited { peer, normal_exit } =>
132 match (
133 started_children.remove(&peer),
134 stopping_children.remove(&peer),
135 normal_exit,
136 ) {
137 (false, true, _) => unreachable!(),
138 (true, true, normal_exit) =>
139 debug!(
140 "a stopping child terminated [child: {}; normal_exit: {}]",
141 peer, normal_exit
142 ),
143 (true, false, normal_exit) =>
144 warn!(
145 "a child unexpectedly terminated [child: {}; normal_exit: {}]",
146 peer, normal_exit
147 ),
148 (false, false, true) => (),
149 (false, false, false) => {
150 debug!(
152 "unknown linked process terminated. Exitting. [offender: {}]",
153 peer
154 );
155 ctx.quit_err(UnknownPeerExited(peer)).await;
156 },
157 },
158
159 any @ _ => {
160 warn!("unexpected message: {:?}", any)
161 },
162 })
163 }
164}
165
166async fn do_start_child<Runnable, Ctx>(
167 ctx: &mut Ctx,
168 sup_address: Address,
169 init_type: InitType,
170 runnable: Runnable,
171) -> unisup::StartResponse
172where
173 Ctx: Messaging + Start<Runnable>,
174{
175 debug!("starting child [init_type: {:?}]", init_type,);
176
177 let result = match init_type {
178 InitType::NoAck => {
179 ctx.spawn(runnable, true)
180 .await
181 .map_err(|e| e.map_kind(StartErrorKind::Spawn))
182 },
183 InitType::WithAck { start_timeout } => ctx.start(runnable, true, start_timeout).await,
184 };
185 match result {
186 Err(reason) => {
187 warn!("error [reason: {}]", reason);
188 Err(reason)
189 },
190 Ok(child) => {
191 debug!("child [address: {}]", child);
192 let _ = ctx.tell(sup_address, ChildStarted(child)).await;
193 Ok(child)
194 },
195 }
196}
197
198async fn do_stop_child<Ctx>(
199 ctx: &mut Ctx,
200 _sup_address: Address,
201 stop_timeout: Duration,
202 child_address: Address,
203) -> unisup::StopResponse
204where
205 Ctx: Fork + Stop + Watching + Messaging,
206{
207 debug!(
208 "stopping child [child_address: {}, stop_timeout: {:?}]",
209 child_address, stop_timeout
210 );
211
212 ctx.shutdown(child_address, stop_timeout)
213 .await
214 .map_err(|e| e.map_kind(|_| StopErrorKind::InternalError))
215}
216
217#[derive(Debug)]
218#[message(base_path = ::mm1_proto)]
219struct ChildStarted(Address);
220
221#[derive(Debug, thiserror::Error)]
222#[error("unknown peer failure: {}", _0)]
223struct UnknownPeerExited(Address);
224
225impl UniformSupFailure {
226 fn fork(e: impl HasErrorKind<ForkErrorKind> + Send) -> Self {
227 Self::Fork(e.kind())
228 }
229
230 fn recv(e: impl HasErrorKind<RecvErrorKind> + Send) -> Self {
231 Self::Recv(e.kind())
232 }
233}
234
235impl<F> Clone for UniformSup<F>
236where
237 ChildSpec<F, ()>: Clone,
238{
239 fn clone(&self) -> Self {
240 Self {
241 child_spec: self.child_spec.clone(),
242 }
243 }
244}
245
246impl<Ctx, Runnable> UniformSupContext<Runnable> for Ctx where
247 Ctx: Fork + InitDone + Linking + Messaging + Quit + Reply + Start<Runnable> + Stop + Watching
248{
249}