mm1_sup/mixed/strategy/
all_for_one.rs

1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::log;
7
8use crate::common::restart_intensity::{RestartIntensity, RestartStats};
9use crate::mixed::ChildType;
10use crate::mixed::decider::{Action, Decider};
11use crate::mixed::strategy::{AllForOne, DeciderError, RestartStrategy};
12
13pub struct AllForOneDecider<K> {
14    restart_intensity: RestartIntensity,
15    restart_stats:     RestartStats,
16    status:            SupStatus,
17    states:            Vec<(K, State)>,
18    orphans:           HashSet<Address>,
19}
20
21impl<K> AllForOne<K> {
22    pub fn new(restart_intensity: RestartIntensity) -> Self {
23        Self {
24            restart_intensity,
25            _pd: Default::default(),
26        }
27    }
28}
29
30impl<K> RestartStrategy<K> for AllForOne<K>
31where
32    AllForOneDecider<K>: Decider<Key = K>,
33{
34    type Decider = AllForOneDecider<K>;
35
36    fn decider(&self) -> Self::Decider {
37        let restart_intensity = self.restart_intensity;
38        let restart_stats = restart_intensity.new_stats();
39        AllForOneDecider {
40            restart_intensity,
41            restart_stats,
42            status: SupStatus::Starting,
43            states: Default::default(),
44            orphans: Default::default(),
45        }
46    }
47}
48
49impl<K> Clone for AllForOne<K> {
50    fn clone(&self) -> Self {
51        Self {
52            restart_intensity: self.restart_intensity,
53            _pd:               Default::default(),
54        }
55    }
56}
57
58enum SupStatus {
59    Starting,
60    Running,
61    Restarting,
62    Stopping { normal_exit: bool },
63    Stopped,
64}
65
66struct State {
67    child_type: ChildType,
68    target:     Target,
69    status:     Status,
70}
71
72#[derive(Debug, Clone, Copy)]
73enum Target {
74    Running,
75    Stopped,
76}
77
78#[derive(Debug, Clone, Copy)]
79enum Status {
80    Starting,
81    Running { address: Address },
82    Terminating { address: Address },
83    Stopped,
84}
85
86impl<K> Decider for AllForOneDecider<K>
87where
88    K: fmt::Display + Eq,
89{
90    type Error = DeciderError;
91    type Key = K;
92
93    fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
94        self.states
95            .iter()
96            .find_map(|(k, s)| {
97                (key == k).then_some(match s.status {
98                    Status::Running { address } | Status::Terminating { address } => Some(address),
99                    _ => None,
100                })
101            })
102            .ok_or(DeciderError::KeyNotFound)
103    }
104
105    fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
106        if self.states.iter().any(|(k, _)| *k == key) {
107            return Err(DeciderError::DuplicateKey)
108        }
109        self.states.push((
110            key,
111            State {
112                child_type,
113                target: Target::Running,
114                status: Status::Stopped,
115            },
116        ));
117        Ok(())
118    }
119
120    fn rm(&mut self, _key: &Self::Key) -> Result<(), Self::Error> {
121        Err(DeciderError::Unsupported)
122    }
123
124    fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
125        assert!(!self.states.iter().any(|(_, s)|
126                matches!(s.status,
127                    Status::Running { address } | Status::Terminating { address } if address == reported_address
128                )
129            )
130        );
131
132        let Some(state) = self
133            .states
134            .iter_mut()
135            .find_map(|(k, s)| (k == key).then_some(s))
136        else {
137            log::warn!(
138                "reported start, key not found [key: {}; addr: {}]",
139                key,
140                reported_address
141            );
142            return
143        };
144
145        match state.status {
146            Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
147                self.orphans.insert(reported_address);
148            },
149            Status::Starting => {
150                state.status = Status::Running {
151                    address: reported_address,
152                };
153            },
154        }
155    }
156
157    fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
158        let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
159            matches!(s.status,
160                    Status::Running { address } |
161                    Status::Terminating { address }
162                    if address == reported_addr)
163            .then_some(s)
164        }) else {
165            log::info!(
166                "termination requested [by-addr: {}, normal-exit: {}]",
167                reported_addr,
168                normal_exit
169            );
170            self.status = SupStatus::Stopping { normal_exit: true };
171            return;
172        };
173
174        let is_acceptable = match state.child_type {
175            ChildType::Permanent => false,
176            ChildType::Transient => normal_exit,
177            ChildType::Temporary => true,
178        };
179        match (state.status, is_acceptable) {
180            (Status::Terminating { address }, _) | (Status::Running { address }, true) => {
181                assert_eq!(address, reported_addr);
182                state.status = Status::Stopped;
183            },
184            (Status::Running { address }, false) => {
185                assert_eq!(address, reported_addr);
186                if let Err(reason) = self
187                    .restart_intensity
188                    .report_exit(&mut self.restart_stats, at)
189                {
190                    log::info!("restart intensity exceeded; giving up. Reason: {}", reason);
191                    self.status = SupStatus::Stopping { normal_exit: false };
192                } else {
193                    self.status = SupStatus::Restarting;
194                }
195                state.status = Status::Stopped;
196            },
197            _ => unreachable!("how could this state be selected?"),
198        }
199    }
200
201    fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
202        if let Some(state) = self
203            .states
204            .iter_mut()
205            .find_map(|(k, s)| (k == key).then_some(s))
206        {
207            match state.status {
208                Status::Starting => {
209                    state.status = Status::Stopped;
210                },
211                _ => {
212                    state.target = Target::Stopped;
213                },
214            }
215            self.status = SupStatus::Stopping { normal_exit: false }
216        }
217    }
218
219    fn quit(&mut self, normal_exit: bool) {
220        self.status = match self.status {
221            SupStatus::Stopped => SupStatus::Stopped,
222            SupStatus::Stopping {
223                normal_exit: existing_normal_exit,
224            } => {
225                SupStatus::Stopping {
226                    normal_exit: normal_exit && existing_normal_exit,
227                }
228            },
229            SupStatus::Starting | SupStatus::Restarting | SupStatus::Running => {
230                SupStatus::Stopping { normal_exit }
231            },
232        };
233    }
234
235    fn next_action(
236        &mut self,
237        _at: tokio::time::Instant,
238    ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
239        if let Some(address) = self.orphans.iter().next().copied() {
240            self.orphans.remove(&address);
241            return Ok(Some(Action::Stop {
242                address,
243                child_id: None,
244            }))
245        }
246
247        let states_iter_mut = match self.status {
248            SupStatus::Starting | SupStatus::Running => {
249                Either::Left(Either::Left(self.states.iter_mut()))
250            },
251            SupStatus::Stopping { .. } | SupStatus::Restarting => {
252                Either::Left(Either::Right(self.states.iter_mut().rev()))
253            },
254            SupStatus::Stopped => Either::Right(std::iter::empty()),
255        };
256
257        let mut all_children_stopped = true;
258        let mut all_children_started = true;
259        for (key, state) in states_iter_mut {
260            let status = state.status;
261            let target = match self.status {
262                SupStatus::Starting | SupStatus::Running => state.target,
263                SupStatus::Stopping { .. } | SupStatus::Restarting => Target::Stopped,
264                SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
265            };
266
267            all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
268            all_children_started = all_children_started && matches!(status, Status::Running { .. });
269
270            log::debug!(
271                "considering {}; status: {:?}, target: {:?}",
272                key,
273                status,
274                target
275            );
276
277            match (status, target) {
278                // Nothing we can do
279                (Status::Starting, Target::Running | Target::Stopped) => continue,
280                (Status::Terminating { .. }, Target::Running | Target::Stopped) => continue,
281
282                // Nothing we should do
283                (Status::Running { .. }, Target::Running) => continue,
284                (Status::Stopped, Target::Stopped) => continue,
285
286                // Do something!
287                (Status::Stopped, Target::Running) => {
288                    state.status = Status::Starting;
289                    return Ok(Some(Action::Start { child_id: key }))
290                },
291                (Status::Running { address }, Target::Stopped) => {
292                    state.status = Status::Terminating { address };
293                    return Ok(Some(Action::Stop {
294                        address,
295                        child_id: Some(key),
296                    }))
297                },
298            }
299        }
300
301        match self.status {
302            SupStatus::Running | SupStatus::Stopped => Ok(None),
303
304            SupStatus::Starting => {
305                log::info!("starting [all-children-started: {}]", all_children_started);
306                if all_children_started {
307                    self.status = SupStatus::Running;
308                    Ok(Some(Action::InitDone))
309                } else {
310                    Ok(None)
311                }
312            },
313
314            SupStatus::Restarting => {
315                log::info!(
316                    "restarting [all-children-stopped: {}]",
317                    all_children_stopped
318                );
319                if all_children_stopped {
320                    self.status = SupStatus::Running;
321                    Ok(Some(Action::Noop))
322                } else {
323                    Ok(None)
324                }
325            },
326            SupStatus::Stopping { normal_exit } => {
327                log::info!(
328                    "stopping [normal: {}; all-children-stopped: {}]",
329                    normal_exit,
330                    all_children_stopped
331                );
332                if all_children_stopped {
333                    self.status = SupStatus::Stopped;
334                    Ok(Some(Action::Quit { normal_exit }))
335                } else {
336                    Ok(None)
337                }
338            },
339        }
340    }
341}
342
343#[cfg(test)]
344mod tests;