jflow_core/supervisor/service.rs
1//! JanusService trait — the contract every supervised service must implement.
2//!
3//! This trait is the cornerstone of the Janus Supervisor Model. Every service
4//! managed by the [`JanusSupervisor`](super::JanusSupervisor) must implement
5//! this trait so the supervisor can:
6//!
7//! - Start the service in a tracked task
8//! - Monitor its lifecycle (detect unexpected exits)
9//! - Propagate shutdown signals via [`CancellationToken`]
10//! - Restart the service according to the configured backoff strategy
11//!
12//! # Design Decisions
13//!
14//! - **`async_trait`**: We use `async_trait` to allow dynamic dispatch
15//! (`Box<dyn JanusService>`). The one-time heap allocation for the boxed
16//! future is negligible for long-running service loops that start once.
17//!
18//! - **`CancellationToken`**: Passed explicitly into `run()` so the service
19//! *must* listen for cancellation. This is superior to relying on `Drop`
20//! semantics, which are unpredictable in async contexts.
21//!
22//! - **`anyhow::Result`**: Allows services to return diverse error types that
23//! the supervisor can uniformly log and use to decide on restart strategies.
24//!
25//! - **`Send + Sync + 'static`**: Required because services are spawned onto
26//! the Tokio runtime via `TaskTracker::spawn`.
27//!
28//! - **`&self` (not `&mut self`) on `run()`**: The trait takes `&self` so
29//! that services can be wrapped in `Arc`, shared across boundaries, or
30//! composed without requiring exclusive access. Services that need
31//! mutable state across restarts should use interior mutability
32//! (`AtomicU64`, `Mutex`, etc.), which is already required by the
33//! `Send + Sync` bounds anyway.
34
35use async_trait::async_trait;
36use tokio_util::sync::CancellationToken;
37
38/// Restart policy for a service managed by the supervisor.
39///
40/// Controls how the supervisor responds when a service exits unexpectedly.
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
42pub enum RestartPolicy {
43 /// Always restart the service on failure (transient or permanent).
44 /// The supervisor will apply exponential backoff between attempts.
45 Always,
46
47 /// Only restart on error returns. If the service exits with `Ok(())`,
48 /// it is considered complete and will not be restarted.
49 #[default]
50 OnFailure,
51
52 /// Never restart. The service runs once and the supervisor only logs
53 /// its exit status. Useful for one-shot initialization tasks.
54 Never,
55}
56
57impl std::fmt::Display for RestartPolicy {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 Self::Always => write!(f, "always"),
61 Self::OnFailure => write!(f, "on_failure"),
62 Self::Never => write!(f, "never"),
63 }
64 }
65}
66
67/// The core trait every supervised service must implement.
68///
69/// # Example
70///
71/// ```rust,ignore
72/// use janus_core::supervisor::{JanusService, RestartPolicy};
73/// use tokio_util::sync::CancellationToken;
74/// use std::sync::atomic::{AtomicU64, Ordering};
75///
76/// pub struct MarketDataFeed {
77/// exchange: String,
78/// polls: AtomicU64,
79/// }
80///
81/// #[async_trait::async_trait]
82/// impl JanusService for MarketDataFeed {
83/// fn name(&self) -> &str {
84/// "market-data-feed"
85/// }
86///
87/// fn restart_policy(&self) -> RestartPolicy {
88/// RestartPolicy::OnFailure
89/// }
90///
91/// async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
92/// loop {
93/// tokio::select! {
94/// _ = cancel.cancelled() => {
95/// tracing::info!("Market data feed shutting down");
96/// break;
97/// }
98/// _ = self.poll_exchange() => {
99/// self.polls.fetch_add(1, Ordering::Relaxed);
100/// }
101/// }
102/// }
103/// Ok(())
104/// }
105/// }
106///
107/// impl MarketDataFeed {
108/// async fn poll_exchange(&self) {
109/// tokio::time::sleep(std::time::Duration::from_millis(100)).await;
110/// }
111/// }
112/// ```
113#[async_trait]
114pub trait JanusService: Send + Sync + 'static {
115 /// Returns the unique name of the service for logging, metrics, and
116 /// supervisor identification.
117 ///
118 /// This name is used in:
119 /// - `tracing` spans (`service = name`)
120 /// - Prometheus metric labels (`janus_supervisor_restarts_total{service="..."}`)
121 /// - The supervisor's internal service registry
122 fn name(&self) -> &str;
123
124 /// The restart policy for this service.
125 ///
126 /// Defaults to [`RestartPolicy::OnFailure`], meaning the supervisor will
127 /// restart the service if `run()` returns an `Err`, but will treat a
128 /// clean `Ok(())` exit as intentional completion.
129 fn restart_policy(&self) -> RestartPolicy {
130 RestartPolicy::OnFailure
131 }
132
133 /// The main execution loop of the service.
134 ///
135 /// This method should run until either:
136 /// 1. The service completes its work naturally (returns `Ok(())`)
137 /// 2. The `cancel` token is cancelled (graceful shutdown)
138 /// 3. An unrecoverable error occurs (returns `Err(...)`)
139 ///
140 /// # Cancellation Contract
141 ///
142 /// Implementations **must** select on `cancel.cancelled()` in their main
143 /// loop. Failure to do so will cause the service to hang during shutdown,
144 /// ultimately hitting the supervisor's shutdown timeout.
145 ///
146 /// ```rust,ignore
147 /// async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
148 /// loop {
149 /// tokio::select! {
150 /// _ = cancel.cancelled() => break,
151 /// result = self.do_work() => {
152 /// result?;
153 /// }
154 /// }
155 /// }
156 /// Ok(())
157 /// }
158 /// ```
159 ///
160 /// # Interior Mutability
161 ///
162 /// Because `run()` takes `&self`, services that need to track mutable
163 /// state (e.g., restart counters, connection handles) should use
164 /// interior mutability primitives like [`AtomicU64`](std::sync::atomic::AtomicU64),
165 /// [`Mutex`](tokio::sync::Mutex), or [`RwLock`](tokio::sync::RwLock).
166 /// This is consistent with the `Send + Sync` requirements and allows
167 /// services to be wrapped in `Arc` or composed without requiring
168 /// exclusive ownership.
169 ///
170 /// # Errors
171 ///
172 /// Returning an error signals the supervisor that the service has failed.
173 /// The supervisor will then apply the service's [`RestartPolicy`] and
174 /// backoff strategy to decide whether and when to restart.
175 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()>;
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use std::sync::atomic::{AtomicU32, Ordering};
182
183 /// A minimal test service to verify trait implementation compiles
184 /// and the default restart policy is correct.
185 ///
186 /// Uses [`AtomicU32`] for the run counter since `run()` takes `&self`.
187 struct DummyService {
188 name: String,
189 run_count: AtomicU32,
190 }
191
192 impl DummyService {
193 fn new(name: &str) -> Self {
194 Self {
195 name: name.to_string(),
196 run_count: AtomicU32::new(0),
197 }
198 }
199
200 fn run_count(&self) -> u32 {
201 self.run_count.load(Ordering::SeqCst)
202 }
203 }
204
205 #[async_trait]
206 impl JanusService for DummyService {
207 fn name(&self) -> &str {
208 &self.name
209 }
210
211 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
212 self.run_count.fetch_add(1, Ordering::SeqCst);
213 cancel.cancelled().await;
214 Ok(())
215 }
216 }
217
218 #[test]
219 fn test_default_restart_policy() {
220 let svc = DummyService::new("test");
221 assert_eq!(svc.restart_policy(), RestartPolicy::OnFailure);
222 }
223
224 #[test]
225 fn test_restart_policy_display() {
226 assert_eq!(RestartPolicy::Always.to_string(), "always");
227 assert_eq!(RestartPolicy::OnFailure.to_string(), "on_failure");
228 assert_eq!(RestartPolicy::Never.to_string(), "never");
229 }
230
231 #[tokio::test]
232 async fn test_dummy_service_runs_and_cancels() {
233 let svc = DummyService::new("test-svc");
234 let token = CancellationToken::new();
235
236 let token_clone = token.clone();
237 let handle = tokio::spawn(async move {
238 // Cancel after a short delay
239 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
240 token_clone.cancel();
241 });
242
243 let result = svc.run(token).await;
244 assert!(result.is_ok());
245 assert_eq!(svc.run_count(), 1);
246
247 handle.await.unwrap();
248 }
249
250 #[test]
251 fn test_service_name() {
252 let svc = DummyService::new("market-data");
253 assert_eq!(svc.name(), "market-data");
254 }
255}