mm1_sup/mixed/
sup_actor.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::hash::Hash;
4
5use mm1_ask::Reply;
6use mm1_common::errors::error_kind::HasErrorKind;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log;
9use mm1_core::context::{ForkErrorKind, RecvErrorKind};
10use mm1_core::envelope::dispatch;
11use mm1_proto::Message;
12use mm1_proto_ask::Request;
13use mm1_proto_sup::mixed as m_sup;
14use mm1_proto_system::Exited;
15
16use crate::mixed::decider::{Action, Decider};
17use crate::mixed::strategy::RestartStrategy;
18use crate::mixed::{
19    ChildType, ErasedActorFactory, MixedSup, MixedSupContext, spec_builder, sup_child,
20};
21type ChildSpec<F> = crate::common::child_spec::ChildSpec<F, ChildType>;
22
23pub async fn mixed_sup<Runnable, Ctx, RS, CS, K>(
24    ctx: &mut Ctx,
25    sup_spec: MixedSup<RS, CS>,
26) -> Result<(), MixedSupError>
27where
28    Runnable: Send,
29    Ctx: MixedSupContext<Runnable>,
30    Ctx: Reply,
31    CS: spec_builder::CollectInto<K, Runnable>,
32    RS: RestartStrategy<K>,
33    K: fmt::Display,
34    K: Clone + Hash + Eq,
35    K: Message,
36    ChildSpec<ErasedActorFactory<Runnable>>: Send + Sync + 'static,
37{
38    ctx.set_trap_exit(true).await;
39
40    let MixedSup {
41        restart_strategy,
42        children,
43    } = sup_spec;
44    let sup_addr = ctx.address();
45    let mut decider = restart_strategy.decider();
46    let children = do_init_children(&mut decider, children)?;
47
48    loop {
49        if let Some(action) = decider
50            .next_action(ctx.now())
51            .map_err(MixedSupError::decider)?
52        {
53            log::debug!("processing decider action: {}", action);
54            match action {
55                Action::Noop => (),
56                Action::InitDone => {
57                    ctx.init_done(ctx.address()).await;
58                },
59                Action::Quit { normal_exit } => {
60                    if normal_exit {
61                        ctx.quit_ok().await;
62                    } else {
63                        ctx.quit_err(MixedSupError::Escalated).await;
64                    }
65                },
66                Action::Start { child_id } => {
67                    let child_id = child_id.clone();
68                    let child_spec = children
69                        .get(&child_id)
70                        .expect("child_id provided by the decider")
71                        .clone()
72                        .map_launcher(|f| f.produce(()));
73
74                    let forked = ctx
75                        .fork()
76                        .await
77                        .map_err(|e| e.kind())
78                        .map_err(MixedSupError::Fork)?;
79                    forked
80                        .run(move |mut ctx| async move { sup_child::run(&mut ctx, sup_addr, child_id, child_spec).await })
81                        .await;
82                },
83
84                Action::Stop { address, child_id } => {
85                    let stop_timeout = child_id
86                        .map(|id| {
87                            children
88                                .get(id)
89                                .expect("child_id provided by the decider")
90                                .stop_timeout
91                        })
92                        .unwrap_or_default();
93                    let forked = ctx
94                        .fork()
95                        .await
96                        .map_err(|e| e.kind())
97                        .map_err(MixedSupError::Fork)?;
98                    forked
99                        .run(move |mut ctx| {
100                            async move {
101                                sup_child::shutdown(&mut ctx, sup_addr, address, stop_timeout).await
102                            }
103                        })
104                        .await;
105                },
106            }
107        }
108
109        let received = ctx.recv().await.map_err(MixedSupError::recv)?;
110
111        dispatch!(match received {
112            sup_child::Started::<K> { child_id, address } => {
113                log::debug!("[{}] started as {}. Linking...", child_id, address);
114                ctx.link(address).await;
115                decider.started(&child_id, address, ctx.now())
116            },
117            sup_child::StartFailed::<K> { child_id } => {
118                log::warn!("failed to start [{}]. Initiating shutdown...", child_id);
119                decider.failed(&child_id, ctx.now());
120            },
121            sup_child::StopFailed { address, reason } => {
122                log::warn!(
123                    "failed to stop {}: {}. Initiating shutdown...",
124                    address,
125                    reason
126                );
127                decider.quit(false);
128            },
129
130            Request::<_> {
131                header,
132                payload: m_sup::GetChildRequest::<K> { child_id },
133            } => {
134                let reply_with: m_sup::GetChildResponse = match decider.address(&child_id) {
135                    Err(reason) => {
136                        Err(ErrorOf::new(
137                            m_sup::GetChildErrorKind::UnknownChild,
138                            reason.to_string(),
139                        ))
140                    },
141                    Ok(address_opt) => Ok(address_opt),
142                };
143                ctx.reply(header, reply_with).await.ok();
144            },
145
146            Exited { peer, normal_exit } => {
147                log::debug!("{} exited", peer);
148                decider.exited(peer, normal_exit, ctx.now());
149            },
150
151            unexpected @ _ => log::warn!("unexpected message: {:?}", unexpected),
152        });
153    }
154}
155
156#[derive(Debug, thiserror::Error)]
157pub enum MixedSupError {
158    #[error("escalated supervisor failure")]
159    Escalated,
160
161    #[error("decider: {}", _0)]
162    Decider(String),
163
164    #[error("recv: {}", _0)]
165    Recv(RecvErrorKind),
166
167    #[error("fork: {}", _0)]
168    Fork(ForkErrorKind),
169}
170
171impl MixedSupError {
172    pub fn decider(reason: impl fmt::Display) -> Self {
173        Self::Decider(reason.to_string())
174    }
175
176    pub fn recv(reason: impl HasErrorKind<RecvErrorKind>) -> Self {
177        Self::Recv(reason.kind())
178    }
179}
180
181fn do_init_children<CS, D, R, K>(
182    decider: &mut D,
183    children: CS,
184) -> Result<HashMap<K, ChildSpec<ErasedActorFactory<R>>>, MixedSupError>
185where
186    CS: spec_builder::CollectInto<K, R>,
187    D: Decider<Key = K>,
188    K: Clone + Hash + Eq,
189{
190    let mut flat = vec![];
191    children.collect_into(&mut flat);
192
193    for (k, spec) in flat.iter() {
194        decider
195            .add(k.clone(), spec.child_type)
196            .map_err(MixedSupError::decider)?;
197    }
198
199    let children_map = flat.into_iter().collect();
200
201    Ok(children_map)
202}