1use std::collections::HashSet;
2use std::fmt;
3
4use either::Either;
5use mm1_address::address::Address;
6use mm1_common::errors::chain::StdErrorDisplayChainExt;
7use mm1_common::log;
8
9use crate::common::restart_intensity::{RestartIntensity, RestartStats};
10use crate::mixed::ChildType;
11use crate::mixed::decider::{Action, Decider};
12use crate::mixed::strategy::{DeciderError, OneForOne, RestartStrategy};
13
14pub struct OneForOneDecider<K> {
15 restart_intensity: RestartIntensity,
16 restart_stats: RestartStats,
17 states: Vec<(K, State)>,
18 orphans: HashSet<Address>,
19 status: SupStatus,
20}
21
22#[derive(Debug)]
23struct State {
24 child_type: ChildType,
25 status: Status,
26 target: Target,
27}
28
29#[derive(Debug, Clone, Copy)]
30enum Status {
31 Starting,
32 Running { address: Address },
33 Terminating { address: Address },
34 Stopped,
35}
36
37#[derive(Debug, Clone, Copy)]
38enum Target {
39 Running,
40 Stopped,
41 Removed,
42}
43
44enum SupStatus {
45 Starting,
46 Running,
47 Stopping { normal_exit: bool },
48 Stopped,
49}
50
51impl<K> OneForOne<K> {
52 pub fn new(restart_intensity: RestartIntensity) -> Self {
53 Self {
54 restart_intensity,
55 _pd: Default::default(),
56 }
57 }
58}
59
60impl<K> RestartStrategy<K> for OneForOne<K>
61where
62 Self: Clone,
63 OneForOneDecider<K>: Decider<Key = K>,
64{
65 type Decider = OneForOneDecider<K>;
66
67 fn decider(&self) -> Self::Decider {
68 let restart_intensity = self.restart_intensity;
69 let restart_stats = restart_intensity.new_stats();
70 OneForOneDecider {
71 restart_intensity,
72 restart_stats,
73 states: Default::default(),
74 orphans: Default::default(),
75 status: SupStatus::Starting,
76 }
77 }
78}
79
80impl<K> Decider for OneForOneDecider<K>
81where
82 K: fmt::Display + Eq,
83{
84 type Error = DeciderError;
85 type Key = K;
86
87 fn address(&self, key: &Self::Key) -> Result<Option<Address>, Self::Error> {
88 self.states
89 .iter()
90 .find_map(|(k, s)| {
91 (key == k).then_some(match s.status {
92 Status::Running { address } | Status::Terminating { address } => Some(address),
93 _ => None,
94 })
95 })
96 .ok_or(DeciderError::KeyNotFound)
97 }
98
99 fn add(&mut self, key: Self::Key, child_type: ChildType) -> Result<(), Self::Error> {
100 if self.states.iter().any(|(k, _)| *k == key) {
101 return Err(DeciderError::DuplicateKey)
102 }
103 self.states.push((
104 key,
105 State {
106 child_type,
107 status: Status::Stopped,
108 target: Target::Running,
109 },
110 ));
111 Ok(())
112 }
113
114 fn rm(&mut self, key: &Self::Key) -> Result<(), Self::Error> {
115 if let Some(state) = self
116 .states
117 .iter_mut()
118 .find_map(|(k, s)| (k == key).then_some(s))
119 {
120 state.target = Target::Removed;
121 }
122 Ok(())
123 }
124
125 fn started(&mut self, key: &Self::Key, reported_address: Address, _at: tokio::time::Instant) {
126 assert!(!self.states.iter().any(|(_, s)|
127 matches!(s.status,
128 Status::Running { address } | Status::Terminating { address } if address == reported_address
129 )
130 )
131 );
132
133 let Some(state) = self
134 .states
135 .iter_mut()
136 .find_map(|(k, s)| (k == key).then_some(s))
137 else {
138 log::warn!(
139 key = %key, addr = %reported_address,
140 "reported start, key not found"
141 );
142 return
143 };
144 match state.status {
145 Status::Running { .. } | Status::Terminating { .. } | Status::Stopped => {
146 self.orphans.insert(reported_address);
147 },
148 Status::Starting => {
149 state.status = Status::Running {
150 address: reported_address,
151 };
152 },
153 }
154 }
155
156 fn exited(&mut self, reported_addr: Address, normal_exit: bool, at: tokio::time::Instant) {
157 let Some(state) = self.states.iter_mut().find_map(|(_, s)| {
158 matches!(s.status,
159 Status::Running { address } |
160 Status::Terminating { address }
161 if address == reported_addr)
162 .then_some(s)
163 }) else {
164 log::info!(
165 by_addr = %reported_addr, normal_exit = %normal_exit,
166 "termination requested"
167 );
168 self.status = SupStatus::Stopping { normal_exit: true };
169 return;
170 };
171
172 let is_acceptable = match state.child_type {
173 ChildType::Permanent => false,
174 ChildType::Transient => normal_exit,
175 ChildType::Temporary => true,
176 };
177 match (state.status, is_acceptable) {
178 (Status::Terminating { address }, _) | (Status::Running { address }, true) => {
179 assert_eq!(address, reported_addr);
180 state.status = Status::Stopped;
181 },
182 (Status::Running { address }, false) => {
183 assert_eq!(address, reported_addr);
184 if let Err(reason) = self
185 .restart_intensity
186 .report_exit(&mut self.restart_stats, at)
187 {
188 log::info!(reason = %reason.as_display_chain(), "restart intensity exceeded; giving up");
189 self.status = SupStatus::Stopping { normal_exit: false };
190 }
191 state.status = Status::Stopped;
192 },
193 _ => unreachable!("how could this state be selected?"),
194 }
195 }
196
197 fn failed(&mut self, key: &Self::Key, _at: tokio::time::Instant) {
198 if let Some(state) = self
199 .states
200 .iter_mut()
201 .find_map(|(k, s)| (k == key).then_some(s))
202 {
203 match state.status {
204 Status::Starting => {
205 state.status = Status::Stopped;
206 },
207 _ => {
208 state.target = Target::Stopped;
209 },
210 }
211 self.status = SupStatus::Stopping { normal_exit: false }
212 }
213 }
214
215 fn quit(&mut self, normal_exit: bool) {
216 self.status = match self.status {
217 SupStatus::Stopped => SupStatus::Stopped,
218 SupStatus::Stopping {
219 normal_exit: existing_normal_exit,
220 } => {
221 SupStatus::Stopping {
222 normal_exit: normal_exit && existing_normal_exit,
223 }
224 },
225 SupStatus::Starting | SupStatus::Running => SupStatus::Stopping { normal_exit },
226 };
227 }
228
229 fn next_action(
230 &mut self,
231 _at: tokio::time::Instant,
232 ) -> Result<Option<Action<'_, Self::Key>>, Self::Error> {
233 self.states.retain(|(_k, state)| {
234 !matches!(
235 (state.status, state.target),
236 (Status::Stopped, Target::Removed)
237 )
238 });
239
240 if let Some(address) = self.orphans.iter().next().copied() {
241 self.orphans.remove(&address);
242 return Ok(Some(Action::Stop {
243 address,
244 child_id: None,
245 }))
246 }
247
248 let states_iter_mut = match self.status {
249 SupStatus::Starting | SupStatus::Running => {
250 Either::Left(Either::Left(self.states.iter_mut()))
251 },
252 SupStatus::Stopping { .. } => Either::Left(Either::Right(self.states.iter_mut().rev())),
253 SupStatus::Stopped => Either::Right(std::iter::empty()),
254 };
255 let mut all_children_started = true;
256 let mut all_children_stopped = true;
257 for &mut (ref key, ref mut state) in states_iter_mut {
258 let status = state.status;
259 let target = match self.status {
260 SupStatus::Starting | SupStatus::Running => state.target,
261 SupStatus::Stopping { .. } => Target::Stopped,
262 SupStatus::Stopped => unreachable!("wouldn't iterate over children"),
263 };
264
265 all_children_stopped = all_children_stopped && matches!(status, Status::Stopped);
266 all_children_started = all_children_started && matches!(status, Status::Running { .. });
267
268 log::debug!(
269 key = %key, status = ?status, target = ?target,
270 "considering"
271 );
272
273 match (status, target) {
274 (Status::Starting, Target::Running | Target::Stopped | Target::Removed) => continue,
276 (
277 Status::Terminating { .. },
278 Target::Running | Target::Stopped | Target::Removed,
279 ) => continue,
280
281 (Status::Running { .. }, Target::Running) => continue,
283 (Status::Stopped, Target::Stopped) => continue,
284
285 (Status::Stopped, Target::Removed) => unreachable!("filtered out above"),
287
288 (Status::Stopped, Target::Running) => {
290 state.status = Status::Starting;
291 return Ok(Some(Action::Start { child_id: key }))
292 },
293 (Status::Running { address }, Target::Stopped | Target::Removed) => {
294 state.status = Status::Terminating { address };
295 return Ok(Some(Action::Stop {
296 address,
297 child_id: Some(key),
298 }))
299 },
300 }
301 }
302
303 match self.status {
304 SupStatus::Running | SupStatus::Stopped => Ok(None),
305
306 SupStatus::Starting => {
307 log::info!(all_children_started = %all_children_started, "starting");
308 if all_children_started {
309 self.status = SupStatus::Running;
310 Ok(Some(Action::InitDone))
311 } else {
312 Ok(None)
313 }
314 },
315
316 SupStatus::Stopping { normal_exit } => {
317 log::info!(
318 normal = %normal_exit, all_children_stopped = %all_children_stopped,
319 "stopping"
320 );
321 if all_children_stopped {
322 self.status = SupStatus::Stopped;
323 Ok(Some(Action::Quit { normal_exit }))
324 } else {
325 Ok(None)
326 }
327 },
328 }
329 }
330}
331
332impl<K> Clone for OneForOne<K> {
333 fn clone(&self) -> Self {
334 Self {
335 restart_intensity: self.restart_intensity,
336 _pd: Default::default(),
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests;