nodedb_cluster/subsystem/trait.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! The `ClusterSubsystem` trait — the shared interface every subsystem implements.
4
5use std::time::Instant;
6
7use async_trait::async_trait;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::context::BootstrapCtx;
12use super::errors::{BootstrapError, ShutdownError};
13use super::health::SubsystemHealth;
14
15/// A running subsystem handle.
16///
17/// Dropping this handle sends the shutdown signal and detaches the task.
18/// Callers that want to wait for clean termination should call
19/// `shutdown_and_wait` with a deadline before dropping.
20pub struct SubsystemHandle {
21 /// The background task running this subsystem's main loop.
22 ///
23 /// Stored as `Option` so `shutdown_and_wait` (which takes `self` by value)
24 /// can extract the `JoinHandle` without conflicting with our `Drop` impl.
25 task: Option<JoinHandle<()>>,
26 /// Sending `true` on this channel requests the subsystem to shut down.
27 pub shutdown_tx: watch::Sender<bool>,
28 /// The name of the subsystem, for diagnostics.
29 pub name: &'static str,
30}
31
32impl SubsystemHandle {
33 /// Create a new handle from an already-spawned task and a shutdown signal.
34 pub fn new(name: &'static str, task: JoinHandle<()>, shutdown_tx: watch::Sender<bool>) -> Self {
35 Self {
36 task: Some(task),
37 shutdown_tx,
38 name,
39 }
40 }
41
42 /// Signal shutdown and wait for the task to finish, respecting `deadline`.
43 ///
44 /// If the task does not finish by `deadline`, the task is aborted and
45 /// `ShutdownError::DeadlineExceeded` is returned.
46 pub async fn shutdown_and_wait(mut self, deadline: Instant) -> Result<(), ShutdownError> {
47 // Send the shutdown signal — ignore send errors (task may have exited already).
48 let _ = self.shutdown_tx.send(true);
49
50 let task = match self.task.take() {
51 Some(t) => t,
52 None => return Ok(()),
53 };
54
55 let timeout = deadline.saturating_duration_since(Instant::now());
56 match tokio::time::timeout(timeout, task).await {
57 Ok(Ok(())) => Ok(()),
58 Ok(Err(join_err)) if join_err.is_panic() => {
59 Err(ShutdownError::Panicked { name: self.name })
60 }
61 Ok(Err(_)) => {
62 // Task was cancelled — treat as clean stop.
63 Ok(())
64 }
65 Err(_elapsed) => Err(ShutdownError::DeadlineExceeded { name: self.name }),
66 }
67 }
68}
69
70impl Drop for SubsystemHandle {
71 fn drop(&mut self) {
72 // Best-effort: send the shutdown signal so the task notices even if
73 // the caller drops without calling `shutdown_and_wait`. The task
74 // itself is dropped here, detaching it from the runtime.
75 let _ = self.shutdown_tx.send(true);
76 }
77}
78
79/// The contract every cluster subsystem must satisfy.
80///
81/// # Dependency ordering
82///
83/// `dependencies()` returns the *names* of subsystems that must have
84/// completed `start()` successfully before this subsystem is started.
85/// The registry performs a topo-sort and enforces this ordering.
86///
87/// # Shutdown
88///
89/// `shutdown()` is called in *reverse* start order when any sibling
90/// fails during bootstrap, or during graceful cluster teardown. Each
91/// subsystem should stop its background work and release resources
92/// before the deadline.
93///
94/// # Health
95///
96/// `health()` returns a point-in-time `SubsystemHealth`. The registry
97/// does not poll this — individual subsystems update the shared
98/// `ClusterHealth` aggregator directly via `BootstrapCtx`.
99#[async_trait]
100pub trait ClusterSubsystem: Send + Sync {
101 /// A unique, human-readable name for this subsystem.
102 ///
103 /// Used as the key in dependency declarations and health maps.
104 fn name(&self) -> &'static str;
105
106 /// Names of subsystems that must be started before this one.
107 ///
108 /// Return `&[]` if there are no prerequisites.
109 fn dependencies(&self) -> &'static [&'static str];
110
111 /// Start the subsystem and return a handle to its background task.
112 ///
113 /// This method is called exactly once, after all declared
114 /// dependencies have been started successfully.
115 async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError>;
116
117 /// Gracefully stop the subsystem before `deadline`.
118 ///
119 /// Implementations should signal their background tasks via the
120 /// `SubsystemHandle::shutdown_tx` and wait for them to exit. If
121 /// the deadline is exceeded, they should abort and return
122 /// `ShutdownError::DeadlineExceeded`.
123 async fn shutdown(&self, deadline: Instant) -> Result<(), ShutdownError>;
124
125 /// Return the current health state of this subsystem.
126 fn health(&self) -> SubsystemHealth;
127}