Skip to main content

koi_common/
runtime_state.rs

1//! Shared start/stop runtime state machine for domain background loops.
2//!
3//! Several domains (DNS, health) wrap a core in a controller that can start a single
4//! background loop, stop it via a [`CancellationToken`], and report whether it is running.
5//! That ~80-line `Mutex<RuntimeState{running, cancel}>` machine was duplicated verbatim;
6//! [`DomainRuntime`] is the one copy.
7//!
8//! Lifecycles that are *not* a single start/stop loop (proxy's per-entry listeners, udp's
9//! reaper-on-construction, the runtime adapter's external-token watcher) are deliberately
10//! left bespoke — forcing them onto this would distort their semantics.
11//!
12//! ## `running` flag semantics (matches the hand-written DNS/health machines exactly)
13//!
14//! - [`start`](DomainRuntime::start) sets `running = true` *synchronously* (before returning)
15//!   and stores the cancel token.
16//! - When the spawned loop finishes on its own, a watcher flips `running = false` and clears
17//!   the token — identical to the old machines, which appended that cleanup after the loop.
18//! - [`stop`](DomainRuntime::stop) cancels the token and sets `running = false` immediately
19//!   (it does not wait for the loop to wind down), again matching the old behaviour.
20
21use std::sync::Arc;
22
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27/// Returned by [`DomainRuntime::start`] when the loop is already running.
28///
29/// `start` returns `Ok(false)` for the already-running case rather than this error, so the
30/// type exists mainly to give callers a typed, infallible-to-construct marker; the generic
31/// `start` never actually yields it today but keeps the door open for fallible launchers.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct AlreadyRunning;
34
35impl std::fmt::Display for AlreadyRunning {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.write_str("runtime is already running")
38    }
39}
40
41impl std::error::Error for AlreadyRunning {}
42
43/// Snapshot of a [`DomainRuntime`]'s state.
44#[derive(Debug, Clone, Copy, serde::Serialize)]
45pub struct RuntimeStatus {
46    pub running: bool,
47}
48
49struct State {
50    running: bool,
51    cancel: Option<CancellationToken>,
52}
53
54/// A start/stop controller around a shared core `C`.
55///
56/// `C` is the domain core the loop operates on; it is held as an `Arc<C>` so the controller
57/// can hand it to the spawned loop and to [`core`](Self::core) callers.
58pub struct DomainRuntime<C> {
59    core: Arc<C>,
60    state: Arc<Mutex<State>>,
61}
62
63impl<C> Clone for DomainRuntime<C> {
64    fn clone(&self) -> Self {
65        Self {
66            core: Arc::clone(&self.core),
67            state: Arc::clone(&self.state),
68        }
69    }
70}
71
72impl<C> DomainRuntime<C> {
73    /// Wrap a core. The controller starts in the stopped state.
74    pub fn new(core: Arc<C>) -> Self {
75        Self {
76            core,
77            state: Arc::new(Mutex::new(State {
78                running: false,
79                cancel: None,
80            })),
81        }
82    }
83
84    /// The shared core.
85    pub fn core(&self) -> Arc<C> {
86        Arc::clone(&self.core)
87    }
88
89    /// Start the background loop.
90    ///
91    /// `mk` is called with a fresh [`CancellationToken`] and must spawn the domain loop,
92    /// returning its [`JoinHandle`]. On success the controller marks itself running and
93    /// stores the token; a watcher task flips `running` back to `false` when the loop's
94    /// handle completes. Returns `Ok(false)` (a no-op) if already running.
95    pub async fn start<F>(&self, mk: F) -> Result<bool, AlreadyRunning>
96    where
97        F: FnOnce(CancellationToken) -> JoinHandle<()>,
98    {
99        let mut state = self.state.lock().await;
100        if state.running {
101            return Ok(false);
102        }
103
104        let token = CancellationToken::new();
105        let handle = mk(token.clone());
106        state.cancel = Some(token);
107        state.running = true;
108        drop(state);
109
110        // Watcher: when the loop's handle finishes (cancelled or done), clear running/cancel
111        // exactly as the old hand-written machines did at the tail of their spawned task.
112        let state = Arc::clone(&self.state);
113        tokio::spawn(async move {
114            let _ = handle.await;
115            let mut guard = state.lock().await;
116            guard.running = false;
117            guard.cancel = None;
118        });
119
120        Ok(true)
121    }
122
123    /// Stop the background loop by cancelling its token.
124    ///
125    /// Returns `true` if a token was present (i.e. the loop was running), `false` otherwise.
126    /// Marks `running = false` immediately without waiting for the loop to wind down.
127    pub async fn stop(&self) -> bool {
128        let mut state = self.state.lock().await;
129        if let Some(token) = state.cancel.take() {
130            token.cancel();
131            state.running = false;
132            true
133        } else {
134            false
135        }
136    }
137
138    /// Current running state.
139    pub async fn status(&self) -> RuntimeStatus {
140        let state = self.state.lock().await;
141        RuntimeStatus {
142            running: state.running,
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    struct Core;
152
153    fn never_ending(token: CancellationToken) -> JoinHandle<()> {
154        tokio::spawn(async move {
155            token.cancelled().await;
156        })
157    }
158
159    #[tokio::test]
160    async fn start_then_status_running() {
161        let rt = DomainRuntime::new(Arc::new(Core));
162        assert!(!rt.status().await.running);
163
164        let started = rt.start(never_ending).await.unwrap();
165        assert!(started);
166        assert!(rt.status().await.running);
167    }
168
169    #[tokio::test]
170    async fn double_start_is_noop() {
171        let rt = DomainRuntime::new(Arc::new(Core));
172        assert!(rt.start(never_ending).await.unwrap());
173        // Second start while running returns Ok(false) and does not spawn another loop.
174        assert!(!rt.start(never_ending).await.unwrap());
175        assert!(rt.status().await.running);
176    }
177
178    #[tokio::test]
179    async fn stop_clears_running() {
180        let rt = DomainRuntime::new(Arc::new(Core));
181        rt.start(never_ending).await.unwrap();
182        assert!(rt.stop().await);
183        assert!(!rt.status().await.running);
184        // Stopping again with no live loop returns false.
185        assert!(!rt.stop().await);
186    }
187
188    #[tokio::test]
189    async fn watcher_flips_running_when_loop_finishes() {
190        let rt = DomainRuntime::new(Arc::new(Core));
191        // A loop that returns immediately; the watcher should flip running=false.
192        rt.start(|_token| tokio::spawn(async {})).await.unwrap();
193        // Give the watcher a chance to observe completion.
194        for _ in 0..50 {
195            if !rt.status().await.running {
196                break;
197            }
198            tokio::task::yield_now().await;
199        }
200        assert!(!rt.status().await.running);
201    }
202
203    #[test]
204    fn already_running_display() {
205        assert_eq!(AlreadyRunning.to_string(), "runtime is already running");
206    }
207}