net/adapter/net/behavior/lifecycle/daemon.rs
1//! [`LifecycleDaemon`] — async lifecycle trait + RAII handle.
2//!
3//! See the module-level doc on [`super`] for the rationale on
4//! why this lives separately from
5//! [`MeshDaemon`](crate::adapter::net::compute::MeshDaemon).
6//!
7//! # Trait shape
8//!
9//! - `on_start(self: Arc<Self>)` — spawn whatever background
10//! work the daemon needs. Called exactly once per daemon
11//! before any other lifecycle method. Receives `Arc<Self>`
12//! so implementations can move the daemon into a tokio task
13//! without weak-ref gymnastics.
14//! - `on_stop(&self)` — signal the background work to stop.
15//! Called exactly once before the handle drops. Idempotent
16//! in practice; [`LifecycleHandle`] only calls it once.
17//!
18//! The trait surface is intentionally minimal so future
19//! lifecycle hooks (`on_pause`, `on_drain`, …) can land
20//! without breaking existing impls.
21
22use async_trait::async_trait;
23use std::sync::Arc;
24
25use crate::adapter::net::behavior::capability::CapabilityFilter;
26
27/// Per-replica health snapshot reported by
28/// [`LifecycleDaemon::health`]. Distinct from the substrate's
29/// `DaemonHealth` so lifecycle daemons can carry typed
30/// diagnostic strings without dragging in cross-module
31/// dependencies. The
32/// [`LifecycleGroup::health`](super::group::LifecycleGroup::health)
33/// accessor returns one of these per replica in declaration
34/// order.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct ReplicaHealth {
37 /// True when the daemon's last heartbeat was within its
38 /// liveness window. Implementations that don't carry a
39 /// liveness notion can leave this `true` permanently —
40 /// the default [`LifecycleDaemon::health`] does exactly
41 /// that.
42 pub healthy: bool,
43 /// Daemon-specific diagnostic when `healthy == false`.
44 /// Operator surfaces render this verbatim.
45 pub diagnostic: Option<String>,
46}
47
48impl ReplicaHealth {
49 /// Healthy snapshot with no diagnostic. The default
50 /// [`LifecycleDaemon::health`] impl returns this.
51 pub fn healthy() -> Self {
52 Self {
53 healthy: true,
54 diagnostic: None,
55 }
56 }
57
58 /// Unhealthy snapshot carrying a diagnostic for operator
59 /// rendering.
60 pub fn unhealthy(reason: impl Into<String>) -> Self {
61 Self {
62 healthy: false,
63 diagnostic: Some(reason.into()),
64 }
65 }
66}
67
68/// Async lifecycle trait for native mesh-aware daemons. See
69/// module doc for the trait's intent and the
70/// [`MeshDaemon`](crate::adapter::net::compute::MeshDaemon)
71/// distinction.
72#[async_trait]
73pub trait LifecycleDaemon: Send + Sync + 'static {
74 /// Human-readable name — used in tracing spans + operator
75 /// surfaces (`net aggregator inspect`, the Deck panel
76 /// header). Stable across the daemon's lifetime; no
77 /// per-replica differentiation.
78 fn name(&self) -> &str;
79
80 /// Capability requirements for placement. Mirrors
81 /// [`MeshDaemon::requirements`](crate::adapter::net::compute::MeshDaemon::requirements)
82 /// so the same scheduler primitives
83 /// ([`Scheduler::place`](crate::adapter::net::compute::Scheduler::place),
84 /// [`GroupCoordinator::place_with_spread`](crate::adapter::net::compute::GroupCoordinator::place_with_spread))
85 /// apply to lifecycle daemons without duplicating the
86 /// filter type. Returns `CapabilityFilter::default()` to
87 /// run anywhere.
88 ///
89 /// Used by
90 /// [`LifecycleGroup::spawn_with_placement`](super::group::LifecycleGroup::spawn_with_placement)
91 /// — invoked once before placement to read the requirements
92 /// applied to every replica. Daemons whose requirements are
93 /// uniform across replicas (the common case) can leave the
94 /// default empty filter and pass requirements directly to
95 /// `spawn_with_placement`.
96 fn requirements(&self) -> CapabilityFilter {
97 CapabilityFilter::default()
98 }
99
100 /// Called once when a [`LifecycleHandle`] wrapping `self`
101 /// is created. Implementations spawn whatever long-running
102 /// background work they need (a tokio interval loop, a
103 /// subscription handler, etc.). Receives `Arc<Self>` so
104 /// implementations can move the daemon into a spawned task
105 /// without weak-ref gymnastics. Errors abort the lifecycle
106 /// — the handle isn't created.
107 async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError>;
108
109 /// Called once when a [`LifecycleHandle`] wrapping `self`
110 /// is dropped. Implementations signal their background work
111 /// to stop. Awaited by the handle's drop / shutdown path;
112 /// implementations that need to wait for full teardown should
113 /// hold a `JoinHandle` internally and await it here.
114 async fn on_stop(&self);
115
116 /// Liveness check polled by
117 /// [`LifecycleGroup::health`](super::group::LifecycleGroup::health)
118 /// and the auto-respawn monitor. Default: report healthy
119 /// — daemons that have a heartbeat / tick / generation
120 /// notion override to surface stuck loops to operators.
121 ///
122 /// `async` because some daemons may need to await an
123 /// internal RwLock or query the runtime; most impls are
124 /// fast and non-blocking.
125 async fn health(&self) -> ReplicaHealth {
126 ReplicaHealth::healthy()
127 }
128}
129
130/// Lifecycle-trait error shape. Distinct from substrate-wide
131/// `AdapterError` so trait implementors can carry typed
132/// failures without pulling in cross-module dependencies.
133#[derive(Debug)]
134pub enum LifecycleError {
135 /// `on_start` failed for a daemon-specific reason. Carries
136 /// a free-form diagnostic string the lifecycle harness
137 /// surfaces to the operator.
138 StartFailed(String),
139}
140
141impl std::fmt::Display for LifecycleError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 match self {
144 Self::StartFailed(msg) => write!(f, "start failed: {msg}"),
145 }
146 }
147}
148
149impl std::error::Error for LifecycleError {}
150
151/// RAII handle that runs a [`LifecycleDaemon`]'s lifecycle.
152/// Construction calls `on_start`; drop schedules `on_stop` on a
153/// detached task so the synchronous Drop impl can fire-and-forget
154/// the async shutdown.
155///
156/// For deterministic shutdown ordering (`net aggregator shutdown`
157/// waiting on the loop to fully drain before returning), use
158/// [`LifecycleHandle::stop`] instead of dropping.
159pub struct LifecycleHandle {
160 daemon: Arc<dyn LifecycleDaemon>,
161 /// `Some` until `stop()` consumes the handle or Drop runs.
162 /// Lets `stop()` move ownership without conflicting with
163 /// Drop's fallback.
164 daemon_for_drop: Option<Arc<dyn LifecycleDaemon>>,
165}
166
167impl LifecycleHandle {
168 /// Construct a handle and run `on_start` synchronously
169 /// against the async runtime. Errors abort — the handle is
170 /// never created if start fails.
171 pub async fn start(daemon: Arc<dyn LifecycleDaemon>) -> Result<Self, LifecycleError> {
172 Arc::clone(&daemon).on_start().await?;
173 Ok(Self {
174 daemon: daemon.clone(),
175 daemon_for_drop: Some(daemon),
176 })
177 }
178
179 /// Borrow the underlying daemon for introspection. Operator
180 /// tooling that wants type-erased access reads through this
181 /// — the lifecycle handle alone doesn't expose concrete
182 /// daemon state.
183 pub fn daemon(&self) -> &Arc<dyn LifecycleDaemon> {
184 &self.daemon
185 }
186
187 /// Shut the daemon down and await the teardown. Consumes
188 /// the handle so a subsequent Drop doesn't double-stop.
189 pub async fn stop(mut self) {
190 let daemon = self.daemon_for_drop.take();
191 if let Some(d) = daemon {
192 d.on_stop().await;
193 }
194 }
195}
196
197impl Drop for LifecycleHandle {
198 fn drop(&mut self) {
199 if let Some(daemon) = self.daemon_for_drop.take() {
200 match tokio::runtime::Handle::try_current() {
201 Ok(handle) => {
202 handle.spawn(async move {
203 daemon.on_stop().await;
204 });
205 }
206 Err(_) => {
207 // No tokio runtime in scope (e.g. synchronous
208 // test teardown). The daemon's internal task
209 // is expected to clean itself up via its
210 // shutdown flag once its own `Arc` is dropped
211 // — but flag the skipped lifecycle hook so the
212 // contract is visible at operator log level.
213 tracing::warn!(
214 daemon = daemon.name(),
215 "LifecycleHandle dropped outside a tokio runtime; \
216 skipping on_stop. Daemon must self-clean via its \
217 shutdown flag.",
218 );
219 }
220 }
221 }
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use std::sync::atomic::{AtomicU8, Ordering};
229
230 struct CountingDaemon {
231 starts: AtomicU8,
232 stops: AtomicU8,
233 }
234
235 #[async_trait]
236 impl LifecycleDaemon for CountingDaemon {
237 fn name(&self) -> &str {
238 "counting"
239 }
240 async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
241 self.starts.fetch_add(1, Ordering::AcqRel);
242 Ok(())
243 }
244 async fn on_stop(&self) {
245 self.stops.fetch_add(1, Ordering::AcqRel);
246 }
247 }
248
249 #[tokio::test]
250 async fn start_fires_on_start_exactly_once() {
251 let daemon = Arc::new(CountingDaemon {
252 starts: AtomicU8::new(0),
253 stops: AtomicU8::new(0),
254 });
255 let handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
256 assert_eq!(daemon.starts.load(Ordering::Acquire), 1);
257 assert_eq!(daemon.stops.load(Ordering::Acquire), 0);
258 handle.stop().await;
259 assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
260 }
261
262 struct FailingStart;
263
264 #[async_trait]
265 impl LifecycleDaemon for FailingStart {
266 fn name(&self) -> &str {
267 "failing"
268 }
269 async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
270 Err(LifecycleError::StartFailed("intentional".into()))
271 }
272 async fn on_stop(&self) {}
273 }
274
275 #[tokio::test]
276 async fn start_failure_aborts_handle_creation() {
277 let result = LifecycleHandle::start(Arc::new(FailingStart)).await;
278 match result {
279 Err(LifecycleError::StartFailed(msg)) => assert_eq!(msg, "intentional"),
280 Ok(_) => panic!("expected StartFailed"),
281 }
282 }
283
284 #[tokio::test]
285 async fn drop_schedules_on_stop_under_tokio_runtime() {
286 let daemon = Arc::new(CountingDaemon {
287 starts: AtomicU8::new(0),
288 stops: AtomicU8::new(0),
289 });
290 {
291 let _handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
292 // Drop fires next.
293 }
294 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
295 assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
296 }
297}