Skip to main content

horust/horust/supervisor/
service_handler.rs

1use std::time::Instant;
2
3use nix::unistd::Pid;
4
5use crate::horust::Event;
6use crate::horust::formats::{
7    FailureStrategy, HealthinessStatus, RestartStrategy, Service, ServiceName, ServiceStatus,
8};
9use crate::horust::supervisor::repo::Repo;
10
11use super::{LifecycleStatus, ShuttingDown};
12
13#[derive(Clone, Debug, Eq, PartialEq, Default)]
14pub(crate) struct ServiceHandler {
15    service: Service,
16    /// Status of this service.
17    pub(super) status: ServiceStatus,
18    /// Process ID of this service, if any
19    pub(super) pid: Option<Pid>,
20    /// How many times in a row we failed to start this service
21    pub(super) restart_attempts: u32,
22    /// Amount of healthiness checks failed, applies only if the service is running
23    pub(super) healthiness_checks_failed: Option<i32>,
24    /// Instant representing at which time we received a shutdown request. Will be used for comparing Service.termination.wait
25    pub(super) shutting_down_start: Option<Instant>,
26}
27
28impl From<Service> for ServiceHandler {
29    fn from(service: Service) -> Self {
30        ServiceHandler {
31            service,
32            ..Default::default()
33        }
34    }
35}
36
37impl ServiceHandler {
38    fn is_alive_state(&self) -> bool {
39        const ALIVE_STATES: [ServiceStatus; 3] = [
40            ServiceStatus::Running,
41            ServiceStatus::Started,
42            ServiceStatus::Starting,
43        ];
44        ALIVE_STATES.contains(&self.status)
45    }
46
47    pub fn start_after(&self) -> &Vec<String> {
48        self.service.start_after.as_ref()
49    }
50
51    pub(crate) fn is_early_state(&self) -> bool {
52        const EARLY_STATES: [ServiceStatus; 3] = [
53            ServiceStatus::Initial,
54            ServiceStatus::Starting,
55            ServiceStatus::Started,
56        ];
57        EARLY_STATES.contains(&self.status)
58    }
59    pub fn service(&self) -> &Service {
60        &self.service
61    }
62
63    pub fn name(&self) -> &ServiceName {
64        &self.service.name
65    }
66
67    pub fn pid(&self) -> Option<Pid> {
68        self.pid
69    }
70
71    pub fn next(&self, repo: &Repo, status: LifecycleStatus) -> Vec<Event> {
72        next(self, repo, status)
73    }
74    pub fn change_status(&self, new_status: ServiceStatus) -> (ServiceHandler, ServiceStatus) {
75        handle_status_change(self, new_status)
76    }
77
78    /// Restart attempts are over if the attempts field is zero or we already retried enough times.
79    pub fn restart_attempts_are_over(&self) -> bool {
80        self.service.restart.attempts == 0 || self.restart_attempts > self.service.restart.attempts
81    }
82    pub fn add_healthcheck_event(&mut self, check: HealthinessStatus) {
83        let previous_hc = self.healthiness_checks_failed.unwrap_or(0);
84        let new_hc =
85            i32::from(self.is_alive_state() && !matches!(check, HealthinessStatus::Healthy));
86        self.healthiness_checks_failed = Some(previous_hc + new_hc);
87    }
88
89    pub fn is_finished_failed(&self) -> bool {
90        matches!(self.status, ServiceStatus::FinishedFailed)
91    }
92
93    pub fn is_in_killing(&self) -> bool {
94        matches!(self.status, ServiceStatus::InKilling)
95    }
96
97    /// Returns true if the last few events of the healthchecker were Unhealthy events.
98    pub fn has_some_failed_healthchecks(&self) -> bool {
99        // If health status message didn't reach the service handler on time, it has failed too fast
100        // So we consider it as unhealthy (thus the `1` for the unwrap).
101        self.healthiness_checks_failed.unwrap_or(1) > 0
102    }
103
104    pub fn is_initial(&self) -> bool {
105        ServiceStatus::Initial == self.status
106    }
107
108    pub fn is_running(&self) -> bool {
109        ServiceStatus::Running == self.status
110    }
111
112    pub fn is_finished(&self) -> bool {
113        ServiceStatus::Finished == self.status
114    }
115
116    pub fn shutting_down_started(&mut self) {
117        self.shutting_down_start = Some(Instant::now());
118    }
119}
120
121/// Generates events that, if applied, will make service_handler FSM progress
122pub(crate) fn next(
123    service_handler: &ServiceHandler,
124    repo: &Repo,
125    lifecycle_status: LifecycleStatus,
126) -> Vec<Event> {
127    match lifecycle_status {
128        LifecycleStatus::Running => next_events(repo, service_handler),
129        LifecycleStatus::ShuttingDown(shutting_down) => {
130            next_events_shutting_down(service_handler, shutting_down)
131        }
132    }
133}
134
135/// Generate the events needed for moving forward the FSM for the service handler
136/// If the system is shutting down, it will call next_shutting_down.
137fn next_events(repo: &Repo, service_handler: &ServiceHandler) -> Vec<Event> {
138    let ev_status =
139        |status: ServiceStatus| Event::new_status_update(service_handler.name(), status);
140    let vev_status = |status: ServiceStatus| vec![ev_status(status)];
141
142    match service_handler.status {
143        ServiceStatus::Initial if repo.is_service_runnable(service_handler) => {
144            vec![Event::Run(service_handler.name().clone())]
145        }
146        // if enough time has passed, this will be considered running
147        ServiceStatus::Started if !service_handler.has_some_failed_healthchecks() => {
148            vev_status(ServiceStatus::Running)
149        }
150        // This will kill the service after 3 failed healthchecks in a row.
151        // Maybe this should be parametrized
152        ServiceStatus::Running
153            if service_handler.healthiness_checks_failed.unwrap_or(-1)
154                > service_handler.service.healthiness.max_failed =>
155        {
156            vec![
157                ev_status(ServiceStatus::InKilling),
158                Event::Kill(service_handler.name().clone()),
159            ]
160        }
161        ServiceStatus::Success => vec![handle_restart_strategy(service_handler, false)],
162        ServiceStatus::Failed => {
163            let mut failure_evs = handle_failed_service(
164                repo.get_dependents(service_handler.name()),
165                service_handler.service(),
166            );
167            let other_services_termination = repo
168                .get_die_if_failed(service_handler.name())
169                .into_iter()
170                .flat_map(|sh_name| {
171                    vec![
172                        Event::new_status_update(sh_name, ServiceStatus::InKilling),
173                        Event::Kill(sh_name.clone()),
174                    ]
175                });
176
177            let service_ev = handle_restart_strategy(service_handler, true);
178
179            failure_evs.push(service_ev);
180            failure_evs.extend(other_services_termination);
181            failure_evs
182        }
183        ServiceStatus::InKilling if should_force_kill(service_handler, None) => vec![
184            Event::new_force_kill(service_handler.name()),
185            Event::new_status_changed(service_handler.name(), ServiceStatus::Failed),
186        ],
187
188        _ => vec![],
189    }
190}
191
192/// This next function assumes that the system is shutting down.
193/// It will make progress in the direction of shutting everything down.
194fn next_events_shutting_down(
195    service_handler: &ServiceHandler,
196    shutting_down: ShuttingDown,
197) -> Vec<Event> {
198    let ev_status =
199        |status: ServiceStatus| Event::new_status_update(service_handler.name(), status);
200    let vev_status = |status: ServiceStatus| vec![ev_status(status)];
201
202    // Handle the new state separately if we're shutting down.
203    match &service_handler.status {
204        ServiceStatus::Running | ServiceStatus::Started => vec![
205            ev_status(ServiceStatus::InKilling),
206            Event::Kill(service_handler.name().clone()),
207        ],
208        ServiceStatus::Success | ServiceStatus::Initial => vev_status(ServiceStatus::Finished),
209        ServiceStatus::Failed => vev_status(ServiceStatus::FinishedFailed),
210        ServiceStatus::InKilling if should_force_kill(service_handler, shutting_down) => {
211            vec![Event::new_force_kill(service_handler.name())]
212        }
213        _ => vec![],
214    }
215}
216
217/// Handles the service handler's status change
218fn handle_status_change(
219    service_handler: &ServiceHandler,
220    next_status: ServiceStatus,
221) -> (ServiceHandler, ServiceStatus) {
222    use ServiceStatus::*;
223
224    let mut new_service_handler = service_handler.clone();
225    if next_status == service_handler.status {
226        return (new_service_handler, next_status);
227    }
228
229    // Static lookup table of valid transitions
230    // A -> [B,C] means that transition to A is allowed only if the service is in state B or C.
231    const ALLOWED_TRANSITIONS: &[(ServiceStatus, &[ServiceStatus])] = &[
232        (Initial, &[Success, Failed]),
233        (Starting, &[Initial]),
234        (Started, &[Starting]),
235        (InKilling, &[Initial, Running, Starting, Started]),
236        (Running, &[Started]),
237        (FinishedFailed, &[Starting, Started, Failed, InKilling]),
238        (Success, &[Starting, Started, Running, InKilling]),
239        (Failed, &[Starting, Started, Running, InKilling]),
240        (Finished, &[Success, Initial]),
241    ];
242
243    let allowed = ALLOWED_TRANSITIONS
244        .iter()
245        .find(|(status, _)| *status == next_status)
246        .map(|(_, allowed_from)| allowed_from);
247
248    let valid = allowed.is_some_and(|allowed_from| allowed_from.contains(&service_handler.status));
249
250    if valid {
251        match next_status {
252            Started => {
253                new_service_handler.status = Started;
254                new_service_handler.restart_attempts = 0;
255            }
256            InKilling if service_handler.status == Initial => {
257                // Nothing to do here, the service was never started.
258                debug!(
259                    " service: {},  status: {}, new status: {}",
260                    service_handler.name(),
261                    service_handler.status,
262                    next_status
263                );
264                new_service_handler.status = Success;
265            }
266            new_status => {
267                new_service_handler.status = new_status;
268            }
269        }
270    } else {
271        error!(
272            "Tried to make an illegal transition: (current) {} ⇾ {} (received) for service: {}",
273            service_handler.status,
274            next_status,
275            service_handler.name()
276        );
277    }
278    let new_status = new_service_handler.status.clone();
279    (new_service_handler, new_status)
280}
281
282/// Produces events based on the Restart Strategy of the service.
283fn handle_restart_strategy(service_handler: &ServiceHandler, is_failed: bool) -> Event {
284    let new_status = match service_handler.service.restart.strategy {
285        RestartStrategy::Never if is_failed => {
286            debug!(
287                "restart attempts: {}, are over: {}, max: {}",
288                service_handler.restart_attempts,
289                service_handler.restart_attempts_are_over(),
290                service_handler.service.restart.attempts
291            );
292            if service_handler.restart_attempts_are_over() {
293                ServiceStatus::FinishedFailed
294            } else {
295                ServiceStatus::Initial
296            }
297        }
298        RestartStrategy::OnFailure if is_failed => ServiceStatus::Initial,
299        RestartStrategy::Never | RestartStrategy::OnFailure => ServiceStatus::Finished,
300        RestartStrategy::Always => ServiceStatus::Initial,
301    };
302    debug!("Restart strategy applied, ev: {:?}", new_status);
303    Event::new_status_update(service_handler.name(), new_status)
304}
305
306/// This is applied to both failed and FinishedFailed services.
307fn handle_failed_service(deps: Vec<ServiceName>, failed_sh: &Service) -> Vec<Event> {
308    match failed_sh.failure.strategy {
309        FailureStrategy::Shutdown => vec![Event::ShuttingDownInitiated(ShuttingDown::Gracefully)],
310        FailureStrategy::KillDependents => {
311            debug!("Failed service has kill-dependents strategy, going to mark them all..");
312            deps.iter()
313                .flat_map(|sh| {
314                    vec![
315                        Event::new_status_update(sh, ServiceStatus::InKilling),
316                        Event::Kill(sh.clone()),
317                    ]
318                })
319                .collect()
320        }
321        FailureStrategy::Ignore => vec![],
322    }
323}
324
325/// Check if we've waited enough for the service to exit
326fn should_force_kill(
327    service_handler: &ServiceHandler,
328    shutting_down: impl Into<Option<ShuttingDown>>,
329) -> bool {
330    if service_handler.pid.is_none() {
331        // Since it was in the started state, it doesn't have a pid yet.
332        // Let's give it the time to start and exit.
333        return false;
334    }
335    if let Some(ShuttingDown::Forcefully) = shutting_down.into() {
336        debug!("{}, should force kill.", service_handler.name());
337        return true;
338    }
339    if let Some(shutting_down_elapsed_secs) = service_handler.shutting_down_start {
340        let shutting_down_elapsed_secs = shutting_down_elapsed_secs.elapsed().as_secs();
341        debug!(
342            "{}, should not force kill. Elapsed: {}, termination wait: {}",
343            service_handler.name(),
344            shutting_down_elapsed_secs,
345            service_handler.service().termination.wait.clone().as_secs()
346        );
347        shutting_down_elapsed_secs > service_handler.service().termination.wait.clone().as_secs()
348    } else {
349        // this might happen, because InKilling state is emitted before the Kill event.
350        // So maybe the supervisor has received only the InKilling state change, but hasn't sent the
351        // signal yet. So it should be fine.
352        debug!("There is no shutting down elapsed secs.");
353        false
354    }
355}
356
357#[cfg(test)]
358mod test {
359    use std::ops::Sub;
360    use std::str::FromStr;
361    use std::time::Duration;
362
363    use nix::unistd::Pid;
364
365    use crate::horust::Event;
366    use crate::horust::formats::{FailureStrategy, Service, ServiceStatus, ShuttingDown};
367    use crate::horust::supervisor::service_handler::{
368        ServiceHandler, handle_failed_service, handle_restart_strategy, should_force_kill,
369    };
370
371    #[test]
372    fn test_handle_restart_strategy() {
373        let new_status = |status| Event::new_status_update("servicename", status);
374        let matrix = vec![
375            (false, "always", new_status(ServiceStatus::Initial)),
376            (true, "always", new_status(ServiceStatus::Initial)),
377            (true, "on-failure", new_status(ServiceStatus::Initial)),
378            (false, "on-failure", new_status(ServiceStatus::Finished)),
379            (true, "never", new_status(ServiceStatus::FinishedFailed)),
380            (false, "never", new_status(ServiceStatus::Finished)),
381        ];
382        matrix
383            .into_iter()
384            .for_each(|(has_failed, strategy, expected)| {
385                let service = format!(
386                    r#"name="servicename"
387command = "Not relevant"
388[restart]
389strategy = "{}"
390"#,
391                    strategy
392                );
393                let service: Service = Service::from_str(service.as_str()).unwrap();
394                let sh = service.into();
395                let received = handle_restart_strategy(&sh, has_failed);
396                assert_eq!(received, expected);
397            });
398    }
399
400    #[test]
401    fn test_should_force_kill() {
402        let service = r#"command="notrelevant"
403[termination]
404wait = "10s"
405"#;
406        let service: Service = toml::from_str(service).unwrap();
407        let mut sh: ServiceHandler = service.into();
408        assert!(!should_force_kill(&sh, None));
409        sh.shutting_down_started();
410        sh.status = ServiceStatus::InKilling;
411        assert!(!should_force_kill(&sh, None));
412        let old_start = sh.shutting_down_start;
413        let past_wait = Some(sh.shutting_down_start.unwrap().sub(Duration::from_secs(20)));
414        sh.shutting_down_start = past_wait;
415        assert!(!should_force_kill(&sh, None));
416        sh.pid = Some(Pid::this());
417        sh.shutting_down_start = old_start;
418        assert!(!should_force_kill(&sh, None));
419        sh.shutting_down_start = past_wait;
420        assert!(should_force_kill(&sh, None));
421    }
422
423    #[test]
424    fn test_handle_failed_service() {
425        let mut service = Service::from_name("b");
426        let evs = handle_failed_service(vec!["a".into()], &service);
427        assert!(evs.is_empty());
428
429        service.failure.strategy = FailureStrategy::KillDependents;
430        let evs = handle_failed_service(vec!["a".into()], &service);
431        let exp = vec![
432            Event::new_status_update("a", ServiceStatus::InKilling),
433            Event::Kill("a".into()),
434        ];
435        assert_eq!(evs, exp);
436
437        service.failure.strategy = FailureStrategy::Shutdown;
438        let evs = handle_failed_service(vec!["a".into()], &service);
439        let exp = vec![Event::ShuttingDownInitiated(ShuttingDown::Gracefully)];
440        assert_eq!(evs, exp);
441    }
442}