koi_common/runtime_state.rs
1//! Shared start/stop runtime state machine for domain background loops.
2//!
3//! Several domains (DNS, health) wrap a core in a controller that can start a single
4//! background loop, stop it via a [`CancellationToken`], and report whether it is running.
5//! That ~80-line `Mutex<RuntimeState{running, cancel}>` machine was duplicated verbatim;
6//! [`DomainRuntime`] is the one copy.
7//!
8//! Lifecycles that are *not* a single start/stop loop (proxy's per-entry listeners, udp's
9//! reaper-on-construction, the runtime adapter's external-token watcher) are deliberately
10//! left bespoke — forcing them onto this would distort their semantics.
11//!
12//! ## `running` flag semantics (matches the hand-written DNS/health machines exactly)
13//!
14//! - [`start`](DomainRuntime::start) sets `running = true` *synchronously* (before returning)
15//! and stores the cancel token.
16//! - When the spawned loop finishes on its own, a watcher flips `running = false` and clears
17//! the token — identical to the old machines, which appended that cleanup after the loop.
18//! - [`stop`](DomainRuntime::stop) cancels the token and sets `running = false` immediately
19//! (it does not wait for the loop to wind down), again matching the old behaviour.
20
21use std::sync::Arc;
22
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27/// Returned by [`DomainRuntime::start`] when the loop is already running.
28///
29/// `start` returns `Ok(false)` for the already-running case rather than this error, so the
30/// type exists mainly to give callers a typed, infallible-to-construct marker; the generic
31/// `start` never actually yields it today but keeps the door open for fallible launchers.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct AlreadyRunning;
34
35impl std::fmt::Display for AlreadyRunning {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.write_str("runtime is already running")
38 }
39}
40
41impl std::error::Error for AlreadyRunning {}
42
43/// Snapshot of a [`DomainRuntime`]'s state.
44#[derive(Debug, Clone, Copy, serde::Serialize)]
45pub struct RuntimeStatus {
46 pub running: bool,
47}
48
49struct State {
50 running: bool,
51 cancel: Option<CancellationToken>,
52}
53
54/// A start/stop controller around a shared core `C`.
55///
56/// `C` is the domain core the loop operates on; it is held as an `Arc<C>` so the controller
57/// can hand it to the spawned loop and to [`core`](Self::core) callers.
58pub struct DomainRuntime<C> {
59 core: Arc<C>,
60 state: Arc<Mutex<State>>,
61}
62
63impl<C> Clone for DomainRuntime<C> {
64 fn clone(&self) -> Self {
65 Self {
66 core: Arc::clone(&self.core),
67 state: Arc::clone(&self.state),
68 }
69 }
70}
71
72impl<C> DomainRuntime<C> {
73 /// Wrap a core. The controller starts in the stopped state.
74 pub fn new(core: Arc<C>) -> Self {
75 Self {
76 core,
77 state: Arc::new(Mutex::new(State {
78 running: false,
79 cancel: None,
80 })),
81 }
82 }
83
84 /// The shared core.
85 pub fn core(&self) -> Arc<C> {
86 Arc::clone(&self.core)
87 }
88
89 /// Start the background loop.
90 ///
91 /// `mk` is called with a fresh [`CancellationToken`] and must spawn the domain loop,
92 /// returning its [`JoinHandle`]. On success the controller marks itself running and
93 /// stores the token; a watcher task flips `running` back to `false` when the loop's
94 /// handle completes. Returns `Ok(false)` (a no-op) if already running.
95 pub async fn start<F>(&self, mk: F) -> Result<bool, AlreadyRunning>
96 where
97 F: FnOnce(CancellationToken) -> JoinHandle<()>,
98 {
99 let mut state = self.state.lock().await;
100 if state.running {
101 return Ok(false);
102 }
103
104 let token = CancellationToken::new();
105 let handle = mk(token.clone());
106 state.cancel = Some(token);
107 state.running = true;
108 drop(state);
109
110 // Watcher: when the loop's handle finishes (cancelled or done), clear running/cancel
111 // exactly as the old hand-written machines did at the tail of their spawned task.
112 let state = Arc::clone(&self.state);
113 tokio::spawn(async move {
114 let _ = handle.await;
115 let mut guard = state.lock().await;
116 guard.running = false;
117 guard.cancel = None;
118 });
119
120 Ok(true)
121 }
122
123 /// Stop the background loop by cancelling its token.
124 ///
125 /// Returns `true` if a token was present (i.e. the loop was running), `false` otherwise.
126 /// Marks `running = false` immediately without waiting for the loop to wind down.
127 pub async fn stop(&self) -> bool {
128 let mut state = self.state.lock().await;
129 if let Some(token) = state.cancel.take() {
130 token.cancel();
131 state.running = false;
132 true
133 } else {
134 false
135 }
136 }
137
138 /// Current running state.
139 pub async fn status(&self) -> RuntimeStatus {
140 let state = self.state.lock().await;
141 RuntimeStatus {
142 running: state.running,
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 struct Core;
152
153 fn never_ending(token: CancellationToken) -> JoinHandle<()> {
154 tokio::spawn(async move {
155 token.cancelled().await;
156 })
157 }
158
159 #[tokio::test]
160 async fn start_then_status_running() {
161 let rt = DomainRuntime::new(Arc::new(Core));
162 assert!(!rt.status().await.running);
163
164 let started = rt.start(never_ending).await.unwrap();
165 assert!(started);
166 assert!(rt.status().await.running);
167 }
168
169 #[tokio::test]
170 async fn double_start_is_noop() {
171 let rt = DomainRuntime::new(Arc::new(Core));
172 assert!(rt.start(never_ending).await.unwrap());
173 // Second start while running returns Ok(false) and does not spawn another loop.
174 assert!(!rt.start(never_ending).await.unwrap());
175 assert!(rt.status().await.running);
176 }
177
178 #[tokio::test]
179 async fn stop_clears_running() {
180 let rt = DomainRuntime::new(Arc::new(Core));
181 rt.start(never_ending).await.unwrap();
182 assert!(rt.stop().await);
183 assert!(!rt.status().await.running);
184 // Stopping again with no live loop returns false.
185 assert!(!rt.stop().await);
186 }
187
188 #[tokio::test]
189 async fn watcher_flips_running_when_loop_finishes() {
190 let rt = DomainRuntime::new(Arc::new(Core));
191 // A loop that returns immediately; the watcher should flip running=false.
192 rt.start(|_token| tokio::spawn(async {})).await.unwrap();
193 // Give the watcher a chance to observe completion.
194 for _ in 0..50 {
195 if !rt.status().await.running {
196 break;
197 }
198 tokio::task::yield_now().await;
199 }
200 assert!(!rt.status().await.running);
201 }
202
203 #[test]
204 fn already_running_display() {
205 assert_eq!(AlreadyRunning.to_string(), "runtime is already running");
206 }
207}