1pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod dependency_reconciler;
14pub mod flow_projector;
15pub mod index_reconciler;
16pub mod lease_expiry;
17pub mod pending_wp_expiry;
18pub mod quota_reconciler;
19pub mod retention_trimmer;
20pub mod suspension_timeout;
21pub mod unblock;
22
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Mutex;
27use std::time::Duration;
28
29use tokio::sync::watch;
30use tokio::task::JoinHandle;
31
32pub struct ScanResult {
34 pub processed: u32,
35 pub errors: u32,
36}
37
38const FAILURE_THRESHOLD: u32 = 3;
42const BACKOFF_CYCLES: u64 = 10;
44const GC_THRESHOLD: usize = 500;
46
47struct FailureEntry {
48 consecutive_failures: u32,
49 skip_until_cycle: u64,
50}
51
52#[derive(Default)]
56pub struct FailureTracker {
57 inner: Mutex<HashMap<String, FailureEntry>>,
58 cycle: AtomicU64,
59}
60
61impl FailureTracker {
62 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn advance_cycle(&self) {
68 let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
69 if cycle.is_multiple_of(50) {
71 let mut map = self.inner.lock().unwrap();
72 if map.len() > GC_THRESHOLD {
73 map.retain(|_, e| {
74 e.consecutive_failures >= FAILURE_THRESHOLD
75 && e.skip_until_cycle > cycle
76 });
77 }
78 }
79 }
80
81 pub fn should_skip(&self, key: &str) -> bool {
84 let mut map = self.inner.lock().unwrap();
85 if let Some(entry) = map.get_mut(key)
86 && entry.consecutive_failures >= FAILURE_THRESHOLD
87 {
88 let cycle = self.cycle.load(Ordering::Relaxed);
89 if entry.skip_until_cycle > cycle {
90 return true;
91 }
92 entry.consecutive_failures = 0;
94 entry.skip_until_cycle = 0;
95 }
96 false
97 }
98
99 pub fn record_failure(&self, key: &str, scanner_name: &str) {
102 let mut map = self.inner.lock().unwrap();
103 let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
104 consecutive_failures: 0,
105 skip_until_cycle: 0,
106 });
107 entry.consecutive_failures += 1;
108 if entry.consecutive_failures == FAILURE_THRESHOLD {
109 let cycle = self.cycle.load(Ordering::Relaxed);
110 entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
111 tracing::error!(
112 scanner = scanner_name,
113 item = key,
114 failures = entry.consecutive_failures,
115 backoff_cycles = BACKOFF_CYCLES,
116 "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
117 );
118 }
119 }
120
121 pub fn record_success(&self, key: &str) {
123 let mut map = self.inner.lock().unwrap();
124 map.remove(key);
125 }
126}
127
128pub trait Scanner: Send + Sync + 'static {
134 fn name(&self) -> &'static str;
136
137 fn interval(&self) -> Duration;
139
140 fn scan_partition(
142 &self,
143 client: &ferriskey::Client,
144 partition: u16,
145 ) -> impl std::future::Future<Output = ScanResult> + Send;
146}
147
148pub struct ScannerRunner;
150
151impl ScannerRunner {
152 pub fn spawn<S: Scanner>(
154 scanner: Arc<S>,
155 client: ferriskey::Client,
156 num_partitions: u16,
157 mut shutdown: watch::Receiver<bool>,
158 ) -> JoinHandle<()> {
159 tokio::spawn(async move {
160 let name = scanner.name();
161 let interval = scanner.interval().max(Duration::from_millis(100));
162 tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
163
164 loop {
165 let cycle_start = tokio::time::Instant::now();
166 let mut total_processed: u32 = 0;
167 let mut total_errors: u32 = 0;
168
169 for p in 0..num_partitions {
170 if *shutdown.borrow() {
172 tracing::info!(scanner = name, "shutdown requested, stopping");
173 return;
174 }
175
176 let result = scanner.scan_partition(&client, p).await;
177 total_processed += result.processed;
178 total_errors += result.errors;
179 }
180
181 let elapsed = cycle_start.elapsed();
182 if total_processed > 0 || total_errors > 0 {
183 tracing::info!(
184 scanner = name,
185 processed = total_processed,
186 errors = total_errors,
187 elapsed_ms = elapsed.as_millis() as u64,
188 "scan cycle complete"
189 );
190 } else {
191 tracing::trace!(
192 scanner = name,
193 elapsed_ms = elapsed.as_millis() as u64,
194 "scan cycle complete (nothing to do)"
195 );
196 }
197
198 let sleep_dur = interval.saturating_sub(elapsed);
200 tokio::select! {
201 _ = tokio::time::sleep(sleep_dur) => {}
202 _ = shutdown.changed() => {
203 if *shutdown.borrow() {
204 tracing::info!(scanner = name, "shutdown requested, stopping");
205 return;
206 }
207 }
208 }
209 }
210 })
211 }
212}