Skip to main content

sochdb_storage/
supervisor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Supervised background workers.
19//!
20//! Long-running detached workers (LSM compaction, GC, the event-driven flusher,
21//! dirty-tracking aggregation, …) historically ran as a bare
22//! [`std::thread::spawn`] containing a `loop { … }`. If any iteration panicked —
23//! a poisoned lock, an arithmetic overflow in debug, an `unwrap()` on a
24//! transient error — the thread unwound and **died silently**. Compaction would
25//! stop, GC would stop, and the only symptom would be slowly growing disk usage
26//! or unbounded version chains, with no signal that anything was wrong.
27//!
28//! [`Supervisor`] wraps a worker's per-iteration body in
29//! [`std::panic::catch_unwind`] so a panic in one iteration is *contained*:
30//! the panic is counted, the worker is marked unhealthy, a bounded backoff is
31//! applied, and the loop is **restarted** rather than torn down. Callers can
32//! observe liveness via [`WorkerHealth`].
33//!
34//! # Contract
35//!
36//! - The worker body is a closure returning [`WorkerStep`]. Returning
37//!   [`WorkerStep::Continue`] runs another iteration; [`WorkerStep::Stop`] ends
38//!   the worker cleanly.
39//! - Shutdown is cooperative: callers flip the shared `running` flag (or return
40//!   [`WorkerStep::Stop`] from the body) and then [`SupervisedWorker::join`].
41//! - Backoff after a panic grows geometrically from `base_backoff` up to
42//!   `max_backoff` and resets to zero after any successful (non-panicking)
43//!   iteration. This prevents a tight panic loop from pinning a core.
44//!
45//! # Example
46//!
47//! ```ignore
48//! let running = Arc::new(AtomicBool::new(true));
49//! let worker = Supervisor::new("compaction")
50//!     .spawn(running.clone(), move || {
51//!         do_one_compaction_pass();
52//!         WorkerStep::Continue
53//!     });
54//! // … later …
55//! running.store(false, Ordering::SeqCst);
56//! worker.join();
57//! assert!(worker.health().panics() == 0);
58//! ```
59
60use std::panic::{AssertUnwindSafe, catch_unwind};
61use std::sync::Arc;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::thread::JoinHandle;
64use std::time::Duration;
65
66/// Outcome of a single worker iteration.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum WorkerStep {
69    /// Run another iteration.
70    Continue,
71    /// Stop the worker cleanly (no further iterations).
72    Stop,
73}
74
75/// Liveness and fault counters for a supervised worker.
76///
77/// Cheap to clone (`Arc` inside) and safe to read from any thread, so it can be
78/// exported into a metrics registry or polled by a health endpoint.
79#[derive(Debug, Clone)]
80pub struct WorkerHealth {
81    inner: Arc<WorkerHealthInner>,
82}
83
84#[derive(Debug)]
85struct WorkerHealthInner {
86    /// Total number of iterations that panicked.
87    panics: AtomicU64,
88    /// Total number of times the loop body was (re)entered after a panic.
89    restarts: AtomicU64,
90    /// Total successful (non-panicking) iterations completed.
91    iterations: AtomicU64,
92    /// `true` while the worker is making forward progress; flips to `false`
93    /// immediately after a panic and back to `true` after the next success.
94    healthy: AtomicBool,
95    /// `true` once the worker loop has fully exited.
96    finished: AtomicBool,
97}
98
99impl WorkerHealth {
100    fn new() -> Self {
101        Self {
102            inner: Arc::new(WorkerHealthInner {
103                panics: AtomicU64::new(0),
104                restarts: AtomicU64::new(0),
105                iterations: AtomicU64::new(0),
106                healthy: AtomicBool::new(true),
107                finished: AtomicBool::new(false),
108            }),
109        }
110    }
111
112    /// Number of iterations that panicked and were contained.
113    #[inline]
114    pub fn panics(&self) -> u64 {
115        self.inner.panics.load(Ordering::Acquire)
116    }
117
118    /// Number of times the loop was restarted after a panic.
119    #[inline]
120    pub fn restarts(&self) -> u64 {
121        self.inner.restarts.load(Ordering::Acquire)
122    }
123
124    /// Number of successful (non-panicking) iterations completed.
125    #[inline]
126    pub fn iterations(&self) -> u64 {
127        self.inner.iterations.load(Ordering::Acquire)
128    }
129
130    /// Whether the worker is currently making forward progress.
131    ///
132    /// Returns `false` between a panic and the next successful iteration.
133    #[inline]
134    pub fn is_healthy(&self) -> bool {
135        self.inner.healthy.load(Ordering::Acquire)
136    }
137
138    /// Whether the worker loop has fully exited.
139    #[inline]
140    pub fn is_finished(&self) -> bool {
141        self.inner.finished.load(Ordering::Acquire)
142    }
143}
144
145/// Builder/configuration for a supervised worker.
146pub struct Supervisor {
147    name: String,
148    base_backoff: Duration,
149    max_backoff: Duration,
150}
151
152impl Supervisor {
153    /// Create a supervisor for a worker with the given diagnostic name.
154    ///
155    /// Defaults: `base_backoff = 10ms`, `max_backoff = 1s`.
156    pub fn new(name: impl Into<String>) -> Self {
157        Self {
158            name: name.into(),
159            base_backoff: Duration::from_millis(10),
160            max_backoff: Duration::from_secs(1),
161        }
162    }
163
164    /// Override the initial post-panic backoff.
165    pub fn base_backoff(mut self, d: Duration) -> Self {
166        self.base_backoff = d;
167        self
168    }
169
170    /// Override the maximum post-panic backoff.
171    pub fn max_backoff(mut self, d: Duration) -> Self {
172        self.max_backoff = d;
173        self
174    }
175
176    /// Spawn the supervised worker.
177    ///
178    /// The worker runs `body` repeatedly while `running` is `true` and the body
179    /// keeps returning [`WorkerStep::Continue`]. Each call to `body` is isolated
180    /// with [`catch_unwind`]; a panic is counted, the worker is marked unhealthy,
181    /// a bounded backoff is applied, and the loop continues.
182    pub fn spawn<F>(self, running: Arc<AtomicBool>, mut body: F) -> SupervisedWorker
183    where
184        F: FnMut() -> WorkerStep + Send + 'static,
185    {
186        let health = WorkerHealth::new();
187        let thread_health = health.clone();
188        let name = self.name.clone();
189        let base = self.base_backoff;
190        let max = self.max_backoff;
191
192        let handle = std::thread::Builder::new()
193            .name(format!("soch-sup-{name}"))
194            .spawn(move || {
195                let mut backoff = Duration::ZERO;
196                while running.load(Ordering::Acquire) {
197                    // Isolate one iteration. AssertUnwindSafe is sound here:
198                    // on panic we do not observe any half-updated state owned by
199                    // `body` — we simply re-enter the loop on the next iteration.
200                    let result = catch_unwind(AssertUnwindSafe(&mut body));
201                    match result {
202                        Ok(WorkerStep::Continue) => {
203                            thread_health
204                                .inner
205                                .iterations
206                                .fetch_add(1, Ordering::AcqRel);
207                            thread_health.inner.healthy.store(true, Ordering::Release);
208                            backoff = Duration::ZERO;
209                        }
210                        Ok(WorkerStep::Stop) => break,
211                        Err(_panic) => {
212                            thread_health.inner.panics.fetch_add(1, Ordering::AcqRel);
213                            thread_health.inner.restarts.fetch_add(1, Ordering::AcqRel);
214                            thread_health.inner.healthy.store(false, Ordering::Release);
215
216                            // Geometric backoff, clamped to `max`, to avoid a hot
217                            // panic loop pinning a CPU. Reset on next success.
218                            backoff = if backoff.is_zero() {
219                                base
220                            } else {
221                                (backoff * 2).min(max)
222                            };
223                            if running.load(Ordering::Acquire) {
224                                std::thread::sleep(backoff);
225                            }
226                        }
227                    }
228                }
229                thread_health.inner.finished.store(true, Ordering::Release);
230            })
231            .expect("failed to spawn supervised worker thread");
232
233        SupervisedWorker {
234            handle: Some(handle),
235            health,
236        }
237    }
238}
239
240/// Handle to a running supervised worker.
241pub struct SupervisedWorker {
242    handle: Option<JoinHandle<()>>,
243    health: WorkerHealth,
244}
245
246impl SupervisedWorker {
247    /// Health/liveness counters shared with the worker thread.
248    #[inline]
249    pub fn health(&self) -> WorkerHealth {
250        self.health.clone()
251    }
252
253    /// Join the worker thread, blocking until it exits.
254    ///
255    /// The caller is responsible for first signalling shutdown (flipping the
256    /// `running` flag passed to [`Supervisor::spawn`]) so the loop can observe
257    /// it and exit; otherwise this blocks indefinitely.
258    pub fn join(mut self) {
259        if let Some(handle) = self.handle.take() {
260            let _ = handle.join();
261        }
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use std::sync::atomic::AtomicU64;
269    use std::time::Instant;
270
271    #[test]
272    fn test_runs_until_running_cleared() {
273        let running = Arc::new(AtomicBool::new(true));
274        let counter = Arc::new(AtomicU64::new(0));
275        let c = counter.clone();
276        let worker = Supervisor::new("count").spawn(running.clone(), move || {
277            c.fetch_add(1, Ordering::Relaxed);
278            std::thread::sleep(Duration::from_millis(1));
279            WorkerStep::Continue
280        });
281
282        // Let it run a bit, then stop.
283        std::thread::sleep(Duration::from_millis(50));
284        running.store(false, Ordering::SeqCst);
285        let health = worker.health();
286        worker.join();
287
288        assert!(health.is_finished());
289        assert!(counter.load(Ordering::Relaxed) > 0, "worker never ran");
290        assert_eq!(health.panics(), 0);
291    }
292
293    #[test]
294    fn test_stop_step_exits_cleanly() {
295        let running = Arc::new(AtomicBool::new(true));
296        let counter = Arc::new(AtomicU64::new(0));
297        let c = counter.clone();
298        let worker = Supervisor::new("stopper").spawn(running, move || {
299            let n = c.fetch_add(1, Ordering::Relaxed);
300            if n >= 2 {
301                WorkerStep::Stop
302            } else {
303                WorkerStep::Continue
304            }
305        });
306
307        let health = worker.health();
308        worker.join();
309        assert!(health.is_finished());
310        assert_eq!(counter.load(Ordering::Relaxed), 3); // 0,1,2 -> stop at 2
311    }
312
313    #[test]
314    fn test_panic_is_contained_and_loop_survives() {
315        // The worker panics on the first iteration, then makes progress.
316        // Without the supervisor the thread would die and `progress` stay 0.
317        let running = Arc::new(AtomicBool::new(true));
318        let attempts = Arc::new(AtomicU64::new(0));
319        let progress = Arc::new(AtomicU64::new(0));
320        let a = attempts.clone();
321        let p = progress.clone();
322
323        let worker = Supervisor::new("panicker")
324            .base_backoff(Duration::from_millis(1))
325            .max_backoff(Duration::from_millis(5))
326            .spawn(running.clone(), move || {
327                let n = a.fetch_add(1, Ordering::SeqCst);
328                if n == 0 {
329                    panic!("boom on first iteration");
330                }
331                p.fetch_add(1, Ordering::SeqCst);
332                WorkerStep::Continue
333            });
334
335        let health = worker.health();
336        // Wait until the worker has recovered and made progress.
337        let deadline = Instant::now() + Duration::from_secs(5);
338        while progress.load(Ordering::SeqCst) == 0 {
339            assert!(
340                Instant::now() < deadline,
341                "worker did not recover from panic"
342            );
343            std::thread::sleep(Duration::from_millis(2));
344        }
345
346        running.store(false, Ordering::SeqCst);
347        worker.join();
348
349        assert_eq!(health.panics(), 1, "panic should have been counted once");
350        assert!(health.restarts() >= 1);
351        assert!(
352            progress.load(Ordering::SeqCst) > 0,
353            "loop must survive the panic and keep working"
354        );
355        // After a successful iteration following the panic, it is healthy again.
356        assert!(health.is_healthy());
357    }
358
359    #[test]
360    fn test_health_unhealthy_immediately_after_panic() {
361        // A worker that only ever panics must report unhealthy and keep counting.
362        let running = Arc::new(AtomicBool::new(true));
363        let worker = Supervisor::new("always-panic")
364            .base_backoff(Duration::from_millis(1))
365            .max_backoff(Duration::from_millis(2))
366            .spawn(running.clone(), || {
367                panic!("always");
368            });
369
370        let health = worker.health();
371        let deadline = Instant::now() + Duration::from_secs(5);
372        while health.panics() < 3 {
373            assert!(Instant::now() < deadline, "panics were not counted");
374            std::thread::sleep(Duration::from_millis(2));
375        }
376        assert!(
377            !health.is_healthy(),
378            "a perpetually panicking worker is unhealthy"
379        );
380
381        running.store(false, Ordering::SeqCst);
382        worker.join();
383    }
384}