1use std::collections::HashMap;
2use std::fmt;
3use std::hash::Hash;
4
5use mm1_ask::Reply;
6use mm1_common::errors::error_kind::HasErrorKind;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log;
9use mm1_core::context::{ForkErrorKind, RecvErrorKind};
10use mm1_core::envelope::dispatch;
11use mm1_proto::Message;
12use mm1_proto_ask::Request;
13use mm1_proto_sup::mixed as m_sup;
14use mm1_proto_system::Exited;
15
16use crate::mixed::decider::{Action, Decider};
17use crate::mixed::strategy::RestartStrategy;
18use crate::mixed::{
19 ChildType, ErasedActorFactory, MixedSup, MixedSupContext, spec_builder, sup_child,
20};
21type ChildSpec<F> = crate::common::child_spec::ChildSpec<F, ChildType>;
22
23pub async fn mixed_sup<Runnable, Ctx, RS, CS, K>(
24 ctx: &mut Ctx,
25 sup_spec: MixedSup<RS, CS>,
26) -> Result<(), MixedSupError>
27where
28 Runnable: Send,
29 Ctx: MixedSupContext<Runnable>,
30 Ctx: Reply,
31 CS: spec_builder::CollectInto<K, Runnable>,
32 RS: RestartStrategy<K>,
33 K: fmt::Display,
34 K: Clone + Hash + Eq,
35 K: Message,
36 ChildSpec<ErasedActorFactory<Runnable>>: Send + Sync + 'static,
37{
38 ctx.set_trap_exit(true).await;
39
40 let MixedSup {
41 restart_strategy,
42 children,
43 } = sup_spec;
44 let sup_addr = ctx.address();
45 let mut decider = restart_strategy.decider();
46 let children = do_init_children(&mut decider, children)?;
47
48 loop {
49 if let Some(action) = decider
50 .next_action(ctx.now())
51 .map_err(MixedSupError::decider)?
52 {
53 log::debug!("processing decider action: {}", action);
54 match action {
55 Action::Noop => (),
56 Action::InitDone => {
57 ctx.init_done(ctx.address()).await;
58 },
59 Action::Quit { normal_exit } => {
60 if normal_exit {
61 ctx.quit_ok().await;
62 } else {
63 ctx.quit_err(MixedSupError::Escalated).await;
64 }
65 },
66 Action::Start { child_id } => {
67 let child_id = child_id.clone();
68 let child_spec = children
69 .get(&child_id)
70 .expect("child_id provided by the decider")
71 .clone()
72 .map_launcher(|f| f.produce(()));
73
74 let forked = ctx
75 .fork()
76 .await
77 .map_err(|e| e.kind())
78 .map_err(MixedSupError::Fork)?;
79 forked
80 .run(move |mut ctx| async move { sup_child::run(&mut ctx, sup_addr, child_id, child_spec).await })
81 .await;
82 },
83
84 Action::Stop { address, child_id } => {
85 let stop_timeout = child_id
86 .map(|id| {
87 children
88 .get(id)
89 .expect("child_id provided by the decider")
90 .stop_timeout
91 })
92 .unwrap_or_default();
93 let forked = ctx
94 .fork()
95 .await
96 .map_err(|e| e.kind())
97 .map_err(MixedSupError::Fork)?;
98 forked
99 .run(move |mut ctx| {
100 async move {
101 sup_child::shutdown(&mut ctx, sup_addr, address, stop_timeout).await
102 }
103 })
104 .await;
105 },
106 }
107 }
108
109 let received = ctx.recv().await.map_err(MixedSupError::recv)?;
110
111 dispatch!(match received {
112 sup_child::Started::<K> { child_id, address } => {
113 log::debug!("[{}] started as {}. Linking...", child_id, address);
114 ctx.link(address).await;
115 decider.started(&child_id, address, ctx.now())
116 },
117 sup_child::StartFailed::<K> { child_id } => {
118 log::warn!("failed to start [{}]. Initiating shutdown...", child_id);
119 decider.failed(&child_id, ctx.now());
120 },
121 sup_child::StopFailed { address, reason } => {
122 log::warn!(
123 "failed to stop {}: {}. Initiating shutdown...",
124 address,
125 reason
126 );
127 decider.quit(false);
128 },
129
130 Request::<_> {
131 header,
132 payload: m_sup::GetChildRequest::<K> { child_id },
133 } => {
134 let reply_with: m_sup::GetChildResponse = match decider.address(&child_id) {
135 Err(reason) => {
136 Err(ErrorOf::new(
137 m_sup::GetChildErrorKind::UnknownChild,
138 reason.to_string(),
139 ))
140 },
141 Ok(address_opt) => Ok(address_opt),
142 };
143 ctx.reply(header, reply_with).await.ok();
144 },
145
146 Exited { peer, normal_exit } => {
147 log::debug!("{} exited", peer);
148 decider.exited(peer, normal_exit, ctx.now());
149 },
150
151 unexpected @ _ => log::warn!("unexpected message: {:?}", unexpected),
152 });
153 }
154}
155
156#[derive(Debug, thiserror::Error)]
157pub enum MixedSupError {
158 #[error("escalated supervisor failure")]
159 Escalated,
160
161 #[error("decider: {}", _0)]
162 Decider(String),
163
164 #[error("recv: {}", _0)]
165 Recv(RecvErrorKind),
166
167 #[error("fork: {}", _0)]
168 Fork(ForkErrorKind),
169}
170
171impl MixedSupError {
172 pub fn decider(reason: impl fmt::Display) -> Self {
173 Self::Decider(reason.to_string())
174 }
175
176 pub fn recv(reason: impl HasErrorKind<RecvErrorKind>) -> Self {
177 Self::Recv(reason.kind())
178 }
179}
180
181fn do_init_children<CS, D, R, K>(
182 decider: &mut D,
183 children: CS,
184) -> Result<HashMap<K, ChildSpec<ErasedActorFactory<R>>>, MixedSupError>
185where
186 CS: spec_builder::CollectInto<K, R>,
187 D: Decider<Key = K>,
188 K: Clone + Hash + Eq,
189{
190 let mut flat = vec![];
191 children.collect_into(&mut flat);
192
193 for (k, spec) in flat.iter() {
194 decider
195 .add(k.clone(), spec.child_type)
196 .map_err(MixedSupError::decider)?;
197 }
198
199 let children_map = flat.into_iter().collect();
200
201 Ok(children_map)
202}