Skip to main content

ff_engine/
supervisor.rs

1//! Scanner supervisor — restarts scanners on panic with backoff.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use crate::scanner::{Scanner, ScannerRunner};
10
11/// Spawn a supervised scanner that restarts on panic.
12///
13/// - Clean shutdown (via watch channel): exits normally.
14/// - Panic: logs, sleeps 5s, increments restart counter, re-spawns.
15pub fn supervised_spawn<S: Scanner>(
16    scanner: Arc<S>,
17    client: ferriskey::Client,
18    num_partitions: u16,
19    shutdown: watch::Receiver<bool>,
20) -> JoinHandle<()> {
21    let restart_count = Arc::new(AtomicU32::new(0));
22    let name = scanner.name();
23
24    let rc = restart_count.clone();
25    tokio::spawn(async move {
26        loop {
27            if *shutdown.borrow() {
28                return;
29            }
30
31            let handle = ScannerRunner::spawn(
32                scanner.clone(),
33                client.clone(),
34                num_partitions,
35                shutdown.clone(),
36            );
37
38            match handle.await {
39                Ok(()) => {
40                    // Clean exit — shutdown was requested
41                    return;
42                }
43                Err(e) if e.is_panic() => {
44                    let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
45                    tracing::error!(
46                        scanner = name,
47                        restart_count = count,
48                        "scanner panicked — restarting in 5s"
49                    );
50                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
51
52                    if *shutdown.borrow() {
53                        return;
54                    }
55                }
56                Err(e) => {
57                    let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
58                    tracing::error!(
59                        scanner = name,
60                        restart_count = count,
61                        error = %e,
62                        "scanner task failed — restarting in 1s"
63                    );
64                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
65
66                    if *shutdown.borrow() {
67                        return;
68                    }
69                }
70            }
71        }
72    })
73}