Skip to main content

zeph_common/
task_supervisor.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Supervised lifecycle task manager for long-running named services.
5//!
6//! [`TaskSupervisor`] manages named, long-lived background tasks (config watcher,
7//! scheduler loop, gateway, MCP connections, etc.) with restart policies, health
8//! snapshots, and graceful shutdown. Unlike `BackgroundSupervisor`
9//! (which is `&mut self`-only, lossy, and turn-scoped), `TaskSupervisor` is
10//! `Clone + Send + Sync` and designed for the full agent session lifetime.
11//!
12//! # Design rationale
13//!
14//! - **Shared handle**: `Arc<Inner>` interior allows passing the supervisor to bootstrap
15//!   code, TUI status display, and shutdown orchestration without lifetime coupling.
16//! - **Event-driven reap**: An internal mpsc channel delivers completion events to a
17//!   reap driver task; no polling interval required.
18//! - **No `JoinSet`**: Individual `JoinHandle`s per task enable per-name abort, status
19//!   tracking, and restart policies — `JoinSet` is better for homogeneous work.
20//! - **Mutex held briefly**: `parking_lot::Mutex` guards only bookkeeping operations
21//!   (insert/remove from `HashMap`). The lock is **never held across `.await`**.
22//!
23//! # Examples
24//!
25//! ```rust,no_run
26//! use std::time::Duration;
27//! use tokio_util::sync::CancellationToken;
28//! use zeph_common::task_supervisor::{RestartPolicy, TaskDescriptor, TaskSupervisor};
29//!
30//! # #[tokio::main]
31//! # async fn main() {
32//! let cancel = CancellationToken::new();
33//! let supervisor = TaskSupervisor::new(cancel.clone());
34//!
35//! supervisor.spawn(TaskDescriptor {
36//!     name: "my-service",
37//!     restart: RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) },
38//!     factory: || async { /* service loop */ },
39//! });
40//!
41//! // Graceful shutdown — waits up to 5 s for all tasks to stop.
42//! supervisor.shutdown_all(Duration::from_secs(5)).await;
43//! # }
44//! ```
45
46use std::collections::HashMap;
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52use tokio::sync::{mpsc, oneshot};
53use tokio::task::AbortHandle;
54use tokio_util::sync::CancellationToken;
55use tracing::Instrument as _;
56
57use crate::BlockingSpawner;
58
59// ── Public types ─────────────────────────────────────────────────────────────
60
61/// Policy governing what happens when a supervised task completes or panics.
62///
63/// Used in [`TaskDescriptor`] to configure restart behaviour for a task.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum RestartPolicy {
66    /// Task runs once; normal completion removes it from the registry.
67    RunOnce,
68    /// Task is restarted **only on panic**, up to `max` times.
69    ///
70    /// Normal completion (the future returns `()`) does **not** trigger a restart.
71    /// The task is removed from the registry on normal exit.
72    ///
73    /// A `max` of `0` means the task is monitored but **never** restarted —
74    /// a panic leaves the entry as `Failed` in the registry for observability.
75    /// Use `RunOnce` when you want the entry removed on completion.
76    ///
77    /// Restart delays follow **exponential backoff**: the delay before attempt `n`
78    /// is `base_delay * 2^(n-1)`, capped at [`MAX_RESTART_DELAY`].
79    ///
80    /// # Examples
81    ///
82    /// ```
83    /// use std::time::Duration;
84    /// use zeph_common::task_supervisor::RestartPolicy;
85    ///
86    /// // Restart up to 3 times with exponential backoff starting at 1 s.
87    /// let policy = RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) };
88    /// ```
89    Restart { max: u32, base_delay: Duration },
90}
91
92/// Maximum delay between restart attempts (caps exponential backoff).
93pub const MAX_RESTART_DELAY: Duration = Duration::from_mins(1);
94
95/// Safety cap on how long the reap driver drains completions after cancellation.
96///
97/// INVARIANT: must be less than the runner shutdown grace period (runner.rs:2387,
98/// currently 10s). If that constant is reduced, this must be reduced proportionally.
99const SHUTDOWN_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
100
101/// Configuration passed to [`TaskSupervisor::spawn`] to describe a supervised task.
102///
103/// `F` must be `Fn` (not `FnOnce`) to support restarts: the factory is called once on
104/// initial spawn and once per restart attempt.
105pub struct TaskDescriptor<F> {
106    /// Unique name for this task (e.g., `"config-watcher"`, `"scheduler-loop"`).
107    ///
108    /// Names must be `'static` — they are typically compile-time string literals.
109    /// Spawning a task with a name that already exists aborts the prior instance.
110    pub name: &'static str,
111    /// Restart policy applied when the task exits unexpectedly.
112    pub restart: RestartPolicy,
113    /// Factory called to produce a new future. Must be `Fn` for restart support.
114    pub factory: F,
115}
116
117/// Opaque handle to a single supervised task.
118///
119/// Can be used to abort the task by name independently of the supervisor.
120#[derive(Debug, Clone)]
121pub struct TaskHandle {
122    name: &'static str,
123    abort: AbortHandle,
124}
125
126impl TaskHandle {
127    /// Abort the task immediately.
128    pub fn abort(&self) {
129        tracing::debug!(task.name = self.name, "task aborted via handle");
130        self.abort.abort();
131    }
132
133    /// Return the task's name.
134    #[must_use]
135    pub const fn name(&self) -> &'static str {
136        self.name
137    }
138}
139
140/// Error returned by [`BlockingHandle::join`].
141#[derive(Debug, PartialEq, Eq)]
142pub enum BlockingError {
143    /// The task panicked before producing a result.
144    Panicked,
145    /// The supervisor (or the task's abort handle) was dropped before the task completed.
146    SupervisorDropped,
147}
148
149impl std::fmt::Display for BlockingError {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        match self {
152            Self::Panicked => write!(f, "supervised blocking task panicked"),
153            Self::SupervisorDropped => write!(f, "supervisor dropped before task completed"),
154        }
155    }
156}
157
158impl std::error::Error for BlockingError {}
159
160/// Handle returned by [`TaskSupervisor::spawn_blocking`].
161///
162/// Awaiting [`BlockingHandle::join`] blocks until the OS-thread task produces a
163/// value. Dropping the handle without joining does **not** cancel the task — it
164/// continues to run on the blocking thread pool but the result is discarded.
165///
166/// A panic inside the closure is captured and returned as
167/// [`BlockingError::Panicked`] rather than propagating to the caller.
168pub struct BlockingHandle<R> {
169    rx: oneshot::Receiver<Result<R, BlockingError>>,
170    abort: AbortHandle,
171}
172
173impl<R> BlockingHandle<R> {
174    /// Await the task result.
175    ///
176    /// # Errors
177    ///
178    /// - [`BlockingError::Panicked`] — the task closure panicked.
179    /// - [`BlockingError::SupervisorDropped`] — the task was aborted or the
180    ///   supervisor was dropped before a value was produced.
181    pub async fn join(self) -> Result<R, BlockingError> {
182        self.rx
183            .await
184            .unwrap_or(Err(BlockingError::SupervisorDropped))
185    }
186
187    /// Non-blocking poll: return the result if the task has already finished, or `None`
188    /// if it is still running.
189    ///
190    /// This is the `BlockingHandle` equivalent of `FutureExt::now_or_never` on a
191    /// [`tokio::task::JoinHandle`]. Call this inside a synchronous context (e.g., between
192    /// agent turns) to apply a completed background result without blocking.
193    ///
194    /// The handle is consumed on success. If the task is not yet done, the handle
195    /// is returned as `Err(self)` so the caller can re-store it.
196    ///
197    /// # Examples
198    ///
199    /// ```rust,no_run
200    /// # use zeph_common::task_supervisor::{BlockingHandle, BlockingError};
201    /// async fn example(mut handle: BlockingHandle<u32>) {
202    ///     // Try to get the result without blocking.
203    ///     match handle.try_join() {
204    ///         Ok(result) => println!("done: {result:?}"),
205    ///         Err(handle) => {
206    ///             // Task still running — `handle` is returned for re-storage.
207    ///             drop(handle);
208    ///         }
209    ///     }
210    /// }
211    /// ```
212    ///
213    /// # Errors
214    ///
215    /// Returns `Err(self)` when the task has not yet produced a result (still running).
216    /// The inner `Ok(Err(BlockingError::...))` variants are returned when the task
217    /// panicked or the supervisor was dropped before the task completed.
218    pub fn try_join(mut self) -> Result<Result<R, BlockingError>, Self> {
219        match self.rx.try_recv() {
220            Ok(result) => Ok(result),
221            Err(tokio::sync::oneshot::error::TryRecvError::Empty) => Err(self),
222            Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
223                Ok(Err(BlockingError::SupervisorDropped))
224            }
225        }
226    }
227
228    /// Abort the underlying task immediately.
229    pub fn abort(&self) {
230        self.abort.abort();
231    }
232}
233
234/// Point-in-time state of a supervised task.
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub enum TaskStatus {
237    /// Task is actively running.
238    Running,
239    /// Task is waiting for the restart delay before the next attempt.
240    Restarting { attempt: u32, max: u32 },
241    /// Task completed normally.
242    Completed,
243    /// Task was force-aborted during shutdown.
244    Aborted,
245    /// Task exhausted all restart attempts and is permanently failed.
246    Failed { reason: String },
247}
248
249/// Point-in-time snapshot of a supervised task, returned by [`TaskSupervisor::snapshot`].
250#[derive(Debug, Clone)]
251/// Observability surface per field:
252///
253/// | Field | tokio-console | Jaeger / OTLP | TUI | `metrics` histogram |
254/// |-------|--------------|--------------|-----|---------------------|
255/// | `name` | span name | span name | task list | label `"task"` |
256/// | `task.wall_time_ms` | — | span field | — | `zeph.task.wall_time_ms` |
257/// | `task.cpu_time_ms` | — | span field | — | `zeph.task.cpu_time_ms` |
258/// | `status` | — | — | task list | — |
259/// | `restart_count` | — | — | task list | — |
260pub struct TaskSnapshot {
261    /// Task name.
262    pub name: Arc<str>,
263    /// Current status.
264    pub status: TaskStatus,
265    /// Instant the task was first spawned.
266    pub started_at: Instant,
267    /// Number of times the task has been restarted.
268    pub restart_count: u32,
269}
270
271// ── Internal types ───────────────────────────────────────────────────────────
272
273type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
274type BoxFactory = Box<dyn Fn() -> BoxFuture + Send + Sync>;
275
276struct TaskEntry {
277    name: Arc<str>,
278    status: TaskStatus,
279    started_at: Instant,
280    restart_count: u32,
281    restart_policy: RestartPolicy,
282    abort_handle: AbortHandle,
283    /// `Some` only for `Restart` policy tasks.
284    factory: Option<BoxFactory>,
285}
286
287/// How a supervised task ended.
288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289enum CompletionKind {
290    /// Future returned normally.
291    Normal,
292    /// Future panicked.
293    Panicked,
294    /// Future was cancelled via the cancellation token or abort handle.
295    Cancelled,
296}
297
298struct Completion {
299    name: Arc<str>,
300    kind: CompletionKind,
301}
302
303struct SupervisorState {
304    tasks: HashMap<Arc<str>, TaskEntry>,
305}
306
307struct Inner {
308    state: parking_lot::Mutex<SupervisorState>,
309    /// Completion events from spawned tasks → reap driver.
310    /// Lives in `Inner` (not `SupervisorState`) to avoid double mutex acquisition
311    /// — callers clone it once during spawn without re-locking state.
312    completion_tx: mpsc::UnboundedSender<Completion>,
313    cancel: CancellationToken,
314    /// Limits the number of concurrently running `spawn_blocking` tasks to prevent
315    /// runaway thread-pool growth under burst load.
316    blocking_semaphore: Arc<tokio::sync::Semaphore>,
317    /// Notified when `active_count()` reaches zero so `shutdown_all` wakes immediately.
318    shutdown_notify: Arc<tokio::sync::Notify>,
319}
320
321// ── Main type ────────────────────────────────────────────────────────────────
322
323/// Shared, cloneable handle to the supervised lifecycle task registry.
324///
325/// `TaskSupervisor` manages named, long-lived background tasks with restart
326/// policies, health snapshots, and graceful shutdown. It is `Clone + Send + Sync`
327/// so it can be distributed to bootstrap code, TUI, and shutdown orchestration
328/// without any additional synchronisation.
329///
330/// # Thread safety
331///
332/// Interior state is guarded by a `parking_lot::Mutex`. The lock is **never**
333/// held across `.await` points.
334///
335/// # Examples
336///
337/// ```rust,no_run
338/// use std::time::Duration;
339/// use tokio_util::sync::CancellationToken;
340/// use zeph_common::task_supervisor::{RestartPolicy, TaskDescriptor, TaskSupervisor};
341///
342/// # #[tokio::main]
343/// # async fn main() {
344/// let cancel = CancellationToken::new();
345/// let sup = TaskSupervisor::new(cancel.clone());
346///
347/// let _handle = sup.spawn(TaskDescriptor {
348///     name: "watcher",
349///     restart: RestartPolicy::RunOnce,
350///     factory: || async { tokio::time::sleep(std::time::Duration::from_secs(1)).await },
351/// });
352///
353/// sup.shutdown_all(Duration::from_secs(5)).await;
354/// # }
355/// ```
356#[derive(Clone)]
357pub struct TaskSupervisor {
358    inner: Arc<Inner>,
359}
360
361impl TaskSupervisor {
362    /// Create a new supervisor and start its reap driver.
363    ///
364    /// The `cancel` token is propagated into every spawned task via `tokio::select!`.
365    /// When the token is cancelled, all tasks exit cooperatively on their next
366    /// cancellation check. Call [`shutdown_all`][Self::shutdown_all] to wait for
367    /// them to finish.
368    ///
369    /// When called outside a Tokio runtime context (e.g. in synchronous unit tests),
370    /// the reap driver is skipped. The supervisor still accepts task registrations but
371    /// completion callbacks are not processed — safe because no tasks can actually be
372    /// spawned without a runtime.
373    #[must_use]
374    pub fn new(cancel: CancellationToken) -> Self {
375        // NOTE: unbounded channel is acceptable here because supervised tasks are
376        // O(10–20) lifecycle services, not high-throughput work. Backpressure would
377        // complicate the spawn path without practical benefit.
378        let (completion_tx, completion_rx) = mpsc::unbounded_channel();
379        let inner = Arc::new(Inner {
380            state: parking_lot::Mutex::new(SupervisorState {
381                tasks: HashMap::new(),
382            }),
383            completion_tx,
384            cancel: cancel.clone(),
385            blocking_semaphore: Arc::new(tokio::sync::Semaphore::new(8)),
386            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
387        });
388
389        // Only start the reap driver when a Tokio runtime is available. In synchronous
390        // unit tests that construct Agent/LifecycleState directly there is no reactor,
391        // so we skip the spawn. Without a runtime no tasks can be spawned either, so
392        // the driver is not needed.
393        if tokio::runtime::Handle::try_current().is_ok() {
394            Self::start_reap_driver(Arc::clone(&inner), completion_rx, cancel);
395        }
396
397        Self { inner }
398    }
399
400    /// Spawn a named, supervised async task.
401    ///
402    /// If a task with the same `name` already exists, it is aborted before the
403    /// new one is started.
404    ///
405    /// # Examples
406    ///
407    /// ```rust,no_run
408    /// use std::time::Duration;
409    /// use tokio_util::sync::CancellationToken;
410    /// use zeph_common::task_supervisor::{RestartPolicy, TaskDescriptor, TaskHandle, TaskSupervisor};
411    ///
412    /// # #[tokio::main]
413    /// # async fn main() {
414    /// let cancel = CancellationToken::new();
415    /// let sup = TaskSupervisor::new(cancel.clone());
416    ///
417    /// let handle: TaskHandle = sup.spawn(TaskDescriptor {
418    ///     name: "config-watcher",
419    ///     restart: RestartPolicy::Restart { max: 3, base_delay: Duration::from_secs(1) },
420    ///     factory: || async { /* watch loop */ },
421    /// });
422    /// # }
423    /// ```
424    pub fn spawn<F, Fut>(&self, desc: TaskDescriptor<F>) -> TaskHandle
425    where
426        F: Fn() -> Fut + Send + Sync + 'static,
427        Fut: Future<Output = ()> + Send + 'static,
428    {
429        let factory: BoxFactory = Box::new(move || Box::pin((desc.factory)()));
430        let cancel = self.inner.cancel.clone();
431        let completion_tx = self.inner.completion_tx.clone();
432        let name: Arc<str> = Arc::from(desc.name);
433
434        let (abort_handle, jh) = Self::do_spawn(desc.name, &factory, cancel);
435        Self::wire_completion_reporter(Arc::clone(&name), jh, completion_tx);
436
437        let entry = TaskEntry {
438            name: Arc::clone(&name),
439            status: TaskStatus::Running,
440            started_at: Instant::now(),
441            restart_count: 0,
442            restart_policy: desc.restart,
443            abort_handle: abort_handle.clone(),
444            factory: match desc.restart {
445                RestartPolicy::RunOnce => None,
446                RestartPolicy::Restart { .. } => Some(factory),
447            },
448        };
449
450        {
451            let mut state = self.inner.state.lock();
452            if let Some(old) = state.tasks.remove(&name) {
453                old.abort_handle.abort();
454            }
455            state.tasks.insert(Arc::clone(&name), entry);
456        }
457
458        TaskHandle {
459            name: desc.name,
460            abort: abort_handle,
461        }
462    }
463
464    /// Spawn a CPU-bound closure on the OS blocking thread pool.
465    ///
466    /// The closure runs via [`tokio::task::spawn_blocking`] — it is never polled
467    /// on tokio worker threads and cannot block async I/O. The task is registered
468    /// in the supervisor registry and is visible to [`snapshot`][Self::snapshot]
469    /// and [`shutdown_all`][Self::shutdown_all].
470    ///
471    /// Dropping the returned [`BlockingHandle`] without calling `.join()` does
472    /// **not** cancel the task; it runs to completion but the result is discarded.
473    ///
474    /// A panic inside `f` is captured and returned as [`BlockingError::Panicked`]
475    /// rather than propagating to the caller.
476    ///
477    /// # Examples
478    ///
479    /// ```rust,no_run
480    /// use std::sync::Arc;
481    /// use tokio_util::sync::CancellationToken;
482    /// use zeph_common::task_supervisor::{BlockingHandle, TaskSupervisor};
483    ///
484    /// # #[tokio::main]
485    /// # async fn main() {
486    /// let cancel = CancellationToken::new();
487    /// let sup = TaskSupervisor::new(cancel);
488    ///
489    /// let handle: BlockingHandle<u32> = sup.spawn_blocking(Arc::from("compute"), || {
490    ///     // CPU-bound work — safe to block here
491    ///     42_u32
492    /// });
493    /// let result = handle.join().await.unwrap();
494    /// assert_eq!(result, 42);
495    /// # }
496    /// ```
497    ///
498    /// # Capacity limit
499    ///
500    /// At most 8 `spawn_blocking` tasks run concurrently. Additional tasks wait for a
501    /// semaphore permit, bounding thread-pool growth under burst load.
502    ///
503    /// # Panics
504    ///
505    /// Panics inside `f` are captured and returned as [`BlockingError::Panicked`] — they
506    /// do not propagate to the caller.
507    #[allow(clippy::needless_pass_by_value)] // `name` is cloned into async task and registry
508    pub fn spawn_blocking<F, R>(&self, name: Arc<str>, f: F) -> BlockingHandle<R>
509    where
510        F: FnOnce() -> R + Send + 'static,
511        R: Send + 'static,
512    {
513        let (tx, rx) = oneshot::channel::<Result<R, BlockingError>>();
514        let span = tracing::info_span!(
515            "supervised_blocking_task",
516            task.name = %name,
517            task.wall_time_ms = tracing::field::Empty,
518            task.cpu_time_ms = tracing::field::Empty,
519        );
520
521        let semaphore = Arc::clone(&self.inner.blocking_semaphore);
522        let inner = Arc::clone(&self.inner);
523        let name_clone = Arc::clone(&name);
524        let completion_tx = self.inner.completion_tx.clone();
525
526        // Wrap the blocking spawn in an async task that first acquires a semaphore
527        // permit, bounding the number of concurrently running blocking tasks to 8.
528        let outer = tokio::spawn(async move {
529            let _permit = semaphore
530                .acquire_owned()
531                .await
532                .expect("blocking semaphore closed");
533
534            let name_for_measure = Arc::clone(&name_clone);
535            let join_handle = tokio::task::spawn_blocking(move || {
536                let _enter = span.enter();
537                measure_blocking(&name_for_measure, f)
538            });
539            let abort = join_handle.abort_handle();
540
541            // Update registry with the real abort handle now that spawn_blocking is live.
542            {
543                let mut state = inner.state.lock();
544                if let Some(entry) = state.tasks.get_mut(&name_clone) {
545                    entry.abort_handle = abort;
546                }
547            }
548
549            let kind = match join_handle.await {
550                Ok(val) => {
551                    let _ = tx.send(Ok(val));
552                    CompletionKind::Normal
553                }
554                Err(e) if e.is_panic() => {
555                    let _ = tx.send(Err(BlockingError::Panicked));
556                    CompletionKind::Panicked
557                }
558                Err(_) => {
559                    // Aborted — drop tx so rx returns SupervisorDropped.
560                    CompletionKind::Cancelled
561                }
562            };
563            // _permit released here, freeing the semaphore slot.
564            let _ = completion_tx.send(Completion {
565                name: name_clone,
566                kind,
567            });
568        });
569        let abort = outer.abort_handle();
570
571        // Register in registry so snapshot/shutdown sees the task.
572        {
573            let mut state = self.inner.state.lock();
574            if let Some(old) = state.tasks.remove(&name) {
575                old.abort_handle.abort();
576            }
577            state.tasks.insert(
578                Arc::clone(&name),
579                TaskEntry {
580                    name: Arc::clone(&name),
581                    status: TaskStatus::Running,
582                    started_at: Instant::now(),
583                    restart_count: 0,
584                    restart_policy: RestartPolicy::RunOnce,
585                    abort_handle: abort.clone(),
586                    factory: None,
587                },
588            );
589        }
590
591        BlockingHandle { rx, abort }
592    }
593
594    /// Spawn an async task that produces a typed result value (runs on tokio worker thread).
595    ///
596    /// Unlike [`spawn`][Self::spawn], no restart policy is supported — the task
597    /// runs once. The task is registered in the supervisor registry under the
598    /// provided `name` and is visible to [`snapshot`][Self::snapshot] and
599    /// [`shutdown_all`][Self::shutdown_all].
600    ///
601    /// For CPU-bound work that must not block tokio workers, use
602    /// [`spawn_blocking`][Self::spawn_blocking] instead.
603    ///
604    /// # Examples
605    ///
606    /// ```rust,no_run
607    /// use std::sync::Arc;
608    /// use tokio_util::sync::CancellationToken;
609    /// use zeph_common::task_supervisor::{BlockingHandle, TaskSupervisor};
610    ///
611    /// # #[tokio::main]
612    /// # async fn main() {
613    /// let cancel = CancellationToken::new();
614    /// let sup = TaskSupervisor::new(cancel.clone());
615    ///
616    /// let handle: BlockingHandle<u32> = sup.spawn_oneshot(Arc::from("compute"), || async { 42_u32 });
617    /// let result = handle.join().await.unwrap();
618    /// assert_eq!(result, 42);
619    /// # }
620    /// ```
621    pub fn spawn_oneshot<F, Fut, R>(&self, name: Arc<str>, factory: F) -> BlockingHandle<R>
622    where
623        F: FnOnce() -> Fut + Send + 'static,
624        Fut: Future<Output = R> + Send + 'static,
625        R: Send + 'static,
626    {
627        let (tx, rx) = oneshot::channel::<Result<R, BlockingError>>();
628        let cancel = self.inner.cancel.clone();
629        let span = tracing::info_span!("supervised_task", task.name = %name);
630        let join_handle: tokio::task::JoinHandle<Option<R>> = tokio::spawn(
631            async move {
632                let fut = factory();
633                tokio::select! {
634                    result = fut => Some(result),
635                    () = cancel.cancelled() => None,
636                }
637            }
638            .instrument(span),
639        );
640        let abort = join_handle.abort_handle();
641
642        {
643            let mut state = self.inner.state.lock();
644            if let Some(old) = state.tasks.remove(&name) {
645                old.abort_handle.abort();
646            }
647            state.tasks.insert(
648                Arc::clone(&name),
649                TaskEntry {
650                    name: Arc::clone(&name),
651                    status: TaskStatus::Running,
652                    started_at: Instant::now(),
653                    restart_count: 0,
654                    restart_policy: RestartPolicy::RunOnce,
655                    abort_handle: abort.clone(),
656                    factory: None,
657                },
658            );
659        }
660
661        let completion_tx = self.inner.completion_tx.clone();
662        tokio::spawn(async move {
663            let kind = match join_handle.await {
664                Ok(Some(val)) => {
665                    let _ = tx.send(Ok(val));
666                    CompletionKind::Normal
667                }
668                Err(e) if e.is_panic() => {
669                    let _ = tx.send(Err(BlockingError::Panicked));
670                    CompletionKind::Panicked
671                }
672                _ => CompletionKind::Cancelled,
673            };
674            let _ = completion_tx.send(Completion { name, kind });
675        });
676        BlockingHandle { rx, abort }
677    }
678
679    /// Abort a task by name. No-op if no task with that name is registered.
680    pub fn abort(&self, name: &'static str) {
681        let state = self.inner.state.lock();
682        let key: Arc<str> = Arc::from(name);
683        if let Some(entry) = state.tasks.get(&key) {
684            entry.abort_handle.abort();
685            tracing::debug!(task.name = name, "task aborted via supervisor");
686        }
687    }
688
689    /// Gracefully shut down all supervised tasks.
690    ///
691    /// Cancels the supervisor's [`CancellationToken`] and waits up to `timeout`
692    /// for all tasks to exit. Tasks that do not exit within the timeout are
693    /// aborted forcefully and their registry entries updated to [`TaskStatus::Aborted`].
694    ///
695    /// # Note
696    ///
697    /// This cancels the token passed to [`TaskSupervisor::new`]. If you share
698    /// that token with other subsystems, they will be cancelled too. Use a child
699    /// token (`cancel.child_token()`) when the supervisor should not affect
700    /// unrelated components.
701    pub async fn shutdown_all(&self, timeout: Duration) {
702        self.inner.cancel.cancel();
703        let sleep = tokio::time::sleep(timeout);
704        tokio::pin!(sleep);
705        loop {
706            let active = self.active_count();
707            if active == 0 {
708                break;
709            }
710            // Subscribe before re-checking so we cannot miss a notification that
711            // fires between the active_count() call above and the select below.
712            let notified = self.inner.shutdown_notify.notified();
713            tokio::select! {
714                biased;
715                () = notified => {
716                    // reap driver decremented active count — re-check at top of loop
717                }
718                () = &mut sleep => {
719                    let mut remaining_names: Vec<Arc<str>> = Vec::new();
720                    {
721                        let mut state = self.inner.state.lock();
722                        for entry in state.tasks.values_mut() {
723                            if matches!(
724                                entry.status,
725                                TaskStatus::Running | TaskStatus::Restarting { .. }
726                            ) {
727                                remaining_names.push(Arc::clone(&entry.name));
728                                entry.abort_handle.abort();
729                                entry.status = TaskStatus::Aborted;
730                            }
731                        }
732                    }
733                    tracing::warn!(
734                        remaining = active,
735                        tasks = ?remaining_names,
736                        "shutdown timeout — aborting remaining tasks"
737                    );
738                    break;
739                }
740            }
741        }
742    }
743
744    /// Return a point-in-time snapshot of all registered tasks.
745    ///
746    /// Suitable for TUI status panels and structured logging. The returned
747    /// list is sorted by `started_at` ascending.
748    #[must_use]
749    pub fn snapshot(&self) -> Vec<TaskSnapshot> {
750        let state = self.inner.state.lock();
751        let mut snaps: Vec<TaskSnapshot> = state
752            .tasks
753            .values()
754            .map(|e| TaskSnapshot {
755                name: Arc::clone(&e.name),
756                status: e.status.clone(),
757                started_at: e.started_at,
758                restart_count: e.restart_count,
759            })
760            .collect();
761        snaps.sort_by_key(|s| s.started_at);
762        snaps
763    }
764
765    /// Return the number of tasks currently in `Running` or `Restarting` state.
766    #[must_use]
767    pub fn active_count(&self) -> usize {
768        let state = self.inner.state.lock();
769        state
770            .tasks
771            .values()
772            .filter(|e| {
773                matches!(
774                    e.status,
775                    TaskStatus::Running | TaskStatus::Restarting { .. }
776                )
777            })
778            .count()
779    }
780
781    /// Return a clone of the supervisor's [`CancellationToken`].
782    ///
783    /// Callers can use this to check whether shutdown has been initiated.
784    #[must_use]
785    pub fn cancellation_token(&self) -> CancellationToken {
786        self.inner.cancel.clone()
787    }
788
789    // ── Internal helpers ──────────────────────────────────────────────────────
790
791    /// Spawn the actual tokio task. Returns `(AbortHandle, JoinHandle)`.
792    fn do_spawn(
793        name: &'static str,
794        factory: &BoxFactory,
795        cancel: CancellationToken,
796    ) -> (AbortHandle, tokio::task::JoinHandle<()>) {
797        let fut = factory();
798        let span = tracing::info_span!("supervised_task", task.name = name);
799        let jh = tokio::spawn(
800            async move {
801                tokio::select! {
802                    () = fut => {},
803                    () = cancel.cancelled() => {},
804                }
805            }
806            .instrument(span),
807        );
808        let abort = jh.abort_handle();
809        (abort, jh)
810    }
811
812    /// Wire a completion reporter: drives `jh` and sends the result to `completion_tx`.
813    fn wire_completion_reporter(
814        name: Arc<str>,
815        jh: tokio::task::JoinHandle<()>,
816        completion_tx: mpsc::UnboundedSender<Completion>,
817    ) {
818        tokio::spawn(async move {
819            let kind = match jh.await {
820                Ok(()) => CompletionKind::Normal,
821                Err(e) if e.is_panic() => CompletionKind::Panicked,
822                Err(_) => CompletionKind::Cancelled,
823            };
824            let _ = completion_tx.send(Completion { name, kind });
825        });
826    }
827
828    /// Spawn the reap driver. The driver processes completion events from the mpsc channel.
829    ///
830    /// After the cancellation token fires, the driver continues draining the channel
831    /// until it is empty — this ensures that tasks which completed just before cancel
832    /// have their registry entries updated, allowing `shutdown_all` to observe
833    /// `active_count() == 0` correctly.
834    fn start_reap_driver(
835        inner: Arc<Inner>,
836        mut completion_rx: mpsc::UnboundedReceiver<Completion>,
837        cancel: CancellationToken,
838    ) {
839        tokio::spawn(async move {
840            // Phase 1: normal operation — process completions until cancel fires.
841            loop {
842                tokio::select! {
843                    biased;
844                    Some(completion) = completion_rx.recv() => {
845                        Self::handle_completion(&inner, completion).await;
846                    }
847                    () = cancel.cancelled() => break,
848                }
849            }
850
851            // Phase 2: post-cancel drain — keep receiving completions until the
852            // registry reports no active tasks, or the channel closes, or the safety
853            // deadline expires. This prevents losing completions that arrive after
854            // tasks observe cancellation (#3161).
855            let drain_deadline = tokio::time::Instant::now() + SHUTDOWN_DRAIN_TIMEOUT;
856            let active = Self::has_active_tasks(&inner);
857            tracing::debug!(active, "reap driver entered post-cancel drain phase");
858            loop {
859                if !Self::has_active_tasks(&inner) {
860                    break;
861                }
862                let remaining =
863                    drain_deadline.saturating_duration_since(tokio::time::Instant::now());
864                if remaining.is_zero() {
865                    break;
866                }
867                match tokio::time::timeout(remaining, completion_rx.recv()).await {
868                    Ok(Some(completion)) => Self::handle_completion(&inner, completion).await,
869                    // channel closed (unreachable in practice — senders live in Inner), or deadline elapsed
870                    Ok(None) | Err(_) => break,
871                }
872            }
873            tracing::debug!(
874                active = Self::has_active_tasks(&inner),
875                "reap driver drain phase complete"
876            );
877        });
878    }
879
880    /// Returns `true` if any task is in `Running` or `Restarting` state.
881    fn has_active_tasks(inner: &Arc<Inner>) -> bool {
882        let state = inner.state.lock();
883        state.tasks.values().any(|e| {
884            matches!(
885                e.status,
886                TaskStatus::Running | TaskStatus::Restarting { .. }
887            )
888        })
889    }
890
891    /// Process a single task completion event.
892    ///
893    /// Lock is never held across `.await`. Phase 1 classifies the completion
894    /// under lock; Phase 2 sleeps with exponential backoff without a lock;
895    /// Phase 3 spawns the next instance and updates the registry.
896    async fn handle_completion(inner: &Arc<Inner>, completion: Completion) {
897        // Short-circuit: once cancellation has fired, never schedule restarts.
898        // Without this, Restart-policy tasks re-register as Running, causing
899        // has_active_tasks() to stay true and the drain loop to spin until timeout.
900        if inner.cancel.is_cancelled() {
901            {
902                let mut state = inner.state.lock();
903                state.tasks.remove(&completion.name);
904            }
905            inner.shutdown_notify.notify_waiters();
906            return;
907        }
908
909        let Some((attempt, max, delay)) = Self::classify_completion(inner, &completion) else {
910            // Task removed from registry (RunOnce completed or Restart exhausted) —
911            // wake shutdown_all so it can re-check active_count immediately.
912            inner.shutdown_notify.notify_waiters();
913            return;
914        };
915
916        tracing::warn!(
917            task.name = %completion.name,
918            attempt,
919            max,
920            delay_ms = delay.as_millis(),
921            "restarting supervised task"
922        );
923
924        if !delay.is_zero() {
925            tokio::time::sleep(delay).await;
926        }
927
928        Self::do_restart(inner, &completion.name, attempt);
929    }
930
931    /// Phase 1: classify the completion under lock and return restart parameters if needed.
932    ///
933    /// Returns `Some((attempt, max, backoff_delay))` when a restart should be scheduled.
934    fn classify_completion(
935        inner: &Arc<Inner>,
936        completion: &Completion,
937    ) -> Option<(u32, u32, Duration)> {
938        let mut state = inner.state.lock();
939        let entry = state.tasks.get_mut(&completion.name)?;
940
941        match completion.kind {
942            CompletionKind::Panicked => {
943                tracing::warn!(task.name = %completion.name, "supervised task panicked");
944            }
945            CompletionKind::Normal => {
946                tracing::info!(task.name = %completion.name, "supervised task completed");
947            }
948            CompletionKind::Cancelled => {
949                tracing::debug!(task.name = %completion.name, "supervised task cancelled");
950            }
951        }
952
953        match entry.restart_policy {
954            RestartPolicy::RunOnce => {
955                entry.status = TaskStatus::Completed;
956                state.tasks.remove(&completion.name);
957                None
958            }
959            RestartPolicy::Restart { max, base_delay } => {
960                // Only restart on panic — normal exit and cancellation are not errors.
961                if completion.kind != CompletionKind::Panicked {
962                    entry.status = TaskStatus::Completed;
963                    state.tasks.remove(&completion.name);
964                    return None;
965                }
966                if entry.restart_count >= max {
967                    let reason = format!("panicked after {max} restart(s)");
968                    tracing::error!(
969                        task.name = %completion.name,
970                        attempts = max,
971                        "task failed permanently"
972                    );
973                    entry.status = TaskStatus::Failed { reason };
974                    None
975                } else {
976                    let attempt = entry.restart_count + 1;
977                    entry.status = TaskStatus::Restarting { attempt, max };
978                    // Exponential backoff: base_delay * 2^(attempt-1), capped at MAX_RESTART_DELAY.
979                    let multiplier = 1_u32
980                        .checked_shl(attempt.saturating_sub(1))
981                        .unwrap_or(u32::MAX);
982                    let delay = base_delay.saturating_mul(multiplier).min(MAX_RESTART_DELAY);
983                    Some((attempt, max, delay))
984                }
985            }
986        }
987        // lock released here
988    }
989
990    /// Phase 3: TOCTOU check, collect spawn params under lock, then spawn outside.
991    fn do_restart(inner: &Arc<Inner>, name: &Arc<str>, attempt: u32) {
992        let spawn_params = {
993            let mut state = inner.state.lock();
994            let Some(entry) = state.tasks.get_mut(name.as_ref()) else {
995                tracing::debug!(
996                    task.name = %name,
997                    "task removed during restart delay — skipping"
998                );
999                return;
1000            };
1001            if !matches!(entry.status, TaskStatus::Restarting { .. }) {
1002                return;
1003            }
1004            let Some(factory) = &entry.factory else {
1005                return;
1006            };
1007            // Wrap factory() in catch_unwind to prevent a factory panic from crashing
1008            // the reap driver and orphaning the registry.
1009            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(factory)) {
1010                Err(_) => {
1011                    let reason = format!("factory panicked on restart attempt {attempt}");
1012                    tracing::error!(task.name = %name, attempt, "factory panicked during restart");
1013                    entry.status = TaskStatus::Failed { reason };
1014                    None
1015                }
1016                Ok(fut) => Some((
1017                    fut,
1018                    inner.cancel.clone(),
1019                    inner.completion_tx.clone(),
1020                    name.clone(),
1021                )),
1022            }
1023            // lock released here
1024        };
1025
1026        let Some((fut, cancel, completion_tx, name)) = spawn_params else {
1027            return;
1028        };
1029
1030        let span = tracing::info_span!("supervised_task", task.name = %name);
1031        let jh = tokio::spawn(
1032            async move {
1033                tokio::select! {
1034                    () = fut => {},
1035                    () = cancel.cancelled() => {},
1036                }
1037            }
1038            .instrument(span),
1039        );
1040        let new_abort = jh.abort_handle();
1041
1042        {
1043            let mut state = inner.state.lock();
1044            if let Some(entry) = state.tasks.get_mut(name.as_ref()) {
1045                entry.restart_count = attempt;
1046                entry.status = TaskStatus::Running;
1047                entry.abort_handle = new_abort;
1048            }
1049        }
1050
1051        Self::wire_completion_reporter(name, jh, completion_tx);
1052    }
1053}
1054
1055// ── Task metrics helpers ──────────────────────────────────────────────────────
1056
1057/// Run `f` and record wall-time and CPU-time metrics via `metrics` crate.
1058#[inline]
1059fn measure_blocking<F, R>(name: &str, f: F) -> R
1060where
1061    F: FnOnce() -> R,
1062{
1063    use cpu_time::ThreadTime;
1064    let wall_start = std::time::Instant::now();
1065    let cpu_start = ThreadTime::now();
1066    let result = f();
1067    let wall_ms = wall_start.elapsed().as_secs_f64() * 1000.0;
1068    let cpu_ms = cpu_start.elapsed().as_secs_f64() * 1000.0;
1069    metrics::histogram!("zeph.task.wall_time_ms", "task" => name.to_owned()).record(wall_ms);
1070    metrics::histogram!("zeph.task.cpu_time_ms", "task" => name.to_owned()).record(cpu_ms);
1071    tracing::Span::current().record("task.wall_time_ms", wall_ms);
1072    tracing::Span::current().record("task.cpu_time_ms", cpu_ms);
1073    result
1074}
1075
1076// ── BlockingSpawner impl ──────────────────────────────────────────────────────
1077
1078impl BlockingSpawner for TaskSupervisor {
1079    /// Spawn a named blocking closure through the supervisor.
1080    ///
1081    /// The task is registered in the supervisor registry (visible in
1082    /// [`snapshot`][Self::snapshot] and subject to graceful shutdown) before
1083    /// the closure begins executing.
1084    fn spawn_blocking_named(
1085        &self,
1086        name: Arc<str>,
1087        f: Box<dyn FnOnce() + Send + 'static>,
1088    ) -> tokio::task::JoinHandle<()> {
1089        let handle = self.spawn_blocking(Arc::clone(&name), f);
1090        tokio::spawn(async move {
1091            if let Err(e) = handle.join().await {
1092                tracing::error!(task.name = %name, error = %e, "supervised blocking task failed");
1093            }
1094        })
1095    }
1096}
1097
1098// ── Unit tests ────────────────────────────────────────────────────────────────
1099
1100#[cfg(test)]
1101mod tests {
1102    use std::sync::Arc;
1103    use std::sync::atomic::{AtomicU32, Ordering};
1104    use std::time::Duration;
1105
1106    use tokio_util::sync::CancellationToken;
1107
1108    use super::*;
1109
1110    fn make_supervisor() -> (TaskSupervisor, CancellationToken) {
1111        let cancel = CancellationToken::new();
1112        let sup = TaskSupervisor::new(cancel.clone());
1113        (sup, cancel)
1114    }
1115
1116    #[tokio::test]
1117    async fn test_spawn_and_complete() {
1118        let (sup, _cancel) = make_supervisor();
1119
1120        let done = Arc::new(tokio::sync::Notify::new());
1121        let done2 = Arc::clone(&done);
1122
1123        sup.spawn(TaskDescriptor {
1124            name: "simple",
1125            restart: RestartPolicy::RunOnce,
1126            factory: move || {
1127                let d = Arc::clone(&done2);
1128                async move {
1129                    d.notify_one();
1130                }
1131            },
1132        });
1133
1134        tokio::time::timeout(Duration::from_secs(2), done.notified())
1135            .await
1136            .expect("task should complete");
1137
1138        tokio::time::sleep(Duration::from_millis(50)).await;
1139        assert_eq!(
1140            sup.active_count(),
1141            0,
1142            "RunOnce task should be removed after completion"
1143        );
1144    }
1145
1146    #[tokio::test]
1147    async fn test_panic_capture() {
1148        let (sup, _cancel) = make_supervisor();
1149
1150        sup.spawn(TaskDescriptor {
1151            name: "panicking",
1152            restart: RestartPolicy::RunOnce,
1153            factory: || async { panic!("intentional test panic") },
1154        });
1155
1156        tokio::time::sleep(Duration::from_millis(200)).await;
1157
1158        let snaps = sup.snapshot();
1159        assert!(
1160            snaps.iter().all(|s| s.name.as_ref() != "panicking"),
1161            "entry should be reaped"
1162        );
1163        assert_eq!(
1164            sup.active_count(),
1165            0,
1166            "active count must be 0 after RunOnce panic"
1167        );
1168    }
1169
1170    /// Regression test for S2: Restart-policy tasks must only restart on panic,
1171    /// not on normal completion.
1172    #[tokio::test]
1173    async fn test_restart_only_on_panic() {
1174        let (sup, _cancel) = make_supervisor();
1175
1176        // Part 1: normal completion — must NOT restart.
1177        let normal_counter = Arc::new(AtomicU32::new(0));
1178        let nc = Arc::clone(&normal_counter);
1179        sup.spawn(TaskDescriptor {
1180            name: "normal-exit",
1181            restart: RestartPolicy::Restart {
1182                max: 3,
1183                base_delay: Duration::from_millis(10),
1184            },
1185            factory: move || {
1186                let c = Arc::clone(&nc);
1187                async move {
1188                    c.fetch_add(1, Ordering::SeqCst);
1189                    // Returns normally — no panic.
1190                }
1191            },
1192        });
1193
1194        tokio::time::sleep(Duration::from_millis(300)).await;
1195        assert_eq!(
1196            normal_counter.load(Ordering::SeqCst),
1197            1,
1198            "normal exit must not restart"
1199        );
1200        assert!(
1201            sup.snapshot()
1202                .iter()
1203                .all(|s| s.name.as_ref() != "normal-exit"),
1204            "entry removed after normal exit"
1205        );
1206
1207        // Part 2: panic — MUST restart up to max times.
1208        let panic_counter = Arc::new(AtomicU32::new(0));
1209        let pc = Arc::clone(&panic_counter);
1210        sup.spawn(TaskDescriptor {
1211            name: "panic-exit",
1212            restart: RestartPolicy::Restart {
1213                max: 2,
1214                base_delay: Duration::from_millis(10),
1215            },
1216            factory: move || {
1217                let c = Arc::clone(&pc);
1218                async move {
1219                    c.fetch_add(1, Ordering::SeqCst);
1220                    panic!("test panic");
1221                }
1222            },
1223        });
1224
1225        // initial + 2 restarts = 3 total
1226        tokio::time::sleep(Duration::from_millis(500)).await;
1227        assert!(
1228            panic_counter.load(Ordering::SeqCst) >= 3,
1229            "panicking task must restart max times"
1230        );
1231        let snap = sup
1232            .snapshot()
1233            .into_iter()
1234            .find(|s| s.name.as_ref() == "panic-exit");
1235        assert!(
1236            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1237            "task must be Failed after exhausting restarts"
1238        );
1239    }
1240
1241    #[tokio::test]
1242    async fn test_restart_policy() {
1243        let (sup, _cancel) = make_supervisor();
1244
1245        let counter = Arc::new(AtomicU32::new(0));
1246        let counter2 = Arc::clone(&counter);
1247
1248        sup.spawn(TaskDescriptor {
1249            name: "restartable",
1250            restart: RestartPolicy::Restart {
1251                max: 2,
1252                base_delay: Duration::from_millis(10),
1253            },
1254            factory: move || {
1255                let c = Arc::clone(&counter2);
1256                async move {
1257                    c.fetch_add(1, Ordering::SeqCst);
1258                    panic!("always panic");
1259                }
1260            },
1261        });
1262
1263        tokio::time::sleep(Duration::from_millis(500)).await;
1264
1265        let runs = counter.load(Ordering::SeqCst);
1266        assert!(
1267            runs >= 3,
1268            "expected at least 3 invocations (initial + 2 restarts), got {runs}"
1269        );
1270
1271        let snaps = sup.snapshot();
1272        let snap = snaps.iter().find(|s| s.name.as_ref() == "restartable");
1273        assert!(snap.is_some(), "failed task should remain in registry");
1274        assert!(
1275            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1276            "task should be Failed after exhausting retries"
1277        );
1278    }
1279
1280    /// Verify exponential backoff: delay doubles on each restart attempt.
1281    #[tokio::test]
1282    async fn test_exponential_backoff() {
1283        let (sup, _cancel) = make_supervisor();
1284
1285        let timestamps = Arc::new(parking_lot::Mutex::new(Vec::<std::time::Instant>::new()));
1286        let ts = Arc::clone(&timestamps);
1287
1288        sup.spawn(TaskDescriptor {
1289            name: "backoff-task",
1290            restart: RestartPolicy::Restart {
1291                max: 3,
1292                base_delay: Duration::from_millis(50),
1293            },
1294            factory: move || {
1295                let t = Arc::clone(&ts);
1296                async move {
1297                    t.lock().push(std::time::Instant::now());
1298                    panic!("always panic");
1299                }
1300            },
1301        });
1302
1303        // Wait long enough for all restarts: 50 + 100 + 200 ms = 350 ms + overhead
1304        tokio::time::sleep(Duration::from_millis(800)).await;
1305
1306        let ts = timestamps.lock();
1307        assert!(
1308            ts.len() >= 3,
1309            "expected at least 3 invocations, got {}",
1310            ts.len()
1311        );
1312
1313        // Verify delays are roughly doubling (within 2x tolerance for CI jitter).
1314        if ts.len() >= 3 {
1315            let d1 = ts[1].duration_since(ts[0]);
1316            let d2 = ts[2].duration_since(ts[1]);
1317            // d2 should be at least 1.5x d1 (allowing for jitter).
1318            assert!(
1319                d2 >= d1.mul_f64(1.5),
1320                "expected exponential backoff: d1={d1:?} d2={d2:?}"
1321            );
1322        }
1323    }
1324
1325    #[tokio::test]
1326    async fn test_graceful_shutdown() {
1327        let (sup, _cancel) = make_supervisor();
1328
1329        for name in ["svc-a", "svc-b", "svc-c"] {
1330            sup.spawn(TaskDescriptor {
1331                name,
1332                restart: RestartPolicy::RunOnce,
1333                factory: || async {
1334                    tokio::time::sleep(Duration::from_mins(1)).await;
1335                },
1336            });
1337        }
1338
1339        assert_eq!(sup.active_count(), 3);
1340
1341        tokio::time::timeout(
1342            Duration::from_secs(2),
1343            sup.shutdown_all(Duration::from_secs(1)),
1344        )
1345        .await
1346        .expect("shutdown should complete within timeout");
1347    }
1348
1349    /// Verify that force-aborted tasks get `TaskStatus::Aborted` in the registry (A2 fix).
1350    #[tokio::test]
1351    async fn test_force_abort_marks_aborted() {
1352        let cancel = CancellationToken::new();
1353        let sup = TaskSupervisor::new(cancel.clone());
1354
1355        sup.spawn(TaskDescriptor {
1356            name: "stubborn-for-abort",
1357            restart: RestartPolicy::RunOnce,
1358            factory: || async {
1359                // Does not cooperate with cancellation.
1360                std::future::pending::<()>().await;
1361            },
1362        });
1363
1364        // Use a very short timeout to trigger force-abort.
1365        sup.shutdown_all(Duration::from_millis(1)).await;
1366
1367        // Entry should be Aborted, not Running.
1368        let snaps = sup.snapshot();
1369        if let Some(snap) = snaps
1370            .iter()
1371            .find(|s| s.name.as_ref() == "stubborn-for-abort")
1372        {
1373            assert_eq!(
1374                snap.status,
1375                TaskStatus::Aborted,
1376                "force-aborted task must have Aborted status"
1377            );
1378        }
1379        // If entry was already reaped (cooperative cancel won), that's also acceptable.
1380    }
1381
1382    #[tokio::test]
1383    async fn test_registry_snapshot() {
1384        let (sup, _cancel) = make_supervisor();
1385
1386        for name in ["alpha", "beta"] {
1387            sup.spawn(TaskDescriptor {
1388                name,
1389                restart: RestartPolicy::RunOnce,
1390                factory: || async {
1391                    tokio::time::sleep(Duration::from_secs(10)).await;
1392                },
1393            });
1394        }
1395
1396        let snaps = sup.snapshot();
1397        assert_eq!(snaps.len(), 2);
1398        let names: Vec<&str> = snaps.iter().map(|s| s.name.as_ref()).collect();
1399        assert!(names.contains(&"alpha"));
1400        assert!(names.contains(&"beta"));
1401        assert!(snaps.iter().all(|s| s.status == TaskStatus::Running));
1402    }
1403
1404    #[tokio::test]
1405    async fn test_blocking_returns_value() {
1406        let (sup, cancel) = make_supervisor();
1407
1408        let handle: BlockingHandle<u32> = sup.spawn_blocking(Arc::from("compute"), || 42_u32);
1409        let result = handle.join().await.expect("should return value");
1410        assert_eq!(result, 42);
1411        cancel.cancel();
1412    }
1413
1414    #[tokio::test]
1415    async fn test_blocking_panic() {
1416        let (sup, _cancel) = make_supervisor();
1417
1418        let handle: BlockingHandle<u32> =
1419            sup.spawn_blocking(Arc::from("panicking-compute"), || panic!("intentional"));
1420        let err = handle
1421            .join()
1422            .await
1423            .expect_err("should return error on panic");
1424        assert_eq!(err, BlockingError::Panicked);
1425    }
1426
1427    /// Verify `spawn_blocking` tasks appear in registry (M3 fix).
1428    #[tokio::test]
1429    async fn test_blocking_registered_in_registry() {
1430        let (sup, cancel) = make_supervisor();
1431
1432        let (tx, rx) = std::sync::mpsc::channel::<()>();
1433        let _handle: BlockingHandle<()> =
1434            sup.spawn_blocking(Arc::from("blocking-task"), move || {
1435                // Block until signalled.
1436                let _ = rx.recv();
1437            });
1438
1439        tokio::time::sleep(Duration::from_millis(10)).await;
1440        assert_eq!(
1441            sup.active_count(),
1442            1,
1443            "blocking task must appear in active_count"
1444        );
1445
1446        let _ = tx.send(());
1447        tokio::time::sleep(Duration::from_millis(100)).await;
1448        assert_eq!(
1449            sup.active_count(),
1450            0,
1451            "blocking task must be removed after completion"
1452        );
1453
1454        cancel.cancel();
1455    }
1456
1457    /// Verify `spawn_oneshot` tasks appear in registry (M3 fix).
1458    #[tokio::test]
1459    async fn test_oneshot_registered_in_registry() {
1460        let (sup, cancel) = make_supervisor();
1461
1462        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1463        let _handle: BlockingHandle<()> =
1464            sup.spawn_oneshot(Arc::from("oneshot-task"), move || async move {
1465                let _ = rx.await;
1466            });
1467
1468        tokio::time::sleep(Duration::from_millis(10)).await;
1469        assert_eq!(
1470            sup.active_count(),
1471            1,
1472            "oneshot task must appear in active_count"
1473        );
1474
1475        let _ = tx.send(());
1476        tokio::time::sleep(Duration::from_millis(50)).await;
1477        assert_eq!(
1478            sup.active_count(),
1479            0,
1480            "oneshot task must be removed after completion"
1481        );
1482
1483        cancel.cancel();
1484    }
1485
1486    #[tokio::test]
1487    async fn test_restart_max_zero() {
1488        let (sup, _cancel) = make_supervisor();
1489
1490        let counter = Arc::new(AtomicU32::new(0));
1491        let counter2 = Arc::clone(&counter);
1492
1493        sup.spawn(TaskDescriptor {
1494            name: "zero-max",
1495            restart: RestartPolicy::Restart {
1496                max: 0,
1497                base_delay: Duration::from_millis(10),
1498            },
1499            factory: move || {
1500                let c = Arc::clone(&counter2);
1501                async move {
1502                    c.fetch_add(1, Ordering::SeqCst);
1503                    panic!("always panic");
1504                }
1505            },
1506        });
1507
1508        tokio::time::sleep(Duration::from_millis(200)).await;
1509
1510        assert_eq!(
1511            counter.load(Ordering::SeqCst),
1512            1,
1513            "max=0 should not restart"
1514        );
1515
1516        let snaps = sup.snapshot();
1517        let snap = snaps.iter().find(|s| s.name.as_ref() == "zero-max");
1518        assert!(snap.is_some(), "entry should remain as Failed");
1519        assert!(
1520            matches!(snap.unwrap().status, TaskStatus::Failed { .. }),
1521            "status should be Failed"
1522        );
1523    }
1524
1525    /// Stress test: spawn 50 tasks concurrently, all must complete and registry must be accurate.
1526    #[tokio::test]
1527    async fn test_concurrent_spawns() {
1528        // All task names must be 'static — pre-defined before any let statements.
1529        static NAMES: [&str; 50] = [
1530            "t00", "t01", "t02", "t03", "t04", "t05", "t06", "t07", "t08", "t09", "t10", "t11",
1531            "t12", "t13", "t14", "t15", "t16", "t17", "t18", "t19", "t20", "t21", "t22", "t23",
1532            "t24", "t25", "t26", "t27", "t28", "t29", "t30", "t31", "t32", "t33", "t34", "t35",
1533            "t36", "t37", "t38", "t39", "t40", "t41", "t42", "t43", "t44", "t45", "t46", "t47",
1534            "t48", "t49",
1535        ];
1536        let (sup, cancel) = make_supervisor();
1537
1538        let completed = Arc::new(AtomicU32::new(0));
1539        for name in &NAMES {
1540            let c = Arc::clone(&completed);
1541            sup.spawn(TaskDescriptor {
1542                name,
1543                restart: RestartPolicy::RunOnce,
1544                factory: move || {
1545                    let c = Arc::clone(&c);
1546                    async move {
1547                        c.fetch_add(1, Ordering::SeqCst);
1548                    }
1549                },
1550            });
1551        }
1552
1553        // Wait for all tasks to complete.
1554        tokio::time::timeout(Duration::from_secs(5), async {
1555            loop {
1556                if completed.load(Ordering::SeqCst) == 50 {
1557                    break;
1558                }
1559                tokio::time::sleep(Duration::from_millis(10)).await;
1560            }
1561        })
1562        .await
1563        .expect("all 50 tasks should complete");
1564
1565        // Give reap driver time to process all completions.
1566        tokio::time::sleep(Duration::from_millis(100)).await;
1567        assert_eq!(sup.active_count(), 0, "all tasks must be reaped");
1568
1569        cancel.cancel();
1570    }
1571
1572    #[tokio::test]
1573    async fn test_shutdown_timeout_expiry() {
1574        let cancel = CancellationToken::new();
1575        let sup = TaskSupervisor::new(cancel.clone());
1576
1577        sup.spawn(TaskDescriptor {
1578            name: "stubborn",
1579            restart: RestartPolicy::RunOnce,
1580            factory: || async {
1581                tokio::time::sleep(Duration::from_mins(1)).await;
1582            },
1583        });
1584
1585        assert_eq!(sup.active_count(), 1);
1586
1587        tokio::time::timeout(
1588            Duration::from_secs(2),
1589            sup.shutdown_all(Duration::from_millis(50)),
1590        )
1591        .await
1592        .expect("shutdown_all should return even on timeout expiry");
1593
1594        assert!(
1595            cancel.is_cancelled(),
1596            "cancel token must be cancelled after shutdown"
1597        );
1598    }
1599
1600    #[tokio::test]
1601    async fn test_cancellation_token() {
1602        let cancel = CancellationToken::new();
1603        let sup = TaskSupervisor::new(cancel.clone());
1604
1605        assert!(!sup.cancellation_token().is_cancelled());
1606
1607        sup.shutdown_all(Duration::from_millis(100)).await;
1608
1609        assert!(
1610            sup.cancellation_token().is_cancelled(),
1611            "token must be cancelled after shutdown"
1612        );
1613    }
1614
1615    /// Regression test for #3161: after `shutdown_all`, all tasks must be reaped
1616    /// even when they complete *after* the cancel signal.
1617    ///
1618    /// The yield loop forces the reap driver to observe cancel and exit phase-1
1619    /// before the tasks send their completions — reliably reproducing the race.
1620    #[tokio::test]
1621    async fn test_shutdown_drains_post_cancel_completions() {
1622        let cancel = CancellationToken::new();
1623        let sup = TaskSupervisor::new(cancel.clone());
1624
1625        for name in [
1626            "loop-1", "loop-2", "loop-3", "loop-4", "loop-5", "loop-6", "loop-7",
1627        ] {
1628            let cancel_inner = cancel.clone();
1629            sup.spawn(TaskDescriptor {
1630                name,
1631                restart: RestartPolicy::RunOnce,
1632                factory: move || {
1633                    let c = cancel_inner.clone();
1634                    async move {
1635                        c.cancelled().await;
1636                        // Yield multiple times so the reap driver observes cancel first.
1637                        for _ in 0..64 {
1638                            tokio::task::yield_now().await;
1639                        }
1640                    }
1641                },
1642            });
1643        }
1644        assert_eq!(sup.active_count(), 7);
1645
1646        sup.shutdown_all(Duration::from_secs(2)).await;
1647
1648        assert_eq!(
1649            sup.active_count(),
1650            0,
1651            "all tasks must be reaped after shutdown (#3161)"
1652        );
1653    }
1654
1655    #[tokio::test]
1656    async fn test_blocking_spawner_task_appears_in_snapshot() {
1657        // Verify that tasks spawned via BlockingSpawner appear in supervisor.snapshot().
1658        use crate::BlockingSpawner;
1659
1660        let cancel = CancellationToken::new();
1661        let sup = TaskSupervisor::new(cancel);
1662
1663        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
1664        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
1665
1666        let handle = sup.spawn_blocking_named(
1667            Arc::from("chunk_file"),
1668            Box::new(move || {
1669                // Signal that the task has started.
1670                let _ = ready_tx.send(());
1671                // Block until test signals release.
1672                let _ = release_rx.blocking_recv();
1673            }),
1674        );
1675
1676        // Wait until the blocking task has actually started.
1677        ready_rx.await.expect("task should start");
1678
1679        let snapshot = sup.snapshot();
1680        assert!(
1681            snapshot.iter().any(|t| t.name.as_ref() == "chunk_file"),
1682            "chunk_file task must appear in supervisor snapshot"
1683        );
1684
1685        // Release the blocking task and await completion.
1686        let _ = release_tx.send(());
1687        handle.await.expect("task should complete");
1688    }
1689
1690    /// Verify that `measure_blocking` emits wall-time and CPU-time histograms.
1691    ///
1692    /// `measure_blocking` calls `metrics::histogram!` on the current thread.
1693    /// We test it directly using a `DebuggingRecorder` installed as the thread-local
1694    /// recorder via `metrics::with_local_recorder`.
1695    #[test]
1696    fn test_measure_blocking_emits_metrics() {
1697        use metrics_util::debugging::DebuggingRecorder;
1698
1699        let recorder = DebuggingRecorder::new();
1700        let snapshotter = recorder.snapshotter();
1701
1702        // Call measure_blocking inside the local recorder scope so histogram! calls
1703        // are captured. The closure runs synchronously on this thread.
1704        metrics::with_local_recorder(&recorder, || {
1705            measure_blocking("test_task", || std::hint::black_box(42_u64));
1706        });
1707
1708        let snapshot = snapshotter.snapshot();
1709        let metric_names: Vec<String> = snapshot
1710            .into_vec()
1711            .into_iter()
1712            .map(|(k, _, _, _)| k.key().name().to_owned())
1713            .collect();
1714
1715        assert!(
1716            metric_names.iter().any(|n| n == "zeph.task.wall_time_ms"),
1717            "expected zeph.task.wall_time_ms histogram; got: {metric_names:?}"
1718        );
1719        assert!(
1720            metric_names.iter().any(|n| n == "zeph.task.cpu_time_ms"),
1721            "expected zeph.task.cpu_time_ms histogram; got: {metric_names:?}"
1722        );
1723    }
1724
1725    /// Verify that `spawn_blocking` semaphore limits concurrent OS-thread tasks to 8.
1726    ///
1727    /// Spawns 16 tasks. Each holds a barrier until 8 are waiting; then releases in order.
1728    /// If more than 8 run concurrently the test would either deadlock (waiting for 9+ to reach
1729    /// the barrier) or the counter would exceed 8 — both are caught.
1730    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1731    async fn test_spawn_blocking_semaphore_cap() {
1732        let (sup, _cancel) = make_supervisor();
1733        let concurrent = Arc::new(AtomicU32::new(0));
1734        let max_concurrent = Arc::new(AtomicU32::new(0));
1735        let barrier = Arc::new(std::sync::Barrier::new(1)); // just a sync point
1736
1737        let mut handles = Vec::new();
1738        for i in 0u32..16 {
1739            let c = Arc::clone(&concurrent);
1740            let m = Arc::clone(&max_concurrent);
1741            let name: Arc<str> = Arc::from(format!("blocking-{i}").as_str());
1742            let h = sup.spawn_blocking(name, move || {
1743                let prev = c.fetch_add(1, Ordering::SeqCst);
1744                // Update observed maximum.
1745                let mut cur_max = m.load(Ordering::SeqCst);
1746                while prev + 1 > cur_max {
1747                    match m.compare_exchange(cur_max, prev + 1, Ordering::SeqCst, Ordering::SeqCst)
1748                    {
1749                        Ok(_) => break,
1750                        Err(x) => cur_max = x,
1751                    }
1752                }
1753                // Simulate work.
1754                std::thread::sleep(std::time::Duration::from_millis(20));
1755                c.fetch_sub(1, Ordering::SeqCst);
1756            });
1757            handles.push(h);
1758        }
1759
1760        for h in handles {
1761            h.join().await.expect("blocking task should succeed");
1762        }
1763        drop(barrier);
1764
1765        let observed = max_concurrent.load(Ordering::SeqCst);
1766        assert!(
1767            observed <= 8,
1768            "observed {observed} concurrent blocking tasks; expected ≤ 8 (semaphore cap)"
1769        );
1770    }
1771}