Skip to main content

ff_engine/scanner/
mod.rs

1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod flow_projector;
16pub mod index_reconciler;
17pub mod lease_expiry;
18pub mod pending_wp_expiry;
19pub mod quota_reconciler;
20pub mod retention_trimmer;
21pub mod suspension_timeout;
22pub mod unblock;
23
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Mutex;
28use std::time::Duration;
29
30use ff_core::backend::ScannerFilter;
31use ff_core::partition::{Partition, PartitionFamily};
32use ff_core::types::Namespace;
33use tokio::sync::watch;
34use tokio::task::JoinHandle;
35
36/// Result of scanning one partition.
37pub struct ScanResult {
38    pub processed: u32,
39    pub errors: u32,
40}
41
42// ── Failure tracking for persistent FCALL errors ──
43
44/// Max consecutive failures before an item enters backoff.
45const FAILURE_THRESHOLD: u32 = 3;
46/// Number of scan cycles to skip after hitting the threshold.
47const BACKOFF_CYCLES: u64 = 10;
48/// Max tracked entries before GC runs.
49const GC_THRESHOLD: usize = 500;
50
51struct FailureEntry {
52    consecutive_failures: u32,
53    skip_until_cycle: u64,
54}
55
56/// Tracks persistently-failing items so they don't permanently consume
57/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
58/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
59#[derive(Default)]
60pub struct FailureTracker {
61    inner: Mutex<HashMap<String, FailureEntry>>,
62    cycle: AtomicU64,
63}
64
65impl FailureTracker {
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Call once per full scan cycle (e.g., when partition == 0).
71    pub fn advance_cycle(&self) {
72        let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
73        // Periodic GC: remove entries whose backoff has expired
74        if cycle.is_multiple_of(50) {
75            let mut map = self.inner.lock().unwrap();
76            if map.len() > GC_THRESHOLD {
77                map.retain(|_, e| {
78                    e.consecutive_failures >= FAILURE_THRESHOLD
79                        && e.skip_until_cycle > cycle
80                });
81            }
82        }
83    }
84
85    /// Returns true if this item should be skipped (in backoff).
86    /// Also resets the entry when backoff expires, giving it another chance.
87    pub fn should_skip(&self, key: &str) -> bool {
88        let mut map = self.inner.lock().unwrap();
89        if let Some(entry) = map.get_mut(key)
90            && entry.consecutive_failures >= FAILURE_THRESHOLD
91        {
92            let cycle = self.cycle.load(Ordering::Relaxed);
93            if entry.skip_until_cycle > cycle {
94                return true;
95            }
96            // Backoff expired — reset and allow retry
97            entry.consecutive_failures = 0;
98            entry.skip_until_cycle = 0;
99        }
100        false
101    }
102
103    /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
104    /// logs an error and puts the item into backoff.
105    pub fn record_failure(&self, key: &str, scanner_name: &str) {
106        let mut map = self.inner.lock().unwrap();
107        let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
108            consecutive_failures: 0,
109            skip_until_cycle: 0,
110        });
111        entry.consecutive_failures += 1;
112        if entry.consecutive_failures == FAILURE_THRESHOLD {
113            let cycle = self.cycle.load(Ordering::Relaxed);
114            entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
115            tracing::error!(
116                scanner = scanner_name,
117                item = key,
118                failures = entry.consecutive_failures,
119                backoff_cycles = BACKOFF_CYCLES,
120                "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
121            );
122        }
123    }
124
125    /// Record a success — clears any tracked failure state.
126    pub fn record_success(&self, key: &str) {
127        let mut map = self.inner.lock().unwrap();
128        map.remove(key);
129    }
130}
131
132/// Trait for background partition scanners.
133///
134/// Each implementation scans one aspect of execution state (lease expiry,
135/// delayed promotion, index consistency) across all partitions at a
136/// configured interval.
137pub trait Scanner: Send + Sync + 'static {
138    /// Human-readable name for logging.
139    fn name(&self) -> &'static str;
140
141    /// How often to run a full scan across all partitions.
142    fn interval(&self) -> Duration;
143
144    /// Per-consumer filter applied by execution-shaped scanners to
145    /// restrict the set of candidates they act on (issue #122).
146    ///
147    /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
148    /// Implementations override by storing a `ScannerFilter` on the
149    /// struct (constructed via `Self::with_filter(..)`) and
150    /// returning `&self.filter`.
151    fn filter(&self) -> &ScannerFilter {
152        &ScannerFilter::NOOP
153    }
154
155    /// Scan a single partition. Called once per partition per cycle.
156    fn scan_partition(
157        &self,
158        client: &ferriskey::Client,
159        partition: u16,
160    ) -> impl std::future::Future<Output = ScanResult> + Send;
161
162    /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
163    /// across all partitions by the scanner runner to produce a
164    /// single gauge value (today only `cancel_reconciler` exports
165    /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
166    /// scanners that don't export a gauge compile unchanged.
167    ///
168    /// Runs AFTER `scan_partition` for the same `partition` within
169    /// the same cycle, so implementations can reuse cached state.
170    /// The trivial default implementation returns `None` for every
171    /// partition and the runner writes nothing.
172    fn sample_backlog_depth(
173        &self,
174        _client: &ferriskey::Client,
175        _partition: u16,
176    ) -> impl std::future::Future<Output = Option<u64>> + Send {
177        async { None }
178    }
179}
180
181/// Issue #122: per-candidate filter helper shared by all
182/// execution-shaped scanners.
183///
184/// Returns true iff the candidate `eid` on `partition` should be
185/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
186/// filter never rejects — returns false without issuing any HGET.
187///
188/// # `eid` format
189///
190/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
191/// string — identical to what Lua stores as a ZSET member via
192/// `ZADD ... A.execution_id` and what every scanner reads out of
193/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
194/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
195/// the canonical **double-tagged** production key shape
196/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
197/// the partition's routing hash-tag; the second is the tag baked
198/// into the ExecutionId string. Matches
199/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
200/// Do not strip the prefix — doing so would build a
201/// single-tagged key that does not exist in production.
202///
203/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
204///
205/// # Cost
206///
207/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
208/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
209/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
210/// 2 HGETs when both are set. Namespace is checked first (cheaper
211/// — touches the already-hot `core` hash) and short-circuits on
212/// mismatch.
213///
214/// # Failure mode
215///
216/// On HGET failure the helper returns true (skip), conservatively:
217/// leaking a cross-tenant candidate due to a transient read error is
218/// worse than the scanner temporarily underclaiming — the next cycle
219/// picks it back up once the backend recovers.
220pub async fn should_skip_candidate(
221    client: &ferriskey::Client,
222    filter: &ScannerFilter,
223    partition: u16,
224    eid: &str,
225) -> bool {
226    if filter.is_noop() {
227        return false;
228    }
229    let p = Partition {
230        family: PartitionFamily::Execution,
231        index: partition,
232    };
233    let tag = p.hash_tag();
234
235    if let Some(ref want_ns) = filter.namespace {
236        let core_key = format!("ff:exec:{}:{}:core", tag, eid);
237        match client
238            .cmd("HGET")
239            .arg(&core_key)
240            .arg("namespace")
241            .execute::<Option<String>>()
242            .await
243        {
244            Ok(Some(s)) => {
245                if &Namespace::new(s) != want_ns {
246                    return true;
247                }
248            }
249            // nil or transport error — skip conservatively.
250            _ => return true,
251        }
252    }
253
254    if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
255        let tags_key = format!("ff:exec:{}:{}:tags", tag, eid);
256        match client
257            .cmd("HGET")
258            .arg(&tags_key)
259            .arg(tag_key.as_str())
260            .execute::<Option<String>>()
261            .await
262        {
263            Ok(Some(v)) if &v == want_value => {}
264            _ => return true,
265        }
266    }
267
268    false
269}
270
271/// Drives a scanner across all execution partitions in a loop.
272pub struct ScannerRunner;
273
274impl ScannerRunner {
275    /// Spawn a tokio task that runs the scanner forever until shutdown.
276    ///
277    /// PR-94: the `metrics` handle records per-cycle duration +
278    /// cycle-total counter. Under the no-op shim (`observability`
279    /// feature off) the recorder calls compile to nothing.
280    pub fn spawn<S: Scanner>(
281        scanner: Arc<S>,
282        client: ferriskey::Client,
283        num_partitions: u16,
284        mut shutdown: watch::Receiver<bool>,
285        metrics: Arc<ff_observability::Metrics>,
286    ) -> JoinHandle<()> {
287        tokio::spawn(async move {
288            let name = scanner.name();
289            let interval = scanner.interval().max(Duration::from_millis(100));
290            tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
291
292            loop {
293                let cycle_start = tokio::time::Instant::now();
294                let mut total_processed: u32 = 0;
295                let mut total_errors: u32 = 0;
296                // Gauge aggregation strategy: require **every**
297                // partition to return a sample before writing.
298                // A partial sum (some partitions sampled, some
299                // returned None on error) would under-report and
300                // make the gauge jump below the true backlog, which
301                // an operator could mis-interpret as drain progress.
302                // On any missing sample we set `sample_valid = false`
303                // and skip the gauge write — the previous value
304                // stands until a full cycle succeeds.
305                //
306                // First partition returning `Some(_)` sets
307                // `sampled = true`; a subsequent `None` on any
308                // partition invalidates the cycle's write.
309                let mut total_backlog_depth: u64 = 0;
310                let mut sampled = false;
311                let mut sample_valid = true;
312
313                for p in 0..num_partitions {
314                    // Check for shutdown between partitions
315                    if *shutdown.borrow() {
316                        tracing::info!(scanner = name, "shutdown requested, stopping");
317                        return;
318                    }
319
320                    let result = scanner.scan_partition(&client, p).await;
321                    total_processed += result.processed;
322                    total_errors += result.errors;
323
324                    // Only query the gauge-sample hook on scanners
325                    // that override it (default returns None for
326                    // every partition — the check short-circuits).
327                    match scanner.sample_backlog_depth(&client, p).await {
328                        Some(d) => {
329                            sampled = true;
330                            total_backlog_depth =
331                                total_backlog_depth.saturating_add(d);
332                        }
333                        None => {
334                            // A non-overriding scanner returns None
335                            // on every partition → `sampled` stays
336                            // false → gauge write skipped anyway.
337                            // An overriding scanner with a transient
338                            // failure invalidates the cycle only if
339                            // it had already sampled a partition.
340                            if sampled {
341                                sample_valid = false;
342                            }
343                        }
344                    }
345                }
346
347                let elapsed = cycle_start.elapsed();
348                // PR-94: scanner cycle metrics. Recorded every cycle,
349                // regardless of whether the cycle did any work, so
350                // operators can see cadence drift (a stuck scanner
351                // stops producing data points).
352                metrics.record_scanner_cycle(name, elapsed);
353                if sampled && sample_valid {
354                    metrics.set_cancel_backlog_depth(total_backlog_depth);
355                } else if sampled && !sample_valid {
356                    // At least one partition sampled but another
357                    // failed — leave the gauge at its prior value.
358                    // Log at debug so operators investigating a
359                    // flat gauge can correlate to the cycle where
360                    // sampling partially failed.
361                    tracing::debug!(
362                        scanner = name,
363                        "skipping cancel_backlog_depth gauge write this cycle \
364                         (partial partition sample)"
365                    );
366                }
367                if total_processed > 0 || total_errors > 0 {
368                    tracing::info!(
369                        scanner = name,
370                        processed = total_processed,
371                        errors = total_errors,
372                        elapsed_ms = elapsed.as_millis() as u64,
373                        "scan cycle complete"
374                    );
375                } else {
376                    tracing::trace!(
377                        scanner = name,
378                        elapsed_ms = elapsed.as_millis() as u64,
379                        "scan cycle complete (nothing to do)"
380                    );
381                }
382
383                // Sleep for the remaining interval (or immediately if scan took longer)
384                let sleep_dur = interval.saturating_sub(elapsed);
385                tokio::select! {
386                    _ = tokio::time::sleep(sleep_dur) => {}
387                    _ = shutdown.changed() => {
388                        if *shutdown.borrow() {
389                            tracing::info!(scanner = name, "shutdown requested, stopping");
390                            return;
391                        }
392                    }
393                }
394            }
395        })
396    }
397}