Skip to main content

ff_engine/scanner/
mod.rs

1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod dependency_reconciler;
14pub mod flow_projector;
15pub mod index_reconciler;
16pub mod lease_expiry;
17pub mod pending_wp_expiry;
18pub mod quota_reconciler;
19pub mod retention_trimmer;
20pub mod suspension_timeout;
21pub mod unblock;
22
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Mutex;
27use std::time::Duration;
28
29use tokio::sync::watch;
30use tokio::task::JoinHandle;
31
32/// Result of scanning one partition.
33pub struct ScanResult {
34    pub processed: u32,
35    pub errors: u32,
36}
37
38// ── Failure tracking for persistent FCALL errors ──
39
40/// Max consecutive failures before an item enters backoff.
41const FAILURE_THRESHOLD: u32 = 3;
42/// Number of scan cycles to skip after hitting the threshold.
43const BACKOFF_CYCLES: u64 = 10;
44/// Max tracked entries before GC runs.
45const GC_THRESHOLD: usize = 500;
46
47struct FailureEntry {
48    consecutive_failures: u32,
49    skip_until_cycle: u64,
50}
51
52/// Tracks persistently-failing items so they don't permanently consume
53/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
54/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
55#[derive(Default)]
56pub struct FailureTracker {
57    inner: Mutex<HashMap<String, FailureEntry>>,
58    cycle: AtomicU64,
59}
60
61impl FailureTracker {
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Call once per full scan cycle (e.g., when partition == 0).
67    pub fn advance_cycle(&self) {
68        let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
69        // Periodic GC: remove entries whose backoff has expired
70        if cycle.is_multiple_of(50) {
71            let mut map = self.inner.lock().unwrap();
72            if map.len() > GC_THRESHOLD {
73                map.retain(|_, e| {
74                    e.consecutive_failures >= FAILURE_THRESHOLD
75                        && e.skip_until_cycle > cycle
76                });
77            }
78        }
79    }
80
81    /// Returns true if this item should be skipped (in backoff).
82    /// Also resets the entry when backoff expires, giving it another chance.
83    pub fn should_skip(&self, key: &str) -> bool {
84        let mut map = self.inner.lock().unwrap();
85        if let Some(entry) = map.get_mut(key)
86            && entry.consecutive_failures >= FAILURE_THRESHOLD
87        {
88            let cycle = self.cycle.load(Ordering::Relaxed);
89            if entry.skip_until_cycle > cycle {
90                return true;
91            }
92            // Backoff expired — reset and allow retry
93            entry.consecutive_failures = 0;
94            entry.skip_until_cycle = 0;
95        }
96        false
97    }
98
99    /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
100    /// logs an error and puts the item into backoff.
101    pub fn record_failure(&self, key: &str, scanner_name: &str) {
102        let mut map = self.inner.lock().unwrap();
103        let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
104            consecutive_failures: 0,
105            skip_until_cycle: 0,
106        });
107        entry.consecutive_failures += 1;
108        if entry.consecutive_failures == FAILURE_THRESHOLD {
109            let cycle = self.cycle.load(Ordering::Relaxed);
110            entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
111            tracing::error!(
112                scanner = scanner_name,
113                item = key,
114                failures = entry.consecutive_failures,
115                backoff_cycles = BACKOFF_CYCLES,
116                "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
117            );
118        }
119    }
120
121    /// Record a success — clears any tracked failure state.
122    pub fn record_success(&self, key: &str) {
123        let mut map = self.inner.lock().unwrap();
124        map.remove(key);
125    }
126}
127
128/// Trait for background partition scanners.
129///
130/// Each implementation scans one aspect of execution state (lease expiry,
131/// delayed promotion, index consistency) across all partitions at a
132/// configured interval.
133pub trait Scanner: Send + Sync + 'static {
134    /// Human-readable name for logging.
135    fn name(&self) -> &'static str;
136
137    /// How often to run a full scan across all partitions.
138    fn interval(&self) -> Duration;
139
140    /// Scan a single partition. Called once per partition per cycle.
141    fn scan_partition(
142        &self,
143        client: &ferriskey::Client,
144        partition: u16,
145    ) -> impl std::future::Future<Output = ScanResult> + Send;
146}
147
148/// Drives a scanner across all execution partitions in a loop.
149pub struct ScannerRunner;
150
151impl ScannerRunner {
152    /// Spawn a tokio task that runs the scanner forever until shutdown.
153    pub fn spawn<S: Scanner>(
154        scanner: Arc<S>,
155        client: ferriskey::Client,
156        num_partitions: u16,
157        mut shutdown: watch::Receiver<bool>,
158    ) -> JoinHandle<()> {
159        tokio::spawn(async move {
160            let name = scanner.name();
161            let interval = scanner.interval().max(Duration::from_millis(100));
162            tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
163
164            loop {
165                let cycle_start = tokio::time::Instant::now();
166                let mut total_processed: u32 = 0;
167                let mut total_errors: u32 = 0;
168
169                for p in 0..num_partitions {
170                    // Check for shutdown between partitions
171                    if *shutdown.borrow() {
172                        tracing::info!(scanner = name, "shutdown requested, stopping");
173                        return;
174                    }
175
176                    let result = scanner.scan_partition(&client, p).await;
177                    total_processed += result.processed;
178                    total_errors += result.errors;
179                }
180
181                let elapsed = cycle_start.elapsed();
182                if total_processed > 0 || total_errors > 0 {
183                    tracing::info!(
184                        scanner = name,
185                        processed = total_processed,
186                        errors = total_errors,
187                        elapsed_ms = elapsed.as_millis() as u64,
188                        "scan cycle complete"
189                    );
190                } else {
191                    tracing::trace!(
192                        scanner = name,
193                        elapsed_ms = elapsed.as_millis() as u64,
194                        "scan cycle complete (nothing to do)"
195                    );
196                }
197
198                // Sleep for the remaining interval (or immediately if scan took longer)
199                let sleep_dur = interval.saturating_sub(elapsed);
200                tokio::select! {
201                    _ = tokio::time::sleep(sleep_dur) => {}
202                    _ = shutdown.changed() => {
203                        if *shutdown.borrow() {
204                            tracing::info!(scanner = name, "shutdown requested, stopping");
205                            return;
206                        }
207                    }
208                }
209            }
210        })
211    }
212}