Skip to main content

net/adapter/net/behavior/lifecycle/
daemon.rs

1//! [`LifecycleDaemon`] — async lifecycle trait + RAII handle.
2//!
3//! See the module-level doc on [`super`] for the rationale on
4//! why this lives separately from
5//! [`MeshDaemon`](crate::adapter::net::compute::MeshDaemon).
6//!
7//! # Trait shape
8//!
9//! - `on_start(self: Arc<Self>)` — spawn whatever background
10//!   work the daemon needs. Called exactly once per daemon
11//!   before any other lifecycle method. Receives `Arc<Self>`
12//!   so implementations can move the daemon into a tokio task
13//!   without weak-ref gymnastics.
14//! - `on_stop(&self)` — signal the background work to stop.
15//!   Called exactly once before the handle drops. Idempotent
16//!   in practice; [`LifecycleHandle`] only calls it once.
17//!
18//! The trait surface is intentionally minimal so future
19//! lifecycle hooks (`on_pause`, `on_drain`, …) can land
20//! without breaking existing impls.
21
22use async_trait::async_trait;
23use std::sync::Arc;
24
25use crate::adapter::net::behavior::capability::CapabilityFilter;
26
27/// Per-replica health snapshot reported by
28/// [`LifecycleDaemon::health`]. Distinct from the substrate's
29/// `DaemonHealth` so lifecycle daemons can carry typed
30/// diagnostic strings without dragging in cross-module
31/// dependencies. The
32/// [`LifecycleGroup::health`](super::group::LifecycleGroup::health)
33/// accessor returns one of these per replica in declaration
34/// order.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct ReplicaHealth {
37    /// True when the daemon's last heartbeat was within its
38    /// liveness window. Implementations that don't carry a
39    /// liveness notion can leave this `true` permanently —
40    /// the default [`LifecycleDaemon::health`] does exactly
41    /// that.
42    pub healthy: bool,
43    /// Daemon-specific diagnostic when `healthy == false`.
44    /// Operator surfaces render this verbatim.
45    pub diagnostic: Option<String>,
46}
47
48impl ReplicaHealth {
49    /// Healthy snapshot with no diagnostic. The default
50    /// [`LifecycleDaemon::health`] impl returns this.
51    pub fn healthy() -> Self {
52        Self {
53            healthy: true,
54            diagnostic: None,
55        }
56    }
57
58    /// Unhealthy snapshot carrying a diagnostic for operator
59    /// rendering.
60    pub fn unhealthy(reason: impl Into<String>) -> Self {
61        Self {
62            healthy: false,
63            diagnostic: Some(reason.into()),
64        }
65    }
66}
67
68/// Async lifecycle trait for native mesh-aware daemons. See
69/// module doc for the trait's intent and the
70/// [`MeshDaemon`](crate::adapter::net::compute::MeshDaemon)
71/// distinction.
72#[async_trait]
73pub trait LifecycleDaemon: Send + Sync + 'static {
74    /// Human-readable name — used in tracing spans + operator
75    /// surfaces (`net aggregator inspect`, the Deck panel
76    /// header). Stable across the daemon's lifetime; no
77    /// per-replica differentiation.
78    fn name(&self) -> &str;
79
80    /// Capability requirements for placement. Mirrors
81    /// [`MeshDaemon::requirements`](crate::adapter::net::compute::MeshDaemon::requirements)
82    /// so the same scheduler primitives
83    /// ([`Scheduler::place`](crate::adapter::net::compute::Scheduler::place),
84    /// [`GroupCoordinator::place_with_spread`](crate::adapter::net::compute::GroupCoordinator::place_with_spread))
85    /// apply to lifecycle daemons without duplicating the
86    /// filter type. Returns `CapabilityFilter::default()` to
87    /// run anywhere.
88    ///
89    /// Used by
90    /// [`LifecycleGroup::spawn_with_placement`](super::group::LifecycleGroup::spawn_with_placement)
91    /// — invoked once before placement to read the requirements
92    /// applied to every replica. Daemons whose requirements are
93    /// uniform across replicas (the common case) can leave the
94    /// default empty filter and pass requirements directly to
95    /// `spawn_with_placement`.
96    fn requirements(&self) -> CapabilityFilter {
97        CapabilityFilter::default()
98    }
99
100    /// Called once when a [`LifecycleHandle`] wrapping `self`
101    /// is created. Implementations spawn whatever long-running
102    /// background work they need (a tokio interval loop, a
103    /// subscription handler, etc.). Receives `Arc<Self>` so
104    /// implementations can move the daemon into a spawned task
105    /// without weak-ref gymnastics. Errors abort the lifecycle
106    /// — the handle isn't created.
107    async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError>;
108
109    /// Called once when a [`LifecycleHandle`] wrapping `self`
110    /// is dropped. Implementations signal their background work
111    /// to stop. Awaited by the handle's drop / shutdown path;
112    /// implementations that need to wait for full teardown should
113    /// hold a `JoinHandle` internally and await it here.
114    async fn on_stop(&self);
115
116    /// Liveness check polled by
117    /// [`LifecycleGroup::health`](super::group::LifecycleGroup::health)
118    /// and the auto-respawn monitor. Default: report healthy
119    /// — daemons that have a heartbeat / tick / generation
120    /// notion override to surface stuck loops to operators.
121    ///
122    /// `async` because some daemons may need to await an
123    /// internal RwLock or query the runtime; most impls are
124    /// fast and non-blocking.
125    async fn health(&self) -> ReplicaHealth {
126        ReplicaHealth::healthy()
127    }
128}
129
130/// Lifecycle-trait error shape. Distinct from substrate-wide
131/// `AdapterError` so trait implementors can carry typed
132/// failures without pulling in cross-module dependencies.
133#[derive(Debug)]
134pub enum LifecycleError {
135    /// `on_start` failed for a daemon-specific reason. Carries
136    /// a free-form diagnostic string the lifecycle harness
137    /// surfaces to the operator.
138    StartFailed(String),
139}
140
141impl std::fmt::Display for LifecycleError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        match self {
144            Self::StartFailed(msg) => write!(f, "start failed: {msg}"),
145        }
146    }
147}
148
149impl std::error::Error for LifecycleError {}
150
151/// RAII handle that runs a [`LifecycleDaemon`]'s lifecycle.
152/// Construction calls `on_start`; drop schedules `on_stop` on a
153/// detached task so the synchronous Drop impl can fire-and-forget
154/// the async shutdown.
155///
156/// For deterministic shutdown ordering (`net aggregator shutdown`
157/// waiting on the loop to fully drain before returning), use
158/// [`LifecycleHandle::stop`] instead of dropping.
159pub struct LifecycleHandle {
160    daemon: Arc<dyn LifecycleDaemon>,
161    /// `Some` until `stop()` consumes the handle or Drop runs.
162    /// Lets `stop()` move ownership without conflicting with
163    /// Drop's fallback.
164    daemon_for_drop: Option<Arc<dyn LifecycleDaemon>>,
165}
166
167impl LifecycleHandle {
168    /// Construct a handle and run `on_start` synchronously
169    /// against the async runtime. Errors abort — the handle is
170    /// never created if start fails.
171    pub async fn start(daemon: Arc<dyn LifecycleDaemon>) -> Result<Self, LifecycleError> {
172        Arc::clone(&daemon).on_start().await?;
173        Ok(Self {
174            daemon: daemon.clone(),
175            daemon_for_drop: Some(daemon),
176        })
177    }
178
179    /// Borrow the underlying daemon for introspection. Operator
180    /// tooling that wants type-erased access reads through this
181    /// — the lifecycle handle alone doesn't expose concrete
182    /// daemon state.
183    pub fn daemon(&self) -> &Arc<dyn LifecycleDaemon> {
184        &self.daemon
185    }
186
187    /// Shut the daemon down and await the teardown. Consumes
188    /// the handle so a subsequent Drop doesn't double-stop.
189    pub async fn stop(mut self) {
190        let daemon = self.daemon_for_drop.take();
191        if let Some(d) = daemon {
192            d.on_stop().await;
193        }
194    }
195}
196
197impl Drop for LifecycleHandle {
198    fn drop(&mut self) {
199        if let Some(daemon) = self.daemon_for_drop.take() {
200            match tokio::runtime::Handle::try_current() {
201                Ok(handle) => {
202                    handle.spawn(async move {
203                        daemon.on_stop().await;
204                    });
205                }
206                Err(_) => {
207                    // No tokio runtime in scope (e.g. synchronous
208                    // test teardown). The daemon's internal task
209                    // is expected to clean itself up via its
210                    // shutdown flag once its own `Arc` is dropped
211                    // — but flag the skipped lifecycle hook so the
212                    // contract is visible at operator log level.
213                    tracing::warn!(
214                        daemon = daemon.name(),
215                        "LifecycleHandle dropped outside a tokio runtime; \
216                         skipping on_stop. Daemon must self-clean via its \
217                         shutdown flag.",
218                    );
219                }
220            }
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use std::sync::atomic::{AtomicU8, Ordering};
229
230    struct CountingDaemon {
231        starts: AtomicU8,
232        stops: AtomicU8,
233    }
234
235    #[async_trait]
236    impl LifecycleDaemon for CountingDaemon {
237        fn name(&self) -> &str {
238            "counting"
239        }
240        async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
241            self.starts.fetch_add(1, Ordering::AcqRel);
242            Ok(())
243        }
244        async fn on_stop(&self) {
245            self.stops.fetch_add(1, Ordering::AcqRel);
246        }
247    }
248
249    #[tokio::test]
250    async fn start_fires_on_start_exactly_once() {
251        let daemon = Arc::new(CountingDaemon {
252            starts: AtomicU8::new(0),
253            stops: AtomicU8::new(0),
254        });
255        let handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
256        assert_eq!(daemon.starts.load(Ordering::Acquire), 1);
257        assert_eq!(daemon.stops.load(Ordering::Acquire), 0);
258        handle.stop().await;
259        assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
260    }
261
262    struct FailingStart;
263
264    #[async_trait]
265    impl LifecycleDaemon for FailingStart {
266        fn name(&self) -> &str {
267            "failing"
268        }
269        async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
270            Err(LifecycleError::StartFailed("intentional".into()))
271        }
272        async fn on_stop(&self) {}
273    }
274
275    #[tokio::test]
276    async fn start_failure_aborts_handle_creation() {
277        let result = LifecycleHandle::start(Arc::new(FailingStart)).await;
278        match result {
279            Err(LifecycleError::StartFailed(msg)) => assert_eq!(msg, "intentional"),
280            Ok(_) => panic!("expected StartFailed"),
281        }
282    }
283
284    #[tokio::test]
285    async fn drop_schedules_on_stop_under_tokio_runtime() {
286        let daemon = Arc::new(CountingDaemon {
287            starts: AtomicU8::new(0),
288            stops: AtomicU8::new(0),
289        });
290        {
291            let _handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
292            // Drop fires next.
293        }
294        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
295        assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
296    }
297}