sochdb_storage/supervisor.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Supervised background workers.
19//!
20//! Long-running detached workers (LSM compaction, GC, the event-driven flusher,
21//! dirty-tracking aggregation, …) historically ran as a bare
22//! [`std::thread::spawn`] containing a `loop { … }`. If any iteration panicked —
23//! a poisoned lock, an arithmetic overflow in debug, an `unwrap()` on a
24//! transient error — the thread unwound and **died silently**. Compaction would
25//! stop, GC would stop, and the only symptom would be slowly growing disk usage
26//! or unbounded version chains, with no signal that anything was wrong.
27//!
28//! [`Supervisor`] wraps a worker's per-iteration body in
29//! [`std::panic::catch_unwind`] so a panic in one iteration is *contained*:
30//! the panic is counted, the worker is marked unhealthy, a bounded backoff is
31//! applied, and the loop is **restarted** rather than torn down. Callers can
32//! observe liveness via [`WorkerHealth`].
33//!
34//! # Contract
35//!
36//! - The worker body is a closure returning [`WorkerStep`]. Returning
37//! [`WorkerStep::Continue`] runs another iteration; [`WorkerStep::Stop`] ends
38//! the worker cleanly.
39//! - Shutdown is cooperative: callers flip the shared `running` flag (or return
40//! [`WorkerStep::Stop`] from the body) and then [`SupervisedWorker::join`].
41//! - Backoff after a panic grows geometrically from `base_backoff` up to
42//! `max_backoff` and resets to zero after any successful (non-panicking)
43//! iteration. This prevents a tight panic loop from pinning a core.
44//!
45//! # Example
46//!
47//! ```ignore
48//! let running = Arc::new(AtomicBool::new(true));
49//! let worker = Supervisor::new("compaction")
50//! .spawn(running.clone(), move || {
51//! do_one_compaction_pass();
52//! WorkerStep::Continue
53//! });
54//! // … later …
55//! running.store(false, Ordering::SeqCst);
56//! worker.join();
57//! assert!(worker.health().panics() == 0);
58//! ```
59
60use std::panic::{AssertUnwindSafe, catch_unwind};
61use std::sync::Arc;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::thread::JoinHandle;
64use std::time::Duration;
65
66/// Outcome of a single worker iteration.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum WorkerStep {
69 /// Run another iteration.
70 Continue,
71 /// Stop the worker cleanly (no further iterations).
72 Stop,
73}
74
75/// Liveness and fault counters for a supervised worker.
76///
77/// Cheap to clone (`Arc` inside) and safe to read from any thread, so it can be
78/// exported into a metrics registry or polled by a health endpoint.
79#[derive(Debug, Clone)]
80pub struct WorkerHealth {
81 inner: Arc<WorkerHealthInner>,
82}
83
84#[derive(Debug)]
85struct WorkerHealthInner {
86 /// Total number of iterations that panicked.
87 panics: AtomicU64,
88 /// Total number of times the loop body was (re)entered after a panic.
89 restarts: AtomicU64,
90 /// Total successful (non-panicking) iterations completed.
91 iterations: AtomicU64,
92 /// `true` while the worker is making forward progress; flips to `false`
93 /// immediately after a panic and back to `true` after the next success.
94 healthy: AtomicBool,
95 /// `true` once the worker loop has fully exited.
96 finished: AtomicBool,
97}
98
99impl WorkerHealth {
100 fn new() -> Self {
101 Self {
102 inner: Arc::new(WorkerHealthInner {
103 panics: AtomicU64::new(0),
104 restarts: AtomicU64::new(0),
105 iterations: AtomicU64::new(0),
106 healthy: AtomicBool::new(true),
107 finished: AtomicBool::new(false),
108 }),
109 }
110 }
111
112 /// Number of iterations that panicked and were contained.
113 #[inline]
114 pub fn panics(&self) -> u64 {
115 self.inner.panics.load(Ordering::Acquire)
116 }
117
118 /// Number of times the loop was restarted after a panic.
119 #[inline]
120 pub fn restarts(&self) -> u64 {
121 self.inner.restarts.load(Ordering::Acquire)
122 }
123
124 /// Number of successful (non-panicking) iterations completed.
125 #[inline]
126 pub fn iterations(&self) -> u64 {
127 self.inner.iterations.load(Ordering::Acquire)
128 }
129
130 /// Whether the worker is currently making forward progress.
131 ///
132 /// Returns `false` between a panic and the next successful iteration.
133 #[inline]
134 pub fn is_healthy(&self) -> bool {
135 self.inner.healthy.load(Ordering::Acquire)
136 }
137
138 /// Whether the worker loop has fully exited.
139 #[inline]
140 pub fn is_finished(&self) -> bool {
141 self.inner.finished.load(Ordering::Acquire)
142 }
143}
144
145/// Builder/configuration for a supervised worker.
146pub struct Supervisor {
147 name: String,
148 base_backoff: Duration,
149 max_backoff: Duration,
150}
151
152impl Supervisor {
153 /// Create a supervisor for a worker with the given diagnostic name.
154 ///
155 /// Defaults: `base_backoff = 10ms`, `max_backoff = 1s`.
156 pub fn new(name: impl Into<String>) -> Self {
157 Self {
158 name: name.into(),
159 base_backoff: Duration::from_millis(10),
160 max_backoff: Duration::from_secs(1),
161 }
162 }
163
164 /// Override the initial post-panic backoff.
165 pub fn base_backoff(mut self, d: Duration) -> Self {
166 self.base_backoff = d;
167 self
168 }
169
170 /// Override the maximum post-panic backoff.
171 pub fn max_backoff(mut self, d: Duration) -> Self {
172 self.max_backoff = d;
173 self
174 }
175
176 /// Spawn the supervised worker.
177 ///
178 /// The worker runs `body` repeatedly while `running` is `true` and the body
179 /// keeps returning [`WorkerStep::Continue`]. Each call to `body` is isolated
180 /// with [`catch_unwind`]; a panic is counted, the worker is marked unhealthy,
181 /// a bounded backoff is applied, and the loop continues.
182 pub fn spawn<F>(self, running: Arc<AtomicBool>, mut body: F) -> SupervisedWorker
183 where
184 F: FnMut() -> WorkerStep + Send + 'static,
185 {
186 let health = WorkerHealth::new();
187 let thread_health = health.clone();
188 let name = self.name.clone();
189 let base = self.base_backoff;
190 let max = self.max_backoff;
191
192 let handle = std::thread::Builder::new()
193 .name(format!("soch-sup-{name}"))
194 .spawn(move || {
195 let mut backoff = Duration::ZERO;
196 while running.load(Ordering::Acquire) {
197 // Isolate one iteration. AssertUnwindSafe is sound here:
198 // on panic we do not observe any half-updated state owned by
199 // `body` — we simply re-enter the loop on the next iteration.
200 let result = catch_unwind(AssertUnwindSafe(&mut body));
201 match result {
202 Ok(WorkerStep::Continue) => {
203 thread_health
204 .inner
205 .iterations
206 .fetch_add(1, Ordering::AcqRel);
207 thread_health.inner.healthy.store(true, Ordering::Release);
208 backoff = Duration::ZERO;
209 }
210 Ok(WorkerStep::Stop) => break,
211 Err(_panic) => {
212 thread_health.inner.panics.fetch_add(1, Ordering::AcqRel);
213 thread_health.inner.restarts.fetch_add(1, Ordering::AcqRel);
214 thread_health.inner.healthy.store(false, Ordering::Release);
215
216 // Geometric backoff, clamped to `max`, to avoid a hot
217 // panic loop pinning a CPU. Reset on next success.
218 backoff = if backoff.is_zero() {
219 base
220 } else {
221 (backoff * 2).min(max)
222 };
223 if running.load(Ordering::Acquire) {
224 std::thread::sleep(backoff);
225 }
226 }
227 }
228 }
229 thread_health.inner.finished.store(true, Ordering::Release);
230 })
231 .expect("failed to spawn supervised worker thread");
232
233 SupervisedWorker {
234 handle: Some(handle),
235 health,
236 }
237 }
238}
239
240/// Handle to a running supervised worker.
241pub struct SupervisedWorker {
242 handle: Option<JoinHandle<()>>,
243 health: WorkerHealth,
244}
245
246impl SupervisedWorker {
247 /// Health/liveness counters shared with the worker thread.
248 #[inline]
249 pub fn health(&self) -> WorkerHealth {
250 self.health.clone()
251 }
252
253 /// Join the worker thread, blocking until it exits.
254 ///
255 /// The caller is responsible for first signalling shutdown (flipping the
256 /// `running` flag passed to [`Supervisor::spawn`]) so the loop can observe
257 /// it and exit; otherwise this blocks indefinitely.
258 pub fn join(mut self) {
259 if let Some(handle) = self.handle.take() {
260 let _ = handle.join();
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use std::sync::atomic::AtomicU64;
269 use std::time::Instant;
270
271 #[test]
272 fn test_runs_until_running_cleared() {
273 let running = Arc::new(AtomicBool::new(true));
274 let counter = Arc::new(AtomicU64::new(0));
275 let c = counter.clone();
276 let worker = Supervisor::new("count").spawn(running.clone(), move || {
277 c.fetch_add(1, Ordering::Relaxed);
278 std::thread::sleep(Duration::from_millis(1));
279 WorkerStep::Continue
280 });
281
282 // Let it run a bit, then stop.
283 std::thread::sleep(Duration::from_millis(50));
284 running.store(false, Ordering::SeqCst);
285 let health = worker.health();
286 worker.join();
287
288 assert!(health.is_finished());
289 assert!(counter.load(Ordering::Relaxed) > 0, "worker never ran");
290 assert_eq!(health.panics(), 0);
291 }
292
293 #[test]
294 fn test_stop_step_exits_cleanly() {
295 let running = Arc::new(AtomicBool::new(true));
296 let counter = Arc::new(AtomicU64::new(0));
297 let c = counter.clone();
298 let worker = Supervisor::new("stopper").spawn(running, move || {
299 let n = c.fetch_add(1, Ordering::Relaxed);
300 if n >= 2 {
301 WorkerStep::Stop
302 } else {
303 WorkerStep::Continue
304 }
305 });
306
307 let health = worker.health();
308 worker.join();
309 assert!(health.is_finished());
310 assert_eq!(counter.load(Ordering::Relaxed), 3); // 0,1,2 -> stop at 2
311 }
312
313 #[test]
314 fn test_panic_is_contained_and_loop_survives() {
315 // The worker panics on the first iteration, then makes progress.
316 // Without the supervisor the thread would die and `progress` stay 0.
317 let running = Arc::new(AtomicBool::new(true));
318 let attempts = Arc::new(AtomicU64::new(0));
319 let progress = Arc::new(AtomicU64::new(0));
320 let a = attempts.clone();
321 let p = progress.clone();
322
323 let worker = Supervisor::new("panicker")
324 .base_backoff(Duration::from_millis(1))
325 .max_backoff(Duration::from_millis(5))
326 .spawn(running.clone(), move || {
327 let n = a.fetch_add(1, Ordering::SeqCst);
328 if n == 0 {
329 panic!("boom on first iteration");
330 }
331 p.fetch_add(1, Ordering::SeqCst);
332 WorkerStep::Continue
333 });
334
335 let health = worker.health();
336 // Wait until the worker has recovered and made progress.
337 let deadline = Instant::now() + Duration::from_secs(5);
338 while progress.load(Ordering::SeqCst) == 0 {
339 assert!(
340 Instant::now() < deadline,
341 "worker did not recover from panic"
342 );
343 std::thread::sleep(Duration::from_millis(2));
344 }
345
346 running.store(false, Ordering::SeqCst);
347 worker.join();
348
349 assert_eq!(health.panics(), 1, "panic should have been counted once");
350 assert!(health.restarts() >= 1);
351 assert!(
352 progress.load(Ordering::SeqCst) > 0,
353 "loop must survive the panic and keep working"
354 );
355 // After a successful iteration following the panic, it is healthy again.
356 assert!(health.is_healthy());
357 }
358
359 #[test]
360 fn test_health_unhealthy_immediately_after_panic() {
361 // A worker that only ever panics must report unhealthy and keep counting.
362 let running = Arc::new(AtomicBool::new(true));
363 let worker = Supervisor::new("always-panic")
364 .base_backoff(Duration::from_millis(1))
365 .max_backoff(Duration::from_millis(2))
366 .spawn(running.clone(), || {
367 panic!("always");
368 });
369
370 let health = worker.health();
371 let deadline = Instant::now() + Duration::from_secs(5);
372 while health.panics() < 3 {
373 assert!(Instant::now() < deadline, "panics were not counted");
374 std::thread::sleep(Duration::from_millis(2));
375 }
376 assert!(
377 !health.is_healthy(),
378 "a perpetually panicking worker is unhealthy"
379 );
380
381 running.store(false, Ordering::SeqCst);
382 worker.join();
383 }
384}