mm1_sup/
uniform.rs

1use std::collections::HashSet;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_ask::Reply;
6use mm1_common::errors::error_kind::HasErrorKind;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log::{debug, warn};
9use mm1_core::context::{
10    Fork, ForkErrorKind, InitDone, Linking, Messaging, Quit, RecvErrorKind, Start, Stop, Tell,
11    Watching,
12};
13use mm1_core::envelope::dispatch;
14use mm1_proto::message;
15use mm1_proto_ask::Request;
16use mm1_proto_sup::uniform::{self as unisup};
17use mm1_proto_system::{
18    StartErrorKind, StopErrorKind, {self as system},
19};
20
21use crate::common::child_spec::{ChildSpec, InitType};
22use crate::common::factory::ActorFactory;
23
24pub trait UniformSupContext<Runnable>:
25    Fork + InitDone + Linking + Messaging + Quit + Reply + Start<Runnable> + Stop + Watching
26{
27}
28
29#[derive(Debug, thiserror::Error)]
30#[message(base_path = ::mm1_proto)]
31pub enum UniformSupFailure {
32    #[error("recv error: {}", _0)]
33    Recv(RecvErrorKind),
34
35    #[error("fork error: {}", _0)]
36    Fork(ForkErrorKind),
37}
38
39pub struct UniformSup<F> {
40    pub child_spec: ChildSpec<F, ()>,
41}
42
43impl<F> UniformSup<F> {
44    pub fn new(child_spec: ChildSpec<F, ()>) -> Self {
45        Self { child_spec }
46    }
47}
48
49pub async fn uniform_sup<R, Ctx, F>(
50    ctx: &mut Ctx,
51    sup_spec: UniformSup<F>,
52) -> Result<(), UniformSupFailure>
53where
54    R: Send + 'static,
55    Ctx: UniformSupContext<R>,
56    F: ActorFactory<Runnable = R>,
57    F::Args: Send,
58{
59    let UniformSup { child_spec } = sup_spec;
60    let ChildSpec {
61        launcher: factory,
62        child_type: (),
63        init_type,
64        stop_timeout,
65    } = child_spec;
66
67    ctx.set_trap_exit(true).await;
68    ctx.init_done(ctx.address()).await;
69
70    let sup_address = ctx.address();
71    let mut started_children: HashSet<Address> = Default::default();
72    let mut stopping_children: HashSet<Address> = Default::default();
73
74    loop {
75        dispatch!(match ctx.recv().await.map_err(UniformSupFailure::recv)? {
76            Request::<_> {
77                header: reply_to,
78                payload: unisup::StartRequest::<F::Args> { args },
79            } => {
80                debug!("start request [reply_to: {}]", reply_to);
81
82                let runnable = factory.produce(args);
83                ctx.fork()
84                    .await
85                    .map_err(UniformSupFailure::fork)?
86                    .run(move |mut ctx| {
87                        async move {
88                            let result =
89                                do_start_child(&mut ctx, sup_address, init_type, runnable).await;
90                            ctx.reply(reply_to, result).await.ok();
91                        }
92                    })
93                    .await;
94            },
95            ChildStarted(child) => {
96                ctx.link(child).await;
97                assert!(started_children.insert(child));
98            },
99
100            Request::<_> {
101                header: reply_to,
102                payload: unisup::StopRequest { child },
103            } => {
104                debug!("stop request [reply_to: {}; child: {}]", reply_to, child);
105
106                if stopping_children.insert(child) {
107                    ctx.fork()
108                        .await
109                        .map_err(UniformSupFailure::fork)?
110                        .run(move |mut ctx| {
111                            async move {
112                                let result =
113                                    do_stop_child(&mut ctx, sup_address, stop_timeout, child).await;
114                                ctx.reply(reply_to, result).await.ok();
115                            }
116                        })
117                        .await;
118                } else {
119                    ctx.reply(
120                        reply_to,
121                        unisup::StopResponse::Err(ErrorOf::new(
122                            StopErrorKind::NotFound,
123                            "not found",
124                        )),
125                    )
126                    .await
127                    .ok();
128                }
129            },
130
131            system::Exited { peer, normal_exit } =>
132                match (
133                    started_children.remove(&peer),
134                    stopping_children.remove(&peer),
135                    normal_exit,
136                ) {
137                    (false, true, _) => unreachable!(),
138                    (true, true, normal_exit) =>
139                        debug!(
140                            "a stopping child terminated [child: {}; normal_exit: {}]",
141                            peer, normal_exit
142                        ),
143                    (true, false, normal_exit) =>
144                        warn!(
145                            "a child unexpectedly terminated [child: {}; normal_exit: {}]",
146                            peer, normal_exit
147                        ),
148                    (false, false, true) => (),
149                    (false, false, false) => {
150                        // TODO: reap all the children before giving up
151                        debug!(
152                            "unknown linked process terminated. Exitting. [offender: {}]",
153                            peer
154                        );
155                        ctx.quit_err(UnknownPeerExited(peer)).await;
156                    },
157                },
158
159            any @ _ => {
160                warn!("unexpected message: {:?}", any)
161            },
162        })
163    }
164}
165
166async fn do_start_child<Runnable, Ctx>(
167    ctx: &mut Ctx,
168    sup_address: Address,
169    init_type: InitType,
170    runnable: Runnable,
171) -> unisup::StartResponse
172where
173    Ctx: Messaging + Start<Runnable>,
174{
175    debug!("starting child [init_type: {:?}]", init_type,);
176
177    let result = match init_type {
178        InitType::NoAck => {
179            ctx.spawn(runnable, true)
180                .await
181                .map_err(|e| e.map_kind(StartErrorKind::Spawn))
182        },
183        InitType::WithAck { start_timeout } => ctx.start(runnable, true, start_timeout).await,
184    };
185    match result {
186        Err(reason) => {
187            warn!("error [reason: {}]", reason);
188            Err(reason)
189        },
190        Ok(child) => {
191            debug!("child [address: {}]", child);
192            let _ = ctx.tell(sup_address, ChildStarted(child)).await;
193            Ok(child)
194        },
195    }
196}
197
198async fn do_stop_child<Ctx>(
199    ctx: &mut Ctx,
200    _sup_address: Address,
201    stop_timeout: Duration,
202    child_address: Address,
203) -> unisup::StopResponse
204where
205    Ctx: Fork + Stop + Watching + Messaging,
206{
207    debug!(
208        "stopping child [child_address: {}, stop_timeout: {:?}]",
209        child_address, stop_timeout
210    );
211
212    ctx.shutdown(child_address, stop_timeout)
213        .await
214        .map_err(|e| e.map_kind(|_| StopErrorKind::InternalError))
215}
216
217#[derive(Debug)]
218#[message(base_path = ::mm1_proto)]
219struct ChildStarted(Address);
220
221#[derive(Debug, thiserror::Error)]
222#[error("unknown peer failure: {}", _0)]
223struct UnknownPeerExited(Address);
224
225impl UniformSupFailure {
226    fn fork(e: impl HasErrorKind<ForkErrorKind> + Send) -> Self {
227        Self::Fork(e.kind())
228    }
229
230    fn recv(e: impl HasErrorKind<RecvErrorKind> + Send) -> Self {
231        Self::Recv(e.kind())
232    }
233}
234
235impl<F> Clone for UniformSup<F>
236where
237    ChildSpec<F, ()>: Clone,
238{
239    fn clone(&self) -> Self {
240        Self {
241            child_spec: self.child_spec.clone(),
242        }
243    }
244}
245
246impl<Ctx, Runnable> UniformSupContext<Runnable> for Ctx where
247    Ctx: Fork + InitDone + Linking + Messaging + Quit + Reply + Start<Runnable> + Stop + Watching
248{
249}