mm1_sup/
uniform.rs

1use std::collections::HashSet;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_common::errors::error_kind::HasErrorKind;
6use mm1_common::errors::error_of::ErrorOf;
7use mm1_common::log::{debug, warn};
8use mm1_core::context::{
9    Fork, ForkErrorKind, InitDone, Linking, Quit, Recv, RecvErrorKind, Start, Stop, Tell, Watching,
10};
11use mm1_core::envelope::dispatch;
12use mm1_proto::message;
13use mm1_proto_sup::uniform::{self as unisup};
14use mm1_proto_system::{
15    StartErrorKind, StopErrorKind, System, {self as system},
16};
17
18use crate::common::child_spec::{ChildSpec, ChildTimeouts, InitType};
19use crate::common::factory::ActorFactory;
20
21#[derive(Debug, thiserror::Error)]
22#[message]
23pub enum UniformSupFailure {
24    #[error("recv error: {}", _0)]
25    Recv(RecvErrorKind),
26
27    #[error("fork error: {}", _0)]
28    Fork(ForkErrorKind),
29}
30
31pub struct UniformSup<F, D = Duration> {
32    pub child_spec: ChildSpec<F, D>,
33}
34
35impl<F, D> UniformSup<F, D> {
36    pub fn new(child_spec: ChildSpec<F, D>) -> Self {
37        Self { child_spec }
38    }
39}
40
41pub async fn uniform_sup<Sys, Ctx, F>(
42    ctx: &mut Ctx,
43    sup_spec: UniformSup<F, Duration>,
44) -> Result<(), UniformSupFailure>
45where
46    Sys: System + Default,
47
48    Ctx: Fork + Recv + Tell + Quit,
49    Ctx: InitDone<Sys>,
50    Ctx: Linking<Sys>,
51    Ctx: Watching<Sys>,
52    Ctx: Start<Sys>,
53    Ctx: Stop<Sys>,
54    F: ActorFactory<Runnable = Sys::Runnable>,
55    F::Args: Send,
56{
57    let UniformSup { child_spec } = sup_spec;
58    let ChildSpec {
59        factory,
60        child_type,
61        init_type,
62        timeouts,
63    } = child_spec;
64    let ChildTimeouts {
65        start_timeout,
66        stop_timeout,
67    } = timeouts;
68
69    let _ = child_type;
70    let _ = timeouts;
71
72    ctx.set_trap_exit(true).await;
73    ctx.init_done(ctx.address()).await;
74
75    let sup_address = ctx.address();
76    let mut started_children: HashSet<Address> = Default::default();
77    let mut stopping_children: HashSet<Address> = Default::default();
78
79    loop {
80        dispatch!(match ctx.recv().await.map_err(UniformSupFailure::recv)? {
81            unisup::StartRequest::<F::Args> { reply_to, args } => {
82                debug!("start request [reply_to: {}]", reply_to);
83
84                let runnable = factory.produce(args);
85                ctx.fork()
86                    .await
87                    .map_err(UniformSupFailure::fork)?
88                    .run(move |mut ctx| {
89                        async move {
90                            let result = do_start_child(
91                                &mut ctx,
92                                sup_address,
93                                init_type,
94                                start_timeout,
95                                runnable,
96                            )
97                            .await;
98                            let _ = ctx.tell(reply_to, result).await;
99                        }
100                    })
101                    .await;
102            },
103            ChildStarted(child) => {
104                ctx.link(child).await;
105                assert!(started_children.insert(child));
106            },
107
108            unisup::StopRequest { reply_to, child } => {
109                debug!("stop request [reply_to: {}; child: {}]", reply_to, child);
110
111                if stopping_children.insert(child) {
112                    ctx.fork()
113                        .await
114                        .map_err(UniformSupFailure::fork)?
115                        .run(move |mut ctx| {
116                            async move {
117                                let result =
118                                    do_stop_child(&mut ctx, sup_address, stop_timeout, child).await;
119                                let _ = ctx.tell(reply_to, result).await;
120                            }
121                        })
122                        .await;
123                } else {
124                    let _ = ctx
125                        .tell(
126                            reply_to,
127                            unisup::StopResponse::Err(ErrorOf::new(
128                                StopErrorKind::NotFound,
129                                "not found",
130                            )),
131                        )
132                        .await;
133                }
134            },
135
136            system::Exited { peer, normal_exit } =>
137                match (
138                    started_children.remove(&peer),
139                    stopping_children.remove(&peer),
140                    normal_exit,
141                ) {
142                    (false, true, _) => unreachable!(),
143                    (true, true, normal_exit) =>
144                        debug!(
145                            "a stopping child terminated [child: {}; normal_exit: {}]",
146                            peer, normal_exit
147                        ),
148                    (true, false, normal_exit) =>
149                        warn!(
150                            "a child unexpectedly terminated [child: {}; normal_exit: {}]",
151                            peer, normal_exit
152                        ),
153                    (false, false, true) => (),
154                    (false, false, false) => {
155                        // TODO: reap all the children before giving up
156                        debug!(
157                            "unknown linked process terminated. Exitting. [offender: {}]",
158                            peer
159                        );
160                        ctx.quit_err(UnknownPeerExited(peer)).await;
161                    },
162                },
163
164            any @ _ => {
165                warn!("unexpected message: {:?}", any)
166            },
167        })
168    }
169}
170
171async fn do_start_child<Sys, Ctx>(
172    ctx: &mut Ctx,
173    sup_address: Address,
174    init_type: InitType,
175    start_timeout: Duration,
176    runnable: Sys::Runnable,
177) -> unisup::StartResponse
178where
179    Sys: System + Default,
180    Ctx: Recv + Tell,
181    Ctx: Start<Sys>,
182{
183    debug!(
184        "starting child [init_type: {:?}, start_timeout: {:?}]",
185        init_type, start_timeout
186    );
187
188    let result = match init_type {
189        InitType::NoAck => {
190            ctx.spawn(runnable, true)
191                .await
192                .map_err(|e| e.map_kind(StartErrorKind::Spawn))
193        },
194        InitType::WithAck => ctx.start(runnable, true, start_timeout).await,
195    };
196    match result {
197        Err(reason) => {
198            warn!("error [reason: {}]", reason);
199            Err(reason)
200        },
201        Ok(child) => {
202            debug!("child [address: {}]", child);
203            let _ = ctx.tell(sup_address, ChildStarted(child)).await;
204            Ok(child)
205        },
206    }
207}
208
209async fn do_stop_child<Sys, Ctx>(
210    ctx: &mut Ctx,
211    _sup_address: Address,
212    stop_timeout: Duration,
213    child_address: Address,
214) -> unisup::StopResponse
215where
216    Sys: System + Default,
217    Ctx: Recv + Fork,
218    Ctx: Stop<Sys> + Watching<Sys>,
219{
220    debug!(
221        "stopping child [child_address: {}, stop_timeout: {:?}]",
222        child_address, stop_timeout
223    );
224
225    ctx.shutdown(child_address, stop_timeout)
226        .await
227        .map_err(|e| e.map_kind(|_| StopErrorKind::InternalError))
228}
229
230#[derive(Debug)]
231#[message]
232struct ChildStarted(Address);
233
234#[derive(Debug, thiserror::Error)]
235#[error("unknown peer failure: {}", _0)]
236struct UnknownPeerExited(Address);
237
238impl UniformSupFailure {
239    fn fork(e: impl HasErrorKind<ForkErrorKind> + Send) -> Self {
240        Self::Fork(e.kind())
241    }
242
243    fn recv(e: impl HasErrorKind<RecvErrorKind> + Send) -> Self {
244        Self::Recv(e.kind())
245    }
246}