Skip to main content

zeph_core/
task_supervisor.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Supervised lifecycle task manager for long-running named services.
5//!
6//! [`TaskSupervisor`] manages named, long-lived background tasks (config watcher,
7//! scheduler loop, gateway, MCP connections, etc.) with restart policies, health
8//! snapshots, and graceful shutdown. Unlike [`crate::agent::agent_supervisor::BackgroundSupervisor`]
9//! (which is `&mut self`-only, lossy, and turn-scoped), `TaskSupervisor` is
10//! `Clone + Send + Sync` and designed for the full agent session lifetime.
11//!
12//! # Design rationale
13//!
14//! - **Shared handle**: `Arc<Inner>` interior allows passing the supervisor to bootstrap
15//!   code, TUI status display, and shutdown orchestration without lifetime coupling.
16//! - **Event-driven reap**: An internal mpsc channel delivers completion events to a
17//!   reap driver task; no polling interval required.
18//! - **No `JoinSet`**: Individual `JoinHandle`s per task enable per-name abort, status
19//!   tracking, and restart policies — `JoinSet` is better for homogeneous work.
20//! - **Mutex held briefly**: `parking_lot::Mutex` guards only bookkeeping operations
21//!   (insert/remove from `HashMap`). The lock is **never held across `.await`**.
22//!
23//! # Examples
24//!
25//! ```rust,no_run
26//! use std::time::Duration;
27//! use tokio_util::sync::CancellationToken;
28//! use zeph_core::task_supervisor::{RestartPolicy, TaskDescriptor, TaskSupervisor};
29//!
30//! # #[tokio::main]
31//! # async fn main() {
32//! let cancel = CancellationToken::new();
33//! let supervisor = TaskSupervisor::new(cancel.clone());
34//!
35//! supervisor.spawn(TaskDescriptor {
36//!     name: "my-service",
37//!     restart: RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) },
38//!     factory: || async { /* service loop */ },
39//! });
40//!
41//! // Graceful shutdown — waits up to 5 s for all tasks to stop.
42//! supervisor.shutdown_all(Duration::from_secs(5)).await;
43//! # }
44//! ```
45
46use std::collections::HashMap;
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52use tokio::sync::{mpsc, oneshot};
53use tokio::task::AbortHandle;
54use tokio_util::sync::CancellationToken;
55use tracing::Instrument as _;
56use zeph_common::BlockingSpawner;
57
58// ── Public types ─────────────────────────────────────────────────────────────
59
60/// Policy governing what happens when a supervised task completes or panics.
61///
62/// Used in [`TaskDescriptor`] to configure restart behaviour for a task.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum RestartPolicy {
65    /// Task runs once; normal completion removes it from the registry.
66    RunOnce,
67    /// Task is restarted **only on panic**, up to `max` times.
68    ///
69    /// Normal completion (the future returns `()`) does **not** trigger a restart.
70    /// The task is removed from the registry on normal exit.
71    ///
72    /// A `max` of `0` means the task is monitored but **never** restarted —
73    /// a panic leaves the entry as `Failed` in the registry for observability.
74    /// Use `RunOnce` when you want the entry removed on completion.
75    ///
76    /// Restart delays follow **exponential backoff**: the delay before attempt `n`
77    /// is `base_delay * 2^(n-1)`, capped at [`MAX_RESTART_DELAY`].
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use std::time::Duration;
83    /// use zeph_core::task_supervisor::RestartPolicy;
84    ///
85    /// // Restart up to 3 times with exponential backoff starting at 1 s.
86    /// let policy = RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) };
87    /// ```
88    Restart { max: u32, base_delay: Duration },
89}
90
91/// Maximum delay between restart attempts (caps exponential backoff).
92pub const MAX_RESTART_DELAY: Duration = Duration::from_secs(60);
93
94/// Configuration passed to [`TaskSupervisor::spawn`] to describe a supervised task.
95///
96/// `F` must be `Fn` (not `FnOnce`) to support restarts: the factory is called once on
97/// initial spawn and once per restart attempt.
98pub struct TaskDescriptor<F> {
99    /// Unique name for this task (e.g., `"config-watcher"`, `"scheduler-loop"`).
100    ///
101    /// Names must be `'static` — they are typically compile-time string literals.
102    /// Spawning a task with a name that already exists aborts the prior instance.
103    pub name: &'static str,
104    /// Restart policy applied when the task exits unexpectedly.
105    pub restart: RestartPolicy,
106    /// Factory called to produce a new future. Must be `Fn` for restart support.
107    pub factory: F,
108}
109
110/// Opaque handle to a single supervised task.
111///
112/// Can be used to abort the task by name independently of the supervisor.
113#[derive(Debug, Clone)]
114pub struct TaskHandle {
115    name: &'static str,
116    abort: AbortHandle,
117}
118
119impl TaskHandle {
120    /// Abort the task immediately.
121    pub fn abort(&self) {
122        tracing::debug!(task.name = self.name, "task aborted via handle");
123        self.abort.abort();
124    }
125
126    /// Return the task's name.
127    #[must_use]
128    pub fn name(&self) -> &'static str {
129        self.name
130    }
131}
132
133/// Error returned by [`BlockingHandle::join`].
134#[derive(Debug, PartialEq, Eq)]
135pub enum BlockingError {
136    /// The task panicked before producing a result.
137    Panicked,
138    /// The supervisor (or the task's abort handle) was dropped before the task completed.
139    SupervisorDropped,
140}
141
142impl std::fmt::Display for BlockingError {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        match self {
145            Self::Panicked => write!(f, "supervised blocking task panicked"),
146            Self::SupervisorDropped => write!(f, "supervisor dropped before task completed"),
147        }
148    }
149}
150
151impl std::error::Error for BlockingError {}
152
153/// Handle returned by [`TaskSupervisor::spawn_blocking`].
154///
155/// Awaiting [`BlockingHandle::join`] blocks until the OS-thread task produces a
156/// value. Dropping the handle without joining does **not** cancel the task — it
157/// continues to run on the blocking thread pool but the result is discarded.
158///
159/// A panic inside the closure is captured and returned as
160/// [`BlockingError::Panicked`] rather than propagating to the caller.
161pub struct BlockingHandle<R> {
162    rx: oneshot::Receiver<Result<R, BlockingError>>,
163    abort: AbortHandle,
164}
165
166impl<R> BlockingHandle<R> {
167    /// Await the task result.
168    ///
169    /// # Errors
170    ///
171    /// - [`BlockingError::Panicked`] — the task closure panicked.
172    /// - [`BlockingError::SupervisorDropped`] — the task was aborted or the
173    ///   supervisor was dropped before a value was produced.
174    pub async fn join(self) -> Result<R, BlockingError> {
175        self.rx
176            .await
177            .unwrap_or(Err(BlockingError::SupervisorDropped))
178    }
179
180    /// Abort the underlying task immediately.
181    pub fn abort(&self) {
182        self.abort.abort();
183    }
184}
185
186/// Point-in-time state of a supervised task.
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub enum TaskStatus {
189    /// Task is actively running.
190    Running,
191    /// Task is waiting for the restart delay before the next attempt.
192    Restarting { attempt: u32, max: u32 },
193    /// Task completed normally.
194    Completed,
195    /// Task was force-aborted during shutdown.
196    Aborted,
197    /// Task exhausted all restart attempts and is permanently failed.
198    Failed { reason: String },
199}
200
201/// Point-in-time snapshot of a supervised task, returned by [`TaskSupervisor::snapshot`].
202#[derive(Debug, Clone)]
203/// Observability surface per field:
204///
205/// | Field | tokio-console | Jaeger / OTLP | TUI | `metrics` histogram |
206/// |-------|--------------|--------------|-----|---------------------|
207/// | `name` | span name | span name | task list | label `"task"` |
208/// | `task.wall_time_ms` | — | span field (`task-metrics`) | — | `zeph.task.wall_time_ms` |
209/// | `task.cpu_time_ms` | — | span field (`task-metrics`) | — | `zeph.task.cpu_time_ms` |
210/// | `status` | — | — | task list | — |
211/// | `restart_count` | — | — | task list | — |
212///
213/// The `task.wall_time_ms` and `task.cpu_time_ms` fields are only populated when
214/// the crate is compiled with the `task-metrics` feature.
215pub struct TaskSnapshot {
216    /// Task name.
217    pub name: Arc<str>,
218    /// Current status.
219    pub status: TaskStatus,
220    /// Instant the task was first spawned.
221    pub started_at: Instant,
222    /// Number of times the task has been restarted.
223    pub restart_count: u32,
224}
225
226// ── Internal types ───────────────────────────────────────────────────────────
227
228type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
229type BoxFactory = Box<dyn Fn() -> BoxFuture + Send + Sync>;
230
231struct TaskEntry {
232    name: Arc<str>,
233    status: TaskStatus,
234    started_at: Instant,
235    restart_count: u32,
236    restart_policy: RestartPolicy,
237    abort_handle: AbortHandle,
238    /// `Some` only for `Restart` policy tasks.
239    factory: Option<BoxFactory>,
240}
241
242/// How a supervised task ended.
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244enum CompletionKind {
245    /// Future returned normally.
246    Normal,
247    /// Future panicked.
248    Panicked,
249    /// Future was cancelled via the cancellation token or abort handle.
250    Cancelled,
251}
252
253struct Completion {
254    name: Arc<str>,
255    kind: CompletionKind,
256}
257
258struct SupervisorState {
259    tasks: HashMap<Arc<str>, TaskEntry>,
260}
261
262struct Inner {
263    state: parking_lot::Mutex<SupervisorState>,
264    /// Completion events from spawned tasks → reap driver.
265    /// Lives in `Inner` (not `SupervisorState`) to avoid double mutex acquisition
266    /// — callers clone it once during spawn without re-locking state.
267    completion_tx: mpsc::UnboundedSender<Completion>,
268    cancel: CancellationToken,
269    /// Limits the number of concurrently running `spawn_blocking` tasks to prevent
270    /// runaway thread-pool growth under burst load.
271    blocking_semaphore: Arc<tokio::sync::Semaphore>,
272}
273
274// ── Main type ────────────────────────────────────────────────────────────────
275
276/// Shared, cloneable handle to the supervised lifecycle task registry.
277///
278/// `TaskSupervisor` manages named, long-lived background tasks with restart
279/// policies, health snapshots, and graceful shutdown. It is `Clone + Send + Sync`
280/// so it can be distributed to bootstrap code, TUI, and shutdown orchestration
281/// without any additional synchronisation.
282///
283/// # Thread safety
284///
285/// Interior state is guarded by a `parking_lot::Mutex`. The lock is **never**
286/// held across `.await` points.
287///
288/// # Examples
289///
290/// ```rust,no_run
291/// use std::time::Duration;
292/// use tokio_util::sync::CancellationToken;
293/// use zeph_core::task_supervisor::{RestartPolicy, TaskDescriptor, TaskSupervisor};
294///
295/// # #[tokio::main]
296/// # async fn main() {
297/// let cancel = CancellationToken::new();
298/// let sup = TaskSupervisor::new(cancel.clone());
299///
300/// let _handle = sup.spawn(TaskDescriptor {
301///     name: "watcher",
302///     restart: RestartPolicy::RunOnce,
303///     factory: || async { tokio::time::sleep(std::time::Duration::from_secs(1)).await },
304/// });
305///
306/// sup.shutdown_all(Duration::from_secs(5)).await;
307/// # }
308/// ```
309#[derive(Clone)]
310pub struct TaskSupervisor {
311    inner: Arc<Inner>,
312}
313
314impl TaskSupervisor {
315    /// Create a new supervisor and start its reap driver.
316    ///
317    /// The `cancel` token is propagated into every spawned task via `tokio::select!`.
318    /// When the token is cancelled, all tasks exit cooperatively on their next
319    /// cancellation check. Call [`shutdown_all`][Self::shutdown_all] to wait for
320    /// them to finish.
321    #[must_use]
322    pub fn new(cancel: CancellationToken) -> Self {
323        // NOTE: unbounded channel is acceptable here because supervised tasks are
324        // O(10–20) lifecycle services, not high-throughput work. Backpressure would
325        // complicate the spawn path without practical benefit.
326        let (completion_tx, completion_rx) = mpsc::unbounded_channel();
327        let inner = Arc::new(Inner {
328            state: parking_lot::Mutex::new(SupervisorState {
329                tasks: HashMap::new(),
330            }),
331            completion_tx,
332            cancel: cancel.clone(),
333            blocking_semaphore: Arc::new(tokio::sync::Semaphore::new(8)),
334        });
335
336        Self::start_reap_driver(Arc::clone(&inner), completion_rx, cancel);
337
338        Self { inner }
339    }
340
341    /// Spawn a named, supervised async task.
342    ///
343    /// If a task with the same `name` already exists, it is aborted before the
344    /// new one is started.
345    ///
346    /// # Examples
347    ///
348    /// ```rust,no_run
349    /// use std::time::Duration;
350    /// use tokio_util::sync::CancellationToken;
351    /// use zeph_core::task_supervisor::{RestartPolicy, TaskDescriptor, TaskHandle, TaskSupervisor};
352    ///
353    /// # #[tokio::main]
354    /// # async fn main() {
355    /// let cancel = CancellationToken::new();
356    /// let sup = TaskSupervisor::new(cancel.clone());
357    ///
358    /// let handle: TaskHandle = sup.spawn(TaskDescriptor {
359    ///     name: "config-watcher",
360    ///     restart: RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) },
361    ///     factory: || async { /* watch loop */ },
362    /// });
363    /// # }
364    /// ```
365    pub fn spawn<F, Fut>(&self, desc: TaskDescriptor<F>) -> TaskHandle
366    where
367        F: Fn() -> Fut + Send + Sync + 'static,
368        Fut: Future<Output = ()> + Send + 'static,
369    {
370        let factory: BoxFactory = Box::new(move || Box::pin((desc.factory)()));
371        let cancel = self.inner.cancel.clone();
372        let completion_tx = self.inner.completion_tx.clone();
373        let name: Arc<str> = Arc::from(desc.name);
374
375        let (abort_handle, jh) = Self::do_spawn(desc.name, &factory, cancel);
376        Self::wire_completion_reporter(Arc::clone(&name), jh, completion_tx);
377
378        let entry = TaskEntry {
379            name: Arc::clone(&name),
380            status: TaskStatus::Running,
381            started_at: Instant::now(),
382            restart_count: 0,
383            restart_policy: desc.restart,
384            abort_handle: abort_handle.clone(),
385            factory: match desc.restart {
386                RestartPolicy::RunOnce => None,
387                RestartPolicy::Restart { .. } => Some(factory),
388            },
389        };
390
391        {
392            let mut state = self.inner.state.lock();
393            if let Some(old) = state.tasks.remove(&name) {
394                old.abort_handle.abort();
395            }
396            state.tasks.insert(Arc::clone(&name), entry);
397        }
398
399        TaskHandle {
400            name: desc.name,
401            abort: abort_handle,
402        }
403    }
404
405    /// Spawn a CPU-bound closure on the OS blocking thread pool.
406    ///
407    /// The closure runs via [`tokio::task::spawn_blocking`] — it is never polled
408    /// on tokio worker threads and cannot block async I/O. The task is registered
409    /// in the supervisor registry and is visible to [`snapshot`][Self::snapshot]
410    /// and [`shutdown_all`][Self::shutdown_all].
411    ///
412    /// Dropping the returned [`BlockingHandle`] without calling `.join()` does
413    /// **not** cancel the task; it runs to completion but the result is discarded.
414    ///
415    /// A panic inside `f` is captured and returned as [`BlockingError::Panicked`]
416    /// rather than propagating to the caller.
417    ///
418    /// # Examples
419    ///
420    /// ```rust,no_run
421    /// use std::sync::Arc;
422    /// use tokio_util::sync::CancellationToken;
423    /// use zeph_core::task_supervisor::{BlockingHandle, TaskSupervisor};
424    ///
425    /// # #[tokio::main]
426    /// # async fn main() {
427    /// let cancel = CancellationToken::new();
428    /// let sup = TaskSupervisor::new(cancel);
429    ///
430    /// let handle: BlockingHandle<u32> = sup.spawn_blocking(Arc::from("compute"), || {
431    ///     // CPU-bound work — safe to block here
432    ///     42_u32
433    /// });
434    /// let result = handle.join().await.unwrap();
435    /// assert_eq!(result, 42);
436    /// # }
437    /// ```
438    ///
439    /// # Capacity limit
440    ///
441    /// At most 8 `spawn_blocking` tasks run concurrently. Additional tasks wait for a
442    /// semaphore permit, bounding thread-pool growth under burst load.
443    ///
444    /// # Panics
445    ///
446    /// Panics inside `f` are captured and returned as [`BlockingError::Panicked`] — they
447    /// do not propagate to the caller.
448    #[allow(clippy::needless_pass_by_value)] // `name` is cloned into async task and registry
449    pub fn spawn_blocking<F, R>(&self, name: Arc<str>, f: F) -> BlockingHandle<R>
450    where
451        F: FnOnce() -> R + Send + 'static,
452        R: Send + 'static,
453    {
454        let (tx, rx) = oneshot::channel::<Result<R, BlockingError>>();
455        #[cfg(feature = "task-metrics")]
456        let span = tracing::info_span!(
457            "supervised_blocking_task",
458            task.name = %name,
459            task.wall_time_ms = tracing::field::Empty,
460            task.cpu_time_ms = tracing::field::Empty,
461        );
462        #[cfg(not(feature = "task-metrics"))]
463        let span = tracing::info_span!("supervised_blocking_task", task.name = %name);
464
465        let semaphore = Arc::clone(&self.inner.blocking_semaphore);
466        let inner = Arc::clone(&self.inner);
467        let name_clone = Arc::clone(&name);
468        let completion_tx = self.inner.completion_tx.clone();
469
470        // Wrap the blocking spawn in an async task that first acquires a semaphore
471        // permit, bounding the number of concurrently running blocking tasks to 8.
472        let outer = tokio::spawn(async move {
473            let _permit = semaphore
474                .acquire_owned()
475                .await
476                .expect("blocking semaphore closed");
477
478            let name_for_measure = Arc::clone(&name_clone);
479            let join_handle = tokio::task::spawn_blocking(move || {
480                let _enter = span.enter();
481                measure_blocking(&name_for_measure, f)
482            });
483            let abort = join_handle.abort_handle();
484
485            // Update registry with the real abort handle now that spawn_blocking is live.
486            {
487                let mut state = inner.state.lock();
488                if let Some(entry) = state.tasks.get_mut(&name_clone) {
489                    entry.abort_handle = abort;
490                }
491            }
492
493            let kind = match join_handle.await {
494                Ok(val) => {
495                    let _ = tx.send(Ok(val));
496                    CompletionKind::Normal
497                }
498                Err(e) if e.is_panic() => {
499                    let _ = tx.send(Err(BlockingError::Panicked));
500                    CompletionKind::Panicked
501                }
502                Err(_) => {
503                    // Aborted — drop tx so rx returns SupervisorDropped.
504                    CompletionKind::Cancelled
505                }
506            };
507            // _permit released here, freeing the semaphore slot.
508            let _ = completion_tx.send(Completion {
509                name: name_clone,
510                kind,
511            });
512        });
513        let abort = outer.abort_handle();
514
515        // Register in registry so snapshot/shutdown sees the task.
516        {
517            let mut state = self.inner.state.lock();
518            if let Some(old) = state.tasks.remove(&name) {
519                old.abort_handle.abort();
520            }
521            state.tasks.insert(
522                Arc::clone(&name),
523                TaskEntry {
524                    name: Arc::clone(&name),
525                    status: TaskStatus::Running,
526                    started_at: Instant::now(),
527                    restart_count: 0,
528                    restart_policy: RestartPolicy::RunOnce,
529                    abort_handle: abort.clone(),
530                    factory: None,
531                },
532            );
533        }
534
535        BlockingHandle { rx, abort }
536    }
537
538    /// Spawn an async task that produces a typed result value (runs on tokio worker thread).
539    ///
540    /// Unlike [`spawn`][Self::spawn], no restart policy is supported — the task
541    /// runs once. The task is registered in the supervisor registry under the
542    /// provided `name` and is visible to [`snapshot`][Self::snapshot] and
543    /// [`shutdown_all`][Self::shutdown_all].
544    ///
545    /// For CPU-bound work that must not block tokio workers, use
546    /// [`spawn_blocking`][Self::spawn_blocking] instead.
547    ///
548    /// # Examples
549    ///
550    /// ```rust,no_run
551    /// use std::sync::Arc;
552    /// use tokio_util::sync::CancellationToken;
553    /// use zeph_core::task_supervisor::{BlockingHandle, TaskSupervisor};
554    ///
555    /// # #[tokio::main]
556    /// # async fn main() {
557    /// let cancel = CancellationToken::new();
558    /// let sup = TaskSupervisor::new(cancel.clone());
559    ///
560    /// let handle: BlockingHandle<u32> = sup.spawn_oneshot(Arc::from("compute"), || async { 42_u32 });
561    /// let result = handle.join().await.unwrap();
562    /// assert_eq!(result, 42);
563    /// # }
564    /// ```
565    pub fn spawn_oneshot<F, Fut, R>(&self, name: Arc<str>, factory: F) -> BlockingHandle<R>
566    where
567        F: FnOnce() -> Fut + Send + 'static,
568        Fut: Future<Output = R> + Send + 'static,
569        R: Send + 'static,
570    {
571        let (tx, rx) = oneshot::channel::<Result<R, BlockingError>>();
572        let cancel = self.inner.cancel.clone();
573        let span = tracing::info_span!("supervised_task", task.name = %name);
574        let join_handle: tokio::task::JoinHandle<Option<R>> = tokio::spawn(
575            async move {
576                let fut = factory();
577                tokio::select! {
578                    result = fut => Some(result),
579                    () = cancel.cancelled() => None,
580                }
581            }
582            .instrument(span),
583        );
584        let abort = join_handle.abort_handle();
585
586        {
587            let mut state = self.inner.state.lock();
588            if let Some(old) = state.tasks.remove(&name) {
589                old.abort_handle.abort();
590            }
591            state.tasks.insert(
592                Arc::clone(&name),
593                TaskEntry {
594                    name: Arc::clone(&name),
595                    status: TaskStatus::Running,
596                    started_at: Instant::now(),
597                    restart_count: 0,
598                    restart_policy: RestartPolicy::RunOnce,
599                    abort_handle: abort.clone(),
600                    factory: None,
601                },
602            );
603        }
604
605        let completion_tx = self.inner.completion_tx.clone();
606        tokio::spawn(async move {
607            let kind = match join_handle.await {
608                Ok(Some(val)) => {
609                    let _ = tx.send(Ok(val));
610                    CompletionKind::Normal
611                }
612                Err(e) if e.is_panic() => {
613                    let _ = tx.send(Err(BlockingError::Panicked));
614                    CompletionKind::Panicked
615                }
616                _ => CompletionKind::Cancelled,
617            };
618            let _ = completion_tx.send(Completion { name, kind });
619        });
620        BlockingHandle { rx, abort }
621    }
622
623    /// Abort a task by name. No-op if no task with that name is registered.
624    pub fn abort(&self, name: &'static str) {
625        let state = self.inner.state.lock();
626        let key: Arc<str> = Arc::from(name);
627        if let Some(entry) = state.tasks.get(&key) {
628            entry.abort_handle.abort();
629            tracing::debug!(task.name = name, "task aborted via supervisor");
630        }
631    }
632
633    /// Gracefully shut down all supervised tasks.
634    ///
635    /// Cancels the supervisor's [`CancellationToken`] and waits up to `timeout`
636    /// for all tasks to exit. Tasks that do not exit within the timeout are
637    /// aborted forcefully and their registry entries updated to [`TaskStatus::Aborted`].
638    ///
639    /// # Note
640    ///
641    /// This cancels the token passed to [`TaskSupervisor::new`]. If you share
642    /// that token with other subsystems, they will be cancelled too. Use a child
643    /// token (`cancel.child_token()`) when the supervisor should not affect
644    /// unrelated components.
645    pub async fn shutdown_all(&self, timeout: Duration) {
646        self.inner.cancel.cancel();
647        let deadline = tokio::time::Instant::now() + timeout;
648        loop {
649            let active = self.active_count();
650            if active == 0 {
651                break;
652            }
653            if tokio::time::Instant::now() >= deadline {
654                tracing::warn!(
655                    remaining = active,
656                    "shutdown timeout — aborting remaining tasks"
657                );
658                let mut state = self.inner.state.lock();
659                for entry in state.tasks.values_mut() {
660                    if matches!(
661                        entry.status,
662                        TaskStatus::Running | TaskStatus::Restarting { .. }
663                    ) {
664                        entry.abort_handle.abort();
665                        entry.status = TaskStatus::Aborted;
666                    }
667                }
668                break;
669            }
670            tokio::time::sleep(Duration::from_millis(50)).await;
671        }
672    }
673
674    /// Return a point-in-time snapshot of all registered tasks.
675    ///
676    /// Suitable for TUI status panels and structured logging. The returned
677    /// list is sorted by `started_at` ascending.
678    #[must_use]
679    pub fn snapshot(&self) -> Vec<TaskSnapshot> {
680        let state = self.inner.state.lock();
681        let mut snaps: Vec<TaskSnapshot> = state
682            .tasks
683            .values()
684            .map(|e| TaskSnapshot {
685                name: Arc::clone(&e.name),
686                status: e.status.clone(),
687                started_at: e.started_at,
688                restart_count: e.restart_count,
689            })
690            .collect();
691        snaps.sort_by_key(|s| s.started_at);
692        snaps
693    }
694
695    /// Return the number of tasks currently in `Running` or `Restarting` state.
696    #[must_use]
697    pub fn active_count(&self) -> usize {
698        let state = self.inner.state.lock();
699        state
700            .tasks
701            .values()
702            .filter(|e| {
703                matches!(
704                    e.status,
705                    TaskStatus::Running | TaskStatus::Restarting { .. }
706                )
707            })
708            .count()
709    }
710
711    /// Return a clone of the supervisor's [`CancellationToken`].
712    ///
713    /// Callers can use this to check whether shutdown has been initiated.
714    #[must_use]
715    pub fn cancellation_token(&self) -> CancellationToken {
716        self.inner.cancel.clone()
717    }
718
719    // ── Internal helpers ──────────────────────────────────────────────────────
720
721    /// Spawn the actual tokio task. Returns `(AbortHandle, JoinHandle)`.
722    fn do_spawn(
723        name: &'static str,
724        factory: &BoxFactory,
725        cancel: CancellationToken,
726    ) -> (AbortHandle, tokio::task::JoinHandle<()>) {
727        let fut = factory();
728        let span = tracing::info_span!("supervised_task", task.name = name);
729        let jh = tokio::spawn(
730            async move {
731                tokio::select! {
732                    () = fut => {},
733                    () = cancel.cancelled() => {},
734                }
735            }
736            .instrument(span),
737        );
738        let abort = jh.abort_handle();
739        (abort, jh)
740    }
741
742    /// Wire a completion reporter: drives `jh` and sends the result to `completion_tx`.
743    fn wire_completion_reporter(
744        name: Arc<str>,
745        jh: tokio::task::JoinHandle<()>,
746        completion_tx: mpsc::UnboundedSender<Completion>,
747    ) {
748        tokio::spawn(async move {
749            let kind = match jh.await {
750                Ok(()) => CompletionKind::Normal,
751                Err(e) if e.is_panic() => CompletionKind::Panicked,
752                Err(_) => CompletionKind::Cancelled,
753            };
754            let _ = completion_tx.send(Completion { name, kind });
755        });
756    }
757
758    /// Spawn the reap driver. The driver processes completion events from the mpsc channel.
759    ///
760    /// After the cancellation token fires, the driver continues draining the channel
761    /// until it is empty — this ensures that tasks which completed just before cancel
762    /// have their registry entries updated, allowing `shutdown_all` to observe
763    /// `active_count() == 0` correctly.
764    fn start_reap_driver(
765        inner: Arc<Inner>,
766        mut completion_rx: mpsc::UnboundedReceiver<Completion>,
767        cancel: CancellationToken,
768    ) {
769        tokio::spawn(async move {
770            loop {
771                tokio::select! {
772                    biased;
773                    Some(completion) = completion_rx.recv() => {
774                        Self::handle_completion(&inner, completion).await;
775                    }
776                    () = cancel.cancelled() => {
777                        // Drain any completions that arrived at the same time as the cancel.
778                        while let Ok(completion) = completion_rx.try_recv() {
779                            Self::handle_completion(&inner, completion).await;
780                        }
781                        break;
782                    }
783                }
784            }
785        });
786    }
787
788    /// Process a single task completion event.
789    ///
790    /// Lock is never held across `.await`. Phase 1 classifies the completion
791    /// under lock; Phase 2 sleeps with exponential backoff without a lock;
792    /// Phase 3 spawns the next instance and updates the registry.
793    async fn handle_completion(inner: &Arc<Inner>, completion: Completion) {
794        let Some((attempt, max, delay)) = Self::classify_completion(inner, &completion) else {
795            return;
796        };
797
798        tracing::warn!(
799            task.name = %completion.name,
800            attempt,
801            max,
802            delay_ms = delay.as_millis(),
803            "restarting supervised task"
804        );
805
806        if !delay.is_zero() {
807            tokio::time::sleep(delay).await;
808        }
809
810        Self::do_restart(inner, &completion.name, attempt);
811    }
812
813    /// Phase 1: classify the completion under lock and return restart parameters if needed.
814    ///
815    /// Returns `Some((attempt, max, backoff_delay))` when a restart should be scheduled.
816    fn classify_completion(
817        inner: &Arc<Inner>,
818        completion: &Completion,
819    ) -> Option<(u32, u32, Duration)> {
820        let mut state = inner.state.lock();
821        let entry = state.tasks.get_mut(&completion.name)?;
822
823        match completion.kind {
824            CompletionKind::Panicked => {
825                tracing::warn!(task.name = %completion.name, "supervised task panicked");
826            }
827            CompletionKind::Normal => {
828                tracing::info!(task.name = %completion.name, "supervised task completed");
829            }
830            CompletionKind::Cancelled => {
831                tracing::debug!(task.name = %completion.name, "supervised task cancelled");
832            }
833        }
834
835        match entry.restart_policy {
836            RestartPolicy::RunOnce => {
837                entry.status = TaskStatus::Completed;
838                state.tasks.remove(&completion.name);
839                None
840            }
841            RestartPolicy::Restart { max, base_delay } => {
842                // Only restart on panic — normal exit and cancellation are not errors.
843                if completion.kind != CompletionKind::Panicked {
844                    entry.status = TaskStatus::Completed;
845                    state.tasks.remove(&completion.name);
846                    return None;
847                }
848                if entry.restart_count >= max {
849                    let reason = format!("panicked after {max} restart(s)");
850                    tracing::error!(
851                        task.name = %completion.name,
852                        attempts = max,
853                        "task failed permanently"
854                    );
855                    entry.status = TaskStatus::Failed { reason };
856                    None
857                } else {
858                    let attempt = entry.restart_count + 1;
859                    entry.status = TaskStatus::Restarting { attempt, max };
860                    // Exponential backoff: base_delay * 2^(attempt-1), capped at MAX_RESTART_DELAY.
861                    let multiplier = 1_u32
862                        .checked_shl(attempt.saturating_sub(1))
863                        .unwrap_or(u32::MAX);
864                    let delay = base_delay.saturating_mul(multiplier).min(MAX_RESTART_DELAY);
865                    Some((attempt, max, delay))
866                }
867            }
868        }
869        // lock released here
870    }
871
872    /// Phase 3: TOCTOU check, collect spawn params under lock, then spawn outside.
873    fn do_restart(inner: &Arc<Inner>, name: &Arc<str>, attempt: u32) {
874        let spawn_params = {
875            let mut state = inner.state.lock();
876            let Some(entry) = state.tasks.get_mut(name.as_ref()) else {
877                tracing::debug!(
878                    task.name = %name,
879                    "task removed during restart delay — skipping"
880                );
881                return;
882            };
883            if !matches!(entry.status, TaskStatus::Restarting { .. }) {
884                return;
885            }
886            let Some(factory) = &entry.factory else {
887                return;
888            };
889            // Wrap factory() in catch_unwind to prevent a factory panic from crashing
890            // the reap driver and orphaning the registry.
891            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(factory)) {
892                Err(_) => {
893                    let reason = format!("factory panicked on restart attempt {attempt}");
894                    tracing::error!(task.name = %name, attempt, "factory panicked during restart");
895                    entry.status = TaskStatus::Failed { reason };
896                    None
897                }
898                Ok(fut) => Some((
899                    fut,
900                    inner.cancel.clone(),
901                    inner.completion_tx.clone(),
902                    name.clone(),
903                )),
904            }
905            // lock released here
906        };
907
908        let Some((fut, cancel, completion_tx, name)) = spawn_params else {
909            return;
910        };
911
912        let span = tracing::info_span!("supervised_task", task.name = %name);
913        let jh = tokio::spawn(
914            async move {
915                tokio::select! {
916                    () = fut => {},
917                    () = cancel.cancelled() => {},
918                }
919            }
920            .instrument(span),
921        );
922        let new_abort = jh.abort_handle();
923
924        {
925            let mut state = inner.state.lock();
926            if let Some(entry) = state.tasks.get_mut(name.as_ref()) {
927                entry.restart_count = attempt;
928                entry.status = TaskStatus::Running;
929                entry.abort_handle = new_abort;
930            }
931        }
932
933        Self::wire_completion_reporter(name.clone(), jh, completion_tx);
934    }
935}
936
937// ── Task metrics helpers ──────────────────────────────────────────────────────
938
939/// Run `f` and record wall-time and CPU-time metrics when `task-metrics` is enabled.
940///
941/// When the feature is disabled this is a zero-overhead identity wrapper —
942/// no `cpu-time` or `metrics` crates are linked.
943#[cfg(feature = "task-metrics")]
944#[inline]
945fn measure_blocking<F, R>(name: &str, f: F) -> R
946where
947    F: FnOnce() -> R,
948{
949    use cpu_time::ThreadTime;
950    let wall_start = std::time::Instant::now();
951    let cpu_start = ThreadTime::now();
952    let result = f();
953    let wall_ms = wall_start.elapsed().as_secs_f64() * 1000.0;
954    let cpu_ms = cpu_start.elapsed().as_secs_f64() * 1000.0;
955    metrics::histogram!("zeph.task.wall_time_ms", "task" => name.to_owned()).record(wall_ms);
956    metrics::histogram!("zeph.task.cpu_time_ms", "task" => name.to_owned()).record(cpu_ms);
957    tracing::Span::current().record("task.wall_time_ms", wall_ms);
958    tracing::Span::current().record("task.cpu_time_ms", cpu_ms);
959    result
960}
961
962/// Identity wrapper when `task-metrics` feature is disabled.
963///
964/// Compiles to a direct call to `f()` with no overhead.
965#[cfg(not(feature = "task-metrics"))]
966#[inline]
967fn measure_blocking<F, R>(_name: &str, f: F) -> R
968where
969    F: FnOnce() -> R,
970{
971    f()
972}
973
974// ── BlockingSpawner impl ──────────────────────────────────────────────────────
975
976impl BlockingSpawner for TaskSupervisor {
977    /// Spawn a named blocking closure through the supervisor.
978    ///
979    /// The task is registered in the supervisor registry (visible in
980    /// [`snapshot`][Self::snapshot] and subject to graceful shutdown) before
981    /// the closure begins executing.
982    fn spawn_blocking_named(
983        &self,
984        name: Arc<str>,
985        f: Box<dyn FnOnce() + Send + 'static>,
986    ) -> tokio::task::JoinHandle<()> {
987        let handle = self.spawn_blocking(Arc::clone(&name), f);
988        tokio::spawn(async move {
989            if let Err(e) = handle.join().await {
990                tracing::error!(task.name = %name, error = %e, "supervised blocking task failed");
991            }
992        })
993    }
994}
995
996// ── Unit tests ────────────────────────────────────────────────────────────────
997
998#[cfg(test)]
999mod tests {
1000    use std::sync::Arc;
1001    use std::sync::atomic::{AtomicU32, Ordering};
1002    use std::time::Duration;
1003
1004    use tokio_util::sync::CancellationToken;
1005
1006    use super::*;
1007
1008    fn make_supervisor() -> (TaskSupervisor, CancellationToken) {
1009        let cancel = CancellationToken::new();
1010        let sup = TaskSupervisor::new(cancel.clone());
1011        (sup, cancel)
1012    }
1013
1014    #[tokio::test]
1015    async fn test_spawn_and_complete() {
1016        let (sup, _cancel) = make_supervisor();
1017
1018        let done = Arc::new(tokio::sync::Notify::new());
1019        let done2 = Arc::clone(&done);
1020
1021        sup.spawn(TaskDescriptor {
1022            name: "simple",
1023            restart: RestartPolicy::RunOnce,
1024            factory: move || {
1025                let d = Arc::clone(&done2);
1026                async move {
1027                    d.notify_one();
1028                }
1029            },
1030        });
1031
1032        tokio::time::timeout(Duration::from_secs(2), done.notified())
1033            .await
1034            .expect("task should complete");
1035
1036        tokio::time::sleep(Duration::from_millis(50)).await;
1037        assert_eq!(
1038            sup.active_count(),
1039            0,
1040            "RunOnce task should be removed after completion"
1041        );
1042    }
1043
1044    #[tokio::test]
1045    async fn test_panic_capture() {
1046        let (sup, _cancel) = make_supervisor();
1047
1048        sup.spawn(TaskDescriptor {
1049            name: "panicking",
1050            restart: RestartPolicy::RunOnce,
1051            factory: || async { panic!("intentional test panic") },
1052        });
1053
1054        tokio::time::sleep(Duration::from_millis(200)).await;
1055
1056        let snaps = sup.snapshot();
1057        assert!(
1058            snaps.iter().all(|s| s.name.as_ref() != "panicking"),
1059            "entry should be reaped"
1060        );
1061        assert_eq!(
1062            sup.active_count(),
1063            0,
1064            "active count must be 0 after RunOnce panic"
1065        );
1066    }
1067
1068    /// Regression test for S2: Restart-policy tasks must only restart on panic,
1069    /// not on normal completion.
1070    #[tokio::test]
1071    async fn test_restart_only_on_panic() {
1072        let (sup, _cancel) = make_supervisor();
1073
1074        // Part 1: normal completion — must NOT restart.
1075        let normal_counter = Arc::new(AtomicU32::new(0));
1076        let nc = Arc::clone(&normal_counter);
1077        sup.spawn(TaskDescriptor {
1078            name: "normal-exit",
1079            restart: RestartPolicy::Restart {
1080                max: 3,
1081                base_delay: Duration::from_millis(10),
1082            },
1083            factory: move || {
1084                let c = Arc::clone(&nc);
1085                async move {
1086                    c.fetch_add(1, Ordering::SeqCst);
1087                    // Returns normally — no panic.
1088                }
1089            },
1090        });
1091
1092        tokio::time::sleep(Duration::from_millis(300)).await;
1093        assert_eq!(
1094            normal_counter.load(Ordering::SeqCst),
1095            1,
1096            "normal exit must not restart"
1097        );
1098        assert!(
1099            sup.snapshot()
1100                .iter()
1101                .all(|s| s.name.as_ref() != "normal-exit"),
1102            "entry removed after normal exit"
1103        );
1104
1105        // Part 2: panic — MUST restart up to max times.
1106        let panic_counter = Arc::new(AtomicU32::new(0));
1107        let pc = Arc::clone(&panic_counter);
1108        sup.spawn(TaskDescriptor {
1109            name: "panic-exit",
1110            restart: RestartPolicy::Restart {
1111                max: 2,
1112                base_delay: Duration::from_millis(10),
1113            },
1114            factory: move || {
1115                let c = Arc::clone(&pc);
1116                async move {
1117                    c.fetch_add(1, Ordering::SeqCst);
1118                    panic!("test panic");
1119                }
1120            },
1121        });
1122
1123        // initial + 2 restarts = 3 total
1124        tokio::time::sleep(Duration::from_millis(500)).await;
1125        assert!(
1126            panic_counter.load(Ordering::SeqCst) >= 3,
1127            "panicking task must restart max times"
1128        );
1129        let snap = sup
1130            .snapshot()
1131            .into_iter()
1132            .find(|s| s.name.as_ref() == "panic-exit");
1133        assert!(
1134            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1135            "task must be Failed after exhausting restarts"
1136        );
1137    }
1138
1139    #[tokio::test]
1140    async fn test_restart_policy() {
1141        let (sup, _cancel) = make_supervisor();
1142
1143        let counter = Arc::new(AtomicU32::new(0));
1144        let counter2 = Arc::clone(&counter);
1145
1146        sup.spawn(TaskDescriptor {
1147            name: "restartable",
1148            restart: RestartPolicy::Restart {
1149                max: 2,
1150                base_delay: Duration::from_millis(10),
1151            },
1152            factory: move || {
1153                let c = Arc::clone(&counter2);
1154                async move {
1155                    c.fetch_add(1, Ordering::SeqCst);
1156                    panic!("always panic");
1157                }
1158            },
1159        });
1160
1161        tokio::time::sleep(Duration::from_millis(500)).await;
1162
1163        let runs = counter.load(Ordering::SeqCst);
1164        assert!(
1165            runs >= 3,
1166            "expected at least 3 invocations (initial + 2 restarts), got {runs}"
1167        );
1168
1169        let snaps = sup.snapshot();
1170        let snap = snaps.iter().find(|s| s.name.as_ref() == "restartable");
1171        assert!(snap.is_some(), "failed task should remain in registry");
1172        assert!(
1173            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1174            "task should be Failed after exhausting retries"
1175        );
1176    }
1177
1178    /// Verify exponential backoff: delay doubles on each restart attempt.
1179    #[tokio::test]
1180    async fn test_exponential_backoff() {
1181        let (sup, _cancel) = make_supervisor();
1182
1183        let timestamps = Arc::new(parking_lot::Mutex::new(Vec::<std::time::Instant>::new()));
1184        let ts = Arc::clone(&timestamps);
1185
1186        sup.spawn(TaskDescriptor {
1187            name: "backoff-task",
1188            restart: RestartPolicy::Restart {
1189                max: 3,
1190                base_delay: Duration::from_millis(50),
1191            },
1192            factory: move || {
1193                let t = Arc::clone(&ts);
1194                async move {
1195                    t.lock().push(std::time::Instant::now());
1196                    panic!("always panic");
1197                }
1198            },
1199        });
1200
1201        // Wait long enough for all restarts: 50 + 100 + 200 ms = 350 ms + overhead
1202        tokio::time::sleep(Duration::from_millis(800)).await;
1203
1204        let ts = timestamps.lock();
1205        assert!(
1206            ts.len() >= 3,
1207            "expected at least 3 invocations, got {}",
1208            ts.len()
1209        );
1210
1211        // Verify delays are roughly doubling (within 2x tolerance for CI jitter).
1212        if ts.len() >= 3 {
1213            let d1 = ts[1].duration_since(ts[0]);
1214            let d2 = ts[2].duration_since(ts[1]);
1215            // d2 should be at least 1.5x d1 (allowing for jitter).
1216            assert!(
1217                d2 >= d1.mul_f64(1.5),
1218                "expected exponential backoff: d1={d1:?} d2={d2:?}"
1219            );
1220        }
1221    }
1222
1223    #[tokio::test]
1224    async fn test_graceful_shutdown() {
1225        let (sup, _cancel) = make_supervisor();
1226
1227        for name in ["svc-a", "svc-b", "svc-c"] {
1228            sup.spawn(TaskDescriptor {
1229                name,
1230                restart: RestartPolicy::RunOnce,
1231                factory: || async {
1232                    tokio::time::sleep(Duration::from_secs(60)).await;
1233                },
1234            });
1235        }
1236
1237        assert_eq!(sup.active_count(), 3);
1238
1239        tokio::time::timeout(
1240            Duration::from_secs(2),
1241            sup.shutdown_all(Duration::from_secs(1)),
1242        )
1243        .await
1244        .expect("shutdown should complete within timeout");
1245    }
1246
1247    /// Verify that force-aborted tasks get TaskStatus::Aborted in the registry (A2 fix).
1248    #[tokio::test]
1249    async fn test_force_abort_marks_aborted() {
1250        let cancel = CancellationToken::new();
1251        let sup = TaskSupervisor::new(cancel.clone());
1252
1253        sup.spawn(TaskDescriptor {
1254            name: "stubborn-for-abort",
1255            restart: RestartPolicy::RunOnce,
1256            factory: || async {
1257                // Does not cooperate with cancellation.
1258                std::future::pending::<()>().await;
1259            },
1260        });
1261
1262        // Use a very short timeout to trigger force-abort.
1263        sup.shutdown_all(Duration::from_millis(1)).await;
1264
1265        // Entry should be Aborted, not Running.
1266        let snaps = sup.snapshot();
1267        if let Some(snap) = snaps
1268            .iter()
1269            .find(|s| s.name.as_ref() == "stubborn-for-abort")
1270        {
1271            assert_eq!(
1272                snap.status,
1273                TaskStatus::Aborted,
1274                "force-aborted task must have Aborted status"
1275            );
1276        }
1277        // If entry was already reaped (cooperative cancel won), that's also acceptable.
1278    }
1279
1280    #[tokio::test]
1281    async fn test_registry_snapshot() {
1282        let (sup, _cancel) = make_supervisor();
1283
1284        for name in ["alpha", "beta"] {
1285            sup.spawn(TaskDescriptor {
1286                name,
1287                restart: RestartPolicy::RunOnce,
1288                factory: || async {
1289                    tokio::time::sleep(Duration::from_secs(10)).await;
1290                },
1291            });
1292        }
1293
1294        let snaps = sup.snapshot();
1295        assert_eq!(snaps.len(), 2);
1296        let names: Vec<&str> = snaps.iter().map(|s| s.name.as_ref()).collect();
1297        assert!(names.contains(&"alpha"));
1298        assert!(names.contains(&"beta"));
1299        assert!(snaps.iter().all(|s| s.status == TaskStatus::Running));
1300    }
1301
1302    #[tokio::test]
1303    async fn test_blocking_returns_value() {
1304        let (sup, cancel) = make_supervisor();
1305
1306        let handle: BlockingHandle<u32> = sup.spawn_blocking(Arc::from("compute"), || 42_u32);
1307        let result = handle.join().await.expect("should return value");
1308        assert_eq!(result, 42);
1309        cancel.cancel();
1310    }
1311
1312    #[tokio::test]
1313    async fn test_blocking_panic() {
1314        let (sup, _cancel) = make_supervisor();
1315
1316        let handle: BlockingHandle<u32> =
1317            sup.spawn_blocking(Arc::from("panicking-compute"), || panic!("intentional"));
1318        let err = handle
1319            .join()
1320            .await
1321            .expect_err("should return error on panic");
1322        assert_eq!(err, BlockingError::Panicked);
1323    }
1324
1325    /// Verify spawn_blocking tasks appear in registry (M3 fix).
1326    #[tokio::test]
1327    async fn test_blocking_registered_in_registry() {
1328        let (sup, cancel) = make_supervisor();
1329
1330        let (tx, rx) = std::sync::mpsc::channel::<()>();
1331        let _handle: BlockingHandle<()> =
1332            sup.spawn_blocking(Arc::from("blocking-task"), move || {
1333                // Block until signalled.
1334                let _ = rx.recv();
1335            });
1336
1337        tokio::time::sleep(Duration::from_millis(10)).await;
1338        assert_eq!(
1339            sup.active_count(),
1340            1,
1341            "blocking task must appear in active_count"
1342        );
1343
1344        let _ = tx.send(());
1345        tokio::time::sleep(Duration::from_millis(100)).await;
1346        assert_eq!(
1347            sup.active_count(),
1348            0,
1349            "blocking task must be removed after completion"
1350        );
1351
1352        cancel.cancel();
1353    }
1354
1355    /// Verify spawn_oneshot tasks appear in registry (M3 fix).
1356    #[tokio::test]
1357    async fn test_oneshot_registered_in_registry() {
1358        let (sup, cancel) = make_supervisor();
1359
1360        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1361        let _handle: BlockingHandle<()> =
1362            sup.spawn_oneshot(Arc::from("oneshot-task"), move || async move {
1363                let _ = rx.await;
1364            });
1365
1366        tokio::time::sleep(Duration::from_millis(10)).await;
1367        assert_eq!(
1368            sup.active_count(),
1369            1,
1370            "oneshot task must appear in active_count"
1371        );
1372
1373        let _ = tx.send(());
1374        tokio::time::sleep(Duration::from_millis(50)).await;
1375        assert_eq!(
1376            sup.active_count(),
1377            0,
1378            "oneshot task must be removed after completion"
1379        );
1380
1381        cancel.cancel();
1382    }
1383
1384    #[tokio::test]
1385    async fn test_restart_max_zero() {
1386        let (sup, _cancel) = make_supervisor();
1387
1388        let counter = Arc::new(AtomicU32::new(0));
1389        let counter2 = Arc::clone(&counter);
1390
1391        sup.spawn(TaskDescriptor {
1392            name: "zero-max",
1393            restart: RestartPolicy::Restart {
1394                max: 0,
1395                base_delay: Duration::from_millis(10),
1396            },
1397            factory: move || {
1398                let c = Arc::clone(&counter2);
1399                async move {
1400                    c.fetch_add(1, Ordering::SeqCst);
1401                    panic!("always panic");
1402                }
1403            },
1404        });
1405
1406        tokio::time::sleep(Duration::from_millis(200)).await;
1407
1408        assert_eq!(
1409            counter.load(Ordering::SeqCst),
1410            1,
1411            "max=0 should not restart"
1412        );
1413
1414        let snaps = sup.snapshot();
1415        let snap = snaps.iter().find(|s| s.name.as_ref() == "zero-max");
1416        assert!(snap.is_some(), "entry should remain as Failed");
1417        assert!(
1418            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1419            "status should be Failed"
1420        );
1421    }
1422
1423    /// Stress test: spawn 50 tasks concurrently, all must complete and registry must be accurate.
1424    #[tokio::test]
1425    async fn test_concurrent_spawns() {
1426        let (sup, cancel) = make_supervisor();
1427
1428        // All task names must be 'static — use a pre-defined slice.
1429        static NAMES: [&str; 50] = [
1430            "t00", "t01", "t02", "t03", "t04", "t05", "t06", "t07", "t08", "t09", "t10", "t11",
1431            "t12", "t13", "t14", "t15", "t16", "t17", "t18", "t19", "t20", "t21", "t22", "t23",
1432            "t24", "t25", "t26", "t27", "t28", "t29", "t30", "t31", "t32", "t33", "t34", "t35",
1433            "t36", "t37", "t38", "t39", "t40", "t41", "t42", "t43", "t44", "t45", "t46", "t47",
1434            "t48", "t49",
1435        ];
1436
1437        let completed = Arc::new(AtomicU32::new(0));
1438        for name in &NAMES {
1439            let c = Arc::clone(&completed);
1440            sup.spawn(TaskDescriptor {
1441                name,
1442                restart: RestartPolicy::RunOnce,
1443                factory: move || {
1444                    let c = Arc::clone(&c);
1445                    async move {
1446                        c.fetch_add(1, Ordering::SeqCst);
1447                    }
1448                },
1449            });
1450        }
1451
1452        // Wait for all tasks to complete.
1453        tokio::time::timeout(Duration::from_secs(5), async {
1454            loop {
1455                if completed.load(Ordering::SeqCst) == 50 {
1456                    break;
1457                }
1458                tokio::time::sleep(Duration::from_millis(10)).await;
1459            }
1460        })
1461        .await
1462        .expect("all 50 tasks should complete");
1463
1464        // Give reap driver time to process all completions.
1465        tokio::time::sleep(Duration::from_millis(100)).await;
1466        assert_eq!(sup.active_count(), 0, "all tasks must be reaped");
1467
1468        cancel.cancel();
1469    }
1470
1471    #[tokio::test]
1472    async fn test_shutdown_timeout_expiry() {
1473        let cancel = CancellationToken::new();
1474        let sup = TaskSupervisor::new(cancel.clone());
1475
1476        sup.spawn(TaskDescriptor {
1477            name: "stubborn",
1478            restart: RestartPolicy::RunOnce,
1479            factory: || async {
1480                tokio::time::sleep(Duration::from_secs(60)).await;
1481            },
1482        });
1483
1484        assert_eq!(sup.active_count(), 1);
1485
1486        tokio::time::timeout(
1487            Duration::from_secs(2),
1488            sup.shutdown_all(Duration::from_millis(50)),
1489        )
1490        .await
1491        .expect("shutdown_all should return even on timeout expiry");
1492
1493        assert!(
1494            cancel.is_cancelled(),
1495            "cancel token must be cancelled after shutdown"
1496        );
1497    }
1498
1499    #[tokio::test]
1500    async fn test_cancellation_token() {
1501        let cancel = CancellationToken::new();
1502        let sup = TaskSupervisor::new(cancel.clone());
1503
1504        assert!(!sup.cancellation_token().is_cancelled());
1505
1506        sup.shutdown_all(Duration::from_millis(100)).await;
1507
1508        assert!(
1509            sup.cancellation_token().is_cancelled(),
1510            "token must be cancelled after shutdown"
1511        );
1512    }
1513
1514    #[tokio::test]
1515    async fn test_blocking_spawner_task_appears_in_snapshot() {
1516        // Verify that tasks spawned via BlockingSpawner appear in supervisor.snapshot().
1517        use zeph_common::BlockingSpawner;
1518
1519        let cancel = CancellationToken::new();
1520        let sup = TaskSupervisor::new(cancel);
1521
1522        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
1523        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
1524
1525        let handle = sup.spawn_blocking_named(
1526            Arc::from("chunk_file"),
1527            Box::new(move || {
1528                // Signal that the task has started.
1529                let _ = ready_tx.send(());
1530                // Block until test signals release.
1531                let _ = release_rx.blocking_recv();
1532            }),
1533        );
1534
1535        // Wait until the blocking task has actually started.
1536        ready_rx.await.expect("task should start");
1537
1538        let snapshot = sup.snapshot();
1539        assert!(
1540            snapshot.iter().any(|t| t.name.as_ref() == "chunk_file"),
1541            "chunk_file task must appear in supervisor snapshot"
1542        );
1543
1544        // Release the blocking task and await completion.
1545        let _ = release_tx.send(());
1546        handle.await.expect("task should complete");
1547    }
1548
1549    /// Verify that `measure_blocking` emits wall-time and CPU-time histograms when
1550    /// the `task-metrics` feature is enabled.
1551    ///
1552    /// `measure_blocking` calls `metrics::histogram!` on the current thread.
1553    /// We test it directly using a `DebuggingRecorder` installed as the thread-local
1554    /// recorder via `metrics::with_local_recorder`.
1555    #[cfg(feature = "task-metrics")]
1556    #[test]
1557    fn test_measure_blocking_emits_metrics() {
1558        use metrics_util::debugging::DebuggingRecorder;
1559
1560        let recorder = DebuggingRecorder::new();
1561        let snapshotter = recorder.snapshotter();
1562
1563        // Call measure_blocking inside the local recorder scope so histogram! calls
1564        // are captured. The closure runs synchronously on this thread.
1565        metrics::with_local_recorder(&recorder, || {
1566            measure_blocking("test_task", || std::hint::black_box(42_u64));
1567        });
1568
1569        let snapshot = snapshotter.snapshot();
1570        let metric_names: Vec<String> = snapshot
1571            .into_vec()
1572            .into_iter()
1573            .map(|(k, _, _, _)| k.key().name().to_owned())
1574            .collect();
1575
1576        assert!(
1577            metric_names.iter().any(|n| n == "zeph.task.wall_time_ms"),
1578            "expected zeph.task.wall_time_ms histogram; got: {metric_names:?}"
1579        );
1580        assert!(
1581            metric_names.iter().any(|n| n == "zeph.task.cpu_time_ms"),
1582            "expected zeph.task.cpu_time_ms histogram; got: {metric_names:?}"
1583        );
1584    }
1585
1586    /// Verify that `spawn_blocking` semaphore limits concurrent OS-thread tasks to 8.
1587    ///
1588    /// Spawns 16 tasks. Each holds a barrier until 8 are waiting; then releases in order.
1589    /// If more than 8 run concurrently the test would either deadlock (waiting for 9+ to reach
1590    /// the barrier) or the counter would exceed 8 — both are caught.
1591    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1592    async fn test_spawn_blocking_semaphore_cap() {
1593        let (sup, _cancel) = make_supervisor();
1594        let concurrent = Arc::new(AtomicU32::new(0));
1595        let max_concurrent = Arc::new(AtomicU32::new(0));
1596        let barrier = Arc::new(std::sync::Barrier::new(1)); // just a sync point
1597
1598        let mut handles = Vec::new();
1599        for i in 0u32..16 {
1600            let c = Arc::clone(&concurrent);
1601            let m = Arc::clone(&max_concurrent);
1602            let name: Arc<str> = Arc::from(format!("blocking-{i}").as_str());
1603            let h = sup.spawn_blocking(name, move || {
1604                let prev = c.fetch_add(1, Ordering::SeqCst);
1605                // Update observed maximum.
1606                let mut cur_max = m.load(Ordering::SeqCst);
1607                while prev + 1 > cur_max {
1608                    match m.compare_exchange(cur_max, prev + 1, Ordering::SeqCst, Ordering::SeqCst)
1609                    {
1610                        Ok(_) => break,
1611                        Err(x) => cur_max = x,
1612                    }
1613                }
1614                // Simulate work.
1615                std::thread::sleep(std::time::Duration::from_millis(20));
1616                c.fetch_sub(1, Ordering::SeqCst);
1617            });
1618            handles.push(h);
1619        }
1620
1621        for h in handles {
1622            h.join().await.expect("blocking task should succeed");
1623        }
1624        drop(barrier);
1625
1626        let observed = max_concurrent.load(Ordering::SeqCst);
1627        assert!(
1628            observed <= 8,
1629            "observed {observed} concurrent blocking tasks; expected ≤ 8 (semaphore cap)"
1630        );
1631    }
1632}