Skip to main content

mm1_sup/mixed/strategy/
one_for_one.rs

1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::errors::chain::StdErrorDisplayChainExt;
7use mm1_common::log;
8
9use crate::common::restart_intensity::{RestartIntensity, RestartStats};
10use crate::mixed::ChildType;
11use crate::mixed::decider::{Action, Decider};
12use crate::mixed::strategy::{DeciderError, OneForOne, RestartStrategy};
13
14pub struct OneForOneDecider<K> {
15    restart_intensity: RestartIntensity,
16    restart_stats:     RestartStats,
17    states:            Vec<(K, State)>,
18    orphans:           HashSet<Address>,
19    status:            SupStatus,
20}
21
22#[derive(Debug)]
23struct State {
24    child_type: ChildType,
25    status:     Status,
26    target:     Target,
27}
28
29#[derive(Debug, Clone, Copy)]
30enum Status {
31    Starting,
32    Running { address: Address },
33    Terminating { address: Address },
34    Stopped,
35}
36
37#[derive(Debug, Clone, Copy)]
38enum Target {
39    Running,
40    Stopped,
41    Removed,
42}
43
44enum SupStatus {
45    Starting,
46    Running,
47    Stopping { normal_exit: bool },
48    Stopped,
49}
50
51impl<K> OneForOne<K> {
52    pub fn new(restart_intensity: RestartIntensity) -> Self {
53        Self {
54            restart_intensity,
55            _pd: Default::default(),
56        }
57    }
58}
59
60impl<K> RestartStrategy<K> for OneForOne<K>
61where
62    Self: Clone,
63    OneForOneDecider<K>: Decider<Key = K>,
64{
65    type Decider = OneForOneDecider<K>;
66
67    fn decider(&self) -> Self::Decider {
68        let restart_intensity = self.restart_intensity;
69        let restart_stats = restart_intensity.new_stats();
70        OneForOneDecider {
71            restart_intensity,
72            restart_stats,
73            states: Default::default(),
74            orphans: Default::default(),
75            status: SupStatus::Starting,
76        }
77    }
78}
79
80impl<K> Decider for OneForOneDecider<K>
81where
82    K: fmt::Display + Eq,
83{
84    type Error = DeciderError;
85    type Key = K;
86
87    fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
88        self.states
89            .iter()
90            .find_map(|(k, s)| {
91                (key == k).then_some(match s.status {
92                    Status::Running { address } | Status::Terminating { address } => Some(address),
93                    _ => None,
94                })
95            })
96            .ok_or(DeciderError::KeyNotFound)
97    }
98
99    fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
100        if self.states.iter().any(|(k, _)| *k == key) {
101            return Err(DeciderError::DuplicateKey)
102        }
103        self.states.push((
104            key,
105            State {
106                child_type,
107                status: Status::Stopped,
108                target: Target::Running,
109            },
110        ));
111        Ok(())
112    }
113
114    fn rm(&mut self, key: &Self::Key) -> Result<(), Self::Error> {
115        if let Some(state) = self
116            .states
117            .iter_mut()
118            .find_map(|(k, s)| (k == key).then_some(s))
119        {
120            state.target = Target::Removed;
121        }
122        Ok(())
123    }
124
125    fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
126        assert!(!self.states.iter().any(|(_, s)|
127                matches!(s.status,
128                    Status::Running { address } | Status::Terminating { address } if address == reported_address
129                )
130            )
131        );
132
133        let Some(state) = self
134            .states
135            .iter_mut()
136            .find_map(|(k, s)| (k == key).then_some(s))
137        else {
138            log::warn!(
139                key = %key, addr = %reported_address,
140                "reported start, key not found"
141            );
142            return
143        };
144        match state.status {
145            Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
146                self.orphans.insert(reported_address);
147            },
148            Status::Starting => {
149                state.status = Status::Running {
150                    address: reported_address,
151                };
152            },
153        }
154    }
155
156    fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
157        let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
158            matches!(s.status,
159                    Status::Running { address } |
160                    Status::Terminating { address }
161                    if address == reported_addr)
162            .then_some(s)
163        }) else {
164            log::info!(
165                by_addr = %reported_addr, normal_exit = %normal_exit,
166                "termination requested"
167            );
168            self.status = SupStatus::Stopping { normal_exit: true };
169            return;
170        };
171
172        let is_acceptable = match state.child_type {
173            ChildType::Permanent => false,
174            ChildType::Transient => normal_exit,
175            ChildType::Temporary => true,
176        };
177        match (state.status, is_acceptable) {
178            (Status::Terminating { address }, _) | (Status::Running { address }, true) => {
179                assert_eq!(address, reported_addr);
180                state.status = Status::Stopped;
181            },
182            (Status::Running { address }, false) => {
183                assert_eq!(address, reported_addr);
184                if let Err(reason) = self
185                    .restart_intensity
186                    .report_exit(&mut self.restart_stats, at)
187                {
188                    log::info!(reason = %reason.as_display_chain(), "restart intensity exceeded; giving up");
189                    self.status = SupStatus::Stopping { normal_exit: false };
190                }
191                state.status = Status::Stopped;
192            },
193            _ => unreachable!("how could this state be selected?"),
194        }
195    }
196
197    fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
198        if let Some(state) = self
199            .states
200            .iter_mut()
201            .find_map(|(k, s)| (k == key).then_some(s))
202        {
203            match state.status {
204                Status::Starting => {
205                    state.status = Status::Stopped;
206                },
207                _ => {
208                    state.target = Target::Stopped;
209                },
210            }
211            self.status = SupStatus::Stopping { normal_exit: false }
212        }
213    }
214
215    fn quit(&mut self, normal_exit: bool) {
216        self.status = match self.status {
217            SupStatus::Stopped => SupStatus::Stopped,
218            SupStatus::Stopping {
219                normal_exit: existing_normal_exit,
220            } => {
221                SupStatus::Stopping {
222                    normal_exit: normal_exit && existing_normal_exit,
223                }
224            },
225            SupStatus::Starting | SupStatus::Running => SupStatus::Stopping { normal_exit },
226        };
227    }
228
229    fn next_action(
230        &mut self,
231        _at: tokio::time::Instant,
232    ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
233        self.states.retain(|(_k, state)| {
234            !matches!(
235                (state.status, state.target),
236                (Status::Stopped, Target::Removed)
237            )
238        });
239
240        if let Some(address) = self.orphans.iter().next().copied() {
241            self.orphans.remove(&address);
242            return Ok(Some(Action::Stop {
243                address,
244                child_id: None,
245            }))
246        }
247
248        let states_iter_mut = match self.status {
249            SupStatus::Starting | SupStatus::Running => {
250                Either::Left(Either::Left(self.states.iter_mut()))
251            },
252            SupStatus::Stopping { .. } => Either::Left(Either::Right(self.states.iter_mut().rev())),
253            SupStatus::Stopped => Either::Right(std::iter::empty()),
254        };
255        let mut all_children_started = true;
256        let mut all_children_stopped = true;
257        for &mut (ref key, ref mut state) in states_iter_mut {
258            let status = state.status;
259            let target = match self.status {
260                SupStatus::Starting | SupStatus::Running => state.target,
261                SupStatus::Stopping { .. } => Target::Stopped,
262                SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
263            };
264
265            all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
266            all_children_started = all_children_started && matches!(status, Status::Running { .. });
267
268            log::debug!(
269                key = %key, status = ?status, target = ?target,
270                "considering"
271            );
272
273            match (status, target) {
274                // Nothing we can do
275                (Status::Starting, Target::Running | Target::Stopped | Target::Removed) => continue,
276                (
277                    Status::Terminating { .. },
278                    Target::Running | Target::Stopped | Target::Removed,
279                ) => continue,
280
281                // Nothing we should do
282                (Status::Running { .. }, Target::Running) => continue,
283                (Status::Stopped, Target::Stopped) => continue,
284
285                // Should not happen
286                (Status::Stopped, Target::Removed) => unreachable!("filtered out above"),
287
288                // Do something!
289                (Status::Stopped, Target::Running) => {
290                    state.status = Status::Starting;
291                    return Ok(Some(Action::Start { child_id: key }))
292                },
293                (Status::Running { address }, Target::Stopped | Target::Removed) => {
294                    state.status = Status::Terminating { address };
295                    return Ok(Some(Action::Stop {
296                        address,
297                        child_id: Some(key),
298                    }))
299                },
300            }
301        }
302
303        match self.status {
304            SupStatus::Running | SupStatus::Stopped => Ok(None),
305
306            SupStatus::Starting => {
307                log::info!(all_children_started = %all_children_started, "starting");
308                if all_children_started {
309                    self.status = SupStatus::Running;
310                    Ok(Some(Action::InitDone))
311                } else {
312                    Ok(None)
313                }
314            },
315
316            SupStatus::Stopping { normal_exit } => {
317                log::info!(
318                    normal = %normal_exit, all_children_stopped = %all_children_stopped,
319                    "stopping"
320                );
321                if all_children_stopped {
322                    self.status = SupStatus::Stopped;
323                    Ok(Some(Action::Quit { normal_exit }))
324                } else {
325                    Ok(None)
326                }
327            },
328        }
329    }
330}
331
332impl<K> Clone for OneForOne<K> {
333    fn clone(&self) -> Self {
334        Self {
335            restart_intensity: self.restart_intensity,
336            _pd:               Default::default(),
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests;