Skip to main content

jflow_core/supervisor/
service.rs

1//! JanusService trait — the contract every supervised service must implement.
2//!
3//! This trait is the cornerstone of the Janus Supervisor Model. Every service
4//! managed by the [`JanusSupervisor`](super::JanusSupervisor) must implement
5//! this trait so the supervisor can:
6//!
7//! - Start the service in a tracked task
8//! - Monitor its lifecycle (detect unexpected exits)
9//! - Propagate shutdown signals via [`CancellationToken`]
10//! - Restart the service according to the configured backoff strategy
11//!
12//! # Design Decisions
13//!
14//! - **`async_trait`**: We use `async_trait` to allow dynamic dispatch
15//!   (`Box<dyn JanusService>`). The one-time heap allocation for the boxed
16//!   future is negligible for long-running service loops that start once.
17//!
18//! - **`CancellationToken`**: Passed explicitly into `run()` so the service
19//!   *must* listen for cancellation. This is superior to relying on `Drop`
20//!   semantics, which are unpredictable in async contexts.
21//!
22//! - **`anyhow::Result`**: Allows services to return diverse error types that
23//!   the supervisor can uniformly log and use to decide on restart strategies.
24//!
25//! - **`Send + Sync + 'static`**: Required because services are spawned onto
26//!   the Tokio runtime via `TaskTracker::spawn`.
27//!
28//! - **`&self` (not `&mut self`) on `run()`**: The trait takes `&self` so
29//!   that services can be wrapped in `Arc`, shared across boundaries, or
30//!   composed without requiring exclusive access. Services that need
31//!   mutable state across restarts should use interior mutability
32//!   (`AtomicU64`, `Mutex`, etc.), which is already required by the
33//!   `Send + Sync` bounds anyway.
34
35use async_trait::async_trait;
36use tokio_util::sync::CancellationToken;
37
38/// Restart policy for a service managed by the supervisor.
39///
40/// Controls how the supervisor responds when a service exits unexpectedly.
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
42pub enum RestartPolicy {
43    /// Always restart the service on failure (transient or permanent).
44    /// The supervisor will apply exponential backoff between attempts.
45    Always,
46
47    /// Only restart on error returns. If the service exits with `Ok(())`,
48    /// it is considered complete and will not be restarted.
49    #[default]
50    OnFailure,
51
52    /// Never restart. The service runs once and the supervisor only logs
53    /// its exit status. Useful for one-shot initialization tasks.
54    Never,
55}
56
57impl std::fmt::Display for RestartPolicy {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            Self::Always => write!(f, "always"),
61            Self::OnFailure => write!(f, "on_failure"),
62            Self::Never => write!(f, "never"),
63        }
64    }
65}
66
67/// The core trait every supervised service must implement.
68///
69/// # Example
70///
71/// ```rust,ignore
72/// use janus_core::supervisor::{JanusService, RestartPolicy};
73/// use tokio_util::sync::CancellationToken;
74/// use std::sync::atomic::{AtomicU64, Ordering};
75///
76/// pub struct MarketDataFeed {
77///     exchange: String,
78///     polls: AtomicU64,
79/// }
80///
81/// #[async_trait::async_trait]
82/// impl JanusService for MarketDataFeed {
83///     fn name(&self) -> &str {
84///         "market-data-feed"
85///     }
86///
87///     fn restart_policy(&self) -> RestartPolicy {
88///         RestartPolicy::OnFailure
89///     }
90///
91///     async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
92///         loop {
93///             tokio::select! {
94///                 _ = cancel.cancelled() => {
95///                     tracing::info!("Market data feed shutting down");
96///                     break;
97///                 }
98///                 _ = self.poll_exchange() => {
99///                     self.polls.fetch_add(1, Ordering::Relaxed);
100///                 }
101///             }
102///         }
103///         Ok(())
104///     }
105/// }
106///
107/// impl MarketDataFeed {
108///     async fn poll_exchange(&self) {
109///         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
110///     }
111/// }
112/// ```
113#[async_trait]
114pub trait JanusService: Send + Sync + 'static {
115    /// Returns the unique name of the service for logging, metrics, and
116    /// supervisor identification.
117    ///
118    /// This name is used in:
119    /// - `tracing` spans (`service = name`)
120    /// - Prometheus metric labels (`janus_supervisor_restarts_total{service="..."}`)
121    /// - The supervisor's internal service registry
122    fn name(&self) -> &str;
123
124    /// The restart policy for this service.
125    ///
126    /// Defaults to [`RestartPolicy::OnFailure`], meaning the supervisor will
127    /// restart the service if `run()` returns an `Err`, but will treat a
128    /// clean `Ok(())` exit as intentional completion.
129    fn restart_policy(&self) -> RestartPolicy {
130        RestartPolicy::OnFailure
131    }
132
133    /// The main execution loop of the service.
134    ///
135    /// This method should run until either:
136    /// 1. The service completes its work naturally (returns `Ok(())`)
137    /// 2. The `cancel` token is cancelled (graceful shutdown)
138    /// 3. An unrecoverable error occurs (returns `Err(...)`)
139    ///
140    /// # Cancellation Contract
141    ///
142    /// Implementations **must** select on `cancel.cancelled()` in their main
143    /// loop. Failure to do so will cause the service to hang during shutdown,
144    /// ultimately hitting the supervisor's shutdown timeout.
145    ///
146    /// ```rust,ignore
147    /// async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
148    ///     loop {
149    ///         tokio::select! {
150    ///             _ = cancel.cancelled() => break,
151    ///             result = self.do_work() => {
152    ///                 result?;
153    ///             }
154    ///         }
155    ///     }
156    ///     Ok(())
157    /// }
158    /// ```
159    ///
160    /// # Interior Mutability
161    ///
162    /// Because `run()` takes `&self`, services that need to track mutable
163    /// state (e.g., restart counters, connection handles) should use
164    /// interior mutability primitives like [`AtomicU64`](std::sync::atomic::AtomicU64),
165    /// [`Mutex`](tokio::sync::Mutex), or [`RwLock`](tokio::sync::RwLock).
166    /// This is consistent with the `Send + Sync` requirements and allows
167    /// services to be wrapped in `Arc` or composed without requiring
168    /// exclusive ownership.
169    ///
170    /// # Errors
171    ///
172    /// Returning an error signals the supervisor that the service has failed.
173    /// The supervisor will then apply the service's [`RestartPolicy`] and
174    /// backoff strategy to decide whether and when to restart.
175    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()>;
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use std::sync::atomic::{AtomicU32, Ordering};
182
183    /// A minimal test service to verify trait implementation compiles
184    /// and the default restart policy is correct.
185    ///
186    /// Uses [`AtomicU32`] for the run counter since `run()` takes `&self`.
187    struct DummyService {
188        name: String,
189        run_count: AtomicU32,
190    }
191
192    impl DummyService {
193        fn new(name: &str) -> Self {
194            Self {
195                name: name.to_string(),
196                run_count: AtomicU32::new(0),
197            }
198        }
199
200        fn run_count(&self) -> u32 {
201            self.run_count.load(Ordering::SeqCst)
202        }
203    }
204
205    #[async_trait]
206    impl JanusService for DummyService {
207        fn name(&self) -> &str {
208            &self.name
209        }
210
211        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
212            self.run_count.fetch_add(1, Ordering::SeqCst);
213            cancel.cancelled().await;
214            Ok(())
215        }
216    }
217
218    #[test]
219    fn test_default_restart_policy() {
220        let svc = DummyService::new("test");
221        assert_eq!(svc.restart_policy(), RestartPolicy::OnFailure);
222    }
223
224    #[test]
225    fn test_restart_policy_display() {
226        assert_eq!(RestartPolicy::Always.to_string(), "always");
227        assert_eq!(RestartPolicy::OnFailure.to_string(), "on_failure");
228        assert_eq!(RestartPolicy::Never.to_string(), "never");
229    }
230
231    #[tokio::test]
232    async fn test_dummy_service_runs_and_cancels() {
233        let svc = DummyService::new("test-svc");
234        let token = CancellationToken::new();
235
236        let token_clone = token.clone();
237        let handle = tokio::spawn(async move {
238            // Cancel after a short delay
239            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
240            token_clone.cancel();
241        });
242
243        let result = svc.run(token).await;
244        assert!(result.is_ok());
245        assert_eq!(svc.run_count(), 1);
246
247        handle.await.unwrap();
248    }
249
250    #[test]
251    fn test_service_name() {
252        let svc = DummyService::new("market-data");
253        assert_eq!(svc.name(), "market-data");
254    }
255}