Skip to main content

mm1_sup/mixed/strategy/
one_for_one.rs

1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::errors::chain::StdErrorDisplayChainExt;
7use mm1_common::log;
8
9use crate::common::restart_intensity::{RestartIntensity, RestartStats};
10use crate::mixed::ChildType;
11use crate::mixed::decider::{Action, Decider};
12use crate::mixed::strategy::{DeciderError, OneForOne, RestartStrategy};
13
14pub struct OneForOneDecider<K> {
15    restart_intensity: RestartIntensity,
16    restart_stats:     RestartStats,
17    states:            Vec<(K, State)>,
18    orphans:           HashSet<Address>,
19    status:            SupStatus,
20}
21
22#[derive(Debug)]
23struct State {
24    child_type: ChildType,
25    status:     Status,
26    target:     Target,
27}
28
29#[derive(Debug, Clone, Copy)]
30enum Status {
31    Starting,
32    Running { address: Address },
33    Terminating { address: Address },
34    Stopped,
35}
36
37#[derive(Debug, Clone, Copy)]
38enum Target {
39    Running,
40    Stopped,
41    Removed,
42}
43
44enum SupStatus {
45    Starting,
46    Running,
47    Stopping { normal_exit: bool },
48    Stopped,
49}
50
51impl<K> OneForOne<K> {
52    pub fn new(restart_intensity: RestartIntensity) -> Self {
53        Self {
54            restart_intensity,
55            _pd: Default::default(),
56        }
57    }
58}
59
60impl<K> RestartStrategy<K> for OneForOne<K>
61where
62    Self: Clone,
63    OneForOneDecider<K>: Decider<Key = K>,
64{
65    type Decider = OneForOneDecider<K>;
66
67    fn decider(&self) -> Self::Decider {
68        let restart_intensity = self.restart_intensity;
69        let restart_stats = restart_intensity.new_stats();
70        OneForOneDecider {
71            restart_intensity,
72            restart_stats,
73            states: Default::default(),
74            orphans: Default::default(),
75            status: SupStatus::Starting,
76        }
77    }
78}
79
80impl<K> Decider for OneForOneDecider<K>
81where
82    K: fmt::Display + Eq,
83{
84    type Error = DeciderError;
85    type Key = K;
86
87    fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
88        self.states
89            .iter()
90            .find_map(|(k, s)| {
91                (key == k).then_some(match s.status {
92                    Status::Running { address } | Status::Terminating { address } => Some(address),
93                    _ => None,
94                })
95            })
96            .ok_or(DeciderError::KeyNotFound)
97    }
98
99    fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
100        if self.states.iter().any(|(k, _)| *k == key) {
101            return Err(DeciderError::DuplicateKey)
102        }
103        self.states.push((
104            key,
105            State {
106                child_type,
107                status: Status::Stopped,
108                target: Target::Running,
109            },
110        ));
111        Ok(())
112    }
113
114    fn rm(&mut self, key: &Self::Key) -> Result<(), Self::Error> {
115        if let Some(state) = self
116            .states
117            .iter_mut()
118            .find_map(|(k, s)| (k == key).then_some(s))
119        {
120            state.target = Target::Removed;
121        }
122        Ok(())
123    }
124
125    fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
126        assert!(!self.states.iter().any(|(_, s)|
127                matches!(s.status,
128                    Status::Running { address } | Status::Terminating { address } if address == reported_address
129                )
130            )
131        );
132
133        let Some(state) = self
134            .states
135            .iter_mut()
136            .find_map(|(k, s)| (k == key).then_some(s))
137        else {
138            log::warn!(
139                key = %key, addr = %reported_address,
140                "reported start, key not found"
141            );
142            return
143        };
144        match state.status {
145            Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
146                self.orphans.insert(reported_address);
147            },
148            Status::Starting => {
149                state.status = Status::Running {
150                    address: reported_address,
151                };
152            },
153        }
154    }
155
156    fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
157        let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
158            matches!(s.status,
159                    Status::Running { address } |
160                    Status::Terminating { address }
161                    if address == reported_addr)
162            .then_some(s)
163        }) else {
164            log::info!(
165                by_addr = %reported_addr, normal_exit = %normal_exit,
166                "termination requested"
167            );
168            self.status = SupStatus::Stopping { normal_exit: true };
169            return;
170        };
171
172        let is_acceptable = match state.child_type {
173            ChildType::Permanent => false,
174            ChildType::Transient => normal_exit,
175            ChildType::Temporary => true,
176        };
177        match (state.status, is_acceptable) {
178            (Status::Terminating { address }, _) => {
179                assert_eq!(address, reported_addr);
180                // Deliberate stop: the target was already set to Stopped/Removed.
181                state.status = Status::Stopped;
182            },
183            (Status::Running { address }, true) => {
184                assert_eq!(address, reported_addr);
185                // Acceptable exit (a Temporary child, or a Transient child
186                // exiting normally): do not restart — leave the child stopped.
187                state.status = Status::Stopped;
188                state.target = Target::Stopped;
189            },
190            (Status::Running { address }, false) => {
191                assert_eq!(address, reported_addr);
192                if let Err(reason) = self
193                    .restart_intensity
194                    .report_exit(&mut self.restart_stats, at)
195                {
196                    log::info!(reason = %reason.as_display_chain(), "restart intensity exceeded; giving up");
197                    self.status = SupStatus::Stopping { normal_exit: false };
198                }
199                state.status = Status::Stopped;
200            },
201            _ => unreachable!("how could this state be selected?"),
202        }
203    }
204
205    fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
206        if let Some(state) = self
207            .states
208            .iter_mut()
209            .find_map(|(k, s)| (k == key).then_some(s))
210        {
211            match state.status {
212                Status::Starting => {
213                    state.status = Status::Stopped;
214                },
215                _ => {
216                    state.target = Target::Stopped;
217                },
218            }
219            self.status = SupStatus::Stopping { normal_exit: false }
220        }
221    }
222
223    fn quit(&mut self, normal_exit: bool) {
224        self.status = match self.status {
225            SupStatus::Stopped => SupStatus::Stopped,
226            SupStatus::Stopping {
227                normal_exit: existing_normal_exit,
228            } => {
229                SupStatus::Stopping {
230                    normal_exit: normal_exit && existing_normal_exit,
231                }
232            },
233            SupStatus::Starting | SupStatus::Running => SupStatus::Stopping { normal_exit },
234        };
235    }
236
237    fn next_action(
238        &mut self,
239        _at: tokio::time::Instant,
240    ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
241        self.states.retain(|(_k, state)| {
242            !matches!(
243                (state.status, state.target),
244                (Status::Stopped, Target::Removed)
245            )
246        });
247
248        if let Some(address) = self.orphans.iter().next().copied() {
249            self.orphans.remove(&address);
250            return Ok(Some(Action::Stop {
251                address,
252                child_id: None,
253            }))
254        }
255
256        let states_iter_mut = match self.status {
257            SupStatus::Starting | SupStatus::Running => {
258                Either::Left(Either::Left(self.states.iter_mut()))
259            },
260            SupStatus::Stopping { .. } => Either::Left(Either::Right(self.states.iter_mut().rev())),
261            SupStatus::Stopped => Either::Right(std::iter::empty()),
262        };
263        let mut all_children_started = true;
264        let mut all_children_stopped = true;
265        for &mut (ref key, ref mut state) in states_iter_mut {
266            let status = state.status;
267            let target = match self.status {
268                SupStatus::Starting | SupStatus::Running => state.target,
269                SupStatus::Stopping { .. } => Target::Stopped,
270                SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
271            };
272
273            all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
274            all_children_started = all_children_started && matches!(status, Status::Running { .. });
275
276            log::debug!(
277                key = %key, status = ?status, target = ?target,
278                "considering"
279            );
280
281            match (status, target) {
282                // Nothing we can do
283                (Status::Starting, Target::Running | Target::Stopped | Target::Removed) => continue,
284                (
285                    Status::Terminating { .. },
286                    Target::Running | Target::Stopped | Target::Removed,
287                ) => continue,
288
289                // Nothing we should do
290                (Status::Running { .. }, Target::Running) => continue,
291                (Status::Stopped, Target::Stopped) => continue,
292
293                // Should not happen
294                (Status::Stopped, Target::Removed) => unreachable!("filtered out above"),
295
296                // Do something!
297                (Status::Stopped, Target::Running) => {
298                    state.status = Status::Starting;
299                    return Ok(Some(Action::Start { child_id: key }))
300                },
301                (Status::Running { address }, Target::Stopped | Target::Removed) => {
302                    state.status = Status::Terminating { address };
303                    return Ok(Some(Action::Stop {
304                        address,
305                        child_id: Some(key),
306                    }))
307                },
308            }
309        }
310
311        match self.status {
312            SupStatus::Running | SupStatus::Stopped => Ok(None),
313
314            SupStatus::Starting => {
315                log::info!(all_children_started = %all_children_started, "starting");
316                if all_children_started {
317                    self.status = SupStatus::Running;
318                    Ok(Some(Action::InitDone))
319                } else {
320                    Ok(None)
321                }
322            },
323
324            SupStatus::Stopping { normal_exit } => {
325                log::info!(
326                    normal = %normal_exit, all_children_stopped = %all_children_stopped,
327                    "stopping"
328                );
329                if all_children_stopped {
330                    self.status = SupStatus::Stopped;
331                    Ok(Some(Action::Quit { normal_exit }))
332                } else {
333                    Ok(None)
334                }
335            },
336        }
337    }
338}
339
340impl<K> Clone for OneForOne<K> {
341    fn clone(&self) -> Self {
342        Self {
343            restart_intensity: self.restart_intensity,
344            _pd:               Default::default(),
345        }
346    }
347}
348
349#[cfg(test)]
350mod tests;