1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::log;
7
8use crate::common::restart_intensity::{RestartIntensity, RestartStats};
9use crate::mixed::ChildType;
10use crate::mixed::decider::{Action, Decider};
11use crate::mixed::strategy::{AllForOne, DeciderError, RestartStrategy};
12
13pub struct AllForOneDecider<K> {
14 restart_intensity: RestartIntensity,
15 restart_stats: RestartStats,
16 status: SupStatus,
17 states: Vec<(K, State)>,
18 orphans: HashSet<Address>,
19}
20
21impl<K> AllForOne<K> {
22 pub fn new(restart_intensity: RestartIntensity) -> Self {
23 Self {
24 restart_intensity,
25 _pd: Default::default(),
26 }
27 }
28}
29
30impl<K> RestartStrategy<K> for AllForOne<K>
31where
32 AllForOneDecider<K>: Decider<Key = K>,
33{
34 type Decider = AllForOneDecider<K>;
35
36 fn decider(&self) -> Self::Decider {
37 let restart_intensity = self.restart_intensity;
38 let restart_stats = restart_intensity.new_stats();
39 AllForOneDecider {
40 restart_intensity,
41 restart_stats,
42 status: SupStatus::Starting,
43 states: Default::default(),
44 orphans: Default::default(),
45 }
46 }
47}
48
49impl<K> Clone for AllForOne<K> {
50 fn clone(&self) -> Self {
51 Self {
52 restart_intensity: self.restart_intensity,
53 _pd: Default::default(),
54 }
55 }
56}
57
58enum SupStatus {
59 Starting,
60 Running,
61 Restarting,
62 Stopping { normal_exit: bool },
63 Stopped,
64}
65
66struct State {
67 child_type: ChildType,
68 target: Target,
69 status: Status,
70}
71
72#[derive(Debug, Clone, Copy)]
73enum Target {
74 Running,
75 Stopped,
76}
77
78#[derive(Debug, Clone, Copy)]
79enum Status {
80 Starting,
81 Running { address: Address },
82 Terminating { address: Address },
83 Stopped,
84}
85
86impl<K> Decider for AllForOneDecider<K>
87where
88 K: fmt::Display + Eq,
89{
90 type Error = DeciderError;
91 type Key = K;
92
93 fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
94 self.states
95 .iter()
96 .find_map(|(k, s)| {
97 (key == k).then_some(match s.status {
98 Status::Running { address } | Status::Terminating { address } => Some(address),
99 _ => None,
100 })
101 })
102 .ok_or(DeciderError::KeyNotFound)
103 }
104
105 fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
106 if self.states.iter().any(|(k, _)| *k == key) {
107 return Err(DeciderError::DuplicateKey)
108 }
109 self.states.push((
110 key,
111 State {
112 child_type,
113 target: Target::Running,
114 status: Status::Stopped,
115 },
116 ));
117 Ok(())
118 }
119
120 fn rm(&mut self, _key: &Self::Key) -> Result<(), Self::Error> {
121 Err(DeciderError::Unsupported)
122 }
123
124 fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
125 assert!(!self.states.iter().any(|(_, s)|
126 matches!(s.status,
127 Status::Running { address } | Status::Terminating { address } if address == reported_address
128 )
129 )
130 );
131
132 let Some(state) = self
133 .states
134 .iter_mut()
135 .find_map(|(k, s)| (k == key).then_some(s))
136 else {
137 log::warn!(
138 "reported start, key not found [key: {}; addr: {}]",
139 key,
140 reported_address
141 );
142 return
143 };
144
145 match state.status {
146 Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
147 self.orphans.insert(reported_address);
148 },
149 Status::Starting => {
150 state.status = Status::Running {
151 address: reported_address,
152 };
153 },
154 }
155 }
156
157 fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
158 let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
159 matches!(s.status,
160 Status::Running { address } |
161 Status::Terminating { address }
162 if address == reported_addr)
163 .then_some(s)
164 }) else {
165 log::info!(
166 "termination requested [by-addr: {}, normal-exit: {}]",
167 reported_addr,
168 normal_exit
169 );
170 self.status = SupStatus::Stopping { normal_exit: true };
171 return;
172 };
173
174 let is_acceptable = match state.child_type {
175 ChildType::Permanent => false,
176 ChildType::Transient => normal_exit,
177 ChildType::Temporary => true,
178 };
179 match (state.status, is_acceptable) {
180 (Status::Terminating { address }, _) | (Status::Running { address }, true) => {
181 assert_eq!(address, reported_addr);
182 state.status = Status::Stopped;
183 },
184 (Status::Running { address }, false) => {
185 assert_eq!(address, reported_addr);
186 if let Err(reason) = self
187 .restart_intensity
188 .report_exit(&mut self.restart_stats, at)
189 {
190 log::info!("restart intensity exceeded; giving up. Reason: {}", reason);
191 self.status = SupStatus::Stopping { normal_exit: false };
192 } else {
193 self.status = SupStatus::Restarting;
194 }
195 state.status = Status::Stopped;
196 },
197 _ => unreachable!("how could this state be selected?"),
198 }
199 }
200
201 fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
202 if let Some(state) = self
203 .states
204 .iter_mut()
205 .find_map(|(k, s)| (k == key).then_some(s))
206 {
207 match state.status {
208 Status::Starting => {
209 state.status = Status::Stopped;
210 },
211 _ => {
212 state.target = Target::Stopped;
213 },
214 }
215 self.status = SupStatus::Stopping { normal_exit: false }
216 }
217 }
218
219 fn quit(&mut self, normal_exit: bool) {
220 self.status = match self.status {
221 SupStatus::Stopped => SupStatus::Stopped,
222 SupStatus::Stopping {
223 normal_exit: existing_normal_exit,
224 } => {
225 SupStatus::Stopping {
226 normal_exit: normal_exit && existing_normal_exit,
227 }
228 },
229 SupStatus::Starting | SupStatus::Restarting | SupStatus::Running => {
230 SupStatus::Stopping { normal_exit }
231 },
232 };
233 }
234
235 fn next_action(
236 &mut self,
237 _at: tokio::time::Instant,
238 ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
239 if let Some(address) = self.orphans.iter().next().copied() {
240 self.orphans.remove(&address);
241 return Ok(Some(Action::Stop {
242 address,
243 child_id: None,
244 }))
245 }
246
247 let states_iter_mut = match self.status {
248 SupStatus::Starting | SupStatus::Running => {
249 Either::Left(Either::Left(self.states.iter_mut()))
250 },
251 SupStatus::Stopping { .. } | SupStatus::Restarting => {
252 Either::Left(Either::Right(self.states.iter_mut().rev()))
253 },
254 SupStatus::Stopped => Either::Right(std::iter::empty()),
255 };
256
257 let mut all_children_stopped = true;
258 let mut all_children_started = true;
259 for (key, state) in states_iter_mut {
260 let status = state.status;
261 let target = match self.status {
262 SupStatus::Starting | SupStatus::Running => state.target,
263 SupStatus::Stopping { .. } | SupStatus::Restarting => Target::Stopped,
264 SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
265 };
266
267 all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
268 all_children_started = all_children_started && matches!(status, Status::Running { .. });
269
270 log::debug!(
271 "considering {}; status: {:?}, target: {:?}",
272 key,
273 status,
274 target
275 );
276
277 match (status, target) {
278 (Status::Starting, Target::Running | Target::Stopped) => continue,
280 (Status::Terminating { .. }, Target::Running | Target::Stopped) => continue,
281
282 (Status::Running { .. }, Target::Running) => continue,
284 (Status::Stopped, Target::Stopped) => continue,
285
286 (Status::Stopped, Target::Running) => {
288 state.status = Status::Starting;
289 return Ok(Some(Action::Start { child_id: key }))
290 },
291 (Status::Running { address }, Target::Stopped) => {
292 state.status = Status::Terminating { address };
293 return Ok(Some(Action::Stop {
294 address,
295 child_id: Some(key),
296 }))
297 },
298 }
299 }
300
301 match self.status {
302 SupStatus::Running | SupStatus::Stopped => Ok(None),
303
304 SupStatus::Starting => {
305 log::info!("starting [all-children-started: {}]", all_children_started);
306 if all_children_started {
307 self.status = SupStatus::Running;
308 Ok(Some(Action::InitDone))
309 } else {
310 Ok(None)
311 }
312 },
313
314 SupStatus::Restarting => {
315 log::info!(
316 "restarting [all-children-stopped: {}]",
317 all_children_stopped
318 );
319 if all_children_stopped {
320 self.status = SupStatus::Running;
321 Ok(Some(Action::Noop))
322 } else {
323 Ok(None)
324 }
325 },
326 SupStatus::Stopping { normal_exit } => {
327 log::info!(
328 "stopping [normal: {}; all-children-stopped: {}]",
329 normal_exit,
330 all_children_stopped
331 );
332 if all_children_stopped {
333 self.status = SupStatus::Stopped;
334 Ok(Some(Action::Quit { normal_exit }))
335 } else {
336 Ok(None)
337 }
338 },
339 }
340 }
341}
342
343#[cfg(test)]
344mod tests;