hydra/
supervisor.rs

1use std::collections::BTreeSet;
2use std::pin::Pin;
3use std::time::Duration;
4use std::time::Instant;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use crate::AutoShutdown;
10use crate::CallError;
11use crate::ChildSpec;
12use crate::ChildType;
13use crate::Dest;
14use crate::ExitReason;
15use crate::From;
16use crate::GenServer;
17use crate::Local;
18use crate::Message;
19use crate::Pid;
20use crate::Process;
21use crate::ProcessFlags;
22use crate::Restart;
23use crate::Shutdown;
24use crate::SupervisorOptions;
25use crate::SystemMessage;
26use crate::shutdown_brutal_kill;
27use crate::shutdown_infinity;
28use crate::shutdown_timeout;
29
30/// A supervision child.
31#[derive(Clone)]
32struct SupervisedChild {
33    spec: ChildSpec,
34    pid: Option<Pid>,
35    restarting: bool,
36}
37
38/// A supervisor message.
39#[doc(hidden)]
40#[derive(Serialize, Deserialize)]
41pub enum SupervisorMessage {
42    TryAgainRestartPid(Pid),
43    TryAgainRestartId(String),
44    CountChildren,
45    CountChildrenSuccess(SupervisorCounts),
46    StartChild(Local<ChildSpec>),
47    StartChildSuccess(Option<Pid>),
48    StartChildError(SupervisorError),
49    TerminateChild(String),
50    TerminateChildSuccess,
51    TerminateChildError(SupervisorError),
52    RestartChild(String),
53    RestartChildSuccess(Option<Pid>),
54    RestartChildError(SupervisorError),
55    DeleteChild(String),
56    DeleteChildSuccess,
57    DeleteChildError(SupervisorError),
58    WhichChildren,
59    WhichChildrenSuccess(Vec<SupervisorChildInfo>),
60}
61
62/// Errors for [Supervisor] calls.
63#[derive(Debug, Serialize, Deserialize)]
64pub enum SupervisorError {
65    /// A call to the [Supervisor] server has failed.
66    CallError(CallError),
67    /// The child already exists and is running.
68    AlreadyStarted,
69    /// The child already exists.
70    AlreadyPresent,
71    /// The child failed to start.
72    StartError(ExitReason),
73    /// The child was not found.
74    NotFound,
75    /// The child is already running.
76    Running,
77    /// The child is being restarted.
78    Restarting,
79}
80
81/// Information about a child of a [Supervisor].
82#[derive(Debug, Serialize, Deserialize)]
83pub struct SupervisorChildInfo {
84    /// The id as defined in the child specification.
85    id: String,
86    /// The [Pid] of the corrosponding child process if it exists.
87    child: Option<Pid>,
88    /// The type of child as defined in the child specification.
89    child_type: ChildType,
90    /// Whether or not the process is about to be restarted.
91    restarting: bool,
92}
93
94/// Contains the counts of all of the supervised children.
95#[derive(Debug, Serialize, Deserialize)]
96pub struct SupervisorCounts {
97    /// The total count of children, dead or alive.
98    pub specs: usize,
99    /// The count of all actively running child processes managed by this supervisor.
100    pub active: usize,
101    /// The count of all children marked as `supervisor` dead or alive.
102    pub supervisors: usize,
103    /// The count of all children marked as `worker` dead or alive.
104    pub workers: usize,
105}
106
107/// The supervision strategy to use for each child.
108#[derive(Debug, Clone, Copy)]
109pub enum SupervisionStrategy {
110    /// If a child process terminates, only that process is restarted.
111    OneForOne,
112    /// If a child process terminates, all other child processes are terminated and then all child processes are restarted.
113    OneForAll,
114    /// If a child process terminates, the terminated child process and the rest of the children started after it, are terminated and restarted.
115    RestForOne,
116}
117
118/// A supervisor is a process which supervises other processes, which we refer to as child processes.
119/// Supervisors are used to build a hierarchical process structure called a supervision tree.
120/// Supervision trees provide fault-tolerance and encapsulate how our applications start and shutdown.
121#[derive(Clone)]
122pub struct Supervisor {
123    children: Vec<SupervisedChild>,
124    identifiers: BTreeSet<String>,
125    restarts: Vec<Instant>,
126    strategy: SupervisionStrategy,
127    auto_shutdown: AutoShutdown,
128    max_restarts: usize,
129    max_duration: Duration,
130}
131
132impl Supervisor {
133    /// Constructs a new instance of [Supervisor] with no children.
134    pub const fn new() -> Self {
135        Self {
136            children: Vec::new(),
137            identifiers: BTreeSet::new(),
138            restarts: Vec::new(),
139            strategy: SupervisionStrategy::OneForOne,
140            auto_shutdown: AutoShutdown::Never,
141            max_restarts: 3,
142            max_duration: Duration::from_secs(5),
143        }
144    }
145
146    /// Constructs a new instance of [Supervisor] with the given children.
147    pub fn with_children<T: IntoIterator<Item = ChildSpec>>(children: T) -> Self {
148        let mut result = Self::new();
149
150        for child in children {
151            result = result.add_child(child);
152        }
153
154        result
155    }
156
157    /// Adds a child to this [Supervisor].
158    pub fn add_child(mut self, child: ChildSpec) -> Self {
159        if self.identifiers.contains(&child.id) {
160            panic!("Child id was not unique!");
161        }
162
163        self.identifiers.insert(child.id.clone());
164
165        self.children.push(SupervisedChild {
166            spec: child,
167            pid: None,
168            restarting: false,
169        });
170
171        self
172    }
173
174    /// Builds a child specification for this [Supervisor] process.
175    pub fn child_spec(self, options: SupervisorOptions) -> ChildSpec {
176        ChildSpec::new("Supervisor")
177            .start(move || self.clone().start_link(options.clone()))
178            .child_type(ChildType::Supervisor)
179    }
180
181    /// Sets the supervision strategy for the [Supervisor].
182    pub const fn strategy(mut self, strategy: SupervisionStrategy) -> Self {
183        self.strategy = strategy;
184        self
185    }
186
187    /// Sets the behavior to use when a significant process exits.
188    pub const fn auto_shutdown(mut self, auto_shutdown: AutoShutdown) -> Self {
189        self.auto_shutdown = auto_shutdown;
190        self
191    }
192
193    /// Sets the maximum number of restarts allowed in a time frame.
194    ///
195    /// Defaults to 3.
196    pub const fn max_restarts(mut self, max_restarts: usize) -> Self {
197        self.max_restarts = max_restarts;
198        self
199    }
200
201    /// Sets the time frame in which `max_restarts` applies.
202    ///
203    /// Defaults to 5s.
204    pub const fn max_duration(mut self, max_duration: Duration) -> Self {
205        self.max_duration = max_duration;
206        self
207    }
208
209    /// Creates a supervisor process not apart of a supervision tree.
210    ///
211    /// This will not return until all of the child processes have been started.
212    pub async fn start(self, options: SupervisorOptions) -> Result<Pid, ExitReason> {
213        GenServer::start(self, options.into()).await
214    }
215
216    /// Creates a supervisor process as part of a supervision tree.
217    ///
218    /// For example, this function ensures that the supervisor is linked to the calling process (its supervisor).
219    ///
220    /// This will not return until all of the child processes have been started.
221    pub async fn start_link(self, options: SupervisorOptions) -> Result<Pid, ExitReason> {
222        GenServer::start_link(self, options.into()).await
223    }
224
225    /// Returns [SupervisorCounts] containing the counts for each of the different child specifications.
226    pub async fn count_children<T: Into<Dest>>(
227        supervisor: T,
228    ) -> Result<SupervisorCounts, SupervisorError> {
229        use SupervisorMessage::*;
230
231        match Supervisor::call(supervisor, CountChildren, None).await? {
232            CountChildrenSuccess(counts) => Ok(counts),
233            _ => unreachable!(),
234        }
235    }
236
237    /// Adds the child specification to the [Supervisor] and starts that child.
238    pub async fn start_child<T: Into<Dest>>(
239        supervisor: T,
240        child: ChildSpec,
241    ) -> Result<Option<Pid>, SupervisorError> {
242        use SupervisorMessage::*;
243
244        match Supervisor::call(supervisor, StartChild(Local::new(child)), None).await? {
245            StartChildSuccess(pid) => Ok(pid),
246            StartChildError(error) => Err(error),
247            _ => unreachable!(),
248        }
249    }
250
251    /// Terminates the given child identified by `child_id`.
252    ///
253    /// The process is terminated, if there's one. The child specification is kept unless the child is temporary.
254    ///
255    /// A non-temporary child process may later be restarted by the [Supervisor].
256    ///
257    /// The child process can also be restarted explicitly by calling `restart_child`. Use `delete_child` to remove the child specification.
258    pub async fn terminate_child<T: Into<Dest>, I: Into<String>>(
259        supervisor: T,
260        child_id: I,
261    ) -> Result<(), SupervisorError> {
262        use SupervisorMessage::*;
263
264        match Supervisor::call(supervisor, TerminateChild(child_id.into()), None).await? {
265            TerminateChildSuccess => Ok(()),
266            TerminateChildError(error) => Err(error),
267            _ => unreachable!(),
268        }
269    }
270
271    /// Restarts a child identified by `child_id`.
272    ///
273    /// The child specification must exist and the corresponding child process must not be running.
274    ///
275    /// Note that for temporary children, the child specification is automatically deleted when the child terminates,
276    /// and thus it is not possible to restart such children.
277    pub async fn restart_child<T: Into<Dest>, I: Into<String>>(
278        supervisor: T,
279        child_id: I,
280    ) -> Result<Option<Pid>, SupervisorError> {
281        use SupervisorMessage::*;
282
283        match Supervisor::call(supervisor, RestartChild(child_id.into()), None).await? {
284            RestartChildSuccess(pid) => Ok(pid),
285            RestartChildError(error) => Err(error),
286            _ => unreachable!(),
287        }
288    }
289
290    /// Deletes the child specification identified by `child_id`.
291    ///
292    /// The corrosponding child process must not be running, use `terminate_child` to terminate it if it's running.
293    pub async fn delete_child<T: Into<Dest>, I: Into<String>>(
294        supervisor: T,
295        child_id: I,
296    ) -> Result<(), SupervisorError> {
297        use SupervisorMessage::*;
298
299        match Supervisor::call(supervisor, DeleteChild(child_id.into()), None).await? {
300            DeleteChildSuccess => Ok(()),
301            DeleteChildError(error) => Err(error),
302            _ => unreachable!(),
303        }
304    }
305
306    /// Returns a list with information about all children of the given [Supervisor].
307    pub async fn which_children<T: Into<Dest>>(
308        supervisor: T,
309    ) -> Result<Vec<SupervisorChildInfo>, SupervisorError> {
310        use SupervisorMessage::*;
311
312        match Supervisor::call(supervisor, WhichChildren, None).await? {
313            WhichChildrenSuccess(info) => Ok(info),
314            _ => unreachable!(),
315        }
316    }
317
318    /// Starts all of the children.
319    async fn start_children(&mut self) -> Result<(), ExitReason> {
320        let mut remove: Vec<usize> = Vec::new();
321
322        for index in 0..self.children.len() {
323            match self.start_child_by_index(index).await {
324                Ok(pid) => {
325                    let child = &mut self.children[index];
326
327                    child.pid = pid;
328                    child.restarting = false;
329
330                    if child.is_temporary() && pid.is_none() {
331                        remove.push(index);
332                    }
333                }
334                Err(reason) => {
335                    #[cfg(feature = "tracing")]
336                    tracing::error!(reason = ?reason, child_id = ?self.children[index].spec.id, "Start error");
337
338                    #[cfg(not(feature = "tracing"))]
339                    let _ = reason;
340
341                    return Err(ExitReason::from("failed_to_start_child"));
342                }
343            }
344        }
345
346        for index in remove.into_iter().rev() {
347            self.remove_child(index);
348        }
349
350        Ok(())
351    }
352
353    /// Deletes a child by the id if it exists.
354    async fn delete_child_by_id(&mut self, child_id: String) -> Result<(), SupervisorError> {
355        let index = self
356            .children
357            .iter()
358            .position(|child| child.spec.id == child_id);
359
360        let Some(index) = index else {
361            return Err(SupervisorError::NotFound);
362        };
363
364        let child = &self.children[index];
365
366        if child.restarting {
367            return Err(SupervisorError::Restarting);
368        } else if child.pid.is_some() {
369            return Err(SupervisorError::Running);
370        }
371
372        let child = self.children.remove(index);
373
374        self.identifiers.remove(&child.spec.id);
375
376        Ok(())
377    }
378
379    /// Terminates a child by the id if it exists.
380    async fn terminate_child_by_id(&mut self, child_id: String) -> Result<(), SupervisorError> {
381        let index = self
382            .children
383            .iter()
384            .position(|child| child.spec.id == child_id);
385
386        if let Some(index) = index {
387            self.terminate_child_by_index(index).await;
388            Ok(())
389        } else {
390            Err(SupervisorError::NotFound)
391        }
392    }
393
394    /// Restarts a child by the id if it's not already started or pending.
395    async fn restart_child_by_id(
396        &mut self,
397        child_id: String,
398    ) -> Result<Option<Pid>, SupervisorError> {
399        let index = self
400            .children
401            .iter()
402            .position(|child| child.spec.id == child_id);
403
404        let Some(index) = index else {
405            return Err(SupervisorError::NotFound);
406        };
407
408        let child = &mut self.children[index];
409
410        if child.restarting {
411            return Err(SupervisorError::Restarting);
412        } else if child.pid.is_some() {
413            return Err(SupervisorError::Running);
414        }
415
416        match self.start_child_by_index(index).await {
417            Ok(pid) => {
418                let child = &mut self.children[index];
419
420                child.pid = pid;
421                child.restarting = false;
422
423                Ok(pid)
424            }
425            Err(reason) => Err(SupervisorError::StartError(reason)),
426        }
427    }
428
429    /// Terminates all of the children.
430    async fn terminate_children(&mut self) {
431        let mut remove: Vec<usize> = Vec::new();
432
433        for (index, child) in self.children.iter_mut().enumerate().rev() {
434            if child.is_temporary() {
435                remove.push(index);
436            }
437
438            let Some(pid) = child.pid.take() else {
439                continue;
440            };
441
442            if let Err(reason) = shutdown(pid, child.shutdown()).await {
443                #[cfg(feature = "tracing")]
444                tracing::error!(reason = ?reason, child_pid = ?pid, "Shutdown error");
445
446                #[cfg(not(feature = "tracing"))]
447                let _ = reason;
448            }
449        }
450
451        for index in remove {
452            self.remove_child(index);
453        }
454    }
455
456    /// Terminates a single child.
457    async fn terminate_child_by_index(&mut self, index: usize) {
458        let child = &mut self.children[index];
459
460        let Some(pid) = child.pid.take() else {
461            return;
462        };
463
464        child.restarting = false;
465
466        let _ = shutdown(pid, child.shutdown()).await;
467    }
468
469    /// Checks all of the children for correct specification and then starts them.
470    async fn init_children(&mut self) -> Result<(), ExitReason> {
471        if let Err(reason) = self.start_children().await {
472            self.terminate_children().await;
473
474            return Err(reason);
475        }
476
477        Ok(())
478    }
479
480    /// Restarts a child that exited for the given `reason`.
481    async fn restart_exited_child(
482        &mut self,
483        pid: Pid,
484        reason: ExitReason,
485    ) -> Result<(), ExitReason> {
486        let Some(index) = self.find_child(pid) else {
487            return Ok(());
488        };
489
490        let child = &mut self.children[index];
491
492        // Permanent children are always restarted.
493        if child.is_permanent() {
494            #[cfg(feature = "tracing")]
495            tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
496
497            if self.add_restart() {
498                return Err(ExitReason::from("shutdown"));
499            }
500
501            self.restart(index).await;
502
503            return Ok(());
504        }
505
506        // If it's not permanent, check if it's a normal reason.
507        if reason.is_normal() || reason == "shutdown" {
508            let child = self.remove_child(index);
509
510            if self.check_auto_shutdown(child) {
511                return Err(ExitReason::from("shutdown"));
512            } else {
513                return Ok(());
514            }
515        }
516
517        // Not a normal reason, check if transient.
518        if child.is_transient() {
519            #[cfg(feature = "tracing")]
520            tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
521
522            if self.add_restart() {
523                return Err(ExitReason::from("shutdown"));
524            }
525
526            self.restart(index).await;
527
528            return Ok(());
529        }
530
531        // Not transient, check if temporary and clean up.
532        if child.is_temporary() {
533            #[cfg(feature = "tracing")]
534            tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
535
536            let child = self.remove_child(index);
537
538            if self.check_auto_shutdown(child) {
539                return Err(ExitReason::from("shutdown"));
540            }
541        }
542
543        Ok(())
544    }
545
546    /// Restarts one or more children starting with the given `index` based on the current strategy.
547    async fn restart(&mut self, index: usize) {
548        use SupervisorMessage::*;
549
550        match self.strategy {
551            SupervisionStrategy::OneForOne => {
552                match self.start_child_by_index(index).await {
553                    Ok(pid) => {
554                        let child = &mut self.children[index];
555
556                        child.pid = pid;
557                        child.restarting = false;
558                    }
559                    Err(reason) => {
560                        let id = self.children[index].id();
561
562                        #[cfg(feature = "tracing")]
563                        tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
564
565                        #[cfg(not(feature = "tracing"))]
566                        let _ = reason;
567
568                        self.children[index].restarting = true;
569
570                        Supervisor::cast(Process::current(), TryAgainRestartId(id));
571                    }
572                };
573            }
574            SupervisionStrategy::RestForOne => {
575                if let Some((index, reason)) = self.restart_multiple_children(index, false).await {
576                    let id = self.children[index].id();
577
578                    #[cfg(feature = "tracing")]
579                    tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
580
581                    #[cfg(not(feature = "tracing"))]
582                    let _ = reason;
583
584                    self.children[index].restarting = true;
585
586                    Supervisor::cast(Process::current(), TryAgainRestartId(id));
587                }
588            }
589            SupervisionStrategy::OneForAll => {
590                if let Some((index, reason)) = self.restart_multiple_children(index, true).await {
591                    let id = self.children[index].id();
592
593                    #[cfg(feature = "tracing")]
594                    tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
595
596                    #[cfg(not(feature = "tracing"))]
597                    let _ = reason;
598
599                    self.children[index].restarting = true;
600
601                    Supervisor::cast(Process::current(), TryAgainRestartId(id));
602                }
603            }
604        }
605    }
606
607    /// Restarts multiple children, returning if one of them fails.
608    async fn restart_multiple_children(
609        &mut self,
610        index: usize,
611        all: bool,
612    ) -> Option<(usize, ExitReason)> {
613        let mut indices = Vec::new();
614
615        let range = if all {
616            0..self.children.len()
617        } else {
618            index..self.children.len()
619        };
620
621        for tindex in range {
622            indices.push(tindex);
623
624            if index == tindex {
625                continue;
626            }
627
628            self.terminate_child_by_index(tindex).await;
629        }
630
631        for sindex in indices {
632            match self.start_child_by_index(sindex).await {
633                Ok(pid) => {
634                    let child = &mut self.children[sindex];
635
636                    child.pid = pid;
637                    child.restarting = false;
638                }
639                Err(reason) => {
640                    return Some((sindex, reason));
641                }
642            }
643        }
644
645        None
646    }
647
648    /// Tries to restart the given child again, returns if an error occured.
649    async fn try_again_restart(&mut self, index: usize) -> Result<(), ExitReason> {
650        if self.add_restart() {
651            return Err(ExitReason::from("shutdown"));
652        }
653
654        if !self.children[index].restarting {
655            return Ok(());
656        }
657
658        self.restart(index).await;
659
660        Ok(())
661    }
662
663    /// Starts the given child by it's index and returns what the result was.
664    async fn start_child_by_index(&mut self, index: usize) -> Result<Option<Pid>, ExitReason> {
665        let child = &mut self.children[index];
666        let start_child = Pin::from(child.spec.start.as_ref().unwrap()()).await;
667
668        match start_child {
669            Ok(pid) => {
670                #[cfg(feature = "tracing")]
671                tracing::info!(child_id = ?child.spec.id, child_pid = ?pid, "Started child");
672
673                Ok(Some(pid))
674            }
675            Err(reason) => {
676                if reason.is_ignore() {
677                    #[cfg(feature = "tracing")]
678                    tracing::info!(child_id = ?child.spec.id, child_pid = ?None::<Pid>, "Started child");
679
680                    Ok(None)
681                } else {
682                    Err(reason)
683                }
684            }
685        }
686    }
687
688    /// Adds the new child spec to the children if it's unique and starts it.
689    async fn start_new_child(&mut self, spec: ChildSpec) -> Result<Option<Pid>, SupervisorError> {
690        if self.identifiers.contains(&spec.id) {
691            let child = self
692                .children
693                .iter()
694                .find(|child| child.spec.id == spec.id)
695                .unwrap();
696
697            if child.pid.is_some() {
698                return Err(SupervisorError::AlreadyStarted);
699            } else {
700                return Err(SupervisorError::AlreadyPresent);
701            }
702        }
703
704        self.identifiers.insert(spec.id.clone());
705        self.children.push(SupervisedChild {
706            spec,
707            pid: None,
708            restarting: false,
709        });
710
711        match self.start_child_by_index(self.children.len() - 1).await {
712            Ok(pid) => {
713                let index = self.children.len() - 1;
714                let child = &mut self.children[index];
715
716                child.pid = pid;
717                child.restarting = false;
718
719                if child.is_temporary() && pid.is_none() {
720                    self.children.remove(index);
721                }
722
723                Ok(pid)
724            }
725            Err(reason) => Err(SupervisorError::StartError(reason)),
726        }
727    }
728
729    /// Checks whether or not we should automatically shutdown the supervisor. Returns `true` if so.
730    fn check_auto_shutdown(&mut self, child: SupervisedChild) -> bool {
731        if matches!(self.auto_shutdown, AutoShutdown::Never) {
732            return false;
733        }
734
735        if !child.spec.significant {
736            return false;
737        }
738
739        if matches!(self.auto_shutdown, AutoShutdown::AnySignificant) {
740            return true;
741        }
742
743        self.children.iter().any(|child| {
744            if child.pid.is_none() {
745                return false;
746            }
747
748            child.spec.significant
749        })
750    }
751
752    /// Adds another restart to the backlog and returns `true` if we've exceeded our quota of restarts.
753    fn add_restart(&mut self) -> bool {
754        let now = Instant::now();
755        let threshold = now - self.max_duration;
756
757        self.restarts.retain(|restart| *restart >= threshold);
758        self.restarts.push(now);
759
760        if self.restarts.len() > self.max_restarts {
761            #[cfg(feature = "tracing")]
762            tracing::error!(restarts = ?self.restarts.len(), threshold = ?self.max_duration, "Reached max restart intensity");
763
764            return true;
765        }
766
767        false
768    }
769
770    /// Gets information on all of the children.
771    fn which_children_info(&mut self) -> Vec<SupervisorChildInfo> {
772        let mut result = Vec::with_capacity(self.children.len());
773
774        for child in &self.children {
775            result.push(SupervisorChildInfo {
776                id: child.spec.id.clone(),
777                child: child.pid,
778                child_type: child.spec.child_type,
779                restarting: child.restarting,
780            });
781        }
782
783        result
784    }
785
786    /// Counts all of the supervised children.
787    fn count_all_children(&mut self) -> SupervisorCounts {
788        let mut counts = SupervisorCounts {
789            specs: 0,
790            active: 0,
791            supervisors: 0,
792            workers: 0,
793        };
794
795        for child in &self.children {
796            counts.specs += 1;
797
798            if child.pid.is_some() {
799                counts.active += 1;
800            }
801
802            if matches!(child.spec.child_type, ChildType::Supervisor) {
803                counts.supervisors += 1;
804            } else {
805                counts.workers += 1;
806            }
807        }
808
809        counts
810    }
811
812    /// Removes a child from the supervisor.
813    fn remove_child(&mut self, index: usize) -> SupervisedChild {
814        let child = self.children.remove(index);
815
816        self.identifiers.remove(&child.spec.id);
817
818        child
819    }
820
821    /// Finds a child by the given `pid`.
822    fn find_child(&mut self, pid: Pid) -> Option<usize> {
823        self.children
824            .iter()
825            .position(|child| child.pid.is_some_and(|cpid| cpid == pid))
826    }
827
828    /// Finds a child by the given `id`.
829    fn find_child_id(&mut self, id: &str) -> Option<usize> {
830        self.children.iter().position(|child| child.spec.id == id)
831    }
832}
833
834impl SupervisedChild {
835    /// Returns `true` if the child is a permanent process.
836    pub const fn is_permanent(&self) -> bool {
837        matches!(self.spec.restart, Restart::Permanent)
838    }
839
840    /// Returns `true` if the child is a transient process.
841    pub const fn is_transient(&self) -> bool {
842        matches!(self.spec.restart, Restart::Transient)
843    }
844
845    /// Returns `true` if the child is a temporary process.
846    pub const fn is_temporary(&self) -> bool {
847        matches!(self.spec.restart, Restart::Temporary)
848    }
849
850    /// Returns the unique id of the child.
851    pub fn id(&self) -> String {
852        self.spec.id.clone()
853    }
854
855    /// Returns how the child should be terminated.
856    pub const fn shutdown(&self) -> Shutdown {
857        match self.spec.shutdown {
858            None => match self.spec.child_type {
859                ChildType::Worker => Shutdown::Duration(Duration::from_secs(5)),
860                ChildType::Supervisor => Shutdown::Infinity,
861            },
862            Some(shutdown) => shutdown,
863        }
864    }
865}
866
867impl Default for Supervisor {
868    fn default() -> Self {
869        Self::new()
870    }
871}
872
873impl GenServer for Supervisor {
874    type Message = SupervisorMessage;
875
876    async fn init(&mut self) -> Result<(), ExitReason> {
877        Process::set_flags(ProcessFlags::TRAP_EXIT);
878
879        self.init_children().await
880    }
881
882    async fn terminate(&mut self, _reason: ExitReason) {
883        self.terminate_children().await;
884    }
885
886    async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
887        use SupervisorMessage::*;
888
889        match message {
890            TryAgainRestartPid(pid) => {
891                if let Some(index) = self.find_child(pid) {
892                    return self.try_again_restart(index).await;
893                }
894            }
895            TryAgainRestartId(id) => {
896                if let Some(index) = self.find_child_id(&id) {
897                    return self.try_again_restart(index).await;
898                }
899            }
900            _ => unreachable!(),
901        }
902
903        Ok(())
904    }
905
906    async fn handle_call(
907        &mut self,
908        message: Self::Message,
909        _from: From,
910    ) -> Result<Option<Self::Message>, ExitReason> {
911        use SupervisorMessage::*;
912
913        match message {
914            CountChildren => {
915                let counts = self.count_all_children();
916
917                Ok(Some(CountChildrenSuccess(counts)))
918            }
919            StartChild(spec) => match self.start_new_child(spec.into_inner()).await {
920                Ok(pid) => Ok(Some(StartChildSuccess(pid))),
921                Err(error) => Ok(Some(StartChildError(error))),
922            },
923            TerminateChild(child_id) => match self.terminate_child_by_id(child_id).await {
924                Ok(()) => Ok(Some(TerminateChildSuccess)),
925                Err(error) => Ok(Some(TerminateChildError(error))),
926            },
927            RestartChild(child_id) => match self.restart_child_by_id(child_id).await {
928                Ok(pid) => Ok(Some(RestartChildSuccess(pid))),
929                Err(error) => Ok(Some(RestartChildError(error))),
930            },
931            DeleteChild(child_id) => match self.delete_child_by_id(child_id).await {
932                Ok(()) => Ok(Some(DeleteChildSuccess)),
933                Err(error) => Ok(Some(DeleteChildError(error))),
934            },
935            WhichChildren => {
936                let children = self.which_children_info();
937
938                Ok(Some(WhichChildrenSuccess(children)))
939            }
940            _ => unreachable!(),
941        }
942    }
943
944    async fn handle_info(&mut self, info: Message<Self::Message>) -> Result<(), ExitReason> {
945        match info {
946            Message::System(SystemMessage::Exit(pid, reason)) => {
947                self.restart_exited_child(pid, reason).await
948            }
949            _ => Ok(()),
950        }
951    }
952}
953
954impl std::convert::From<CallError> for SupervisorError {
955    fn from(value: CallError) -> Self {
956        Self::CallError(value)
957    }
958}
959
960/// Terminates the given `pid` using the given `shutdown` method.
961async fn shutdown(pid: Pid, shutdown: Shutdown) -> Result<(), ExitReason> {
962    let monitor = Process::monitor(pid);
963
964    match shutdown {
965        Shutdown::BrutalKill => shutdown_brutal_kill(pid, monitor).await,
966        Shutdown::Duration(timeout) => shutdown_timeout(pid, monitor, timeout).await,
967        Shutdown::Infinity => shutdown_infinity(pid, monitor).await,
968    }
969}