1pub mod adapters;
52pub mod backoff;
53pub mod lifecycle;
54pub mod service;
55
56pub use adapters::{ApiModuleAdapter, ModuleAdapter};
58pub use backoff::{BackoffAction, BackoffConfig, BackoffState};
59pub use lifecycle::{
60 ServiceLifecycle, ServiceLifecycleSnapshot, ServicePhase, TerminationReason, TransitionError,
61};
62pub use service::{JanusService, RestartPolicy};
63
64use std::collections::HashMap;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::time::Duration;
68
69use tokio::sync::RwLock;
70use tokio_util::sync::CancellationToken;
71use tokio_util::task::TaskTracker;
72
73#[derive(Debug, Clone)]
79pub struct SupervisorConfig {
80 pub default_backoff: BackoffConfig,
84
85 pub shutdown_timeout: Duration,
88
89 pub install_signal_handler: bool,
93}
94
95impl Default for SupervisorConfig {
96 fn default() -> Self {
97 Self {
98 default_backoff: BackoffConfig::default(),
99 shutdown_timeout: Duration::from_secs(30),
100 install_signal_handler: true,
101 }
102 }
103}
104
105impl SupervisorConfig {
106 pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
108 self.shutdown_timeout = timeout;
109 self
110 }
111
112 pub fn with_default_backoff(mut self, backoff: BackoffConfig) -> Self {
114 self.default_backoff = backoff;
115 self
116 }
117
118 pub fn without_signal_handler(mut self) -> Self {
120 self.install_signal_handler = false;
121 self
122 }
123}
124
125#[derive(Debug, Default)]
136pub struct SupervisorMetrics {
137 pub restarts_total: AtomicU64,
139
140 pub active_services: AtomicU64,
142
143 pub spawned_total: AtomicU64,
145
146 pub terminated_total: AtomicU64,
148
149 pub circuit_breaker_trips: AtomicU64,
151}
152
153impl SupervisorMetrics {
154 fn new() -> Self {
155 Self::default()
156 }
157
158 fn record_spawn(&self) {
159 self.spawned_total.fetch_add(1, Ordering::Relaxed);
160 let new_active = self.active_services.fetch_add(1, Ordering::Relaxed) + 1;
161
162 let prom = crate::metrics::metrics();
169 prom.supervisor_spawned_total.inc();
170 prom.supervisor_active_services.set(new_active as f64);
171 }
172
173 fn record_restart(&self) {
174 self.restarts_total.fetch_add(1, Ordering::Relaxed);
175 crate::metrics::metrics().supervisor_restarts_total.inc();
176 }
177
178 fn record_termination(&self) {
179 self.terminated_total.fetch_add(1, Ordering::Relaxed);
180 let prev = self
183 .active_services
184 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
185 Some(v.saturating_sub(1))
186 })
187 .unwrap_or(0);
188 let new_active = prev.saturating_sub(1);
189
190 let prom = crate::metrics::metrics();
194 prom.supervisor_terminated_total.inc();
195 prom.supervisor_active_services.set(new_active as f64);
196 }
197
198 fn record_termination_with_uptime(&self, service_name: &str, uptime_secs: f64) {
199 self.record_termination();
200 crate::metrics::metrics()
201 .supervisor_uptime_seconds
202 .with_label_values(&[service_name])
203 .observe(uptime_secs);
204 }
205
206 fn record_circuit_breaker_trip(&self) {
207 self.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
208 crate::metrics::metrics()
209 .supervisor_circuit_breaker_trips
210 .inc();
211 }
212
213 pub fn snapshot(&self) -> MetricsSnapshot {
215 MetricsSnapshot {
216 restarts_total: self.restarts_total.load(Ordering::Relaxed),
217 active_services: self.active_services.load(Ordering::Relaxed),
218 spawned_total: self.spawned_total.load(Ordering::Relaxed),
219 terminated_total: self.terminated_total.load(Ordering::Relaxed),
220 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
221 }
222 }
223}
224
225#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
227pub struct MetricsSnapshot {
228 pub restarts_total: u64,
229 pub active_services: u64,
230 pub spawned_total: u64,
231 pub terminated_total: u64,
232 pub circuit_breaker_trips: u64,
233}
234
235#[derive(Debug, Clone, Default)]
241pub struct SpawnOptions {
242 pub backoff: Option<BackoffConfig>,
244}
245
246impl SpawnOptions {
247 pub fn with_backoff(backoff: BackoffConfig) -> Self {
249 Self {
250 backoff: Some(backoff),
251 }
252 }
253}
254
255pub struct JanusSupervisor {
272 config: SupervisorConfig,
273 tracker: TaskTracker,
274 cancel_token: CancellationToken,
275 metrics: Arc<SupervisorMetrics>,
276 lifecycles: Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
277}
278
279impl JanusSupervisor {
280 pub fn new(config: SupervisorConfig) -> Self {
282 Self {
283 config,
284 tracker: TaskTracker::new(),
285 cancel_token: CancellationToken::new(),
286 metrics: Arc::new(SupervisorMetrics::new()),
287 lifecycles: Arc::new(RwLock::new(HashMap::new())),
288 }
289 }
290
291 pub fn with_defaults() -> Self {
293 Self::new(SupervisorConfig::default())
294 }
295
296 pub fn cancel_token(&self) -> &CancellationToken {
300 &self.cancel_token
301 }
302
303 pub fn metrics(&self) -> &Arc<SupervisorMetrics> {
305 &self.metrics
306 }
307
308 pub async fn lifecycle_snapshots(&self) -> Vec<ServiceLifecycleSnapshot> {
310 let lifecycles = self.lifecycles.read().await;
311 lifecycles
312 .values()
313 .map(ServiceLifecycleSnapshot::from)
314 .collect()
315 }
316
317 pub async fn service_lifecycle(&self, name: &str) -> Option<ServiceLifecycleSnapshot> {
319 let lifecycles = self.lifecycles.read().await;
320 lifecycles.get(name).map(ServiceLifecycleSnapshot::from)
321 }
322
323 pub async fn service_count(&self) -> usize {
325 self.lifecycles.read().await.len()
326 }
327
328 #[tracing::instrument(skip(self))]
335 pub fn trigger_shutdown(&self) {
336 tracing::info!("Supervisor: shutdown triggered");
337 self.cancel_token.cancel();
338 }
339
340 pub fn is_shutting_down(&self) -> bool {
342 self.cancel_token.is_cancelled()
343 }
344
345 pub fn spawn_service(&self, service: Box<dyn JanusService>) {
352 self.spawn_service_with_options(service, SpawnOptions::default());
353 }
354
355 #[tracing::instrument(skip(self, service, options), fields(service = %service.name(), policy = %service.restart_policy()))]
357 pub fn spawn_service_with_options(
358 &self,
359 service: Box<dyn JanusService>,
360 options: SpawnOptions,
361 ) {
362 let service_name = service.name().to_string();
363 let restart_policy = service.restart_policy();
364 let backoff_config = options
365 .backoff
366 .unwrap_or_else(|| self.config.default_backoff.clone());
367
368 let cancel = self.cancel_token.child_token();
369 let metrics = self.metrics.clone();
370 let lifecycles = self.lifecycles.clone();
371
372 metrics.record_spawn();
373
374 self.tracker.spawn(Self::service_loop(
375 service,
376 service_name,
377 restart_policy,
378 backoff_config,
379 cancel,
380 metrics,
381 lifecycles,
382 ));
383 }
384
385 #[tracing::instrument(
393 skip_all,
394 fields(service = %service_name, policy = %restart_policy)
395 )]
396 async fn service_loop(
397 service: Box<dyn JanusService>,
398 service_name: String,
399 restart_policy: RestartPolicy,
400 backoff_config: BackoffConfig,
401 cancel: CancellationToken,
402 metrics: Arc<SupervisorMetrics>,
403 lifecycles: Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
404 ) {
405 let mut backoff = BackoffState::new(backoff_config);
406 let mut lifecycle = ServiceLifecycle::new(&service_name);
407
408 {
410 let mut lc_map = lifecycles.write().await;
411 lc_map.insert(service_name.clone(), lifecycle.clone());
412 }
413
414 loop {
415 if cancel.is_cancelled() {
417 tracing::info!(service = %service_name, "cancellation detected, not starting service");
418 let _ = lifecycle.transition_to_stopping();
419 let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
420 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
421 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
422 metrics.record_termination_with_uptime(&service_name, uptime);
423 return;
424 }
425
426 if lifecycle.phase() == ServicePhase::Starting {
428 let _ = lifecycle.transition_to_running();
429 } else if lifecycle.phase() == ServicePhase::BackingOff {
430 let _ = lifecycle.transition_to_restarting();
432 let _ = lifecycle.transition_to_running();
433 metrics.record_restart();
434 }
435
436 backoff.record_start();
437 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
438
439 tracing::info!(
440 service = %service_name,
441 attempt = lifecycle.start_count(),
442 "running service"
443 );
444
445 let result = service.run(cancel.clone()).await;
453
454 if cancel.is_cancelled() {
457 tracing::info!(service = %service_name, "service exited after cancellation");
458 let _ = lifecycle.transition_to_stopping();
459 let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
460 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
461 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
462 metrics.record_termination_with_uptime(&service_name, uptime);
463 return;
464 }
465
466 match result {
468 Ok(()) => {
469 tracing::info!(service = %service_name, "service exited cleanly");
470 backoff.maybe_reset_on_cooldown();
471
472 match restart_policy {
473 RestartPolicy::Always => {
474 backoff.reset();
479
480 tracing::info!(
482 service = %service_name,
483 "restart_policy=always, will restart after backoff"
484 );
485 let delay = Duration::from_millis(100);
488 let _ = lifecycle
489 .transition_to_backing_off("clean exit, policy=always", delay);
490 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
491
492 tokio::select! {
493 _ = cancel.cancelled() => {
494 let _ = lifecycle.transition_to_stopping();
495 let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
496 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
497 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
498 metrics.record_termination_with_uptime(&service_name, uptime);
499 return;
500 }
501 _ = tokio::time::sleep(delay) => {}
502 }
503 continue;
504 }
505 RestartPolicy::OnFailure | RestartPolicy::Never => {
506 let _ =
508 lifecycle.transition_to_terminated(TerminationReason::Completed);
509 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
510 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
511 metrics.record_termination_with_uptime(&service_name, uptime);
512 return;
513 }
514 }
515 }
516
517 Err(err) => {
518 let error_msg = format!("{err:#}");
519 tracing::error!(
520 service = %service_name,
521 error = %error_msg,
522 "service failed"
523 );
524
525 backoff.maybe_reset_on_cooldown();
526
527 match restart_policy {
528 RestartPolicy::Never => {
529 tracing::warn!(
530 service = %service_name,
531 "restart_policy=never, service will not be restarted"
532 );
533 let _ = lifecycle.transition_to_terminated(
534 TerminationReason::Unrecoverable(error_msg),
535 );
536 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
537 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
538 metrics.record_termination_with_uptime(&service_name, uptime);
539 return;
540 }
541
542 RestartPolicy::OnFailure | RestartPolicy::Always => {
543 match backoff.next_backoff() {
545 BackoffAction::Retry(delay) => {
546 tracing::info!(
547 service = %service_name,
548 delay_ms = delay.as_millis() as u64,
549 attempt = backoff.attempt(),
550 "scheduling restart after backoff"
551 );
552
553 let _ = lifecycle.transition_to_backing_off(&error_msg, delay);
554 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle)
555 .await;
556
557 tokio::select! {
559 _ = cancel.cancelled() => {
560 tracing::info!(
561 service = %service_name,
562 "cancellation during backoff"
563 );
564 let _ = lifecycle.transition_to_stopping();
565 let _ = lifecycle.transition_to_terminated(
566 TerminationReason::Cancelled,
567 );
568 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
569 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
570 metrics.record_termination_with_uptime(&service_name, uptime);
571 return;
572 }
573 _ = tokio::time::sleep(delay) => {
574 }
576 }
577 }
578
579 BackoffAction::CircuitOpen {
580 failures,
581 max_retries,
582 } => {
583 tracing::error!(
584 service = %service_name,
585 failures = failures,
586 max_retries = max_retries,
587 "CIRCUIT BREAKER OPEN — too many failures, giving up"
588 );
589 metrics.record_circuit_breaker_trip();
590
591 let _ = lifecycle.transition_to_terminated(
592 TerminationReason::CircuitBreakerOpen {
593 failures,
594 max_retries,
595 },
596 );
597 Self::update_lifecycle(&lifecycles, &service_name, &lifecycle)
598 .await;
599 let uptime = lifecycle.cumulative_running_time().as_secs_f64();
600 metrics.record_termination_with_uptime(&service_name, uptime);
601 return;
602 }
603 }
604 }
605 }
606 }
607 }
608 }
609 }
610
611 async fn update_lifecycle(
613 lifecycles: &Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
614 name: &str,
615 lifecycle: &ServiceLifecycle,
616 ) {
617 let mut lc_map = lifecycles.write().await;
618 lc_map.insert(name.to_string(), lifecycle.clone());
619 }
620
621 #[tracing::instrument(skip(self), fields(timeout_secs = self.config.shutdown_timeout.as_secs()))]
628 pub async fn wait_for_drain(&self) {
629 self.tracker.close();
630
631 tracing::info!(
632 timeout_secs = self.config.shutdown_timeout.as_secs(),
633 "waiting for all services to drain"
634 );
635
636 match tokio::time::timeout(self.config.shutdown_timeout, self.tracker.wait()).await {
637 Ok(()) => {
638 tracing::info!("all services drained successfully");
639 }
640 Err(_) => {
641 tracing::warn!(
642 timeout_secs = self.config.shutdown_timeout.as_secs(),
643 "shutdown timeout exceeded, some services may not have exited cleanly"
644 );
645 }
646 }
647 }
648
649 #[tracing::instrument(skip(self), fields(signal_handler = self.config.install_signal_handler))]
657 pub async fn run_until_shutdown(&self) -> anyhow::Result<()> {
658 if self.config.install_signal_handler {
659 self.wait_for_signal_and_shutdown().await?;
660 } else {
661 self.cancel_token.cancelled().await;
663 tracing::info!("external shutdown signal received");
664 }
665
666 self.wait_for_drain().await;
667
668 let snap = self.metrics.snapshot();
670 tracing::info!(
671 restarts = snap.restarts_total,
672 spawned = snap.spawned_total,
673 terminated = snap.terminated_total,
674 circuit_trips = snap.circuit_breaker_trips,
675 "supervisor shutdown complete"
676 );
677
678 Ok(())
679 }
680
681 async fn wait_for_signal_and_shutdown(&self) -> anyhow::Result<()> {
683 #[cfg(unix)]
684 {
685 use tokio::signal::unix::{SignalKind, signal};
686
687 let mut sigterm = signal(SignalKind::terminate())?;
688 let mut sigint = signal(SignalKind::interrupt())?;
689
690 tokio::select! {
691 _ = sigterm.recv() => {
692 tracing::info!("received SIGTERM");
693 }
694 _ = sigint.recv() => {
695 tracing::info!("received SIGINT");
696 }
697 _ = self.cancel_token.cancelled() => {
698 tracing::info!("shutdown triggered programmatically");
699 return Ok(());
700 }
701 }
702 }
703
704 #[cfg(not(unix))]
705 {
706 tokio::select! {
707 result = tokio::signal::ctrl_c() => {
708 result?;
709 tracing::info!("received Ctrl+C");
710 }
711 _ = self.cancel_token.cancelled() => {
712 tracing::info!("shutdown triggered programmatically");
713 return Ok(());
714 }
715 }
716 }
717
718 self.cancel_token.cancel();
719 Ok(())
720 }
721}
722
723#[cfg(test)]
728mod tests {
729 use super::*;
730
731 struct CountingService {
736 name: String,
737 policy: RestartPolicy,
738 run_count: Arc<AtomicU64>,
739 }
740
741 impl CountingService {
742 fn new(name: &str, policy: RestartPolicy) -> (Self, Arc<AtomicU64>) {
743 let count = Arc::new(AtomicU64::new(0));
744 (
745 Self {
746 name: name.to_string(),
747 policy,
748 run_count: count.clone(),
749 },
750 count,
751 )
752 }
753 }
754
755 #[async_trait::async_trait]
756 impl JanusService for CountingService {
757 fn name(&self) -> &str {
758 &self.name
759 }
760
761 fn restart_policy(&self) -> RestartPolicy {
762 self.policy
763 }
764
765 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
766 self.run_count.fetch_add(1, Ordering::SeqCst);
767 cancel.cancelled().await;
768 Ok(())
769 }
770 }
771
772 struct FailNTimes {
774 name: String,
775 fail_count: u32,
776 current: Arc<AtomicU64>,
777 }
778
779 impl FailNTimes {
780 fn new(name: &str, fail_count: u32) -> (Self, Arc<AtomicU64>) {
781 let current = Arc::new(AtomicU64::new(0));
782 (
783 Self {
784 name: name.to_string(),
785 fail_count,
786 current: current.clone(),
787 },
788 current,
789 )
790 }
791 }
792
793 #[async_trait::async_trait]
794 impl JanusService for FailNTimes {
795 fn name(&self) -> &str {
796 &self.name
797 }
798
799 fn restart_policy(&self) -> RestartPolicy {
800 RestartPolicy::OnFailure
801 }
802
803 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
804 let attempt = self.current.fetch_add(1, Ordering::SeqCst) as u32;
805 if attempt < self.fail_count {
806 tokio::time::sleep(Duration::from_millis(1)).await;
808 anyhow::bail!("simulated failure #{}", attempt + 1);
809 }
810 cancel.cancelled().await;
812 Ok(())
813 }
814 }
815
816 struct OneShotService {
818 name: String,
819 ran: Arc<AtomicU64>,
820 }
821
822 impl OneShotService {
823 fn new(name: &str) -> (Self, Arc<AtomicU64>) {
824 let ran = Arc::new(AtomicU64::new(0));
825 (
826 Self {
827 name: name.to_string(),
828 ran: ran.clone(),
829 },
830 ran,
831 )
832 }
833 }
834
835 #[async_trait::async_trait]
836 impl JanusService for OneShotService {
837 fn name(&self) -> &str {
838 &self.name
839 }
840
841 fn restart_policy(&self) -> RestartPolicy {
842 RestartPolicy::Never
843 }
844
845 async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
846 self.ran.fetch_add(1, Ordering::SeqCst);
847 Ok(())
848 }
849 }
850
851 struct AlwaysFailService {
853 name: String,
854 attempts: Arc<AtomicU64>,
855 }
856
857 impl AlwaysFailService {
858 fn new(name: &str) -> (Self, Arc<AtomicU64>) {
859 let attempts = Arc::new(AtomicU64::new(0));
860 (
861 Self {
862 name: name.to_string(),
863 attempts: attempts.clone(),
864 },
865 attempts,
866 )
867 }
868 }
869
870 #[async_trait::async_trait]
871 impl JanusService for AlwaysFailService {
872 fn name(&self) -> &str {
873 &self.name
874 }
875
876 fn restart_policy(&self) -> RestartPolicy {
877 RestartPolicy::OnFailure
878 }
879
880 async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
881 self.attempts.fetch_add(1, Ordering::SeqCst);
882 tokio::time::sleep(Duration::from_millis(1)).await;
883 anyhow::bail!("permanent failure");
884 }
885 }
886
887 #[tokio::test]
890 async fn test_supervisor_creation() {
891 let sup = JanusSupervisor::with_defaults();
892 assert!(!sup.is_shutting_down());
893 assert_eq!(sup.service_count().await, 0);
894 }
895
896 #[tokio::test]
897 async fn test_spawn_and_cancel_single_service() {
898 let config = SupervisorConfig::default().without_signal_handler();
899 let sup = JanusSupervisor::new(config);
900
901 let (svc, count) = CountingService::new("test-svc", RestartPolicy::OnFailure);
902 sup.spawn_service(Box::new(svc));
903
904 tokio::time::sleep(Duration::from_millis(50)).await;
906
907 assert_eq!(count.load(Ordering::SeqCst), 1);
908 assert_eq!(sup.metrics().active_services.load(Ordering::Relaxed), 1);
909
910 sup.trigger_shutdown();
912 sup.wait_for_drain().await;
913
914 assert_eq!(count.load(Ordering::SeqCst), 1);
915 let snap = sup.metrics().snapshot();
916 assert_eq!(snap.spawned_total, 1);
917 assert_eq!(snap.terminated_total, 1);
918 assert_eq!(snap.active_services, 0);
919 }
920
921 #[tokio::test]
922 async fn test_spawn_multiple_services() {
923 let config = SupervisorConfig::default().without_signal_handler();
924 let sup = JanusSupervisor::new(config);
925
926 let (svc1, count1) = CountingService::new("svc-1", RestartPolicy::OnFailure);
927 let (svc2, count2) = CountingService::new("svc-2", RestartPolicy::OnFailure);
928 let (svc3, count3) = CountingService::new("svc-3", RestartPolicy::OnFailure);
929
930 sup.spawn_service(Box::new(svc1));
931 sup.spawn_service(Box::new(svc2));
932 sup.spawn_service(Box::new(svc3));
933
934 tokio::time::sleep(Duration::from_millis(50)).await;
935
936 assert_eq!(count1.load(Ordering::SeqCst), 1);
937 assert_eq!(count2.load(Ordering::SeqCst), 1);
938 assert_eq!(count3.load(Ordering::SeqCst), 1);
939
940 sup.trigger_shutdown();
941 sup.wait_for_drain().await;
942
943 let snap = sup.metrics().snapshot();
944 assert_eq!(snap.spawned_total, 3);
945 assert_eq!(snap.terminated_total, 3);
946 }
947
948 #[tokio::test]
949 async fn test_service_restart_on_failure() {
950 let config = SupervisorConfig::default()
951 .without_signal_handler()
952 .with_default_backoff(
953 BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
954 .without_circuit_breaker(),
955 );
956
957 let sup = JanusSupervisor::new(config);
958
959 let (svc, attempts) = FailNTimes::new("fail-3", 3);
961 sup.spawn_service(Box::new(svc));
962
963 tokio::time::sleep(Duration::from_millis(500)).await;
965
966 assert!(
968 attempts.load(Ordering::SeqCst) >= 4,
969 "expected >= 4 attempts, got {}",
970 attempts.load(Ordering::SeqCst)
971 );
972
973 sup.trigger_shutdown();
974 sup.wait_for_drain().await;
975
976 let snap = sup.metrics().snapshot();
977 assert!(snap.restarts_total >= 3);
978 }
979
980 #[tokio::test]
981 async fn test_one_shot_service_no_restart() {
982 let config = SupervisorConfig::default().without_signal_handler();
983 let sup = JanusSupervisor::new(config);
984
985 let (svc, ran) = OneShotService::new("one-shot");
986 sup.spawn_service(Box::new(svc));
987
988 tokio::time::sleep(Duration::from_millis(100)).await;
990
991 assert_eq!(ran.load(Ordering::SeqCst), 1);
993
994 let snap = sup.metrics().snapshot();
996 assert_eq!(snap.terminated_total, 1);
997 assert_eq!(snap.restarts_total, 0);
998
999 sup.trigger_shutdown();
1000 sup.wait_for_drain().await;
1001 }
1002
1003 #[tokio::test]
1004 async fn test_restart_policy_never_on_failure() {
1005 let config = SupervisorConfig::default().without_signal_handler();
1006 let sup = JanusSupervisor::new(config);
1007
1008 struct FailOnce {
1010 ran: Arc<AtomicU64>,
1011 }
1012
1013 #[async_trait::async_trait]
1014 impl JanusService for FailOnce {
1015 fn name(&self) -> &str {
1016 "fail-once-never"
1017 }
1018 fn restart_policy(&self) -> RestartPolicy {
1019 RestartPolicy::Never
1020 }
1021 async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1022 self.ran.fetch_add(1, Ordering::SeqCst);
1023 anyhow::bail!("intentional failure");
1024 }
1025 }
1026
1027 let ran = Arc::new(AtomicU64::new(0));
1028 let svc = FailOnce { ran: ran.clone() };
1029 sup.spawn_service(Box::new(svc));
1030
1031 tokio::time::sleep(Duration::from_millis(100)).await;
1032
1033 assert_eq!(ran.load(Ordering::SeqCst), 1);
1035
1036 let snap = sup.metrics().snapshot();
1037 assert_eq!(snap.terminated_total, 1);
1038 assert_eq!(snap.restarts_total, 0);
1039
1040 sup.trigger_shutdown();
1041 sup.wait_for_drain().await;
1042 }
1043
1044 #[tokio::test]
1045 async fn test_circuit_breaker_trips() {
1046 let config = SupervisorConfig::default()
1047 .without_signal_handler()
1048 .with_default_backoff(
1049 BackoffConfig::new(Duration::from_millis(5), Duration::from_millis(20))
1050 .with_circuit_breaker(3, Duration::from_secs(60)),
1051 );
1052
1053 let sup = JanusSupervisor::new(config);
1054
1055 let (svc, attempts) = AlwaysFailService::new("always-fail");
1056 sup.spawn_service(Box::new(svc));
1057
1058 tokio::time::sleep(Duration::from_millis(500)).await;
1060
1061 let att = attempts.load(Ordering::SeqCst);
1063 assert!(att >= 3, "expected at least 3 attempts, got {}", att);
1064
1065 let snap = sup.metrics().snapshot();
1066 assert_eq!(snap.circuit_breaker_trips, 1);
1067 assert_eq!(snap.terminated_total, 1);
1068
1069 sup.trigger_shutdown();
1070 sup.wait_for_drain().await;
1071 }
1072
1073 #[tokio::test]
1074 async fn test_lifecycle_snapshots() {
1075 let config = SupervisorConfig::default().without_signal_handler();
1076 let sup = JanusSupervisor::new(config);
1077
1078 let (svc, _) = CountingService::new("lifecycle-test", RestartPolicy::OnFailure);
1079 sup.spawn_service(Box::new(svc));
1080
1081 tokio::time::sleep(Duration::from_millis(50)).await;
1082
1083 let snapshots = sup.lifecycle_snapshots().await;
1084 assert_eq!(snapshots.len(), 1);
1085
1086 let snap = &snapshots[0];
1087 assert_eq!(snap.service_name, "lifecycle-test");
1088 assert_eq!(snap.phase, ServicePhase::Running);
1089 assert_eq!(snap.start_count, 1);
1090 assert_eq!(snap.total_failures, 0);
1091
1092 sup.trigger_shutdown();
1093 sup.wait_for_drain().await;
1094
1095 let snapshots = sup.lifecycle_snapshots().await;
1097 let snap = &snapshots[0];
1098 assert_eq!(snap.phase, ServicePhase::Terminated);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_service_lifecycle_by_name() {
1103 let config = SupervisorConfig::default().without_signal_handler();
1104 let sup = JanusSupervisor::new(config);
1105
1106 let (svc, _) = CountingService::new("named-svc", RestartPolicy::OnFailure);
1107 sup.spawn_service(Box::new(svc));
1108
1109 tokio::time::sleep(Duration::from_millis(50)).await;
1110
1111 let snap = sup.service_lifecycle("named-svc").await;
1112 assert!(snap.is_some());
1113 assert_eq!(snap.unwrap().service_name, "named-svc");
1114
1115 let missing = sup.service_lifecycle("nonexistent").await;
1116 assert!(missing.is_none());
1117
1118 sup.trigger_shutdown();
1119 sup.wait_for_drain().await;
1120 }
1121
1122 #[tokio::test]
1123 async fn test_metrics_snapshot() {
1124 let sup = JanusSupervisor::with_defaults();
1125 let snap = sup.metrics().snapshot();
1126
1127 assert_eq!(snap.restarts_total, 0);
1128 assert_eq!(snap.active_services, 0);
1129 assert_eq!(snap.spawned_total, 0);
1130 assert_eq!(snap.terminated_total, 0);
1131 assert_eq!(snap.circuit_breaker_trips, 0);
1132 }
1133
1134 #[tokio::test]
1135 async fn test_run_until_shutdown_with_external_cancel() {
1136 let config = SupervisorConfig::default().without_signal_handler();
1137 let sup = JanusSupervisor::new(config);
1138
1139 let (svc, count) = CountingService::new("ext-cancel", RestartPolicy::OnFailure);
1140 sup.spawn_service(Box::new(svc));
1141
1142 let cancel = sup.cancel_token().clone();
1143
1144 let handle = tokio::spawn({
1146 let sup_ref_metrics = sup.metrics().clone();
1147 async move {
1148 cancel.cancelled().await;
1151 sup_ref_metrics.snapshot()
1152 }
1153 });
1154
1155 tokio::time::sleep(Duration::from_millis(50)).await;
1156 assert_eq!(count.load(Ordering::SeqCst), 1);
1157
1158 sup.trigger_shutdown();
1159 sup.wait_for_drain().await;
1160
1161 let snap = handle.await.unwrap();
1162 assert_eq!(snap.spawned_total, 1);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_shutdown_timeout() {
1167 let config = SupervisorConfig::default()
1168 .without_signal_handler()
1169 .with_shutdown_timeout(Duration::from_millis(100));
1170
1171 let sup = JanusSupervisor::new(config);
1172
1173 struct HangingService;
1175
1176 #[async_trait::async_trait]
1177 impl JanusService for HangingService {
1178 fn name(&self) -> &str {
1179 "hanger"
1180 }
1181 async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1182 tokio::time::sleep(Duration::from_secs(3600)).await;
1185 Ok(())
1186 }
1187 }
1188
1189 sup.spawn_service(Box::new(HangingService));
1190 tokio::time::sleep(Duration::from_millis(20)).await;
1191
1192 sup.trigger_shutdown();
1193
1194 let start = std::time::Instant::now();
1196 sup.wait_for_drain().await;
1197 let elapsed = start.elapsed();
1198
1199 assert!(
1201 elapsed < Duration::from_secs(1),
1202 "drain took too long: {:?}",
1203 elapsed
1204 );
1205 }
1206
1207 #[tokio::test]
1208 async fn test_spawn_with_custom_backoff() {
1209 let config = SupervisorConfig::default().without_signal_handler();
1210 let sup = JanusSupervisor::new(config);
1211
1212 let (svc, attempts) = AlwaysFailService::new("custom-backoff");
1213
1214 let custom_backoff =
1215 BackoffConfig::new(Duration::from_millis(5), Duration::from_millis(10))
1216 .with_circuit_breaker(2, Duration::from_secs(60));
1217
1218 sup.spawn_service_with_options(Box::new(svc), SpawnOptions::with_backoff(custom_backoff));
1219
1220 tokio::time::sleep(Duration::from_millis(200)).await;
1221
1222 assert!(attempts.load(Ordering::SeqCst) >= 2);
1224
1225 let snap = sup.metrics().snapshot();
1226 assert_eq!(snap.circuit_breaker_trips, 1);
1227
1228 sup.trigger_shutdown();
1229 sup.wait_for_drain().await;
1230 }
1231
1232 #[tokio::test]
1233 async fn test_restart_policy_always_on_clean_exit() {
1234 let config = SupervisorConfig::default()
1235 .without_signal_handler()
1236 .with_default_backoff(
1237 BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
1238 .without_circuit_breaker(),
1239 );
1240
1241 let sup = JanusSupervisor::new(config);
1242
1243 struct ExitImmediately {
1245 count: Arc<AtomicU64>,
1246 }
1247
1248 #[async_trait::async_trait]
1249 impl JanusService for ExitImmediately {
1250 fn name(&self) -> &str {
1251 "exit-immediately"
1252 }
1253 fn restart_policy(&self) -> RestartPolicy {
1254 RestartPolicy::Always
1255 }
1256 async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1257 self.count.fetch_add(1, Ordering::SeqCst);
1258 tokio::time::sleep(Duration::from_millis(1)).await;
1259 Ok(())
1260 }
1261 }
1262
1263 let count = Arc::new(AtomicU64::new(0));
1264 let svc = ExitImmediately {
1265 count: count.clone(),
1266 };
1267
1268 sup.spawn_service(Box::new(svc));
1269
1270 tokio::time::sleep(Duration::from_millis(500)).await;
1272
1273 let runs = count.load(Ordering::SeqCst);
1274 assert!(
1275 runs >= 2,
1276 "expected service to run multiple times with Always policy, got {}",
1277 runs
1278 );
1279
1280 sup.trigger_shutdown();
1281 sup.wait_for_drain().await;
1282 }
1283
1284 #[tokio::test]
1285 async fn test_is_shutting_down() {
1286 let sup = JanusSupervisor::with_defaults();
1287 assert!(!sup.is_shutting_down());
1288
1289 sup.trigger_shutdown();
1290 assert!(sup.is_shutting_down());
1291 }
1292
1293 #[tokio::test]
1294 async fn test_config_builder() {
1295 let config = SupervisorConfig::default()
1296 .with_shutdown_timeout(Duration::from_secs(10))
1297 .with_default_backoff(BackoffConfig::new(
1298 Duration::from_millis(200),
1299 Duration::from_secs(30),
1300 ))
1301 .without_signal_handler();
1302
1303 assert_eq!(config.shutdown_timeout, Duration::from_secs(10));
1304 assert!(!config.install_signal_handler);
1305 assert_eq!(
1306 config.default_backoff.base_delay,
1307 Duration::from_millis(200)
1308 );
1309 assert_eq!(config.default_backoff.max_delay, Duration::from_secs(30));
1310 }
1311
1312 struct LifecycleTracer {
1319 name: String,
1320 log: Arc<tokio::sync::Mutex<Vec<String>>>,
1321 policy: RestartPolicy,
1322 }
1323
1324 impl LifecycleTracer {
1325 fn new(
1326 name: &str,
1327 log: Arc<tokio::sync::Mutex<Vec<String>>>,
1328 policy: RestartPolicy,
1329 ) -> Self {
1330 Self {
1331 name: name.to_string(),
1332 log,
1333 policy,
1334 }
1335 }
1336 }
1337
1338 #[async_trait::async_trait]
1339 impl JanusService for LifecycleTracer {
1340 fn name(&self) -> &str {
1341 &self.name
1342 }
1343
1344 fn restart_policy(&self) -> RestartPolicy {
1345 self.policy
1346 }
1347
1348 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
1349 {
1350 let mut l = self.log.lock().await;
1351 l.push(format!("{}:started", self.name));
1352 }
1353 cancel.cancelled().await;
1354 {
1355 let mut l = self.log.lock().await;
1356 l.push(format!("{}:stopped", self.name));
1357 }
1358 Ok(())
1359 }
1360 }
1361
1362 #[tokio::test]
1371 async fn test_integration_graceful_shutdown_e2e() {
1372 let log = Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
1373
1374 let config = SupervisorConfig::default()
1375 .with_shutdown_timeout(Duration::from_secs(5))
1376 .without_signal_handler();
1377
1378 let sup = JanusSupervisor::new(config);
1379
1380 sup.spawn_service(Box::new(LifecycleTracer::new(
1382 "api",
1383 log.clone(),
1384 RestartPolicy::Always,
1385 )));
1386 sup.spawn_service(Box::new(LifecycleTracer::new(
1387 "data",
1388 log.clone(),
1389 RestartPolicy::OnFailure,
1390 )));
1391 sup.spawn_service(Box::new(LifecycleTracer::new(
1392 "cns",
1393 log.clone(),
1394 RestartPolicy::OnFailure,
1395 )));
1396
1397 tokio::time::sleep(Duration::from_millis(100)).await;
1399
1400 assert_eq!(sup.service_count().await, 3);
1401 assert!(!sup.is_shutting_down());
1402
1403 sup.trigger_shutdown();
1405 assert!(sup.is_shutting_down());
1406
1407 sup.wait_for_drain().await;
1409
1410 let events = log.lock().await;
1412 for svc in &["api", "data", "cns"] {
1413 assert!(
1414 events.contains(&format!("{}:started", svc)),
1415 "service '{}' never started; events: {:?}",
1416 svc,
1417 *events,
1418 );
1419 assert!(
1420 events.contains(&format!("{}:stopped", svc)),
1421 "service '{}' never stopped; events: {:?}",
1422 svc,
1423 *events,
1424 );
1425 }
1426
1427 let snapshots = sup.lifecycle_snapshots().await;
1429 assert_eq!(snapshots.len(), 3);
1430 for snap in &snapshots {
1431 assert_eq!(
1432 snap.phase,
1433 ServicePhase::Terminated,
1434 "service '{}' should be Terminated, was {}",
1435 snap.service_name,
1436 snap.phase,
1437 );
1438 assert_eq!(
1439 snap.termination_reason.as_deref(),
1440 Some("cancelled"),
1441 "service '{}' should have been cancelled, got {:?}",
1442 snap.service_name,
1443 snap.termination_reason,
1444 );
1445 assert!(snap.start_count >= 1);
1446 }
1447
1448 let metrics = sup.metrics().snapshot();
1450 assert_eq!(metrics.spawned_total, 3);
1451 assert_eq!(metrics.terminated_total, 3);
1452 assert_eq!(metrics.active_services, 0);
1453 }
1454
1455 struct ChaosService {
1464 name: String,
1465 fail_times: u32,
1466 current: Arc<std::sync::atomic::AtomicU32>,
1467 attempt_times: Arc<tokio::sync::Mutex<Vec<std::time::Instant>>>,
1468 policy: RestartPolicy,
1469 }
1470
1471 impl ChaosService {
1472 fn new(name: &str, fail_times: u32, policy: RestartPolicy) -> Self {
1473 Self {
1474 name: name.to_string(),
1475 fail_times,
1476 current: Arc::new(std::sync::atomic::AtomicU32::new(0)),
1477 attempt_times: Arc::new(tokio::sync::Mutex::new(Vec::new())),
1478 policy,
1479 }
1480 }
1481 }
1482
1483 #[async_trait::async_trait]
1484 impl JanusService for ChaosService {
1485 fn name(&self) -> &str {
1486 &self.name
1487 }
1488
1489 fn restart_policy(&self) -> RestartPolicy {
1490 self.policy
1491 }
1492
1493 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
1494 {
1495 let mut ts = self.attempt_times.lock().await;
1496 ts.push(std::time::Instant::now());
1497 }
1498
1499 let n = self
1500 .current
1501 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1502
1503 if n < self.fail_times {
1504 anyhow::bail!("chaos failure #{}", n + 1);
1505 }
1506
1507 cancel.cancelled().await;
1509 Ok(())
1510 }
1511 }
1512
1513 #[tokio::test]
1522 async fn test_chaos_exponential_backoff() {
1523 let backoff = BackoffConfig::new(
1524 Duration::from_millis(20), Duration::from_secs(2), )
1527 .without_circuit_breaker(); let config = SupervisorConfig::default()
1530 .with_shutdown_timeout(Duration::from_secs(5))
1531 .with_default_backoff(backoff)
1532 .without_signal_handler();
1533
1534 let sup = JanusSupervisor::new(config);
1535
1536 let chaos = ChaosService::new("chaos-backoff", 3, RestartPolicy::OnFailure);
1537 let attempts_arc = chaos.attempt_times.clone();
1538 let current_arc = chaos.current.clone();
1539
1540 sup.spawn_service(Box::new(chaos));
1541
1542 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1544 loop {
1545 let count = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1546 if count >= 4 {
1547 break; }
1549 if tokio::time::Instant::now() > deadline {
1550 panic!("chaos service did not recover in time; attempts={}", count,);
1551 }
1552 tokio::time::sleep(Duration::from_millis(50)).await;
1553 }
1554
1555 tokio::time::sleep(Duration::from_millis(100)).await;
1557
1558 let timestamps = attempts_arc.lock().await;
1560 assert!(
1561 timestamps.len() >= 4,
1562 "expected ≥4 attempts, got {}",
1563 timestamps.len(),
1564 );
1565
1566 let delays: Vec<Duration> = timestamps
1569 .windows(2)
1570 .map(|w| w[1].duration_since(w[0]))
1571 .collect();
1572
1573 for (i, d) in delays.iter().enumerate().skip(1) {
1583 assert!(
1584 *d >= Duration::from_millis(1),
1585 "delay[{}] too short: {:?} — backoff may not be working",
1586 i,
1587 d,
1588 );
1589 }
1590
1591 let metrics = sup.metrics().snapshot();
1593 assert!(
1594 metrics.restarts_total >= 3,
1595 "expected ≥3 restarts, got {}",
1596 metrics.restarts_total,
1597 );
1598
1599 sup.trigger_shutdown();
1601 sup.wait_for_drain().await;
1602
1603 let snap = sup.service_lifecycle("chaos-backoff").await;
1605 assert!(snap.is_some());
1606 let snap = snap.unwrap();
1607 assert_eq!(snap.phase, ServicePhase::Terminated);
1608 }
1609
1610 #[tokio::test]
1619 async fn test_chaos_circuit_breaker_trips() {
1620 let backoff = BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
1621 .with_circuit_breaker(3, Duration::from_secs(60));
1622
1623 let config = SupervisorConfig::default()
1624 .with_shutdown_timeout(Duration::from_secs(5))
1625 .with_default_backoff(backoff)
1626 .without_signal_handler();
1627
1628 let sup = JanusSupervisor::new(config);
1629
1630 let chaos = ChaosService::new("chaos-cb", 1000, RestartPolicy::OnFailure);
1632 let current_arc = chaos.current.clone();
1633
1634 sup.spawn_service(Box::new(chaos));
1635
1636 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1638 loop {
1639 if let Some(snap) = sup.service_lifecycle("chaos-cb").await
1640 && snap.phase == ServicePhase::Terminated
1641 {
1642 break;
1643 }
1644 if tokio::time::Instant::now() > deadline {
1645 panic!(
1646 "circuit breaker did not trip in time; attempts={}",
1647 current_arc.load(std::sync::atomic::Ordering::SeqCst),
1648 );
1649 }
1650 tokio::time::sleep(Duration::from_millis(50)).await;
1651 }
1652
1653 let snap = sup.service_lifecycle("chaos-cb").await.unwrap();
1654 assert_eq!(snap.phase, ServicePhase::Terminated);
1655
1656 let reason = snap
1658 .termination_reason
1659 .as_deref()
1660 .expect("expected a termination reason");
1661 assert!(
1662 reason.contains("circuit breaker"),
1663 "expected circuit breaker termination, got: {}",
1664 reason,
1665 );
1666
1667 let metrics = sup.metrics().snapshot();
1669 assert!(
1670 metrics.circuit_breaker_trips >= 1,
1671 "expected ≥1 circuit breaker trip, got {}",
1672 metrics.circuit_breaker_trips,
1673 );
1674
1675 let attempts_at_trip = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1678 tokio::time::sleep(Duration::from_millis(200)).await;
1679 let attempts_after = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1680 assert_eq!(
1681 attempts_at_trip, attempts_after,
1682 "service should NOT restart after circuit breaker trips",
1683 );
1684
1685 sup.trigger_shutdown();
1686 sup.wait_for_drain().await;
1687 }
1688
1689 #[tokio::test]
1697 async fn test_chaos_mixed_fleet() {
1698 let backoff = BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(100))
1699 .with_circuit_breaker(3, Duration::from_secs(60));
1700
1701 let config = SupervisorConfig::default()
1702 .with_shutdown_timeout(Duration::from_secs(5))
1703 .with_default_backoff(backoff)
1704 .without_signal_handler();
1705
1706 let sup = JanusSupervisor::new(config);
1707
1708 let log = Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
1709
1710 sup.spawn_service(Box::new(LifecycleTracer::new(
1712 "healthy-api",
1713 log.clone(),
1714 RestartPolicy::OnFailure,
1715 )));
1716
1717 let chaos = ChaosService::new("bad-data", 1000, RestartPolicy::OnFailure);
1719 sup.spawn_service(Box::new(chaos));
1720
1721 let recovering = ChaosService::new("flaky-cns", 2, RestartPolicy::OnFailure);
1723 let recovering_attempts = recovering.current.clone();
1724 sup.spawn_service(Box::new(recovering));
1725
1726 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
1729 loop {
1730 if sup.service_count().await == 3 {
1731 break;
1732 }
1733 if tokio::time::Instant::now() > deadline {
1734 panic!(
1735 "timed out waiting for 3 services to register; got {}",
1736 sup.service_count().await,
1737 );
1738 }
1739 tokio::time::sleep(Duration::from_millis(10)).await;
1740 }
1741
1742 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1745 loop {
1746 let bad_done = sup
1747 .service_lifecycle("bad-data")
1748 .await
1749 .is_some_and(|s| s.phase == ServicePhase::Terminated);
1750
1751 let recovered = recovering_attempts.load(std::sync::atomic::Ordering::SeqCst) >= 3;
1752
1753 if bad_done && recovered {
1754 break;
1755 }
1756 if tokio::time::Instant::now() > deadline {
1757 panic!("mixed fleet did not reach expected state in time");
1758 }
1759 tokio::time::sleep(Duration::from_millis(50)).await;
1760 }
1761
1762 let healthy_snap = sup.service_lifecycle("healthy-api").await.unwrap();
1764 assert!(
1765 healthy_snap.phase.is_alive(),
1766 "healthy service should still be alive, was {}",
1767 healthy_snap.phase,
1768 );
1769
1770 let bad_snap = sup.service_lifecycle("bad-data").await.unwrap();
1772 assert_eq!(bad_snap.phase, ServicePhase::Terminated);
1773 assert!(
1774 bad_snap
1775 .termination_reason
1776 .as_deref()
1777 .is_some_and(|r| r.contains("circuit breaker")),
1778 "bad-data should have circuit-broken, got {:?}",
1779 bad_snap.termination_reason,
1780 );
1781
1782 let flaky_snap = sup.service_lifecycle("flaky-cns").await.unwrap();
1784 assert!(
1785 flaky_snap.phase.is_alive(),
1786 "flaky service should have recovered, was {}",
1787 flaky_snap.phase,
1788 );
1789 assert!(
1790 flaky_snap.start_count >= 3,
1791 "flaky service should have started ≥3 times, got {}",
1792 flaky_snap.start_count,
1793 );
1794
1795 sup.trigger_shutdown();
1797 sup.wait_for_drain().await;
1798
1799 for name in &["healthy-api", "bad-data", "flaky-cns"] {
1801 let snap = sup.service_lifecycle(name).await.unwrap();
1802 assert_eq!(
1803 snap.phase,
1804 ServicePhase::Terminated,
1805 "service '{}' should be Terminated after shutdown",
1806 name,
1807 );
1808 }
1809
1810 let metrics = sup.metrics().snapshot();
1811 assert_eq!(metrics.active_services, 0);
1812 assert_eq!(metrics.spawned_total, 3);
1813 assert_eq!(metrics.terminated_total, 3);
1814 }
1815}