1use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use crate::scanner::{Scanner, ScannerRunner};
10
11pub fn supervised_spawn<S: Scanner>(
16 scanner: Arc<S>,
17 client: ferriskey::Client,
18 num_partitions: u16,
19 shutdown: watch::Receiver<bool>,
20) -> JoinHandle<()> {
21 let restart_count = Arc::new(AtomicU32::new(0));
22 let name = scanner.name();
23
24 let rc = restart_count.clone();
25 tokio::spawn(async move {
26 loop {
27 if *shutdown.borrow() {
28 return;
29 }
30
31 let handle = ScannerRunner::spawn(
32 scanner.clone(),
33 client.clone(),
34 num_partitions,
35 shutdown.clone(),
36 );
37
38 match handle.await {
39 Ok(()) => {
40 return;
42 }
43 Err(e) if e.is_panic() => {
44 let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
45 tracing::error!(
46 scanner = name,
47 restart_count = count,
48 "scanner panicked — restarting in 5s"
49 );
50 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
51
52 if *shutdown.borrow() {
53 return;
54 }
55 }
56 Err(e) => {
57 let count = rc.fetch_add(1, Ordering::Relaxed) + 1;
58 tracing::error!(
59 scanner = name,
60 restart_count = count,
61 error = %e,
62 "scanner task failed — restarting in 1s"
63 );
64 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
65
66 if *shutdown.borrow() {
67 return;
68 }
69 }
70 }
71 }
72 })
73}