1use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use crate::scanner::{Scanner, ScannerRunner};
10
11pub fn supervised_spawn<S: Scanner>(
16 scanner: Arc<S>,
17 client: ferriskey::Client,
18 num_partitions: u16,
19 shutdown: watch::Receiver<bool>,
20 metrics: Arc<ff_observability::Metrics>,
21) -> JoinHandle<()> {
22 let restart_count = Arc::new(AtomicU32::new(0));
23 let name = scanner.name();
24
25 let rc = restart_count.clone();
26 tokio::spawn(async move {
27 loop {
28 if *shutdown.borrow() {
29 return;
30 }
31
32 let handle = ScannerRunner::spawn(
33 scanner.clone(),
34 client.clone(),
35 num_partitions,
36 shutdown.clone(),
37 metrics.clone(),
38 );
39
40 match handle.await {
41 Ok(()) => {
42 return;
44 }
45 Err(e) if e.is_panic() => {
46 let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
47 tracing::error!(
48 scanner = name,
49 restart_count = count,
50 "scanner panicked — restarting in 5s"
51 );
52 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
53
54 if *shutdown.borrow() {
55 return;
56 }
57 }
58 Err(e) => {
59 let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
60 tracing::error!(
61 scanner = name,
62 restart_count = count,
63 error = %e,
64 "scanner task failed — restarting in 1s"
65 );
66 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
67
68 if *shutdown.borrow() {
69 return;
70 }
71 }
72 }
73 }
74 })
75}