1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::errors::chain::StdErrorDisplayChainExt;
7use mm1_common::log;
8
9use crate::common::restart_intensity::{RestartIntensity, RestartStats};
10use crate::mixed::ChildType;
11use crate::mixed::decider::{Action, Decider};
12use crate::mixed::strategy::{DeciderError, OneForOne, RestartStrategy};
13
14pub struct OneForOneDecider<K> {
15 restart_intensity: RestartIntensity,
16 restart_stats: RestartStats,
17 states: Vec<(K, State)>,
18 orphans: HashSet<Address>,
19 status: SupStatus,
20}
21
22#[derive(Debug)]
23struct State {
24 child_type: ChildType,
25 status: Status,
26 target: Target,
27}
28
29#[derive(Debug, Clone, Copy)]
30enum Status {
31 Starting,
32 Running { address: Address },
33 Terminating { address: Address },
34 Stopped,
35}
36
37#[derive(Debug, Clone, Copy)]
38enum Target {
39 Running,
40 Stopped,
41 Removed,
42}
43
44enum SupStatus {
45 Starting,
46 Running,
47 Stopping { normal_exit: bool },
48 Stopped,
49}
50
51impl<K> OneForOne<K> {
52 pub fn new(restart_intensity: RestartIntensity) -> Self {
53 Self {
54 restart_intensity,
55 _pd: Default::default(),
56 }
57 }
58}
59
60impl<K> RestartStrategy<K> for OneForOne<K>
61where
62 Self: Clone,
63 OneForOneDecider<K>: Decider<Key = K>,
64{
65 type Decider = OneForOneDecider<K>;
66
67 fn decider(&self) -> Self::Decider {
68 let restart_intensity = self.restart_intensity;
69 let restart_stats = restart_intensity.new_stats();
70 OneForOneDecider {
71 restart_intensity,
72 restart_stats,
73 states: Default::default(),
74 orphans: Default::default(),
75 status: SupStatus::Starting,
76 }
77 }
78}
79
80impl<K> Decider for OneForOneDecider<K>
81where
82 K: fmt::Display + Eq,
83{
84 type Error = DeciderError;
85 type Key = K;
86
87 fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
88 self.states
89 .iter()
90 .find_map(|(k, s)| {
91 (key == k).then_some(match s.status {
92 Status::Running { address } | Status::Terminating { address } => Some(address),
93 _ => None,
94 })
95 })
96 .ok_or(DeciderError::KeyNotFound)
97 }
98
99 fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
100 if self.states.iter().any(|(k, _)| *k == key) {
101 return Err(DeciderError::DuplicateKey)
102 }
103 self.states.push((
104 key,
105 State {
106 child_type,
107 status: Status::Stopped,
108 target: Target::Running,
109 },
110 ));
111 Ok(())
112 }
113
114 fn rm(&mut self, key: &Self::Key) -> Result<(), Self::Error> {
115 if let Some(state) = self
116 .states
117 .iter_mut()
118 .find_map(|(k, s)| (k == key).then_some(s))
119 {
120 state.target = Target::Removed;
121 }
122 Ok(())
123 }
124
125 fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
126 assert!(!self.states.iter().any(|(_, s)|
127 matches!(s.status,
128 Status::Running { address } | Status::Terminating { address } if address == reported_address
129 )
130 )
131 );
132
133 let Some(state) = self
134 .states
135 .iter_mut()
136 .find_map(|(k, s)| (k == key).then_some(s))
137 else {
138 log::warn!(
139 key = %key, addr = %reported_address,
140 "reported start, key not found"
141 );
142 return
143 };
144 match state.status {
145 Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
146 self.orphans.insert(reported_address);
147 },
148 Status::Starting => {
149 state.status = Status::Running {
150 address: reported_address,
151 };
152 },
153 }
154 }
155
156 fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
157 let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
158 matches!(s.status,
159 Status::Running { address } |
160 Status::Terminating { address }
161 if address == reported_addr)
162 .then_some(s)
163 }) else {
164 log::info!(
165 by_addr = %reported_addr, normal_exit = %normal_exit,
166 "termination requested"
167 );
168 self.status = SupStatus::Stopping { normal_exit: true };
169 return;
170 };
171
172 let is_acceptable = match state.child_type {
173 ChildType::Permanent => false,
174 ChildType::Transient => normal_exit,
175 ChildType::Temporary => true,
176 };
177 match (state.status, is_acceptable) {
178 (Status::Terminating { address }, _) => {
179 assert_eq!(address, reported_addr);
180 state.status = Status::Stopped;
182 },
183 (Status::Running { address }, true) => {
184 assert_eq!(address, reported_addr);
185 state.status = Status::Stopped;
188 state.target = Target::Stopped;
189 },
190 (Status::Running { address }, false) => {
191 assert_eq!(address, reported_addr);
192 if let Err(reason) = self
193 .restart_intensity
194 .report_exit(&mut self.restart_stats, at)
195 {
196 log::info!(reason = %reason.as_display_chain(), "restart intensity exceeded; giving up");
197 self.status = SupStatus::Stopping { normal_exit: false };
198 }
199 state.status = Status::Stopped;
200 },
201 _ => unreachable!("how could this state be selected?"),
202 }
203 }
204
205 fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
206 if let Some(state) = self
207 .states
208 .iter_mut()
209 .find_map(|(k, s)| (k == key).then_some(s))
210 {
211 match state.status {
212 Status::Starting => {
213 state.status = Status::Stopped;
214 },
215 _ => {
216 state.target = Target::Stopped;
217 },
218 }
219 self.status = SupStatus::Stopping { normal_exit: false }
220 }
221 }
222
223 fn quit(&mut self, normal_exit: bool) {
224 self.status = match self.status {
225 SupStatus::Stopped => SupStatus::Stopped,
226 SupStatus::Stopping {
227 normal_exit: existing_normal_exit,
228 } => {
229 SupStatus::Stopping {
230 normal_exit: normal_exit && existing_normal_exit,
231 }
232 },
233 SupStatus::Starting | SupStatus::Running => SupStatus::Stopping { normal_exit },
234 };
235 }
236
237 fn next_action(
238 &mut self,
239 _at: tokio::time::Instant,
240 ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
241 self.states.retain(|(_k, state)| {
242 !matches!(
243 (state.status, state.target),
244 (Status::Stopped, Target::Removed)
245 )
246 });
247
248 if let Some(address) = self.orphans.iter().next().copied() {
249 self.orphans.remove(&address);
250 return Ok(Some(Action::Stop {
251 address,
252 child_id: None,
253 }))
254 }
255
256 let states_iter_mut = match self.status {
257 SupStatus::Starting | SupStatus::Running => {
258 Either::Left(Either::Left(self.states.iter_mut()))
259 },
260 SupStatus::Stopping { .. } => Either::Left(Either::Right(self.states.iter_mut().rev())),
261 SupStatus::Stopped => Either::Right(std::iter::empty()),
262 };
263 let mut all_children_started = true;
264 let mut all_children_stopped = true;
265 for &mut (ref key, ref mut state) in states_iter_mut {
266 let status = state.status;
267 let target = match self.status {
268 SupStatus::Starting | SupStatus::Running => state.target,
269 SupStatus::Stopping { .. } => Target::Stopped,
270 SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
271 };
272
273 all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
274 all_children_started = all_children_started && matches!(status, Status::Running { .. });
275
276 log::debug!(
277 key = %key, status = ?status, target = ?target,
278 "considering"
279 );
280
281 match (status, target) {
282 (Status::Starting, Target::Running | Target::Stopped | Target::Removed) => continue,
284 (
285 Status::Terminating { .. },
286 Target::Running | Target::Stopped | Target::Removed,
287 ) => continue,
288
289 (Status::Running { .. }, Target::Running) => continue,
291 (Status::Stopped, Target::Stopped) => continue,
292
293 (Status::Stopped, Target::Removed) => unreachable!("filtered out above"),
295
296 (Status::Stopped, Target::Running) => {
298 state.status = Status::Starting;
299 return Ok(Some(Action::Start { child_id: key }))
300 },
301 (Status::Running { address }, Target::Stopped | Target::Removed) => {
302 state.status = Status::Terminating { address };
303 return Ok(Some(Action::Stop {
304 address,
305 child_id: Some(key),
306 }))
307 },
308 }
309 }
310
311 match self.status {
312 SupStatus::Running | SupStatus::Stopped => Ok(None),
313
314 SupStatus::Starting => {
315 log::info!(all_children_started = %all_children_started, "starting");
316 if all_children_started {
317 self.status = SupStatus::Running;
318 Ok(Some(Action::InitDone))
319 } else {
320 Ok(None)
321 }
322 },
323
324 SupStatus::Stopping { normal_exit } => {
325 log::info!(
326 normal = %normal_exit, all_children_stopped = %all_children_stopped,
327 "stopping"
328 );
329 if all_children_stopped {
330 self.status = SupStatus::Stopped;
331 Ok(Some(Action::Quit { normal_exit }))
332 } else {
333 Ok(None)
334 }
335 },
336 }
337 }
338}
339
340impl<K> Clone for OneForOne<K> {
341 fn clone(&self) -> Self {
342 Self {
343 restart_intensity: self.restart_intensity,
344 _pd: Default::default(),
345 }
346 }
347}
348
349#[cfg(test)]
350mod tests;