Skip to main content

ff_engine/scanner/
mod.rs

1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod edge_cancel_dispatcher;
16pub mod edge_cancel_reconciler;
17pub mod flow_projector;
18pub mod index_reconciler;
19pub mod lease_expiry;
20pub mod pending_wp_expiry;
21pub mod quota_reconciler;
22pub mod retention_trimmer;
23pub mod suspension_timeout;
24pub mod unblock;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Mutex;
30use std::time::Duration;
31
32use ff_core::backend::ScannerFilter;
33use ff_core::engine_backend::EngineBackend;
34use ff_core::types::ExecutionId;
35use tokio::sync::watch;
36use tokio::task::JoinHandle;
37
38/// Result of scanning one partition.
39pub struct ScanResult {
40    pub processed: u32,
41    pub errors: u32,
42}
43
44// ── Failure tracking for persistent FCALL errors ──
45
46/// Max consecutive failures before an item enters backoff.
47const FAILURE_THRESHOLD: u32 = 3;
48/// Number of scan cycles to skip after hitting the threshold.
49const BACKOFF_CYCLES: u64 = 10;
50/// Max tracked entries before GC runs.
51const GC_THRESHOLD: usize = 500;
52
53struct FailureEntry {
54    consecutive_failures: u32,
55    skip_until_cycle: u64,
56}
57
58/// Tracks persistently-failing items so they don't permanently consume
59/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
60/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
61#[derive(Default)]
62pub struct FailureTracker {
63    inner: Mutex<HashMap<String, FailureEntry>>,
64    cycle: AtomicU64,
65}
66
67impl FailureTracker {
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Call once per full scan cycle (e.g., when partition == 0).
73    pub fn advance_cycle(&self) {
74        let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
75        // Periodic GC: remove entries whose backoff has expired
76        if cycle.is_multiple_of(50) {
77            let mut map = self.inner.lock().unwrap();
78            if map.len() > GC_THRESHOLD {
79                map.retain(|_, e| {
80                    e.consecutive_failures >= FAILURE_THRESHOLD
81                        && e.skip_until_cycle > cycle
82                });
83            }
84        }
85    }
86
87    /// Returns true if this item should be skipped (in backoff).
88    /// Also resets the entry when backoff expires, giving it another chance.
89    pub fn should_skip(&self, key: &str) -> bool {
90        let mut map = self.inner.lock().unwrap();
91        if let Some(entry) = map.get_mut(key)
92            && entry.consecutive_failures >= FAILURE_THRESHOLD
93        {
94            let cycle = self.cycle.load(Ordering::Relaxed);
95            if entry.skip_until_cycle > cycle {
96                return true;
97            }
98            // Backoff expired — reset and allow retry
99            entry.consecutive_failures = 0;
100            entry.skip_until_cycle = 0;
101        }
102        false
103    }
104
105    /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
106    /// logs an error and puts the item into backoff.
107    pub fn record_failure(&self, key: &str, scanner_name: &str) {
108        let mut map = self.inner.lock().unwrap();
109        let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
110            consecutive_failures: 0,
111            skip_until_cycle: 0,
112        });
113        entry.consecutive_failures += 1;
114        if entry.consecutive_failures == FAILURE_THRESHOLD {
115            let cycle = self.cycle.load(Ordering::Relaxed);
116            entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
117            tracing::error!(
118                scanner = scanner_name,
119                item = key,
120                failures = entry.consecutive_failures,
121                backoff_cycles = BACKOFF_CYCLES,
122                "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
123            );
124        }
125    }
126
127    /// Record a success — clears any tracked failure state.
128    pub fn record_success(&self, key: &str) {
129        let mut map = self.inner.lock().unwrap();
130        map.remove(key);
131    }
132}
133
134/// Trait for background partition scanners.
135///
136/// Each implementation scans one aspect of execution state (lease expiry,
137/// delayed promotion, index consistency) across all partitions at a
138/// configured interval.
139pub trait Scanner: Send + Sync + 'static {
140    /// Human-readable name for logging.
141    fn name(&self) -> &'static str;
142
143    /// How often to run a full scan across all partitions.
144    fn interval(&self) -> Duration;
145
146    /// Per-consumer filter applied by execution-shaped scanners to
147    /// restrict the set of candidates they act on (issue #122).
148    ///
149    /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
150    /// Implementations override by storing a `ScannerFilter` on the
151    /// struct (constructed via `Self::with_filter(..)`) and
152    /// returning `&self.filter`.
153    fn filter(&self) -> &ScannerFilter {
154        &ScannerFilter::NOOP
155    }
156
157    /// Scan a single partition. Called once per partition per cycle.
158    fn scan_partition(
159        &self,
160        client: &ferriskey::Client,
161        partition: u16,
162    ) -> impl std::future::Future<Output = ScanResult> + Send;
163
164    /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
165    /// across all partitions by the scanner runner to produce a
166    /// single gauge value (today only `cancel_reconciler` exports
167    /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
168    /// scanners that don't export a gauge compile unchanged.
169    ///
170    /// Runs AFTER `scan_partition` for the same `partition` within
171    /// the same cycle, so implementations can reuse cached state.
172    /// The trivial default implementation returns `None` for every
173    /// partition and the runner writes nothing.
174    fn sample_backlog_depth(
175        &self,
176        _client: &ferriskey::Client,
177        _partition: u16,
178    ) -> impl std::future::Future<Output = Option<u64>> + Send {
179        async { None }
180    }
181}
182
183/// Issue #122: per-candidate filter helper shared by all
184/// execution-shaped scanners.
185///
186/// Returns true iff the candidate `eid` on `partition` should be
187/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
188/// filter never rejects — returns false without issuing any HGET.
189///
190/// # `eid` format
191///
192/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
193/// string — identical to what Lua stores as a ZSET member via
194/// `ZADD ... A.execution_id` and what every scanner reads out of
195/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
196/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
197/// the canonical **double-tagged** production key shape
198/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
199/// the partition's routing hash-tag; the second is the tag baked
200/// into the ExecutionId string. Matches
201/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
202/// Do not strip the prefix — doing so would build a
203/// single-tagged key that does not exist in production.
204///
205/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
206///
207/// # Cost
208///
209/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
210/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
211/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
212/// 2 HGETs when both are set. Namespace is checked first (cheaper
213/// — touches the already-hot `core` hash) and short-circuits on
214/// mismatch.
215///
216/// # Failure mode
217///
218/// On HGET failure the helper returns true (skip), conservatively:
219/// leaking a cross-tenant candidate due to a transient read error is
220/// worse than the scanner temporarily underclaiming — the next cycle
221/// picks it back up once the backend recovers.
222pub async fn should_skip_candidate(
223    backend: Option<&Arc<dyn EngineBackend>>,
224    filter: &ScannerFilter,
225    _partition: u16,
226    eid: &str,
227) -> bool {
228    if filter.is_noop() {
229        return false;
230    }
231    // With a non-noop filter but no backend plumbed, skip
232    // conservatively — same posture as a transport error. The
233    // in-tree engine path (`Engine::start_internal`) always plumbs a
234    // backend; `None` only reaches here from test-only scanner
235    // constructors that use `ScannerFilter::default()` (noop), so
236    // the first branch above already short-circuits.
237    let Some(backend) = backend else {
238        return true;
239    };
240    let Ok(exec_id) = ExecutionId::parse(eid) else {
241        // Malformed eid → skip conservatively (matches pre-PR-7b
242        // posture: anything we can't validate gets filtered out).
243        return true;
244    };
245
246    if let Some(ref want_ns) = filter.namespace {
247        // Dedicated point-read — preserves the 1-HGET cost contract
248        // documented above. `describe_execution` would be an N-field
249        // HGETALL / full-snapshot read and is the wrong tool when only
250        // the namespace scalar is needed.
251        match backend.get_execution_namespace(&exec_id).await {
252            Ok(Some(ref got)) if got == want_ns.as_str() => {}
253            _ => return true,
254        }
255    }
256
257    if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
258        match backend.get_execution_tag(&exec_id, tag_key.as_str()).await {
259            Ok(Some(v)) if &v == want_value => {}
260            _ => return true,
261        }
262    }
263
264    false
265}
266
267/// Drives a scanner across all execution partitions in a loop.
268pub struct ScannerRunner;
269
270impl ScannerRunner {
271    /// Spawn a tokio task that runs the scanner forever until shutdown.
272    ///
273    /// PR-94: the `metrics` handle records per-cycle duration +
274    /// cycle-total counter. Under the no-op shim (`observability`
275    /// feature off) the recorder calls compile to nothing.
276    pub fn spawn<S: Scanner>(
277        scanner: Arc<S>,
278        client: ferriskey::Client,
279        num_partitions: u16,
280        mut shutdown: watch::Receiver<bool>,
281        metrics: Arc<ff_observability::Metrics>,
282    ) -> JoinHandle<()> {
283        tokio::spawn(async move {
284            let name = scanner.name();
285            let interval = scanner.interval().max(Duration::from_millis(100));
286            tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
287
288            loop {
289                let cycle_start = tokio::time::Instant::now();
290                let mut total_processed: u32 = 0;
291                let mut total_errors: u32 = 0;
292                // Gauge aggregation strategy: require **every**
293                // partition to return a sample before writing.
294                // A partial sum (some partitions sampled, some
295                // returned None on error) would under-report and
296                // make the gauge jump below the true backlog, which
297                // an operator could mis-interpret as drain progress.
298                // On any missing sample we set `sample_valid = false`
299                // and skip the gauge write — the previous value
300                // stands until a full cycle succeeds.
301                //
302                // First partition returning `Some(_)` sets
303                // `sampled = true`; a subsequent `None` on any
304                // partition invalidates the cycle's write.
305                let mut total_backlog_depth: u64 = 0;
306                let mut sampled = false;
307                let mut sample_valid = true;
308
309                for p in 0..num_partitions {
310                    // Check for shutdown between partitions
311                    if *shutdown.borrow() {
312                        tracing::info!(scanner = name, "shutdown requested, stopping");
313                        return;
314                    }
315
316                    let result = scanner.scan_partition(&client, p).await;
317                    total_processed += result.processed;
318                    total_errors += result.errors;
319
320                    // Only query the gauge-sample hook on scanners
321                    // that override it (default returns None for
322                    // every partition — the check short-circuits).
323                    match scanner.sample_backlog_depth(&client, p).await {
324                        Some(d) => {
325                            sampled = true;
326                            total_backlog_depth =
327                                total_backlog_depth.saturating_add(d);
328                        }
329                        None => {
330                            // A non-overriding scanner returns None
331                            // on every partition → `sampled` stays
332                            // false → gauge write skipped anyway.
333                            // An overriding scanner with a transient
334                            // failure invalidates the cycle only if
335                            // it had already sampled a partition.
336                            if sampled {
337                                sample_valid = false;
338                            }
339                        }
340                    }
341                }
342
343                let elapsed = cycle_start.elapsed();
344                // PR-94: scanner cycle metrics. Recorded every cycle,
345                // regardless of whether the cycle did any work, so
346                // operators can see cadence drift (a stuck scanner
347                // stops producing data points).
348                metrics.record_scanner_cycle(name, elapsed);
349                if sampled && sample_valid {
350                    metrics.set_cancel_backlog_depth(total_backlog_depth);
351                } else if sampled && !sample_valid {
352                    // At least one partition sampled but another
353                    // failed — leave the gauge at its prior value.
354                    // Log at debug so operators investigating a
355                    // flat gauge can correlate to the cycle where
356                    // sampling partially failed.
357                    tracing::debug!(
358                        scanner = name,
359                        "skipping cancel_backlog_depth gauge write this cycle \
360                         (partial partition sample)"
361                    );
362                }
363                if total_processed > 0 || total_errors > 0 {
364                    tracing::info!(
365                        scanner = name,
366                        processed = total_processed,
367                        errors = total_errors,
368                        elapsed_ms = elapsed.as_millis() as u64,
369                        "scan cycle complete"
370                    );
371                } else {
372                    tracing::trace!(
373                        scanner = name,
374                        elapsed_ms = elapsed.as_millis() as u64,
375                        "scan cycle complete (nothing to do)"
376                    );
377                }
378
379                // Sleep for the remaining interval (or immediately if scan took longer)
380                let sleep_dur = interval.saturating_sub(elapsed);
381                tokio::select! {
382                    _ = tokio::time::sleep(sleep_dur) => {}
383                    _ = shutdown.changed() => {
384                        if *shutdown.borrow() {
385                            tracing::info!(scanner = name, "shutdown requested, stopping");
386                            return;
387                        }
388                    }
389                }
390            }
391        })
392    }
393}
394