ff_engine/scanner/mod.rs
1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod edge_cancel_dispatcher;
16pub mod edge_cancel_reconciler;
17pub mod flow_projector;
18pub mod index_reconciler;
19pub mod lease_expiry;
20pub mod pending_wp_expiry;
21pub mod quota_reconciler;
22pub mod retention_trimmer;
23pub mod suspension_timeout;
24pub mod unblock;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Mutex;
30use std::time::Duration;
31
32use ff_core::backend::ScannerFilter;
33use ff_core::engine_backend::EngineBackend;
34use ff_core::types::ExecutionId;
35use tokio::sync::watch;
36use tokio::task::JoinHandle;
37
38/// Result of scanning one partition.
39pub struct ScanResult {
40 pub processed: u32,
41 pub errors: u32,
42}
43
44// ── Failure tracking for persistent FCALL errors ──
45
46/// Max consecutive failures before an item enters backoff.
47const FAILURE_THRESHOLD: u32 = 3;
48/// Number of scan cycles to skip after hitting the threshold.
49const BACKOFF_CYCLES: u64 = 10;
50/// Max tracked entries before GC runs.
51const GC_THRESHOLD: usize = 500;
52
53struct FailureEntry {
54 consecutive_failures: u32,
55 skip_until_cycle: u64,
56}
57
58/// Tracks persistently-failing items so they don't permanently consume
59/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
60/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
61#[derive(Default)]
62pub struct FailureTracker {
63 inner: Mutex<HashMap<String, FailureEntry>>,
64 cycle: AtomicU64,
65}
66
67impl FailureTracker {
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Call once per full scan cycle (e.g., when partition == 0).
73 pub fn advance_cycle(&self) {
74 let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
75 // Periodic GC: remove entries whose backoff has expired
76 if cycle.is_multiple_of(50) {
77 let mut map = self.inner.lock().unwrap();
78 if map.len() > GC_THRESHOLD {
79 map.retain(|_, e| {
80 e.consecutive_failures >= FAILURE_THRESHOLD
81 && e.skip_until_cycle > cycle
82 });
83 }
84 }
85 }
86
87 /// Returns true if this item should be skipped (in backoff).
88 /// Also resets the entry when backoff expires, giving it another chance.
89 pub fn should_skip(&self, key: &str) -> bool {
90 let mut map = self.inner.lock().unwrap();
91 if let Some(entry) = map.get_mut(key)
92 && entry.consecutive_failures >= FAILURE_THRESHOLD
93 {
94 let cycle = self.cycle.load(Ordering::Relaxed);
95 if entry.skip_until_cycle > cycle {
96 return true;
97 }
98 // Backoff expired — reset and allow retry
99 entry.consecutive_failures = 0;
100 entry.skip_until_cycle = 0;
101 }
102 false
103 }
104
105 /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
106 /// logs an error and puts the item into backoff.
107 pub fn record_failure(&self, key: &str, scanner_name: &str) {
108 let mut map = self.inner.lock().unwrap();
109 let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
110 consecutive_failures: 0,
111 skip_until_cycle: 0,
112 });
113 entry.consecutive_failures += 1;
114 if entry.consecutive_failures == FAILURE_THRESHOLD {
115 let cycle = self.cycle.load(Ordering::Relaxed);
116 entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
117 tracing::error!(
118 scanner = scanner_name,
119 item = key,
120 failures = entry.consecutive_failures,
121 backoff_cycles = BACKOFF_CYCLES,
122 "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
123 );
124 }
125 }
126
127 /// Record a success — clears any tracked failure state.
128 pub fn record_success(&self, key: &str) {
129 let mut map = self.inner.lock().unwrap();
130 map.remove(key);
131 }
132}
133
134/// Trait for background partition scanners.
135///
136/// Each implementation scans one aspect of execution state (lease expiry,
137/// delayed promotion, index consistency) across all partitions at a
138/// configured interval.
139pub trait Scanner: Send + Sync + 'static {
140 /// Human-readable name for logging.
141 fn name(&self) -> &'static str;
142
143 /// How often to run a full scan across all partitions.
144 fn interval(&self) -> Duration;
145
146 /// Per-consumer filter applied by execution-shaped scanners to
147 /// restrict the set of candidates they act on (issue #122).
148 ///
149 /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
150 /// Implementations override by storing a `ScannerFilter` on the
151 /// struct (constructed via `Self::with_filter(..)`) and
152 /// returning `&self.filter`.
153 fn filter(&self) -> &ScannerFilter {
154 &ScannerFilter::NOOP
155 }
156
157 /// Scan a single partition. Called once per partition per cycle.
158 fn scan_partition(
159 &self,
160 client: &ferriskey::Client,
161 partition: u16,
162 ) -> impl std::future::Future<Output = ScanResult> + Send;
163
164 /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
165 /// across all partitions by the scanner runner to produce a
166 /// single gauge value (today only `cancel_reconciler` exports
167 /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
168 /// scanners that don't export a gauge compile unchanged.
169 ///
170 /// Runs AFTER `scan_partition` for the same `partition` within
171 /// the same cycle, so implementations can reuse cached state.
172 /// The trivial default implementation returns `None` for every
173 /// partition and the runner writes nothing.
174 fn sample_backlog_depth(
175 &self,
176 _client: &ferriskey::Client,
177 _partition: u16,
178 ) -> impl std::future::Future<Output = Option<u64>> + Send {
179 async { None }
180 }
181}
182
183/// Issue #122: per-candidate filter helper shared by all
184/// execution-shaped scanners.
185///
186/// Returns true iff the candidate `eid` on `partition` should be
187/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
188/// filter never rejects — returns false without issuing any HGET.
189///
190/// # `eid` format
191///
192/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
193/// string — identical to what Lua stores as a ZSET member via
194/// `ZADD ... A.execution_id` and what every scanner reads out of
195/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
196/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
197/// the canonical **double-tagged** production key shape
198/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
199/// the partition's routing hash-tag; the second is the tag baked
200/// into the ExecutionId string. Matches
201/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
202/// Do not strip the prefix — doing so would build a
203/// single-tagged key that does not exist in production.
204///
205/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
206///
207/// # Cost
208///
209/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
210/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
211/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
212/// 2 HGETs when both are set. Namespace is checked first (cheaper
213/// — touches the already-hot `core` hash) and short-circuits on
214/// mismatch.
215///
216/// # Failure mode
217///
218/// On HGET failure the helper returns true (skip), conservatively:
219/// leaking a cross-tenant candidate due to a transient read error is
220/// worse than the scanner temporarily underclaiming — the next cycle
221/// picks it back up once the backend recovers.
222pub async fn should_skip_candidate(
223 backend: Option<&Arc<dyn EngineBackend>>,
224 filter: &ScannerFilter,
225 _partition: u16,
226 eid: &str,
227) -> bool {
228 if filter.is_noop() {
229 return false;
230 }
231 // With a non-noop filter but no backend plumbed, skip
232 // conservatively — same posture as a transport error. The
233 // in-tree engine path (`Engine::start_internal`) always plumbs a
234 // backend; `None` only reaches here from test-only scanner
235 // constructors that use `ScannerFilter::default()` (noop), so
236 // the first branch above already short-circuits.
237 let Some(backend) = backend else {
238 return true;
239 };
240 let Ok(exec_id) = ExecutionId::parse(eid) else {
241 // Malformed eid → skip conservatively (matches pre-PR-7b
242 // posture: anything we can't validate gets filtered out).
243 return true;
244 };
245
246 if let Some(ref want_ns) = filter.namespace {
247 // Dedicated point-read — preserves the 1-HGET cost contract
248 // documented above. `describe_execution` would be an N-field
249 // HGETALL / full-snapshot read and is the wrong tool when only
250 // the namespace scalar is needed.
251 match backend.get_execution_namespace(&exec_id).await {
252 Ok(Some(ref got)) if got == want_ns.as_str() => {}
253 _ => return true,
254 }
255 }
256
257 if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
258 match backend.get_execution_tag(&exec_id, tag_key.as_str()).await {
259 Ok(Some(v)) if &v == want_value => {}
260 _ => return true,
261 }
262 }
263
264 false
265}
266
267/// Drives a scanner across all execution partitions in a loop.
268pub struct ScannerRunner;
269
270impl ScannerRunner {
271 /// Spawn a tokio task that runs the scanner forever until shutdown.
272 ///
273 /// PR-94: the `metrics` handle records per-cycle duration +
274 /// cycle-total counter. Under the no-op shim (`observability`
275 /// feature off) the recorder calls compile to nothing.
276 pub fn spawn<S: Scanner>(
277 scanner: Arc<S>,
278 client: ferriskey::Client,
279 num_partitions: u16,
280 mut shutdown: watch::Receiver<bool>,
281 metrics: Arc<ff_observability::Metrics>,
282 ) -> JoinHandle<()> {
283 tokio::spawn(async move {
284 let name = scanner.name();
285 let interval = scanner.interval().max(Duration::from_millis(100));
286 tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
287
288 loop {
289 let cycle_start = tokio::time::Instant::now();
290 let mut total_processed: u32 = 0;
291 let mut total_errors: u32 = 0;
292 // Gauge aggregation strategy: require **every**
293 // partition to return a sample before writing.
294 // A partial sum (some partitions sampled, some
295 // returned None on error) would under-report and
296 // make the gauge jump below the true backlog, which
297 // an operator could mis-interpret as drain progress.
298 // On any missing sample we set `sample_valid = false`
299 // and skip the gauge write — the previous value
300 // stands until a full cycle succeeds.
301 //
302 // First partition returning `Some(_)` sets
303 // `sampled = true`; a subsequent `None` on any
304 // partition invalidates the cycle's write.
305 let mut total_backlog_depth: u64 = 0;
306 let mut sampled = false;
307 let mut sample_valid = true;
308
309 for p in 0..num_partitions {
310 // Check for shutdown between partitions
311 if *shutdown.borrow() {
312 tracing::info!(scanner = name, "shutdown requested, stopping");
313 return;
314 }
315
316 let result = scanner.scan_partition(&client, p).await;
317 total_processed += result.processed;
318 total_errors += result.errors;
319
320 // Only query the gauge-sample hook on scanners
321 // that override it (default returns None for
322 // every partition — the check short-circuits).
323 match scanner.sample_backlog_depth(&client, p).await {
324 Some(d) => {
325 sampled = true;
326 total_backlog_depth =
327 total_backlog_depth.saturating_add(d);
328 }
329 None => {
330 // A non-overriding scanner returns None
331 // on every partition → `sampled` stays
332 // false → gauge write skipped anyway.
333 // An overriding scanner with a transient
334 // failure invalidates the cycle only if
335 // it had already sampled a partition.
336 if sampled {
337 sample_valid = false;
338 }
339 }
340 }
341 }
342
343 let elapsed = cycle_start.elapsed();
344 // PR-94: scanner cycle metrics. Recorded every cycle,
345 // regardless of whether the cycle did any work, so
346 // operators can see cadence drift (a stuck scanner
347 // stops producing data points).
348 metrics.record_scanner_cycle(name, elapsed);
349 if sampled && sample_valid {
350 metrics.set_cancel_backlog_depth(total_backlog_depth);
351 } else if sampled && !sample_valid {
352 // At least one partition sampled but another
353 // failed — leave the gauge at its prior value.
354 // Log at debug so operators investigating a
355 // flat gauge can correlate to the cycle where
356 // sampling partially failed.
357 tracing::debug!(
358 scanner = name,
359 "skipping cancel_backlog_depth gauge write this cycle \
360 (partial partition sample)"
361 );
362 }
363 if total_processed > 0 || total_errors > 0 {
364 tracing::info!(
365 scanner = name,
366 processed = total_processed,
367 errors = total_errors,
368 elapsed_ms = elapsed.as_millis() as u64,
369 "scan cycle complete"
370 );
371 } else {
372 tracing::trace!(
373 scanner = name,
374 elapsed_ms = elapsed.as_millis() as u64,
375 "scan cycle complete (nothing to do)"
376 );
377 }
378
379 // Sleep for the remaining interval (or immediately if scan took longer)
380 let sleep_dur = interval.saturating_sub(elapsed);
381 tokio::select! {
382 _ = tokio::time::sleep(sleep_dur) => {}
383 _ = shutdown.changed() => {
384 if *shutdown.borrow() {
385 tracing::info!(scanner = name, "shutdown requested, stopping");
386 return;
387 }
388 }
389 }
390 }
391 })
392 }
393}
394