rustrade_supervisor/service.rs
1//! The [`TradingService`] trait — every supervised unit must implement this.
2//!
3//! This is a thin rename of `janus-core`'s `JanusService` trait. The design
4//! and rationale are preserved verbatim; see that crate's service.rs doc
5//! comments for the full rationale.
6
7use async_trait::async_trait;
8use tokio_util::sync::CancellationToken;
9
10/// Restart policy for a [`TradingService`] under supervisor control.
11#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
12pub enum RestartPolicy {
13 /// Always restart on exit, regardless of success/failure.
14 Always,
15 /// Only restart if `run()` returned `Err`.
16 #[default]
17 OnFailure,
18 /// Never restart. One-shot.
19 Never,
20}
21
22impl std::fmt::Display for RestartPolicy {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 Self::Always => write!(f, "always"),
26 Self::OnFailure => write!(f, "on_failure"),
27 Self::Never => write!(f, "never"),
28 }
29 }
30}
31
32/// A long-running task managed by the [`crate::Supervisor`].
33///
34/// # Cancellation contract
35///
36/// Implementations **must** select on `cancel.cancelled()` in their main loop.
37/// A service that doesn't respond to cancellation will hang the whole
38/// shutdown process until the supervisor's shutdown timeout fires.
39///
40/// # Interior mutability
41///
42/// `run` takes `&self`, so services wrapped in `Arc` work naturally. Mutable
43/// state (counters, connection handles, etc.) should use atomics, `Mutex`,
44/// or `RwLock`. This is required anyway by the `Send + Sync + 'static` bound.
45///
46/// # Example
47///
48/// A counter service that ticks every second until cancelled. Note the
49/// `tokio::select!` pattern that interleaves real work with the
50/// cancellation watch.
51///
52/// ```
53/// use std::sync::atomic::{AtomicU64, Ordering};
54/// use std::time::Duration;
55/// use async_trait::async_trait;
56/// use rustrade_supervisor::{RestartPolicy, TradingService};
57/// use tokio_util::sync::CancellationToken;
58///
59/// struct CounterService {
60/// ticks: AtomicU64,
61/// }
62///
63/// #[async_trait]
64/// impl TradingService for CounterService {
65/// fn name(&self) -> &str { "counter" }
66/// fn restart_policy(&self) -> RestartPolicy { RestartPolicy::OnFailure }
67/// async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
68/// let mut tick = tokio::time::interval(Duration::from_secs(1));
69/// loop {
70/// tokio::select! {
71/// _ = cancel.cancelled() => return Ok(()),
72/// _ = tick.tick() => {
73/// self.ticks.fetch_add(1, Ordering::Relaxed);
74/// }
75/// }
76/// }
77/// }
78/// }
79/// ```
80#[async_trait]
81pub trait TradingService: Send + Sync + 'static {
82 /// Unique service name for logs, metrics, and supervisor identification.
83 fn name(&self) -> &str;
84
85 /// When should the supervisor restart this service on exit?
86 fn restart_policy(&self) -> RestartPolicy {
87 RestartPolicy::OnFailure
88 }
89
90 /// Main execution loop. Must honour the cancellation token.
91 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()>;
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use std::sync::atomic::{AtomicU32, Ordering};
98
99 struct DummyService {
100 name: String,
101 run_count: AtomicU32,
102 }
103
104 impl DummyService {
105 fn new(name: &str) -> Self {
106 Self {
107 name: name.to_string(),
108 run_count: AtomicU32::new(0),
109 }
110 }
111
112 fn run_count(&self) -> u32 {
113 self.run_count.load(Ordering::SeqCst)
114 }
115 }
116
117 #[async_trait]
118 impl TradingService for DummyService {
119 fn name(&self) -> &str {
120 &self.name
121 }
122
123 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
124 self.run_count.fetch_add(1, Ordering::SeqCst);
125 cancel.cancelled().await;
126 Ok(())
127 }
128 }
129
130 #[test]
131 fn test_default_restart_policy() {
132 let svc = DummyService::new("test");
133 assert_eq!(svc.restart_policy(), RestartPolicy::OnFailure);
134 }
135
136 #[test]
137 fn test_restart_policy_display() {
138 assert_eq!(RestartPolicy::Always.to_string(), "always");
139 assert_eq!(RestartPolicy::OnFailure.to_string(), "on_failure");
140 assert_eq!(RestartPolicy::Never.to_string(), "never");
141 }
142
143 #[tokio::test]
144 async fn test_dummy_service_runs_and_cancels() {
145 let svc = DummyService::new("test-svc");
146 let token = CancellationToken::new();
147
148 let token_clone = token.clone();
149 let handle = tokio::spawn(async move {
150 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
151 token_clone.cancel();
152 });
153
154 let result = svc.run(token).await;
155 assert!(result.is_ok());
156 assert_eq!(svc.run_count(), 1);
157 handle.await.unwrap();
158 }
159
160 #[test]
161 fn test_service_name() {
162 let svc = DummyService::new("market-data");
163 assert_eq!(svc.name(), "market-data");
164 }
165}