Skip to main content

ff_engine/
supervisor.rs

1//! Scanner supervisor — restarts scanners on panic with backoff.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use crate::scanner::{Scanner, ScannerRunner};
10
11/// Spawn a supervised scanner that restarts on panic.
12///
13/// - Clean shutdown (via watch channel): exits normally.
14/// - Panic: logs, sleeps 5s, increments restart counter, re-spawns.
15pub fn supervised_spawn<S: Scanner>(
16    scanner: Arc<S>,
17    client: ferriskey::Client,
18    num_partitions: u16,
19    shutdown: watch::Receiver<bool>,
20    metrics: Arc<ff_observability::Metrics>,
21) -> JoinHandle<()> {
22    let restart_count = Arc::new(AtomicU32::new(0));
23    let name = scanner.name();
24
25    let rc = restart_count.clone();
26    tokio::spawn(async move {
27        loop {
28            if *shutdown.borrow() {
29                return;
30            }
31
32            let handle = ScannerRunner::spawn(
33                scanner.clone(),
34                client.clone(),
35                num_partitions,
36                shutdown.clone(),
37                metrics.clone(),
38            );
39
40            match handle.await {
41                Ok(()) => {
42                    // Clean exit — shutdown was requested
43                    return;
44                }
45                Err(e) if e.is_panic() => {
46                    let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
47                    tracing::error!(
48                        scanner = name,
49                        restart_count = count,
50                        "scanner panicked — restarting in 5s"
51                    );
52                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
53
54                    if *shutdown.borrow() {
55                        return;
56                    }
57                }
58                Err(e) => {
59                    let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
60                    tracing::error!(
61                        scanner = name,
62                        restart_count = count,
63                        error = %e,
64                        "scanner task failed — restarting in 1s"
65                    );
66                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
67
68                    if *shutdown.borrow() {
69                        return;
70                    }
71                }
72            }
73        }
74    })
75}