Skip to main content

ff_engine/scanner/
mod.rs

1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod edge_cancel_dispatcher;
16pub mod edge_cancel_reconciler;
17pub mod flow_projector;
18pub mod index_reconciler;
19pub mod lease_expiry;
20pub mod pending_wp_expiry;
21pub mod quota_reconciler;
22pub mod retention_trimmer;
23pub mod suspension_timeout;
24pub mod unblock;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Mutex;
30use std::time::Duration;
31
32use ff_core::backend::ScannerFilter;
33use ff_core::partition::{Partition, PartitionFamily};
34use ff_core::types::Namespace;
35use tokio::sync::watch;
36use tokio::task::JoinHandle;
37
38/// Result of scanning one partition.
39pub struct ScanResult {
40    pub processed: u32,
41    pub errors: u32,
42}
43
44// ── Failure tracking for persistent FCALL errors ──
45
46/// Max consecutive failures before an item enters backoff.
47const FAILURE_THRESHOLD: u32 = 3;
48/// Number of scan cycles to skip after hitting the threshold.
49const BACKOFF_CYCLES: u64 = 10;
50/// Max tracked entries before GC runs.
51const GC_THRESHOLD: usize = 500;
52
53struct FailureEntry {
54    consecutive_failures: u32,
55    skip_until_cycle: u64,
56}
57
58/// Tracks persistently-failing items so they don't permanently consume
59/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
60/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
61#[derive(Default)]
62pub struct FailureTracker {
63    inner: Mutex<HashMap<String, FailureEntry>>,
64    cycle: AtomicU64,
65}
66
67impl FailureTracker {
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Call once per full scan cycle (e.g., when partition == 0).
73    pub fn advance_cycle(&self) {
74        let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
75        // Periodic GC: remove entries whose backoff has expired
76        if cycle.is_multiple_of(50) {
77            let mut map = self.inner.lock().unwrap();
78            if map.len() > GC_THRESHOLD {
79                map.retain(|_, e| {
80                    e.consecutive_failures >= FAILURE_THRESHOLD
81                        && e.skip_until_cycle > cycle
82                });
83            }
84        }
85    }
86
87    /// Returns true if this item should be skipped (in backoff).
88    /// Also resets the entry when backoff expires, giving it another chance.
89    pub fn should_skip(&self, key: &str) -> bool {
90        let mut map = self.inner.lock().unwrap();
91        if let Some(entry) = map.get_mut(key)
92            && entry.consecutive_failures >= FAILURE_THRESHOLD
93        {
94            let cycle = self.cycle.load(Ordering::Relaxed);
95            if entry.skip_until_cycle > cycle {
96                return true;
97            }
98            // Backoff expired — reset and allow retry
99            entry.consecutive_failures = 0;
100            entry.skip_until_cycle = 0;
101        }
102        false
103    }
104
105    /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
106    /// logs an error and puts the item into backoff.
107    pub fn record_failure(&self, key: &str, scanner_name: &str) {
108        let mut map = self.inner.lock().unwrap();
109        let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
110            consecutive_failures: 0,
111            skip_until_cycle: 0,
112        });
113        entry.consecutive_failures += 1;
114        if entry.consecutive_failures == FAILURE_THRESHOLD {
115            let cycle = self.cycle.load(Ordering::Relaxed);
116            entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
117            tracing::error!(
118                scanner = scanner_name,
119                item = key,
120                failures = entry.consecutive_failures,
121                backoff_cycles = BACKOFF_CYCLES,
122                "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
123            );
124        }
125    }
126
127    /// Record a success — clears any tracked failure state.
128    pub fn record_success(&self, key: &str) {
129        let mut map = self.inner.lock().unwrap();
130        map.remove(key);
131    }
132}
133
134/// Trait for background partition scanners.
135///
136/// Each implementation scans one aspect of execution state (lease expiry,
137/// delayed promotion, index consistency) across all partitions at a
138/// configured interval.
139pub trait Scanner: Send + Sync + 'static {
140    /// Human-readable name for logging.
141    fn name(&self) -> &'static str;
142
143    /// How often to run a full scan across all partitions.
144    fn interval(&self) -> Duration;
145
146    /// Per-consumer filter applied by execution-shaped scanners to
147    /// restrict the set of candidates they act on (issue #122).
148    ///
149    /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
150    /// Implementations override by storing a `ScannerFilter` on the
151    /// struct (constructed via `Self::with_filter(..)`) and
152    /// returning `&self.filter`.
153    fn filter(&self) -> &ScannerFilter {
154        &ScannerFilter::NOOP
155    }
156
157    /// Scan a single partition. Called once per partition per cycle.
158    fn scan_partition(
159        &self,
160        client: &ferriskey::Client,
161        partition: u16,
162    ) -> impl std::future::Future<Output = ScanResult> + Send;
163
164    /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
165    /// across all partitions by the scanner runner to produce a
166    /// single gauge value (today only `cancel_reconciler` exports
167    /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
168    /// scanners that don't export a gauge compile unchanged.
169    ///
170    /// Runs AFTER `scan_partition` for the same `partition` within
171    /// the same cycle, so implementations can reuse cached state.
172    /// The trivial default implementation returns `None` for every
173    /// partition and the runner writes nothing.
174    fn sample_backlog_depth(
175        &self,
176        _client: &ferriskey::Client,
177        _partition: u16,
178    ) -> impl std::future::Future<Output = Option<u64>> + Send {
179        async { None }
180    }
181}
182
183/// Issue #122: per-candidate filter helper shared by all
184/// execution-shaped scanners.
185///
186/// Returns true iff the candidate `eid` on `partition` should be
187/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
188/// filter never rejects — returns false without issuing any HGET.
189///
190/// # `eid` format
191///
192/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
193/// string — identical to what Lua stores as a ZSET member via
194/// `ZADD ... A.execution_id` and what every scanner reads out of
195/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
196/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
197/// the canonical **double-tagged** production key shape
198/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
199/// the partition's routing hash-tag; the second is the tag baked
200/// into the ExecutionId string. Matches
201/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
202/// Do not strip the prefix — doing so would build a
203/// single-tagged key that does not exist in production.
204///
205/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
206///
207/// # Cost
208///
209/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
210/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
211/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
212/// 2 HGETs when both are set. Namespace is checked first (cheaper
213/// — touches the already-hot `core` hash) and short-circuits on
214/// mismatch.
215///
216/// # Failure mode
217///
218/// On HGET failure the helper returns true (skip), conservatively:
219/// leaking a cross-tenant candidate due to a transient read error is
220/// worse than the scanner temporarily underclaiming — the next cycle
221/// picks it back up once the backend recovers.
222pub async fn should_skip_candidate(
223    client: &ferriskey::Client,
224    filter: &ScannerFilter,
225    partition: u16,
226    eid: &str,
227) -> bool {
228    if filter.is_noop() {
229        return false;
230    }
231    let p = Partition {
232        family: PartitionFamily::Execution,
233        index: partition,
234    };
235    let tag = p.hash_tag();
236
237    if let Some(ref want_ns) = filter.namespace {
238        let core_key = format!("ff:exec:{}:{}:core", tag, eid);
239        match client
240            .cmd("HGET")
241            .arg(&core_key)
242            .arg("namespace")
243            .execute::<Option<String>>()
244            .await
245        {
246            Ok(Some(s)) => {
247                if &Namespace::new(s) != want_ns {
248                    return true;
249                }
250            }
251            // nil or transport error — skip conservatively.
252            _ => return true,
253        }
254    }
255
256    if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
257        let tags_key = format!("ff:exec:{}:{}:tags", tag, eid);
258        match client
259            .cmd("HGET")
260            .arg(&tags_key)
261            .arg(tag_key.as_str())
262            .execute::<Option<String>>()
263            .await
264        {
265            Ok(Some(v)) if &v == want_value => {}
266            _ => return true,
267        }
268    }
269
270    false
271}
272
273/// Drives a scanner across all execution partitions in a loop.
274pub struct ScannerRunner;
275
276impl ScannerRunner {
277    /// Spawn a tokio task that runs the scanner forever until shutdown.
278    ///
279    /// PR-94: the `metrics` handle records per-cycle duration +
280    /// cycle-total counter. Under the no-op shim (`observability`
281    /// feature off) the recorder calls compile to nothing.
282    pub fn spawn<S: Scanner>(
283        scanner: Arc<S>,
284        client: ferriskey::Client,
285        num_partitions: u16,
286        mut shutdown: watch::Receiver<bool>,
287        metrics: Arc<ff_observability::Metrics>,
288    ) -> JoinHandle<()> {
289        tokio::spawn(async move {
290            let name = scanner.name();
291            let interval = scanner.interval().max(Duration::from_millis(100));
292            tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
293
294            loop {
295                let cycle_start = tokio::time::Instant::now();
296                let mut total_processed: u32 = 0;
297                let mut total_errors: u32 = 0;
298                // Gauge aggregation strategy: require **every**
299                // partition to return a sample before writing.
300                // A partial sum (some partitions sampled, some
301                // returned None on error) would under-report and
302                // make the gauge jump below the true backlog, which
303                // an operator could mis-interpret as drain progress.
304                // On any missing sample we set `sample_valid = false`
305                // and skip the gauge write — the previous value
306                // stands until a full cycle succeeds.
307                //
308                // First partition returning `Some(_)` sets
309                // `sampled = true`; a subsequent `None` on any
310                // partition invalidates the cycle's write.
311                let mut total_backlog_depth: u64 = 0;
312                let mut sampled = false;
313                let mut sample_valid = true;
314
315                for p in 0..num_partitions {
316                    // Check for shutdown between partitions
317                    if *shutdown.borrow() {
318                        tracing::info!(scanner = name, "shutdown requested, stopping");
319                        return;
320                    }
321
322                    let result = scanner.scan_partition(&client, p).await;
323                    total_processed += result.processed;
324                    total_errors += result.errors;
325
326                    // Only query the gauge-sample hook on scanners
327                    // that override it (default returns None for
328                    // every partition — the check short-circuits).
329                    match scanner.sample_backlog_depth(&client, p).await {
330                        Some(d) => {
331                            sampled = true;
332                            total_backlog_depth =
333                                total_backlog_depth.saturating_add(d);
334                        }
335                        None => {
336                            // A non-overriding scanner returns None
337                            // on every partition → `sampled` stays
338                            // false → gauge write skipped anyway.
339                            // An overriding scanner with a transient
340                            // failure invalidates the cycle only if
341                            // it had already sampled a partition.
342                            if sampled {
343                                sample_valid = false;
344                            }
345                        }
346                    }
347                }
348
349                let elapsed = cycle_start.elapsed();
350                // PR-94: scanner cycle metrics. Recorded every cycle,
351                // regardless of whether the cycle did any work, so
352                // operators can see cadence drift (a stuck scanner
353                // stops producing data points).
354                metrics.record_scanner_cycle(name, elapsed);
355                if sampled && sample_valid {
356                    metrics.set_cancel_backlog_depth(total_backlog_depth);
357                } else if sampled && !sample_valid {
358                    // At least one partition sampled but another
359                    // failed — leave the gauge at its prior value.
360                    // Log at debug so operators investigating a
361                    // flat gauge can correlate to the cycle where
362                    // sampling partially failed.
363                    tracing::debug!(
364                        scanner = name,
365                        "skipping cancel_backlog_depth gauge write this cycle \
366                         (partial partition sample)"
367                    );
368                }
369                if total_processed > 0 || total_errors > 0 {
370                    tracing::info!(
371                        scanner = name,
372                        processed = total_processed,
373                        errors = total_errors,
374                        elapsed_ms = elapsed.as_millis() as u64,
375                        "scan cycle complete"
376                    );
377                } else {
378                    tracing::trace!(
379                        scanner = name,
380                        elapsed_ms = elapsed.as_millis() as u64,
381                        "scan cycle complete (nothing to do)"
382                    );
383                }
384
385                // Sleep for the remaining interval (or immediately if scan took longer)
386                let sleep_dur = interval.saturating_sub(elapsed);
387                tokio::select! {
388                    _ = tokio::time::sleep(sleep_dur) => {}
389                    _ = shutdown.changed() => {
390                        if *shutdown.borrow() {
391                            tracing::info!(scanner = name, "shutdown requested, stopping");
392                            return;
393                        }
394                    }
395                }
396            }
397        })
398    }
399}