1use std::collections::BTreeSet;
2use std::pin::Pin;
3use std::time::Duration;
4use std::time::Instant;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use crate::AutoShutdown;
10use crate::CallError;
11use crate::ChildSpec;
12use crate::ChildType;
13use crate::Dest;
14use crate::ExitReason;
15use crate::From;
16use crate::GenServer;
17use crate::Local;
18use crate::Message;
19use crate::Pid;
20use crate::Process;
21use crate::ProcessFlags;
22use crate::Restart;
23use crate::Shutdown;
24use crate::SupervisorOptions;
25use crate::SystemMessage;
26use crate::shutdown_brutal_kill;
27use crate::shutdown_infinity;
28use crate::shutdown_timeout;
29
30#[derive(Clone)]
32struct SupervisedChild {
33 spec: ChildSpec,
34 pid: Option<Pid>,
35 restarting: bool,
36}
37
38#[doc(hidden)]
40#[derive(Serialize, Deserialize)]
41pub enum SupervisorMessage {
42 TryAgainRestartPid(Pid),
43 TryAgainRestartId(String),
44 CountChildren,
45 CountChildrenSuccess(SupervisorCounts),
46 StartChild(Local<ChildSpec>),
47 StartChildSuccess(Option<Pid>),
48 StartChildError(SupervisorError),
49 TerminateChild(String),
50 TerminateChildSuccess,
51 TerminateChildError(SupervisorError),
52 RestartChild(String),
53 RestartChildSuccess(Option<Pid>),
54 RestartChildError(SupervisorError),
55 DeleteChild(String),
56 DeleteChildSuccess,
57 DeleteChildError(SupervisorError),
58 WhichChildren,
59 WhichChildrenSuccess(Vec<SupervisorChildInfo>),
60}
61
62#[derive(Debug, Serialize, Deserialize)]
64pub enum SupervisorError {
65 CallError(CallError),
67 AlreadyStarted,
69 AlreadyPresent,
71 StartError(ExitReason),
73 NotFound,
75 Running,
77 Restarting,
79}
80
81#[derive(Debug, Serialize, Deserialize)]
83pub struct SupervisorChildInfo {
84 id: String,
86 child: Option<Pid>,
88 child_type: ChildType,
90 restarting: bool,
92}
93
94#[derive(Debug, Serialize, Deserialize)]
96pub struct SupervisorCounts {
97 pub specs: usize,
99 pub active: usize,
101 pub supervisors: usize,
103 pub workers: usize,
105}
106
107#[derive(Debug, Clone, Copy)]
109pub enum SupervisionStrategy {
110 OneForOne,
112 OneForAll,
114 RestForOne,
116}
117
118#[derive(Clone)]
122pub struct Supervisor {
123 children: Vec<SupervisedChild>,
124 identifiers: BTreeSet<String>,
125 restarts: Vec<Instant>,
126 strategy: SupervisionStrategy,
127 auto_shutdown: AutoShutdown,
128 max_restarts: usize,
129 max_duration: Duration,
130}
131
132impl Supervisor {
133 pub const fn new() -> Self {
135 Self {
136 children: Vec::new(),
137 identifiers: BTreeSet::new(),
138 restarts: Vec::new(),
139 strategy: SupervisionStrategy::OneForOne,
140 auto_shutdown: AutoShutdown::Never,
141 max_restarts: 3,
142 max_duration: Duration::from_secs(5),
143 }
144 }
145
146 pub fn with_children<T: IntoIterator<Item = ChildSpec>>(children: T) -> Self {
148 let mut result = Self::new();
149
150 for child in children {
151 result = result.add_child(child);
152 }
153
154 result
155 }
156
157 pub fn add_child(mut self, child: ChildSpec) -> Self {
159 if self.identifiers.contains(&child.id) {
160 panic!("Child id was not unique!");
161 }
162
163 self.identifiers.insert(child.id.clone());
164
165 self.children.push(SupervisedChild {
166 spec: child,
167 pid: None,
168 restarting: false,
169 });
170
171 self
172 }
173
174 pub fn child_spec(self, options: SupervisorOptions) -> ChildSpec {
176 ChildSpec::new("Supervisor")
177 .start(move || self.clone().start_link(options.clone()))
178 .child_type(ChildType::Supervisor)
179 }
180
181 pub const fn strategy(mut self, strategy: SupervisionStrategy) -> Self {
183 self.strategy = strategy;
184 self
185 }
186
187 pub const fn auto_shutdown(mut self, auto_shutdown: AutoShutdown) -> Self {
189 self.auto_shutdown = auto_shutdown;
190 self
191 }
192
193 pub const fn max_restarts(mut self, max_restarts: usize) -> Self {
197 self.max_restarts = max_restarts;
198 self
199 }
200
201 pub const fn max_duration(mut self, max_duration: Duration) -> Self {
205 self.max_duration = max_duration;
206 self
207 }
208
209 pub async fn start(self, options: SupervisorOptions) -> Result<Pid, ExitReason> {
213 GenServer::start(self, options.into()).await
214 }
215
216 pub async fn start_link(self, options: SupervisorOptions) -> Result<Pid, ExitReason> {
222 GenServer::start_link(self, options.into()).await
223 }
224
225 pub async fn count_children<T: Into<Dest>>(
227 supervisor: T,
228 ) -> Result<SupervisorCounts, SupervisorError> {
229 use SupervisorMessage::*;
230
231 match Supervisor::call(supervisor, CountChildren, None).await? {
232 CountChildrenSuccess(counts) => Ok(counts),
233 _ => unreachable!(),
234 }
235 }
236
237 pub async fn start_child<T: Into<Dest>>(
239 supervisor: T,
240 child: ChildSpec,
241 ) -> Result<Option<Pid>, SupervisorError> {
242 use SupervisorMessage::*;
243
244 match Supervisor::call(supervisor, StartChild(Local::new(child)), None).await? {
245 StartChildSuccess(pid) => Ok(pid),
246 StartChildError(error) => Err(error),
247 _ => unreachable!(),
248 }
249 }
250
251 pub async fn terminate_child<T: Into<Dest>, I: Into<String>>(
259 supervisor: T,
260 child_id: I,
261 ) -> Result<(), SupervisorError> {
262 use SupervisorMessage::*;
263
264 match Supervisor::call(supervisor, TerminateChild(child_id.into()), None).await? {
265 TerminateChildSuccess => Ok(()),
266 TerminateChildError(error) => Err(error),
267 _ => unreachable!(),
268 }
269 }
270
271 pub async fn restart_child<T: Into<Dest>, I: Into<String>>(
278 supervisor: T,
279 child_id: I,
280 ) -> Result<Option<Pid>, SupervisorError> {
281 use SupervisorMessage::*;
282
283 match Supervisor::call(supervisor, RestartChild(child_id.into()), None).await? {
284 RestartChildSuccess(pid) => Ok(pid),
285 RestartChildError(error) => Err(error),
286 _ => unreachable!(),
287 }
288 }
289
290 pub async fn delete_child<T: Into<Dest>, I: Into<String>>(
294 supervisor: T,
295 child_id: I,
296 ) -> Result<(), SupervisorError> {
297 use SupervisorMessage::*;
298
299 match Supervisor::call(supervisor, DeleteChild(child_id.into()), None).await? {
300 DeleteChildSuccess => Ok(()),
301 DeleteChildError(error) => Err(error),
302 _ => unreachable!(),
303 }
304 }
305
306 pub async fn which_children<T: Into<Dest>>(
308 supervisor: T,
309 ) -> Result<Vec<SupervisorChildInfo>, SupervisorError> {
310 use SupervisorMessage::*;
311
312 match Supervisor::call(supervisor, WhichChildren, None).await? {
313 WhichChildrenSuccess(info) => Ok(info),
314 _ => unreachable!(),
315 }
316 }
317
318 async fn start_children(&mut self) -> Result<(), ExitReason> {
320 let mut remove: Vec<usize> = Vec::new();
321
322 for index in 0..self.children.len() {
323 match self.start_child_by_index(index).await {
324 Ok(pid) => {
325 let child = &mut self.children[index];
326
327 child.pid = pid;
328 child.restarting = false;
329
330 if child.is_temporary() && pid.is_none() {
331 remove.push(index);
332 }
333 }
334 Err(reason) => {
335 #[cfg(feature = "tracing")]
336 tracing::error!(reason = ?reason, child_id = ?self.children[index].spec.id, "Start error");
337
338 #[cfg(not(feature = "tracing"))]
339 let _ = reason;
340
341 return Err(ExitReason::from("failed_to_start_child"));
342 }
343 }
344 }
345
346 for index in remove.into_iter().rev() {
347 self.remove_child(index);
348 }
349
350 Ok(())
351 }
352
353 async fn delete_child_by_id(&mut self, child_id: String) -> Result<(), SupervisorError> {
355 let index = self
356 .children
357 .iter()
358 .position(|child| child.spec.id == child_id);
359
360 let Some(index) = index else {
361 return Err(SupervisorError::NotFound);
362 };
363
364 let child = &self.children[index];
365
366 if child.restarting {
367 return Err(SupervisorError::Restarting);
368 } else if child.pid.is_some() {
369 return Err(SupervisorError::Running);
370 }
371
372 let child = self.children.remove(index);
373
374 self.identifiers.remove(&child.spec.id);
375
376 Ok(())
377 }
378
379 async fn terminate_child_by_id(&mut self, child_id: String) -> Result<(), SupervisorError> {
381 let index = self
382 .children
383 .iter()
384 .position(|child| child.spec.id == child_id);
385
386 if let Some(index) = index {
387 self.terminate_child_by_index(index).await;
388 Ok(())
389 } else {
390 Err(SupervisorError::NotFound)
391 }
392 }
393
394 async fn restart_child_by_id(
396 &mut self,
397 child_id: String,
398 ) -> Result<Option<Pid>, SupervisorError> {
399 let index = self
400 .children
401 .iter()
402 .position(|child| child.spec.id == child_id);
403
404 let Some(index) = index else {
405 return Err(SupervisorError::NotFound);
406 };
407
408 let child = &mut self.children[index];
409
410 if child.restarting {
411 return Err(SupervisorError::Restarting);
412 } else if child.pid.is_some() {
413 return Err(SupervisorError::Running);
414 }
415
416 match self.start_child_by_index(index).await {
417 Ok(pid) => {
418 let child = &mut self.children[index];
419
420 child.pid = pid;
421 child.restarting = false;
422
423 Ok(pid)
424 }
425 Err(reason) => Err(SupervisorError::StartError(reason)),
426 }
427 }
428
429 async fn terminate_children(&mut self) {
431 let mut remove: Vec<usize> = Vec::new();
432
433 for (index, child) in self.children.iter_mut().enumerate().rev() {
434 if child.is_temporary() {
435 remove.push(index);
436 }
437
438 let Some(pid) = child.pid.take() else {
439 continue;
440 };
441
442 if let Err(reason) = shutdown(pid, child.shutdown()).await {
443 #[cfg(feature = "tracing")]
444 tracing::error!(reason = ?reason, child_pid = ?pid, "Shutdown error");
445
446 #[cfg(not(feature = "tracing"))]
447 let _ = reason;
448 }
449 }
450
451 for index in remove {
452 self.remove_child(index);
453 }
454 }
455
456 async fn terminate_child_by_index(&mut self, index: usize) {
458 let child = &mut self.children[index];
459
460 let Some(pid) = child.pid.take() else {
461 return;
462 };
463
464 child.restarting = false;
465
466 let _ = shutdown(pid, child.shutdown()).await;
467 }
468
469 async fn init_children(&mut self) -> Result<(), ExitReason> {
471 if let Err(reason) = self.start_children().await {
472 self.terminate_children().await;
473
474 return Err(reason);
475 }
476
477 Ok(())
478 }
479
480 async fn restart_exited_child(
482 &mut self,
483 pid: Pid,
484 reason: ExitReason,
485 ) -> Result<(), ExitReason> {
486 let Some(index) = self.find_child(pid) else {
487 return Ok(());
488 };
489
490 let child = &mut self.children[index];
491
492 if child.is_permanent() {
494 #[cfg(feature = "tracing")]
495 tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
496
497 if self.add_restart() {
498 return Err(ExitReason::from("shutdown"));
499 }
500
501 self.restart(index).await;
502
503 return Ok(());
504 }
505
506 if reason.is_normal() || reason == "shutdown" {
508 let child = self.remove_child(index);
509
510 if self.check_auto_shutdown(child) {
511 return Err(ExitReason::from("shutdown"));
512 } else {
513 return Ok(());
514 }
515 }
516
517 if child.is_transient() {
519 #[cfg(feature = "tracing")]
520 tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
521
522 if self.add_restart() {
523 return Err(ExitReason::from("shutdown"));
524 }
525
526 self.restart(index).await;
527
528 return Ok(());
529 }
530
531 if child.is_temporary() {
533 #[cfg(feature = "tracing")]
534 tracing::error!(reason = ?reason, child_id = ?child.spec.id, child_pid = ?child.pid, "Child terminated");
535
536 let child = self.remove_child(index);
537
538 if self.check_auto_shutdown(child) {
539 return Err(ExitReason::from("shutdown"));
540 }
541 }
542
543 Ok(())
544 }
545
546 async fn restart(&mut self, index: usize) {
548 use SupervisorMessage::*;
549
550 match self.strategy {
551 SupervisionStrategy::OneForOne => {
552 match self.start_child_by_index(index).await {
553 Ok(pid) => {
554 let child = &mut self.children[index];
555
556 child.pid = pid;
557 child.restarting = false;
558 }
559 Err(reason) => {
560 let id = self.children[index].id();
561
562 #[cfg(feature = "tracing")]
563 tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
564
565 #[cfg(not(feature = "tracing"))]
566 let _ = reason;
567
568 self.children[index].restarting = true;
569
570 Supervisor::cast(Process::current(), TryAgainRestartId(id));
571 }
572 };
573 }
574 SupervisionStrategy::RestForOne => {
575 if let Some((index, reason)) = self.restart_multiple_children(index, false).await {
576 let id = self.children[index].id();
577
578 #[cfg(feature = "tracing")]
579 tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
580
581 #[cfg(not(feature = "tracing"))]
582 let _ = reason;
583
584 self.children[index].restarting = true;
585
586 Supervisor::cast(Process::current(), TryAgainRestartId(id));
587 }
588 }
589 SupervisionStrategy::OneForAll => {
590 if let Some((index, reason)) = self.restart_multiple_children(index, true).await {
591 let id = self.children[index].id();
592
593 #[cfg(feature = "tracing")]
594 tracing::error!(reason = ?reason, child_id = ?id, child_pid = ?self.children[index].pid, "Start error");
595
596 #[cfg(not(feature = "tracing"))]
597 let _ = reason;
598
599 self.children[index].restarting = true;
600
601 Supervisor::cast(Process::current(), TryAgainRestartId(id));
602 }
603 }
604 }
605 }
606
607 async fn restart_multiple_children(
609 &mut self,
610 index: usize,
611 all: bool,
612 ) -> Option<(usize, ExitReason)> {
613 let mut indices = Vec::new();
614
615 let range = if all {
616 0..self.children.len()
617 } else {
618 index..self.children.len()
619 };
620
621 for tindex in range {
622 indices.push(tindex);
623
624 if index == tindex {
625 continue;
626 }
627
628 self.terminate_child_by_index(tindex).await;
629 }
630
631 for sindex in indices {
632 match self.start_child_by_index(sindex).await {
633 Ok(pid) => {
634 let child = &mut self.children[sindex];
635
636 child.pid = pid;
637 child.restarting = false;
638 }
639 Err(reason) => {
640 return Some((sindex, reason));
641 }
642 }
643 }
644
645 None
646 }
647
648 async fn try_again_restart(&mut self, index: usize) -> Result<(), ExitReason> {
650 if self.add_restart() {
651 return Err(ExitReason::from("shutdown"));
652 }
653
654 if !self.children[index].restarting {
655 return Ok(());
656 }
657
658 self.restart(index).await;
659
660 Ok(())
661 }
662
663 async fn start_child_by_index(&mut self, index: usize) -> Result<Option<Pid>, ExitReason> {
665 let child = &mut self.children[index];
666 let start_child = Pin::from(child.spec.start.as_ref().unwrap()()).await;
667
668 match start_child {
669 Ok(pid) => {
670 #[cfg(feature = "tracing")]
671 tracing::info!(child_id = ?child.spec.id, child_pid = ?pid, "Started child");
672
673 Ok(Some(pid))
674 }
675 Err(reason) => {
676 if reason.is_ignore() {
677 #[cfg(feature = "tracing")]
678 tracing::info!(child_id = ?child.spec.id, child_pid = ?None::<Pid>, "Started child");
679
680 Ok(None)
681 } else {
682 Err(reason)
683 }
684 }
685 }
686 }
687
688 async fn start_new_child(&mut self, spec: ChildSpec) -> Result<Option<Pid>, SupervisorError> {
690 if self.identifiers.contains(&spec.id) {
691 let child = self
692 .children
693 .iter()
694 .find(|child| child.spec.id == spec.id)
695 .unwrap();
696
697 if child.pid.is_some() {
698 return Err(SupervisorError::AlreadyStarted);
699 } else {
700 return Err(SupervisorError::AlreadyPresent);
701 }
702 }
703
704 self.identifiers.insert(spec.id.clone());
705 self.children.push(SupervisedChild {
706 spec,
707 pid: None,
708 restarting: false,
709 });
710
711 match self.start_child_by_index(self.children.len() - 1).await {
712 Ok(pid) => {
713 let index = self.children.len() - 1;
714 let child = &mut self.children[index];
715
716 child.pid = pid;
717 child.restarting = false;
718
719 if child.is_temporary() && pid.is_none() {
720 self.children.remove(index);
721 }
722
723 Ok(pid)
724 }
725 Err(reason) => Err(SupervisorError::StartError(reason)),
726 }
727 }
728
729 fn check_auto_shutdown(&mut self, child: SupervisedChild) -> bool {
731 if matches!(self.auto_shutdown, AutoShutdown::Never) {
732 return false;
733 }
734
735 if !child.spec.significant {
736 return false;
737 }
738
739 if matches!(self.auto_shutdown, AutoShutdown::AnySignificant) {
740 return true;
741 }
742
743 self.children.iter().any(|child| {
744 if child.pid.is_none() {
745 return false;
746 }
747
748 child.spec.significant
749 })
750 }
751
752 fn add_restart(&mut self) -> bool {
754 let now = Instant::now();
755 let threshold = now - self.max_duration;
756
757 self.restarts.retain(|restart| *restart >= threshold);
758 self.restarts.push(now);
759
760 if self.restarts.len() > self.max_restarts {
761 #[cfg(feature = "tracing")]
762 tracing::error!(restarts = ?self.restarts.len(), threshold = ?self.max_duration, "Reached max restart intensity");
763
764 return true;
765 }
766
767 false
768 }
769
770 fn which_children_info(&mut self) -> Vec<SupervisorChildInfo> {
772 let mut result = Vec::with_capacity(self.children.len());
773
774 for child in &self.children {
775 result.push(SupervisorChildInfo {
776 id: child.spec.id.clone(),
777 child: child.pid,
778 child_type: child.spec.child_type,
779 restarting: child.restarting,
780 });
781 }
782
783 result
784 }
785
786 fn count_all_children(&mut self) -> SupervisorCounts {
788 let mut counts = SupervisorCounts {
789 specs: 0,
790 active: 0,
791 supervisors: 0,
792 workers: 0,
793 };
794
795 for child in &self.children {
796 counts.specs += 1;
797
798 if child.pid.is_some() {
799 counts.active += 1;
800 }
801
802 if matches!(child.spec.child_type, ChildType::Supervisor) {
803 counts.supervisors += 1;
804 } else {
805 counts.workers += 1;
806 }
807 }
808
809 counts
810 }
811
812 fn remove_child(&mut self, index: usize) -> SupervisedChild {
814 let child = self.children.remove(index);
815
816 self.identifiers.remove(&child.spec.id);
817
818 child
819 }
820
821 fn find_child(&mut self, pid: Pid) -> Option<usize> {
823 self.children
824 .iter()
825 .position(|child| child.pid.is_some_and(|cpid| cpid == pid))
826 }
827
828 fn find_child_id(&mut self, id: &str) -> Option<usize> {
830 self.children.iter().position(|child| child.spec.id == id)
831 }
832}
833
834impl SupervisedChild {
835 pub const fn is_permanent(&self) -> bool {
837 matches!(self.spec.restart, Restart::Permanent)
838 }
839
840 pub const fn is_transient(&self) -> bool {
842 matches!(self.spec.restart, Restart::Transient)
843 }
844
845 pub const fn is_temporary(&self) -> bool {
847 matches!(self.spec.restart, Restart::Temporary)
848 }
849
850 pub fn id(&self) -> String {
852 self.spec.id.clone()
853 }
854
855 pub const fn shutdown(&self) -> Shutdown {
857 match self.spec.shutdown {
858 None => match self.spec.child_type {
859 ChildType::Worker => Shutdown::Duration(Duration::from_secs(5)),
860 ChildType::Supervisor => Shutdown::Infinity,
861 },
862 Some(shutdown) => shutdown,
863 }
864 }
865}
866
867impl Default for Supervisor {
868 fn default() -> Self {
869 Self::new()
870 }
871}
872
873impl GenServer for Supervisor {
874 type Message = SupervisorMessage;
875
876 async fn init(&mut self) -> Result<(), ExitReason> {
877 Process::set_flags(ProcessFlags::TRAP_EXIT);
878
879 self.init_children().await
880 }
881
882 async fn terminate(&mut self, _reason: ExitReason) {
883 self.terminate_children().await;
884 }
885
886 async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
887 use SupervisorMessage::*;
888
889 match message {
890 TryAgainRestartPid(pid) => {
891 if let Some(index) = self.find_child(pid) {
892 return self.try_again_restart(index).await;
893 }
894 }
895 TryAgainRestartId(id) => {
896 if let Some(index) = self.find_child_id(&id) {
897 return self.try_again_restart(index).await;
898 }
899 }
900 _ => unreachable!(),
901 }
902
903 Ok(())
904 }
905
906 async fn handle_call(
907 &mut self,
908 message: Self::Message,
909 _from: From,
910 ) -> Result<Option<Self::Message>, ExitReason> {
911 use SupervisorMessage::*;
912
913 match message {
914 CountChildren => {
915 let counts = self.count_all_children();
916
917 Ok(Some(CountChildrenSuccess(counts)))
918 }
919 StartChild(spec) => match self.start_new_child(spec.into_inner()).await {
920 Ok(pid) => Ok(Some(StartChildSuccess(pid))),
921 Err(error) => Ok(Some(StartChildError(error))),
922 },
923 TerminateChild(child_id) => match self.terminate_child_by_id(child_id).await {
924 Ok(()) => Ok(Some(TerminateChildSuccess)),
925 Err(error) => Ok(Some(TerminateChildError(error))),
926 },
927 RestartChild(child_id) => match self.restart_child_by_id(child_id).await {
928 Ok(pid) => Ok(Some(RestartChildSuccess(pid))),
929 Err(error) => Ok(Some(RestartChildError(error))),
930 },
931 DeleteChild(child_id) => match self.delete_child_by_id(child_id).await {
932 Ok(()) => Ok(Some(DeleteChildSuccess)),
933 Err(error) => Ok(Some(DeleteChildError(error))),
934 },
935 WhichChildren => {
936 let children = self.which_children_info();
937
938 Ok(Some(WhichChildrenSuccess(children)))
939 }
940 _ => unreachable!(),
941 }
942 }
943
944 async fn handle_info(&mut self, info: Message<Self::Message>) -> Result<(), ExitReason> {
945 match info {
946 Message::System(SystemMessage::Exit(pid, reason)) => {
947 self.restart_exited_child(pid, reason).await
948 }
949 _ => Ok(()),
950 }
951 }
952}
953
954impl std::convert::From<CallError> for SupervisorError {
955 fn from(value: CallError) -> Self {
956 Self::CallError(value)
957 }
958}
959
960async fn shutdown(pid: Pid, shutdown: Shutdown) -> Result<(), ExitReason> {
962 let monitor = Process::monitor(pid);
963
964 match shutdown {
965 Shutdown::BrutalKill => shutdown_brutal_kill(pid, monitor).await,
966 Shutdown::Duration(timeout) => shutdown_timeout(pid, monitor, timeout).await,
967 Shutdown::Infinity => shutdown_infinity(pid, monitor).await,
968 }
969}