Skip to main content

rustrade_supervisor/
service.rs

1//! The [`TradingService`] trait — every supervised unit must implement this.
2//!
3//! This is a thin rename of `janus-core`'s `JanusService` trait. The design
4//! and rationale are preserved verbatim; see that crate's service.rs doc
5//! comments for the full rationale.
6
7use async_trait::async_trait;
8use tokio_util::sync::CancellationToken;
9
10/// Restart policy for a [`TradingService`] under supervisor control.
11#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
12pub enum RestartPolicy {
13    /// Always restart on exit, regardless of success/failure.
14    Always,
15    /// Only restart if `run()` returned `Err`.
16    #[default]
17    OnFailure,
18    /// Never restart. One-shot.
19    Never,
20}
21
22impl std::fmt::Display for RestartPolicy {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            Self::Always => write!(f, "always"),
26            Self::OnFailure => write!(f, "on_failure"),
27            Self::Never => write!(f, "never"),
28        }
29    }
30}
31
32/// A long-running task managed by the [`crate::Supervisor`].
33///
34/// # Cancellation contract
35///
36/// Implementations **must** select on `cancel.cancelled()` in their main loop.
37/// A service that doesn't respond to cancellation will hang the whole
38/// shutdown process until the supervisor's shutdown timeout fires.
39///
40/// # Interior mutability
41///
42/// `run` takes `&self`, so services wrapped in `Arc` work naturally. Mutable
43/// state (counters, connection handles, etc.) should use atomics, `Mutex`,
44/// or `RwLock`. This is required anyway by the `Send + Sync + 'static` bound.
45///
46/// # Example
47///
48/// A counter service that ticks every second until cancelled. Note the
49/// `tokio::select!` pattern that interleaves real work with the
50/// cancellation watch.
51///
52/// ```
53/// use std::sync::atomic::{AtomicU64, Ordering};
54/// use std::time::Duration;
55/// use async_trait::async_trait;
56/// use rustrade_supervisor::{RestartPolicy, TradingService};
57/// use tokio_util::sync::CancellationToken;
58///
59/// struct CounterService {
60///     ticks: AtomicU64,
61/// }
62///
63/// #[async_trait]
64/// impl TradingService for CounterService {
65///     fn name(&self) -> &str { "counter" }
66///     fn restart_policy(&self) -> RestartPolicy { RestartPolicy::OnFailure }
67///     async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
68///         let mut tick = tokio::time::interval(Duration::from_secs(1));
69///         loop {
70///             tokio::select! {
71///                 _ = cancel.cancelled() => return Ok(()),
72///                 _ = tick.tick() => {
73///                     self.ticks.fetch_add(1, Ordering::Relaxed);
74///                 }
75///             }
76///         }
77///     }
78/// }
79/// ```
80#[async_trait]
81pub trait TradingService: Send + Sync + 'static {
82    /// Unique service name for logs, metrics, and supervisor identification.
83    fn name(&self) -> &str;
84
85    /// When should the supervisor restart this service on exit?
86    fn restart_policy(&self) -> RestartPolicy {
87        RestartPolicy::OnFailure
88    }
89
90    /// Main execution loop. Must honour the cancellation token.
91    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()>;
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use std::sync::atomic::{AtomicU32, Ordering};
98
99    struct DummyService {
100        name: String,
101        run_count: AtomicU32,
102    }
103
104    impl DummyService {
105        fn new(name: &str) -> Self {
106            Self {
107                name: name.to_string(),
108                run_count: AtomicU32::new(0),
109            }
110        }
111
112        fn run_count(&self) -> u32 {
113            self.run_count.load(Ordering::SeqCst)
114        }
115    }
116
117    #[async_trait]
118    impl TradingService for DummyService {
119        fn name(&self) -> &str {
120            &self.name
121        }
122
123        async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
124            self.run_count.fetch_add(1, Ordering::SeqCst);
125            cancel.cancelled().await;
126            Ok(())
127        }
128    }
129
130    #[test]
131    fn test_default_restart_policy() {
132        let svc = DummyService::new("test");
133        assert_eq!(svc.restart_policy(), RestartPolicy::OnFailure);
134    }
135
136    #[test]
137    fn test_restart_policy_display() {
138        assert_eq!(RestartPolicy::Always.to_string(), "always");
139        assert_eq!(RestartPolicy::OnFailure.to_string(), "on_failure");
140        assert_eq!(RestartPolicy::Never.to_string(), "never");
141    }
142
143    #[tokio::test]
144    async fn test_dummy_service_runs_and_cancels() {
145        let svc = DummyService::new("test-svc");
146        let token = CancellationToken::new();
147
148        let token_clone = token.clone();
149        let handle = tokio::spawn(async move {
150            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
151            token_clone.cancel();
152        });
153
154        let result = svc.run(token).await;
155        assert!(result.is_ok());
156        assert_eq!(svc.run_count(), 1);
157        handle.await.unwrap();
158    }
159
160    #[test]
161    fn test_service_name() {
162        let svc = DummyService::new("market-data");
163        assert_eq!(svc.name(), "market-data");
164    }
165}