Skip to main content

nodedb_cluster/subsystem/
trait.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! The `ClusterSubsystem` trait — the shared interface every subsystem implements.
4
5use std::time::Instant;
6
7use async_trait::async_trait;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::context::BootstrapCtx;
12use super::errors::{BootstrapError, ShutdownError};
13use super::health::SubsystemHealth;
14
15/// A running subsystem handle.
16///
17/// Dropping this handle sends the shutdown signal and detaches the task.
18/// Callers that want to wait for clean termination should call
19/// `shutdown_and_wait` with a deadline before dropping.
20pub struct SubsystemHandle {
21    /// The background task running this subsystem's main loop.
22    ///
23    /// Stored as `Option` so `shutdown_and_wait` (which takes `self` by value)
24    /// can extract the `JoinHandle` without conflicting with our `Drop` impl.
25    task: Option<JoinHandle<()>>,
26    /// Sending `true` on this channel requests the subsystem to shut down.
27    pub shutdown_tx: watch::Sender<bool>,
28    /// The name of the subsystem, for diagnostics.
29    pub name: &'static str,
30}
31
32impl SubsystemHandle {
33    /// Create a new handle from an already-spawned task and a shutdown signal.
34    pub fn new(name: &'static str, task: JoinHandle<()>, shutdown_tx: watch::Sender<bool>) -> Self {
35        Self {
36            task: Some(task),
37            shutdown_tx,
38            name,
39        }
40    }
41
42    /// Signal shutdown and wait for the task to finish, respecting `deadline`.
43    ///
44    /// If the task does not finish by `deadline`, the task is aborted and
45    /// `ShutdownError::DeadlineExceeded` is returned.
46    pub async fn shutdown_and_wait(mut self, deadline: Instant) -> Result<(), ShutdownError> {
47        // Send the shutdown signal — ignore send errors (task may have exited already).
48        let _ = self.shutdown_tx.send(true);
49
50        let task = match self.task.take() {
51            Some(t) => t,
52            None => return Ok(()),
53        };
54
55        let timeout = deadline.saturating_duration_since(Instant::now());
56        match tokio::time::timeout(timeout, task).await {
57            Ok(Ok(())) => Ok(()),
58            Ok(Err(join_err)) if join_err.is_panic() => {
59                Err(ShutdownError::Panicked { name: self.name })
60            }
61            Ok(Err(_)) => {
62                // Task was cancelled — treat as clean stop.
63                Ok(())
64            }
65            Err(_elapsed) => Err(ShutdownError::DeadlineExceeded { name: self.name }),
66        }
67    }
68}
69
70impl Drop for SubsystemHandle {
71    fn drop(&mut self) {
72        // Best-effort: send the shutdown signal so the task notices even if
73        // the caller drops without calling `shutdown_and_wait`. The task
74        // itself is dropped here, detaching it from the runtime.
75        let _ = self.shutdown_tx.send(true);
76    }
77}
78
79/// The contract every cluster subsystem must satisfy.
80///
81/// # Dependency ordering
82///
83/// `dependencies()` returns the *names* of subsystems that must have
84/// completed `start()` successfully before this subsystem is started.
85/// The registry performs a topo-sort and enforces this ordering.
86///
87/// # Shutdown
88///
89/// `shutdown()` is called in *reverse* start order when any sibling
90/// fails during bootstrap, or during graceful cluster teardown. Each
91/// subsystem should stop its background work and release resources
92/// before the deadline.
93///
94/// # Health
95///
96/// `health()` returns a point-in-time `SubsystemHealth`. The registry
97/// does not poll this — individual subsystems update the shared
98/// `ClusterHealth` aggregator directly via `BootstrapCtx`.
99#[async_trait]
100pub trait ClusterSubsystem: Send + Sync {
101    /// A unique, human-readable name for this subsystem.
102    ///
103    /// Used as the key in dependency declarations and health maps.
104    fn name(&self) -> &'static str;
105
106    /// Names of subsystems that must be started before this one.
107    ///
108    /// Return `&[]` if there are no prerequisites.
109    fn dependencies(&self) -> &'static [&'static str];
110
111    /// Start the subsystem and return a handle to its background task.
112    ///
113    /// This method is called exactly once, after all declared
114    /// dependencies have been started successfully.
115    async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError>;
116
117    /// Gracefully stop the subsystem before `deadline`.
118    ///
119    /// Implementations should signal their background tasks via the
120    /// `SubsystemHandle::shutdown_tx` and wait for them to exit. If
121    /// the deadline is exceeded, they should abort and return
122    /// `ShutdownError::DeadlineExceeded`.
123    async fn shutdown(&self, deadline: Instant) -> Result<(), ShutdownError>;
124
125    /// Return the current health state of this subsystem.
126    fn health(&self) -> SubsystemHealth;
127}