Skip to main content

jflow_core/supervisor/
mod.rs

1//! Janus Supervisor — hierarchical service lifecycle management.
2//!
3//! Prometheus metrics are automatically published to the global
4//! [`JanusMetrics`](crate::metrics::JanusMetrics) registry whenever
5//! the supervisor records a spawn, restart, termination, or circuit
6//! breaker trip.  This means the `/metrics` endpoint exposes
7//! `janus_supervisor_*` counters/gauges with zero additional wiring.
8//!
9//! This module implements the **Janus Supervisor Model** described in the
10//! architecture refactor document. It replaces the old "fire and forget"
11//! `tokio::spawn` pattern with a structured supervision tree built on:
12//!
13//! - [`TaskTracker`] — tracks spawned tasks without accumulating results
14//!   (unlike `JoinSet`), preventing memory leaks in long-running processes.
15//! - [`CancellationToken`] — propagates graceful shutdown signals through
16//!   the service hierarchy.
17//!
18//! # Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────┐
22//! │                JanusSupervisor                   │
23//! │                                                  │
24//! │  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
25//! │  │  Data    │  │   CNS    │  │Execution │ ...   │
26//! │  │ Service  │  │ Service  │  │ Service  │      │
27//! │  └──────────┘  └──────────┘  └──────────┘      │
28//! │                                                  │
29//! │  TaskTracker  ◄── tracks all spawned tasks       │
30//! │  CancellationToken ◄── shutdown signal tree      │
31//! │  BackoffState[] ◄── per-service restart state    │
32//! │  ServiceLifecycle[] ◄── per-service state machine│
33//! └─────────────────────────────────────────────────┘
34//! ```
35//!
36//! # Usage
37//!
38//! ```rust,ignore
39//! use janus_core::supervisor::{JanusSupervisor, SupervisorConfig};
40//!
41//! let config = SupervisorConfig::default();
42//! let mut supervisor = JanusSupervisor::new(config);
43//!
44//! supervisor.spawn_service(Box::new(my_data_service));
45//! supervisor.spawn_service(Box::new(my_cns_service));
46//!
47//! // Blocks until Ctrl+C / SIGTERM, then orchestrates graceful shutdown
48//! supervisor.run_until_shutdown().await?;
49//! ```
50
51pub mod adapters;
52pub mod backoff;
53pub mod lifecycle;
54pub mod service;
55
56// Re-exports
57pub use adapters::{ApiModuleAdapter, ModuleAdapter};
58pub use backoff::{BackoffAction, BackoffConfig, BackoffState};
59pub use lifecycle::{
60    ServiceLifecycle, ServiceLifecycleSnapshot, ServicePhase, TerminationReason, TransitionError,
61};
62pub use service::{JanusService, RestartPolicy};
63
64use std::collections::HashMap;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::time::Duration;
68
69use tokio::sync::RwLock;
70use tokio_util::sync::CancellationToken;
71use tokio_util::task::TaskTracker;
72
73// ---------------------------------------------------------------------------
74// SupervisorConfig
75// ---------------------------------------------------------------------------
76
77/// Configuration for the [`JanusSupervisor`].
78#[derive(Debug, Clone)]
79pub struct SupervisorConfig {
80    /// Default backoff configuration applied to services that don't
81    /// provide their own. Individual services can override via the
82    /// per-service spawn options.
83    pub default_backoff: BackoffConfig,
84
85    /// Maximum time to wait for all services to drain during shutdown.
86    /// After this deadline the supervisor will log a warning and exit.
87    pub shutdown_timeout: Duration,
88
89    /// Whether to install a Ctrl+C / SIGTERM handler automatically.
90    /// Set to `false` if you want to manage signals externally and call
91    /// [`JanusSupervisor::trigger_shutdown`] manually.
92    pub install_signal_handler: bool,
93}
94
95impl Default for SupervisorConfig {
96    fn default() -> Self {
97        Self {
98            default_backoff: BackoffConfig::default(),
99            shutdown_timeout: Duration::from_secs(30),
100            install_signal_handler: true,
101        }
102    }
103}
104
105impl SupervisorConfig {
106    /// Builder: set the shutdown timeout.
107    pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
108        self.shutdown_timeout = timeout;
109        self
110    }
111
112    /// Builder: set the default backoff config.
113    pub fn with_default_backoff(mut self, backoff: BackoffConfig) -> Self {
114        self.default_backoff = backoff;
115        self
116    }
117
118    /// Builder: disable automatic signal handler installation.
119    pub fn without_signal_handler(mut self) -> Self {
120        self.install_signal_handler = false;
121        self
122    }
123}
124
125// ---------------------------------------------------------------------------
126// SupervisorMetrics
127// ---------------------------------------------------------------------------
128
129/// Atomic counters for supervisor-level Prometheus-compatible metrics.
130///
131/// These map directly to the metrics specified in the architecture doc:
132/// - `janus_supervisor_restarts_total`
133/// - `janus_supervisor_active_services`
134/// - `janus_supervisor_spawned_total`
135#[derive(Debug, Default)]
136pub struct SupervisorMetrics {
137    /// Total number of service restarts across all services.
138    pub restarts_total: AtomicU64,
139
140    /// Number of services currently in a non-terminal phase.
141    pub active_services: AtomicU64,
142
143    /// Total number of services ever spawned (including restarts).
144    pub spawned_total: AtomicU64,
145
146    /// Total number of services that have terminated.
147    pub terminated_total: AtomicU64,
148
149    /// Total number of circuit breaker trips.
150    pub circuit_breaker_trips: AtomicU64,
151}
152
153impl SupervisorMetrics {
154    fn new() -> Self {
155        Self::default()
156    }
157
158    fn record_spawn(&self) {
159        self.spawned_total.fetch_add(1, Ordering::Relaxed);
160        let new_active = self.active_services.fetch_add(1, Ordering::Relaxed) + 1;
161
162        // Push to global Prometheus registry.
163        // The atomics above are the single source of truth for
164        // `active_services` — we `set()` the Prometheus gauge from
165        // the authoritative atomic value instead of calling the
166        // independent `inc()` / `dec()` helpers on JanusMetrics,
167        // which avoids a TOCTOU divergence between the two stores.
168        let prom = crate::metrics::metrics();
169        prom.supervisor_spawned_total.inc();
170        prom.supervisor_active_services.set(new_active as f64);
171    }
172
173    fn record_restart(&self) {
174        self.restarts_total.fetch_add(1, Ordering::Relaxed);
175        crate::metrics::metrics().supervisor_restarts_total.inc();
176    }
177
178    fn record_termination(&self) {
179        self.terminated_total.fetch_add(1, Ordering::Relaxed);
180        // Saturating subtract to avoid underflow from race conditions.
181        // `fetch_update` returns the *previous* value on success.
182        let prev = self
183            .active_services
184            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
185                Some(v.saturating_sub(1))
186            })
187            .unwrap_or(0);
188        let new_active = prev.saturating_sub(1);
189
190        // Set the Prometheus gauge authoritatively from our atomic
191        // rather than using the independent get()+dec() path in
192        // JanusMetrics, which has a TOCTOU race on the gauge.
193        let prom = crate::metrics::metrics();
194        prom.supervisor_terminated_total.inc();
195        prom.supervisor_active_services.set(new_active as f64);
196    }
197
198    fn record_termination_with_uptime(&self, service_name: &str, uptime_secs: f64) {
199        self.record_termination();
200        crate::metrics::metrics()
201            .supervisor_uptime_seconds
202            .with_label_values(&[service_name])
203            .observe(uptime_secs);
204    }
205
206    fn record_circuit_breaker_trip(&self) {
207        self.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
208        crate::metrics::metrics()
209            .supervisor_circuit_breaker_trips
210            .inc();
211    }
212
213    /// Snapshot the current metric values.
214    pub fn snapshot(&self) -> MetricsSnapshot {
215        MetricsSnapshot {
216            restarts_total: self.restarts_total.load(Ordering::Relaxed),
217            active_services: self.active_services.load(Ordering::Relaxed),
218            spawned_total: self.spawned_total.load(Ordering::Relaxed),
219            terminated_total: self.terminated_total.load(Ordering::Relaxed),
220            circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
221        }
222    }
223}
224
225/// Plain-data snapshot of supervisor metrics.
226#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
227pub struct MetricsSnapshot {
228    pub restarts_total: u64,
229    pub active_services: u64,
230    pub spawned_total: u64,
231    pub terminated_total: u64,
232    pub circuit_breaker_trips: u64,
233}
234
235// ---------------------------------------------------------------------------
236// SpawnOptions
237// ---------------------------------------------------------------------------
238
239/// Per-service spawn configuration.
240#[derive(Debug, Clone, Default)]
241pub struct SpawnOptions {
242    /// Override the supervisor's default backoff config for this service.
243    pub backoff: Option<BackoffConfig>,
244}
245
246impl SpawnOptions {
247    /// Create options with a custom backoff config.
248    pub fn with_backoff(backoff: BackoffConfig) -> Self {
249        Self {
250            backoff: Some(backoff),
251        }
252    }
253}
254
255// ---------------------------------------------------------------------------
256// JanusSupervisor
257// ---------------------------------------------------------------------------
258
259/// The central supervisor for the Janus system.
260///
261/// Manages the lifecycle of all [`JanusService`] implementations using
262/// [`TaskTracker`] for structured concurrency and [`CancellationToken`]
263/// for graceful shutdown propagation.
264///
265/// # Memory Safety
266///
267/// Unlike `JoinSet`, `TaskTracker` does **not** accumulate task return
268/// values. Completed task memory is reclaimed immediately, making this
269/// safe for long-running processes that may restart services hundreds of
270/// times over weeks of operation.
271pub struct JanusSupervisor {
272    config: SupervisorConfig,
273    tracker: TaskTracker,
274    cancel_token: CancellationToken,
275    metrics: Arc<SupervisorMetrics>,
276    lifecycles: Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
277}
278
279impl JanusSupervisor {
280    /// Create a new supervisor with the given configuration.
281    pub fn new(config: SupervisorConfig) -> Self {
282        Self {
283            config,
284            tracker: TaskTracker::new(),
285            cancel_token: CancellationToken::new(),
286            metrics: Arc::new(SupervisorMetrics::new()),
287            lifecycles: Arc::new(RwLock::new(HashMap::new())),
288        }
289    }
290
291    /// Create a new supervisor with default configuration.
292    pub fn with_defaults() -> Self {
293        Self::new(SupervisorConfig::default())
294    }
295
296    /// Get a reference to the supervisor's cancellation token.
297    ///
298    /// Useful for external code that needs to observe or trigger shutdown.
299    pub fn cancel_token(&self) -> &CancellationToken {
300        &self.cancel_token
301    }
302
303    /// Get a reference to the supervisor's metrics.
304    pub fn metrics(&self) -> &Arc<SupervisorMetrics> {
305        &self.metrics
306    }
307
308    /// Get a snapshot of all service lifecycles.
309    pub async fn lifecycle_snapshots(&self) -> Vec<ServiceLifecycleSnapshot> {
310        let lifecycles = self.lifecycles.read().await;
311        lifecycles
312            .values()
313            .map(ServiceLifecycleSnapshot::from)
314            .collect()
315    }
316
317    /// Get the lifecycle snapshot for a specific service by name.
318    pub async fn service_lifecycle(&self, name: &str) -> Option<ServiceLifecycleSnapshot> {
319        let lifecycles = self.lifecycles.read().await;
320        lifecycles.get(name).map(ServiceLifecycleSnapshot::from)
321    }
322
323    /// Number of services currently tracked (alive + terminated).
324    pub async fn service_count(&self) -> usize {
325        self.lifecycles.read().await.len()
326    }
327
328    /// Trigger a graceful shutdown of all managed services.
329    ///
330    /// This cancels the root cancellation token, which propagates to all
331    /// child tokens held by running services. The supervisor's
332    /// `run_until_shutdown` (or `wait_for_drain`) will then wait for
333    /// tasks to complete up to the configured timeout.
334    #[tracing::instrument(skip(self))]
335    pub fn trigger_shutdown(&self) {
336        tracing::info!("Supervisor: shutdown triggered");
337        self.cancel_token.cancel();
338    }
339
340    /// Returns `true` if the supervisor's shutdown has been triggered.
341    pub fn is_shutting_down(&self) -> bool {
342        self.cancel_token.is_cancelled()
343    }
344
345    // ── Spawn ─────────────────────────────────────────────────────────
346
347    /// Spawn a service into the supervisor with default options.
348    ///
349    /// The service will be wrapped in a restart loop governed by its
350    /// [`RestartPolicy`] and the supervisor's default [`BackoffConfig`].
351    pub fn spawn_service(&self, service: Box<dyn JanusService>) {
352        self.spawn_service_with_options(service, SpawnOptions::default());
353    }
354
355    /// Spawn a service with custom per-service options.
356    #[tracing::instrument(skip(self, service, options), fields(service = %service.name(), policy = %service.restart_policy()))]
357    pub fn spawn_service_with_options(
358        &self,
359        service: Box<dyn JanusService>,
360        options: SpawnOptions,
361    ) {
362        let service_name = service.name().to_string();
363        let restart_policy = service.restart_policy();
364        let backoff_config = options
365            .backoff
366            .unwrap_or_else(|| self.config.default_backoff.clone());
367
368        let cancel = self.cancel_token.child_token();
369        let metrics = self.metrics.clone();
370        let lifecycles = self.lifecycles.clone();
371
372        metrics.record_spawn();
373
374        self.tracker.spawn(Self::service_loop(
375            service,
376            service_name,
377            restart_policy,
378            backoff_config,
379            cancel,
380            metrics,
381            lifecycles,
382        ));
383    }
384
385    /// The core restart loop for a single service.
386    ///
387    /// This is the heart of the supervisor — it:
388    /// 1. Runs the service
389    /// 2. Catches failures
390    /// 3. Applies the restart policy and backoff strategy
391    /// 4. Loops until cancelled or the circuit breaker trips
392    #[tracing::instrument(
393        skip_all,
394        fields(service = %service_name, policy = %restart_policy)
395    )]
396    async fn service_loop(
397        service: Box<dyn JanusService>,
398        service_name: String,
399        restart_policy: RestartPolicy,
400        backoff_config: BackoffConfig,
401        cancel: CancellationToken,
402        metrics: Arc<SupervisorMetrics>,
403        lifecycles: Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
404    ) {
405        let mut backoff = BackoffState::new(backoff_config);
406        let mut lifecycle = ServiceLifecycle::new(&service_name);
407
408        // Store the lifecycle
409        {
410            let mut lc_map = lifecycles.write().await;
411            lc_map.insert(service_name.clone(), lifecycle.clone());
412        }
413
414        loop {
415            // Check cancellation before each attempt
416            if cancel.is_cancelled() {
417                tracing::info!(service = %service_name, "cancellation detected, not starting service");
418                let _ = lifecycle.transition_to_stopping();
419                let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
420                Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
421                let uptime = lifecycle.cumulative_running_time().as_secs_f64();
422                metrics.record_termination_with_uptime(&service_name, uptime);
423                return;
424            }
425
426            // Transition to Running
427            if lifecycle.phase() == ServicePhase::Starting {
428                let _ = lifecycle.transition_to_running();
429            } else if lifecycle.phase() == ServicePhase::BackingOff {
430                // Restarting from backoff
431                let _ = lifecycle.transition_to_restarting();
432                let _ = lifecycle.transition_to_running();
433                metrics.record_restart();
434            }
435
436            backoff.record_start();
437            Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
438
439            tracing::info!(
440                service = %service_name,
441                attempt = lifecycle.start_count(),
442                "running service"
443            );
444
445            // Run the service.  The cancel token is passed into `run()` so
446            // cooperative services can detect shutdown and return promptly.
447            // We do NOT race `cancel.cancelled()` against `service.run()`
448            // here — doing so (especially with `biased`) would drop the
449            // service's future before it can perform cleanup.  The real
450            // safety net for non-responsive services is the shutdown
451            // timeout in `wait_for_drain`.
452            let result = service.run(cancel.clone()).await;
453
454            // If the service returned because the cancel token fired,
455            // treat it as a clean cancellation regardless of the result.
456            if cancel.is_cancelled() {
457                tracing::info!(service = %service_name, "service exited after cancellation");
458                let _ = lifecycle.transition_to_stopping();
459                let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
460                Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
461                let uptime = lifecycle.cumulative_running_time().as_secs_f64();
462                metrics.record_termination_with_uptime(&service_name, uptime);
463                return;
464            }
465
466            // Handle the result
467            match result {
468                Ok(()) => {
469                    tracing::info!(service = %service_name, "service exited cleanly");
470                    backoff.maybe_reset_on_cooldown();
471
472                    match restart_policy {
473                        RestartPolicy::Always => {
474                            // A clean exit means the service completed successfully,
475                            // so explicitly reset the backoff state. This prevents
476                            // stale attempt counts from prior error paths from
477                            // bleeding into subsequent clean-exit restart cycles.
478                            backoff.reset();
479
480                            // Always restart, even on clean exit
481                            tracing::info!(
482                                service = %service_name,
483                                "restart_policy=always, will restart after backoff"
484                            );
485                            // For clean exits with Always, we use a minimal delay
486                            // rather than the exponential backoff (which is for errors)
487                            let delay = Duration::from_millis(100);
488                            let _ = lifecycle
489                                .transition_to_backing_off("clean exit, policy=always", delay);
490                            Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
491
492                            tokio::select! {
493                                _ = cancel.cancelled() => {
494                                    let _ = lifecycle.transition_to_stopping();
495                                    let _ = lifecycle.transition_to_terminated(TerminationReason::Cancelled);
496                                    Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
497                                    let uptime = lifecycle.cumulative_running_time().as_secs_f64();
498                                    metrics.record_termination_with_uptime(&service_name, uptime);
499                                    return;
500                                }
501                                _ = tokio::time::sleep(delay) => {}
502                            }
503                            continue;
504                        }
505                        RestartPolicy::OnFailure | RestartPolicy::Never => {
506                            // Clean exit — service completed its work
507                            let _ =
508                                lifecycle.transition_to_terminated(TerminationReason::Completed);
509                            Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
510                            let uptime = lifecycle.cumulative_running_time().as_secs_f64();
511                            metrics.record_termination_with_uptime(&service_name, uptime);
512                            return;
513                        }
514                    }
515                }
516
517                Err(err) => {
518                    let error_msg = format!("{err:#}");
519                    tracing::error!(
520                        service = %service_name,
521                        error = %error_msg,
522                        "service failed"
523                    );
524
525                    backoff.maybe_reset_on_cooldown();
526
527                    match restart_policy {
528                        RestartPolicy::Never => {
529                            tracing::warn!(
530                                service = %service_name,
531                                "restart_policy=never, service will not be restarted"
532                            );
533                            let _ = lifecycle.transition_to_terminated(
534                                TerminationReason::Unrecoverable(error_msg),
535                            );
536                            Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
537                            let uptime = lifecycle.cumulative_running_time().as_secs_f64();
538                            metrics.record_termination_with_uptime(&service_name, uptime);
539                            return;
540                        }
541
542                        RestartPolicy::OnFailure | RestartPolicy::Always => {
543                            // Compute backoff
544                            match backoff.next_backoff() {
545                                BackoffAction::Retry(delay) => {
546                                    tracing::info!(
547                                        service = %service_name,
548                                        delay_ms = delay.as_millis() as u64,
549                                        attempt = backoff.attempt(),
550                                        "scheduling restart after backoff"
551                                    );
552
553                                    let _ = lifecycle.transition_to_backing_off(&error_msg, delay);
554                                    Self::update_lifecycle(&lifecycles, &service_name, &lifecycle)
555                                        .await;
556
557                                    // Sleep for the backoff duration, but respect cancellation
558                                    tokio::select! {
559                                        _ = cancel.cancelled() => {
560                                            tracing::info!(
561                                                service = %service_name,
562                                                "cancellation during backoff"
563                                            );
564                                            let _ = lifecycle.transition_to_stopping();
565                                            let _ = lifecycle.transition_to_terminated(
566                                                TerminationReason::Cancelled,
567                                            );
568                                            Self::update_lifecycle(&lifecycles, &service_name, &lifecycle).await;
569                                            let uptime = lifecycle.cumulative_running_time().as_secs_f64();
570                                            metrics.record_termination_with_uptime(&service_name, uptime);
571                                            return;
572                                        }
573                                        _ = tokio::time::sleep(delay) => {
574                                            // Backoff complete, loop will restart
575                                        }
576                                    }
577                                }
578
579                                BackoffAction::CircuitOpen {
580                                    failures,
581                                    max_retries,
582                                } => {
583                                    tracing::error!(
584                                        service = %service_name,
585                                        failures = failures,
586                                        max_retries = max_retries,
587                                        "CIRCUIT BREAKER OPEN — too many failures, giving up"
588                                    );
589                                    metrics.record_circuit_breaker_trip();
590
591                                    let _ = lifecycle.transition_to_terminated(
592                                        TerminationReason::CircuitBreakerOpen {
593                                            failures,
594                                            max_retries,
595                                        },
596                                    );
597                                    Self::update_lifecycle(&lifecycles, &service_name, &lifecycle)
598                                        .await;
599                                    let uptime = lifecycle.cumulative_running_time().as_secs_f64();
600                                    metrics.record_termination_with_uptime(&service_name, uptime);
601                                    return;
602                                }
603                            }
604                        }
605                    }
606                }
607            }
608        }
609    }
610
611    /// Update the lifecycle state in the shared map.
612    async fn update_lifecycle(
613        lifecycles: &Arc<RwLock<HashMap<String, ServiceLifecycle>>>,
614        name: &str,
615        lifecycle: &ServiceLifecycle,
616    ) {
617        let mut lc_map = lifecycles.write().await;
618        lc_map.insert(name.to_string(), lifecycle.clone());
619    }
620
621    // ── Shutdown coordination ─────────────────────────────────────────
622
623    /// Close the tracker and wait for all tasks to complete, with timeout.
624    ///
625    /// Call this after triggering shutdown (or after `run_until_shutdown`
626    /// returns) to ensure all tasks have drained.
627    #[tracing::instrument(skip(self), fields(timeout_secs = self.config.shutdown_timeout.as_secs()))]
628    pub async fn wait_for_drain(&self) {
629        self.tracker.close();
630
631        tracing::info!(
632            timeout_secs = self.config.shutdown_timeout.as_secs(),
633            "waiting for all services to drain"
634        );
635
636        match tokio::time::timeout(self.config.shutdown_timeout, self.tracker.wait()).await {
637            Ok(()) => {
638                tracing::info!("all services drained successfully");
639            }
640            Err(_) => {
641                tracing::warn!(
642                    timeout_secs = self.config.shutdown_timeout.as_secs(),
643                    "shutdown timeout exceeded, some services may not have exited cleanly"
644                );
645            }
646        }
647    }
648
649    /// Run the supervisor until a shutdown signal is received.
650    ///
651    /// If `install_signal_handler` is `true` in the config (the default),
652    /// this will listen for Ctrl+C / SIGTERM and trigger shutdown.
653    ///
654    /// Returns after all services have drained (or the shutdown timeout
655    /// has elapsed).
656    #[tracing::instrument(skip(self), fields(signal_handler = self.config.install_signal_handler))]
657    pub async fn run_until_shutdown(&self) -> anyhow::Result<()> {
658        if self.config.install_signal_handler {
659            self.wait_for_signal_and_shutdown().await?;
660        } else {
661            // Just wait for the token to be cancelled externally
662            self.cancel_token.cancelled().await;
663            tracing::info!("external shutdown signal received");
664        }
665
666        self.wait_for_drain().await;
667
668        // Log final metrics
669        let snap = self.metrics.snapshot();
670        tracing::info!(
671            restarts = snap.restarts_total,
672            spawned = snap.spawned_total,
673            terminated = snap.terminated_total,
674            circuit_trips = snap.circuit_breaker_trips,
675            "supervisor shutdown complete"
676        );
677
678        Ok(())
679    }
680
681    /// Wait for OS signals (Ctrl+C, SIGTERM) and trigger shutdown.
682    async fn wait_for_signal_and_shutdown(&self) -> anyhow::Result<()> {
683        #[cfg(unix)]
684        {
685            use tokio::signal::unix::{SignalKind, signal};
686
687            let mut sigterm = signal(SignalKind::terminate())?;
688            let mut sigint = signal(SignalKind::interrupt())?;
689
690            tokio::select! {
691                _ = sigterm.recv() => {
692                    tracing::info!("received SIGTERM");
693                }
694                _ = sigint.recv() => {
695                    tracing::info!("received SIGINT");
696                }
697                _ = self.cancel_token.cancelled() => {
698                    tracing::info!("shutdown triggered programmatically");
699                    return Ok(());
700                }
701            }
702        }
703
704        #[cfg(not(unix))]
705        {
706            tokio::select! {
707                result = tokio::signal::ctrl_c() => {
708                    result?;
709                    tracing::info!("received Ctrl+C");
710                }
711                _ = self.cancel_token.cancelled() => {
712                    tracing::info!("shutdown triggered programmatically");
713                    return Ok(());
714                }
715            }
716        }
717
718        self.cancel_token.cancel();
719        Ok(())
720    }
721}
722
723// ---------------------------------------------------------------------------
724// Tests
725// ---------------------------------------------------------------------------
726
727#[cfg(test)]
728mod tests {
729    use super::*;
730
731    // ── Helpers ───────────────────────────────────────────────────────
732
733    /// A simple service that counts how many times it ran and exits
734    /// cleanly on cancellation.
735    struct CountingService {
736        name: String,
737        policy: RestartPolicy,
738        run_count: Arc<AtomicU64>,
739    }
740
741    impl CountingService {
742        fn new(name: &str, policy: RestartPolicy) -> (Self, Arc<AtomicU64>) {
743            let count = Arc::new(AtomicU64::new(0));
744            (
745                Self {
746                    name: name.to_string(),
747                    policy,
748                    run_count: count.clone(),
749                },
750                count,
751            )
752        }
753    }
754
755    #[async_trait::async_trait]
756    impl JanusService for CountingService {
757        fn name(&self) -> &str {
758            &self.name
759        }
760
761        fn restart_policy(&self) -> RestartPolicy {
762            self.policy
763        }
764
765        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
766            self.run_count.fetch_add(1, Ordering::SeqCst);
767            cancel.cancelled().await;
768            Ok(())
769        }
770    }
771
772    /// A service that fails N times before succeeding.
773    struct FailNTimes {
774        name: String,
775        fail_count: u32,
776        current: Arc<AtomicU64>,
777    }
778
779    impl FailNTimes {
780        fn new(name: &str, fail_count: u32) -> (Self, Arc<AtomicU64>) {
781            let current = Arc::new(AtomicU64::new(0));
782            (
783                Self {
784                    name: name.to_string(),
785                    fail_count,
786                    current: current.clone(),
787                },
788                current,
789            )
790        }
791    }
792
793    #[async_trait::async_trait]
794    impl JanusService for FailNTimes {
795        fn name(&self) -> &str {
796            &self.name
797        }
798
799        fn restart_policy(&self) -> RestartPolicy {
800            RestartPolicy::OnFailure
801        }
802
803        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
804            let attempt = self.current.fetch_add(1, Ordering::SeqCst) as u32;
805            if attempt < self.fail_count {
806                // Yield briefly so tests aren't instant-looping
807                tokio::time::sleep(Duration::from_millis(1)).await;
808                anyhow::bail!("simulated failure #{}", attempt + 1);
809            }
810            // After N failures, run until cancelled
811            cancel.cancelled().await;
812            Ok(())
813        }
814    }
815
816    /// A service that immediately returns Ok (one-shot task).
817    struct OneShotService {
818        name: String,
819        ran: Arc<AtomicU64>,
820    }
821
822    impl OneShotService {
823        fn new(name: &str) -> (Self, Arc<AtomicU64>) {
824            let ran = Arc::new(AtomicU64::new(0));
825            (
826                Self {
827                    name: name.to_string(),
828                    ran: ran.clone(),
829                },
830                ran,
831            )
832        }
833    }
834
835    #[async_trait::async_trait]
836    impl JanusService for OneShotService {
837        fn name(&self) -> &str {
838            &self.name
839        }
840
841        fn restart_policy(&self) -> RestartPolicy {
842            RestartPolicy::Never
843        }
844
845        async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
846            self.ran.fetch_add(1, Ordering::SeqCst);
847            Ok(())
848        }
849    }
850
851    /// A service that always fails (for circuit breaker testing).
852    struct AlwaysFailService {
853        name: String,
854        attempts: Arc<AtomicU64>,
855    }
856
857    impl AlwaysFailService {
858        fn new(name: &str) -> (Self, Arc<AtomicU64>) {
859            let attempts = Arc::new(AtomicU64::new(0));
860            (
861                Self {
862                    name: name.to_string(),
863                    attempts: attempts.clone(),
864                },
865                attempts,
866            )
867        }
868    }
869
870    #[async_trait::async_trait]
871    impl JanusService for AlwaysFailService {
872        fn name(&self) -> &str {
873            &self.name
874        }
875
876        fn restart_policy(&self) -> RestartPolicy {
877            RestartPolicy::OnFailure
878        }
879
880        async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
881            self.attempts.fetch_add(1, Ordering::SeqCst);
882            tokio::time::sleep(Duration::from_millis(1)).await;
883            anyhow::bail!("permanent failure");
884        }
885    }
886
887    // ── Tests ─────────────────────────────────────────────────────────
888
889    #[tokio::test]
890    async fn test_supervisor_creation() {
891        let sup = JanusSupervisor::with_defaults();
892        assert!(!sup.is_shutting_down());
893        assert_eq!(sup.service_count().await, 0);
894    }
895
896    #[tokio::test]
897    async fn test_spawn_and_cancel_single_service() {
898        let config = SupervisorConfig::default().without_signal_handler();
899        let sup = JanusSupervisor::new(config);
900
901        let (svc, count) = CountingService::new("test-svc", RestartPolicy::OnFailure);
902        sup.spawn_service(Box::new(svc));
903
904        // Give the service time to start
905        tokio::time::sleep(Duration::from_millis(50)).await;
906
907        assert_eq!(count.load(Ordering::SeqCst), 1);
908        assert_eq!(sup.metrics().active_services.load(Ordering::Relaxed), 1);
909
910        // Trigger shutdown
911        sup.trigger_shutdown();
912        sup.wait_for_drain().await;
913
914        assert_eq!(count.load(Ordering::SeqCst), 1);
915        let snap = sup.metrics().snapshot();
916        assert_eq!(snap.spawned_total, 1);
917        assert_eq!(snap.terminated_total, 1);
918        assert_eq!(snap.active_services, 0);
919    }
920
921    #[tokio::test]
922    async fn test_spawn_multiple_services() {
923        let config = SupervisorConfig::default().without_signal_handler();
924        let sup = JanusSupervisor::new(config);
925
926        let (svc1, count1) = CountingService::new("svc-1", RestartPolicy::OnFailure);
927        let (svc2, count2) = CountingService::new("svc-2", RestartPolicy::OnFailure);
928        let (svc3, count3) = CountingService::new("svc-3", RestartPolicy::OnFailure);
929
930        sup.spawn_service(Box::new(svc1));
931        sup.spawn_service(Box::new(svc2));
932        sup.spawn_service(Box::new(svc3));
933
934        tokio::time::sleep(Duration::from_millis(50)).await;
935
936        assert_eq!(count1.load(Ordering::SeqCst), 1);
937        assert_eq!(count2.load(Ordering::SeqCst), 1);
938        assert_eq!(count3.load(Ordering::SeqCst), 1);
939
940        sup.trigger_shutdown();
941        sup.wait_for_drain().await;
942
943        let snap = sup.metrics().snapshot();
944        assert_eq!(snap.spawned_total, 3);
945        assert_eq!(snap.terminated_total, 3);
946    }
947
948    #[tokio::test]
949    async fn test_service_restart_on_failure() {
950        let config = SupervisorConfig::default()
951            .without_signal_handler()
952            .with_default_backoff(
953                BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
954                    .without_circuit_breaker(),
955            );
956
957        let sup = JanusSupervisor::new(config);
958
959        // Service fails 3 times, then runs cleanly
960        let (svc, attempts) = FailNTimes::new("fail-3", 3);
961        sup.spawn_service(Box::new(svc));
962
963        // Give enough time for 3 failures + backoffs + stable run
964        tokio::time::sleep(Duration::from_millis(500)).await;
965
966        // Should have attempted at least 4 times (3 failures + 1 success)
967        assert!(
968            attempts.load(Ordering::SeqCst) >= 4,
969            "expected >= 4 attempts, got {}",
970            attempts.load(Ordering::SeqCst)
971        );
972
973        sup.trigger_shutdown();
974        sup.wait_for_drain().await;
975
976        let snap = sup.metrics().snapshot();
977        assert!(snap.restarts_total >= 3);
978    }
979
980    #[tokio::test]
981    async fn test_one_shot_service_no_restart() {
982        let config = SupervisorConfig::default().without_signal_handler();
983        let sup = JanusSupervisor::new(config);
984
985        let (svc, ran) = OneShotService::new("one-shot");
986        sup.spawn_service(Box::new(svc));
987
988        // Wait for it to complete
989        tokio::time::sleep(Duration::from_millis(100)).await;
990
991        // Should have run exactly once
992        assert_eq!(ran.load(Ordering::SeqCst), 1);
993
994        // And terminated
995        let snap = sup.metrics().snapshot();
996        assert_eq!(snap.terminated_total, 1);
997        assert_eq!(snap.restarts_total, 0);
998
999        sup.trigger_shutdown();
1000        sup.wait_for_drain().await;
1001    }
1002
1003    #[tokio::test]
1004    async fn test_restart_policy_never_on_failure() {
1005        let config = SupervisorConfig::default().without_signal_handler();
1006        let sup = JanusSupervisor::new(config);
1007
1008        // A service with Never policy that fails
1009        struct FailOnce {
1010            ran: Arc<AtomicU64>,
1011        }
1012
1013        #[async_trait::async_trait]
1014        impl JanusService for FailOnce {
1015            fn name(&self) -> &str {
1016                "fail-once-never"
1017            }
1018            fn restart_policy(&self) -> RestartPolicy {
1019                RestartPolicy::Never
1020            }
1021            async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1022                self.ran.fetch_add(1, Ordering::SeqCst);
1023                anyhow::bail!("intentional failure");
1024            }
1025        }
1026
1027        let ran = Arc::new(AtomicU64::new(0));
1028        let svc = FailOnce { ran: ran.clone() };
1029        sup.spawn_service(Box::new(svc));
1030
1031        tokio::time::sleep(Duration::from_millis(100)).await;
1032
1033        // Should have run exactly once (no restart)
1034        assert_eq!(ran.load(Ordering::SeqCst), 1);
1035
1036        let snap = sup.metrics().snapshot();
1037        assert_eq!(snap.terminated_total, 1);
1038        assert_eq!(snap.restarts_total, 0);
1039
1040        sup.trigger_shutdown();
1041        sup.wait_for_drain().await;
1042    }
1043
1044    #[tokio::test]
1045    async fn test_circuit_breaker_trips() {
1046        let config = SupervisorConfig::default()
1047            .without_signal_handler()
1048            .with_default_backoff(
1049                BackoffConfig::new(Duration::from_millis(5), Duration::from_millis(20))
1050                    .with_circuit_breaker(3, Duration::from_secs(60)),
1051            );
1052
1053        let sup = JanusSupervisor::new(config);
1054
1055        let (svc, attempts) = AlwaysFailService::new("always-fail");
1056        sup.spawn_service(Box::new(svc));
1057
1058        // Wait for circuit breaker to trip
1059        tokio::time::sleep(Duration::from_millis(500)).await;
1060
1061        // Circuit breaker should have tripped at 3 failures
1062        let att = attempts.load(Ordering::SeqCst);
1063        assert!(att >= 3, "expected at least 3 attempts, got {}", att);
1064
1065        let snap = sup.metrics().snapshot();
1066        assert_eq!(snap.circuit_breaker_trips, 1);
1067        assert_eq!(snap.terminated_total, 1);
1068
1069        sup.trigger_shutdown();
1070        sup.wait_for_drain().await;
1071    }
1072
1073    #[tokio::test]
1074    async fn test_lifecycle_snapshots() {
1075        let config = SupervisorConfig::default().without_signal_handler();
1076        let sup = JanusSupervisor::new(config);
1077
1078        let (svc, _) = CountingService::new("lifecycle-test", RestartPolicy::OnFailure);
1079        sup.spawn_service(Box::new(svc));
1080
1081        tokio::time::sleep(Duration::from_millis(50)).await;
1082
1083        let snapshots = sup.lifecycle_snapshots().await;
1084        assert_eq!(snapshots.len(), 1);
1085
1086        let snap = &snapshots[0];
1087        assert_eq!(snap.service_name, "lifecycle-test");
1088        assert_eq!(snap.phase, ServicePhase::Running);
1089        assert_eq!(snap.start_count, 1);
1090        assert_eq!(snap.total_failures, 0);
1091
1092        sup.trigger_shutdown();
1093        sup.wait_for_drain().await;
1094
1095        // After shutdown, should be terminated
1096        let snapshots = sup.lifecycle_snapshots().await;
1097        let snap = &snapshots[0];
1098        assert_eq!(snap.phase, ServicePhase::Terminated);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_service_lifecycle_by_name() {
1103        let config = SupervisorConfig::default().without_signal_handler();
1104        let sup = JanusSupervisor::new(config);
1105
1106        let (svc, _) = CountingService::new("named-svc", RestartPolicy::OnFailure);
1107        sup.spawn_service(Box::new(svc));
1108
1109        tokio::time::sleep(Duration::from_millis(50)).await;
1110
1111        let snap = sup.service_lifecycle("named-svc").await;
1112        assert!(snap.is_some());
1113        assert_eq!(snap.unwrap().service_name, "named-svc");
1114
1115        let missing = sup.service_lifecycle("nonexistent").await;
1116        assert!(missing.is_none());
1117
1118        sup.trigger_shutdown();
1119        sup.wait_for_drain().await;
1120    }
1121
1122    #[tokio::test]
1123    async fn test_metrics_snapshot() {
1124        let sup = JanusSupervisor::with_defaults();
1125        let snap = sup.metrics().snapshot();
1126
1127        assert_eq!(snap.restarts_total, 0);
1128        assert_eq!(snap.active_services, 0);
1129        assert_eq!(snap.spawned_total, 0);
1130        assert_eq!(snap.terminated_total, 0);
1131        assert_eq!(snap.circuit_breaker_trips, 0);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_run_until_shutdown_with_external_cancel() {
1136        let config = SupervisorConfig::default().without_signal_handler();
1137        let sup = JanusSupervisor::new(config);
1138
1139        let (svc, count) = CountingService::new("ext-cancel", RestartPolicy::OnFailure);
1140        sup.spawn_service(Box::new(svc));
1141
1142        let cancel = sup.cancel_token().clone();
1143
1144        // Spawn the run loop
1145        let handle = tokio::spawn({
1146            let sup_ref_metrics = sup.metrics().clone();
1147            async move {
1148                // We can't move the supervisor into a spawn because it's not Send
1149                // in all cases, so just wait for the cancel token
1150                cancel.cancelled().await;
1151                sup_ref_metrics.snapshot()
1152            }
1153        });
1154
1155        tokio::time::sleep(Duration::from_millis(50)).await;
1156        assert_eq!(count.load(Ordering::SeqCst), 1);
1157
1158        sup.trigger_shutdown();
1159        sup.wait_for_drain().await;
1160
1161        let snap = handle.await.unwrap();
1162        assert_eq!(snap.spawned_total, 1);
1163    }
1164
1165    #[tokio::test]
1166    async fn test_shutdown_timeout() {
1167        let config = SupervisorConfig::default()
1168            .without_signal_handler()
1169            .with_shutdown_timeout(Duration::from_millis(100));
1170
1171        let sup = JanusSupervisor::new(config);
1172
1173        // Spawn a service that ignores cancellation (badly behaved)
1174        struct HangingService;
1175
1176        #[async_trait::async_trait]
1177        impl JanusService for HangingService {
1178            fn name(&self) -> &str {
1179                "hanger"
1180            }
1181            async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1182                // Intentionally ignores cancellation — simulates a stuck service.
1183                // Sleep for a very long time.
1184                tokio::time::sleep(Duration::from_secs(3600)).await;
1185                Ok(())
1186            }
1187        }
1188
1189        sup.spawn_service(Box::new(HangingService));
1190        tokio::time::sleep(Duration::from_millis(20)).await;
1191
1192        sup.trigger_shutdown();
1193
1194        // This should complete within ~100ms (the timeout), not 3600s
1195        let start = std::time::Instant::now();
1196        sup.wait_for_drain().await;
1197        let elapsed = start.elapsed();
1198
1199        // Allow some slack but should be well under 1 second
1200        assert!(
1201            elapsed < Duration::from_secs(1),
1202            "drain took too long: {:?}",
1203            elapsed
1204        );
1205    }
1206
1207    #[tokio::test]
1208    async fn test_spawn_with_custom_backoff() {
1209        let config = SupervisorConfig::default().without_signal_handler();
1210        let sup = JanusSupervisor::new(config);
1211
1212        let (svc, attempts) = AlwaysFailService::new("custom-backoff");
1213
1214        let custom_backoff =
1215            BackoffConfig::new(Duration::from_millis(5), Duration::from_millis(10))
1216                .with_circuit_breaker(2, Duration::from_secs(60));
1217
1218        sup.spawn_service_with_options(Box::new(svc), SpawnOptions::with_backoff(custom_backoff));
1219
1220        tokio::time::sleep(Duration::from_millis(200)).await;
1221
1222        // Circuit breaker with max_retries=2 should trip quickly
1223        assert!(attempts.load(Ordering::SeqCst) >= 2);
1224
1225        let snap = sup.metrics().snapshot();
1226        assert_eq!(snap.circuit_breaker_trips, 1);
1227
1228        sup.trigger_shutdown();
1229        sup.wait_for_drain().await;
1230    }
1231
1232    #[tokio::test]
1233    async fn test_restart_policy_always_on_clean_exit() {
1234        let config = SupervisorConfig::default()
1235            .without_signal_handler()
1236            .with_default_backoff(
1237                BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
1238                    .without_circuit_breaker(),
1239            );
1240
1241        let sup = JanusSupervisor::new(config);
1242
1243        /// A service that exits Ok immediately each time (policy: Always)
1244        struct ExitImmediately {
1245            count: Arc<AtomicU64>,
1246        }
1247
1248        #[async_trait::async_trait]
1249        impl JanusService for ExitImmediately {
1250            fn name(&self) -> &str {
1251                "exit-immediately"
1252            }
1253            fn restart_policy(&self) -> RestartPolicy {
1254                RestartPolicy::Always
1255            }
1256            async fn run(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1257                self.count.fetch_add(1, Ordering::SeqCst);
1258                tokio::time::sleep(Duration::from_millis(1)).await;
1259                Ok(())
1260            }
1261        }
1262
1263        let count = Arc::new(AtomicU64::new(0));
1264        let svc = ExitImmediately {
1265            count: count.clone(),
1266        };
1267
1268        sup.spawn_service(Box::new(svc));
1269
1270        // Let it restart a few times
1271        tokio::time::sleep(Duration::from_millis(500)).await;
1272
1273        let runs = count.load(Ordering::SeqCst);
1274        assert!(
1275            runs >= 2,
1276            "expected service to run multiple times with Always policy, got {}",
1277            runs
1278        );
1279
1280        sup.trigger_shutdown();
1281        sup.wait_for_drain().await;
1282    }
1283
1284    #[tokio::test]
1285    async fn test_is_shutting_down() {
1286        let sup = JanusSupervisor::with_defaults();
1287        assert!(!sup.is_shutting_down());
1288
1289        sup.trigger_shutdown();
1290        assert!(sup.is_shutting_down());
1291    }
1292
1293    #[tokio::test]
1294    async fn test_config_builder() {
1295        let config = SupervisorConfig::default()
1296            .with_shutdown_timeout(Duration::from_secs(10))
1297            .with_default_backoff(BackoffConfig::new(
1298                Duration::from_millis(200),
1299                Duration::from_secs(30),
1300            ))
1301            .without_signal_handler();
1302
1303        assert_eq!(config.shutdown_timeout, Duration::from_secs(10));
1304        assert!(!config.install_signal_handler);
1305        assert_eq!(
1306            config.default_backoff.base_delay,
1307            Duration::from_millis(200)
1308        );
1309        assert_eq!(config.default_backoff.max_delay, Duration::from_secs(30));
1310    }
1311
1312    // =====================================================================
1313    // Integration Tests — Graceful Shutdown E2E
1314    // =====================================================================
1315
1316    /// A service that logs lifecycle events into a shared Vec so the test
1317    /// can verify the exact sequence of operations.
1318    struct LifecycleTracer {
1319        name: String,
1320        log: Arc<tokio::sync::Mutex<Vec<String>>>,
1321        policy: RestartPolicy,
1322    }
1323
1324    impl LifecycleTracer {
1325        fn new(
1326            name: &str,
1327            log: Arc<tokio::sync::Mutex<Vec<String>>>,
1328            policy: RestartPolicy,
1329        ) -> Self {
1330            Self {
1331                name: name.to_string(),
1332                log,
1333                policy,
1334            }
1335        }
1336    }
1337
1338    #[async_trait::async_trait]
1339    impl JanusService for LifecycleTracer {
1340        fn name(&self) -> &str {
1341            &self.name
1342        }
1343
1344        fn restart_policy(&self) -> RestartPolicy {
1345            self.policy
1346        }
1347
1348        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
1349            {
1350                let mut l = self.log.lock().await;
1351                l.push(format!("{}:started", self.name));
1352            }
1353            cancel.cancelled().await;
1354            {
1355                let mut l = self.log.lock().await;
1356                l.push(format!("{}:stopped", self.name));
1357            }
1358            Ok(())
1359        }
1360    }
1361
1362    /// **Integration Test — Graceful Shutdown E2E**
1363    ///
1364    /// Verifies:
1365    ///   1. Multiple services start and register in the supervisor.
1366    ///   2. Programmatic shutdown cancels all services.
1367    ///   3. All services terminate with `Cancelled` reason.
1368    ///   4. Supervisor drains within the timeout.
1369    ///   5. Final metrics are consistent.
1370    #[tokio::test]
1371    async fn test_integration_graceful_shutdown_e2e() {
1372        let log = Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
1373
1374        let config = SupervisorConfig::default()
1375            .with_shutdown_timeout(Duration::from_secs(5))
1376            .without_signal_handler();
1377
1378        let sup = JanusSupervisor::new(config);
1379
1380        // Spawn three traced services with different policies
1381        sup.spawn_service(Box::new(LifecycleTracer::new(
1382            "api",
1383            log.clone(),
1384            RestartPolicy::Always,
1385        )));
1386        sup.spawn_service(Box::new(LifecycleTracer::new(
1387            "data",
1388            log.clone(),
1389            RestartPolicy::OnFailure,
1390        )));
1391        sup.spawn_service(Box::new(LifecycleTracer::new(
1392            "cns",
1393            log.clone(),
1394            RestartPolicy::OnFailure,
1395        )));
1396
1397        // Give services time to start
1398        tokio::time::sleep(Duration::from_millis(100)).await;
1399
1400        assert_eq!(sup.service_count().await, 3);
1401        assert!(!sup.is_shutting_down());
1402
1403        // Trigger graceful shutdown
1404        sup.trigger_shutdown();
1405        assert!(sup.is_shutting_down());
1406
1407        // Wait for all tasks to drain
1408        sup.wait_for_drain().await;
1409
1410        // Verify lifecycle events — each service should have started & stopped
1411        let events = log.lock().await;
1412        for svc in &["api", "data", "cns"] {
1413            assert!(
1414                events.contains(&format!("{}:started", svc)),
1415                "service '{}' never started; events: {:?}",
1416                svc,
1417                *events,
1418            );
1419            assert!(
1420                events.contains(&format!("{}:stopped", svc)),
1421                "service '{}' never stopped; events: {:?}",
1422                svc,
1423                *events,
1424            );
1425        }
1426
1427        // Verify all services terminated
1428        let snapshots = sup.lifecycle_snapshots().await;
1429        assert_eq!(snapshots.len(), 3);
1430        for snap in &snapshots {
1431            assert_eq!(
1432                snap.phase,
1433                ServicePhase::Terminated,
1434                "service '{}' should be Terminated, was {}",
1435                snap.service_name,
1436                snap.phase,
1437            );
1438            assert_eq!(
1439                snap.termination_reason.as_deref(),
1440                Some("cancelled"),
1441                "service '{}' should have been cancelled, got {:?}",
1442                snap.service_name,
1443                snap.termination_reason,
1444            );
1445            assert!(snap.start_count >= 1);
1446        }
1447
1448        // Verify final metrics
1449        let metrics = sup.metrics().snapshot();
1450        assert_eq!(metrics.spawned_total, 3);
1451        assert_eq!(metrics.terminated_total, 3);
1452        assert_eq!(metrics.active_services, 0);
1453    }
1454
1455    // =====================================================================
1456    // Chaos Tests — Backoff & Circuit Breaker Validation
1457    // =====================================================================
1458
1459    /// A configurable chaos service that:
1460    /// - Fails `fail_times` before succeeding (or keeps failing forever).
1461    /// - Records each attempt timestamp for backoff analysis.
1462    /// - On success, waits for cancellation.
1463    struct ChaosService {
1464        name: String,
1465        fail_times: u32,
1466        current: Arc<std::sync::atomic::AtomicU32>,
1467        attempt_times: Arc<tokio::sync::Mutex<Vec<std::time::Instant>>>,
1468        policy: RestartPolicy,
1469    }
1470
1471    impl ChaosService {
1472        fn new(name: &str, fail_times: u32, policy: RestartPolicy) -> Self {
1473            Self {
1474                name: name.to_string(),
1475                fail_times,
1476                current: Arc::new(std::sync::atomic::AtomicU32::new(0)),
1477                attempt_times: Arc::new(tokio::sync::Mutex::new(Vec::new())),
1478                policy,
1479            }
1480        }
1481    }
1482
1483    #[async_trait::async_trait]
1484    impl JanusService for ChaosService {
1485        fn name(&self) -> &str {
1486            &self.name
1487        }
1488
1489        fn restart_policy(&self) -> RestartPolicy {
1490            self.policy
1491        }
1492
1493        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
1494            {
1495                let mut ts = self.attempt_times.lock().await;
1496                ts.push(std::time::Instant::now());
1497            }
1498
1499            let n = self
1500                .current
1501                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1502
1503            if n < self.fail_times {
1504                anyhow::bail!("chaos failure #{}", n + 1);
1505            }
1506
1507            // Success — wait for cancellation
1508            cancel.cancelled().await;
1509            Ok(())
1510        }
1511    }
1512
1513    /// **Chaos Test — Exponential Backoff Verification**
1514    ///
1515    /// Spawns a service that fails 3 times then succeeds.
1516    /// Verifies that:
1517    ///   1. The service is restarted automatically.
1518    ///   2. Delays between attempts grow (exponential backoff).
1519    ///   3. The service stabilises after recovery.
1520    ///   4. Metrics count 3 restarts.
1521    #[tokio::test]
1522    async fn test_chaos_exponential_backoff() {
1523        let backoff = BackoffConfig::new(
1524            Duration::from_millis(20), // tiny base for fast test
1525            Duration::from_secs(2),    // cap
1526        )
1527        .without_circuit_breaker(); // no circuit breaker — let it retry
1528
1529        let config = SupervisorConfig::default()
1530            .with_shutdown_timeout(Duration::from_secs(5))
1531            .with_default_backoff(backoff)
1532            .without_signal_handler();
1533
1534        let sup = JanusSupervisor::new(config);
1535
1536        let chaos = ChaosService::new("chaos-backoff", 3, RestartPolicy::OnFailure);
1537        let attempts_arc = chaos.attempt_times.clone();
1538        let current_arc = chaos.current.clone();
1539
1540        sup.spawn_service(Box::new(chaos));
1541
1542        // Wait until the service has succeeded (attempt 4+) or timeout
1543        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1544        loop {
1545            let count = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1546            if count >= 4 {
1547                break; // 3 failures + 1 success
1548            }
1549            if tokio::time::Instant::now() > deadline {
1550                panic!("chaos service did not recover in time; attempts={}", count,);
1551            }
1552            tokio::time::sleep(Duration::from_millis(50)).await;
1553        }
1554
1555        // Give it a moment to stabilise
1556        tokio::time::sleep(Duration::from_millis(100)).await;
1557
1558        // Verify exponential backoff: successive delays should grow
1559        let timestamps = attempts_arc.lock().await;
1560        assert!(
1561            timestamps.len() >= 4,
1562            "expected ≥4 attempts, got {}",
1563            timestamps.len(),
1564        );
1565
1566        // Check that delay between attempts 2→3 >= delay between 1→2
1567        // (accounting for jitter, we just check monotonic growth trend)
1568        let delays: Vec<Duration> = timestamps
1569            .windows(2)
1570            .map(|w| w[1].duration_since(w[0]))
1571            .collect();
1572
1573        // Skip delays[0] — that's the gap between the initial spawn and the
1574        // first restart.  On a busy CI runner the service can fail and be
1575        // re-spawned faster than the base backoff because the first attempt
1576        // has no preceding failure to back off from.
1577        //
1578        // Starting from delays[1] onward we're measuring actual backoff
1579        // intervals (after failure 2, 3, …).  We use a 1 ms floor instead
1580        // of 5 ms to tolerate scheduler jitter on overloaded runners while
1581        // still catching "no backoff at all" regressions.
1582        for (i, d) in delays.iter().enumerate().skip(1) {
1583            assert!(
1584                *d >= Duration::from_millis(1),
1585                "delay[{}] too short: {:?} — backoff may not be working",
1586                i,
1587                d,
1588            );
1589        }
1590
1591        // Verify restart metrics
1592        let metrics = sup.metrics().snapshot();
1593        assert!(
1594            metrics.restarts_total >= 3,
1595            "expected ≥3 restarts, got {}",
1596            metrics.restarts_total,
1597        );
1598
1599        // Shutdown cleanly
1600        sup.trigger_shutdown();
1601        sup.wait_for_drain().await;
1602
1603        // Service should be terminated
1604        let snap = sup.service_lifecycle("chaos-backoff").await;
1605        assert!(snap.is_some());
1606        let snap = snap.unwrap();
1607        assert_eq!(snap.phase, ServicePhase::Terminated);
1608    }
1609
1610    /// **Chaos Test — Circuit Breaker Trips**
1611    ///
1612    /// Spawns a service that always fails with a tight circuit breaker
1613    /// (max 3 retries). Verifies:
1614    ///   1. The circuit breaker opens after 3 failures.
1615    ///   2. The service terminates with `CircuitBreakerOpen` reason.
1616    ///   3. No further restarts occur after the trip.
1617    ///   4. Metrics record exactly 1 circuit breaker trip.
1618    #[tokio::test]
1619    async fn test_chaos_circuit_breaker_trips() {
1620        let backoff = BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(50))
1621            .with_circuit_breaker(3, Duration::from_secs(60));
1622
1623        let config = SupervisorConfig::default()
1624            .with_shutdown_timeout(Duration::from_secs(5))
1625            .with_default_backoff(backoff)
1626            .without_signal_handler();
1627
1628        let sup = JanusSupervisor::new(config);
1629
1630        // Service that always fails (fail_times = u32::MAX effectively)
1631        let chaos = ChaosService::new("chaos-cb", 1000, RestartPolicy::OnFailure);
1632        let current_arc = chaos.current.clone();
1633
1634        sup.spawn_service(Box::new(chaos));
1635
1636        // Wait until the circuit breaker trips (service terminates)
1637        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1638        loop {
1639            if let Some(snap) = sup.service_lifecycle("chaos-cb").await
1640                && snap.phase == ServicePhase::Terminated
1641            {
1642                break;
1643            }
1644            if tokio::time::Instant::now() > deadline {
1645                panic!(
1646                    "circuit breaker did not trip in time; attempts={}",
1647                    current_arc.load(std::sync::atomic::Ordering::SeqCst),
1648                );
1649            }
1650            tokio::time::sleep(Duration::from_millis(50)).await;
1651        }
1652
1653        let snap = sup.service_lifecycle("chaos-cb").await.unwrap();
1654        assert_eq!(snap.phase, ServicePhase::Terminated);
1655
1656        // Verify termination reason is CircuitBreakerOpen
1657        let reason = snap
1658            .termination_reason
1659            .as_deref()
1660            .expect("expected a termination reason");
1661        assert!(
1662            reason.contains("circuit breaker"),
1663            "expected circuit breaker termination, got: {}",
1664            reason,
1665        );
1666
1667        // Verify metrics
1668        let metrics = sup.metrics().snapshot();
1669        assert!(
1670            metrics.circuit_breaker_trips >= 1,
1671            "expected ≥1 circuit breaker trip, got {}",
1672            metrics.circuit_breaker_trips,
1673        );
1674
1675        // Record the attempt count at trip time, wait, and verify no
1676        // further attempts occurred (no restarts after CB open).
1677        let attempts_at_trip = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1678        tokio::time::sleep(Duration::from_millis(200)).await;
1679        let attempts_after = current_arc.load(std::sync::atomic::Ordering::SeqCst);
1680        assert_eq!(
1681            attempts_at_trip, attempts_after,
1682            "service should NOT restart after circuit breaker trips",
1683        );
1684
1685        sup.trigger_shutdown();
1686        sup.wait_for_drain().await;
1687    }
1688
1689    /// **Chaos Test — Mixed Fleet (healthy + failing services)**
1690    ///
1691    /// Spawns a mix of healthy and failing services and verifies that
1692    /// the supervisor handles the fleet correctly:
1693    ///   - Healthy services stay running and respond to shutdown.
1694    ///   - Failing services trigger backoff and eventually circuit-break.
1695    ///   - Shutdown cleanly drains all services regardless of state.
1696    #[tokio::test]
1697    async fn test_chaos_mixed_fleet() {
1698        let backoff = BackoffConfig::new(Duration::from_millis(10), Duration::from_millis(100))
1699            .with_circuit_breaker(3, Duration::from_secs(60));
1700
1701        let config = SupervisorConfig::default()
1702            .with_shutdown_timeout(Duration::from_secs(5))
1703            .with_default_backoff(backoff)
1704            .without_signal_handler();
1705
1706        let sup = JanusSupervisor::new(config);
1707
1708        let log = Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
1709
1710        // Healthy service — runs until cancelled
1711        sup.spawn_service(Box::new(LifecycleTracer::new(
1712            "healthy-api",
1713            log.clone(),
1714            RestartPolicy::OnFailure,
1715        )));
1716
1717        // Failing service — always fails, will circuit-break
1718        let chaos = ChaosService::new("bad-data", 1000, RestartPolicy::OnFailure);
1719        sup.spawn_service(Box::new(chaos));
1720
1721        // Recovering service — fails twice then stabilises
1722        let recovering = ChaosService::new("flaky-cns", 2, RestartPolicy::OnFailure);
1723        let recovering_attempts = recovering.current.clone();
1724        sup.spawn_service(Box::new(recovering));
1725
1726        // Wait for all three services to register in the lifecycle map
1727        // (spawned tasks may not have started yet).
1728        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
1729        loop {
1730            if sup.service_count().await == 3 {
1731                break;
1732            }
1733            if tokio::time::Instant::now() > deadline {
1734                panic!(
1735                    "timed out waiting for 3 services to register; got {}",
1736                    sup.service_count().await,
1737                );
1738            }
1739            tokio::time::sleep(Duration::from_millis(10)).await;
1740        }
1741
1742        // Wait for the failing service to circuit-break AND the recovering
1743        // service to stabilise
1744        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
1745        loop {
1746            let bad_done = sup
1747                .service_lifecycle("bad-data")
1748                .await
1749                .is_some_and(|s| s.phase == ServicePhase::Terminated);
1750
1751            let recovered = recovering_attempts.load(std::sync::atomic::Ordering::SeqCst) >= 3;
1752
1753            if bad_done && recovered {
1754                break;
1755            }
1756            if tokio::time::Instant::now() > deadline {
1757                panic!("mixed fleet did not reach expected state in time");
1758            }
1759            tokio::time::sleep(Duration::from_millis(50)).await;
1760        }
1761
1762        // Healthy service should still be alive
1763        let healthy_snap = sup.service_lifecycle("healthy-api").await.unwrap();
1764        assert!(
1765            healthy_snap.phase.is_alive(),
1766            "healthy service should still be alive, was {}",
1767            healthy_snap.phase,
1768        );
1769
1770        // Bad service should be circuit-broken
1771        let bad_snap = sup.service_lifecycle("bad-data").await.unwrap();
1772        assert_eq!(bad_snap.phase, ServicePhase::Terminated);
1773        assert!(
1774            bad_snap
1775                .termination_reason
1776                .as_deref()
1777                .is_some_and(|r| r.contains("circuit breaker")),
1778            "bad-data should have circuit-broken, got {:?}",
1779            bad_snap.termination_reason,
1780        );
1781
1782        // Flaky service should have recovered (alive or running)
1783        let flaky_snap = sup.service_lifecycle("flaky-cns").await.unwrap();
1784        assert!(
1785            flaky_snap.phase.is_alive(),
1786            "flaky service should have recovered, was {}",
1787            flaky_snap.phase,
1788        );
1789        assert!(
1790            flaky_snap.start_count >= 3,
1791            "flaky service should have started ≥3 times, got {}",
1792            flaky_snap.start_count,
1793        );
1794
1795        // Shutdown everything
1796        sup.trigger_shutdown();
1797        sup.wait_for_drain().await;
1798
1799        // All services should be terminated now
1800        for name in &["healthy-api", "bad-data", "flaky-cns"] {
1801            let snap = sup.service_lifecycle(name).await.unwrap();
1802            assert_eq!(
1803                snap.phase,
1804                ServicePhase::Terminated,
1805                "service '{}' should be Terminated after shutdown",
1806                name,
1807            );
1808        }
1809
1810        let metrics = sup.metrics().snapshot();
1811        assert_eq!(metrics.active_services, 0);
1812        assert_eq!(metrics.spawned_total, 3);
1813        assert_eq!(metrics.terminated_total, 3);
1814    }
1815}