ff_engine/scanner/mod.rs
1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod edge_cancel_dispatcher;
16pub mod edge_cancel_reconciler;
17pub mod flow_projector;
18pub mod index_reconciler;
19pub mod lease_expiry;
20pub mod pending_wp_expiry;
21pub mod quota_reconciler;
22pub mod retention_trimmer;
23pub mod suspension_timeout;
24pub mod unblock;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Mutex;
30use std::time::Duration;
31
32use ff_core::backend::ScannerFilter;
33use ff_core::partition::{Partition, PartitionFamily};
34use ff_core::types::Namespace;
35use tokio::sync::watch;
36use tokio::task::JoinHandle;
37
38/// Result of scanning one partition.
39pub struct ScanResult {
40 pub processed: u32,
41 pub errors: u32,
42}
43
44// ── Failure tracking for persistent FCALL errors ──
45
46/// Max consecutive failures before an item enters backoff.
47const FAILURE_THRESHOLD: u32 = 3;
48/// Number of scan cycles to skip after hitting the threshold.
49const BACKOFF_CYCLES: u64 = 10;
50/// Max tracked entries before GC runs.
51const GC_THRESHOLD: usize = 500;
52
53struct FailureEntry {
54 consecutive_failures: u32,
55 skip_until_cycle: u64,
56}
57
58/// Tracks persistently-failing items so they don't permanently consume
59/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
60/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
61#[derive(Default)]
62pub struct FailureTracker {
63 inner: Mutex<HashMap<String, FailureEntry>>,
64 cycle: AtomicU64,
65}
66
67impl FailureTracker {
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Call once per full scan cycle (e.g., when partition == 0).
73 pub fn advance_cycle(&self) {
74 let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
75 // Periodic GC: remove entries whose backoff has expired
76 if cycle.is_multiple_of(50) {
77 let mut map = self.inner.lock().unwrap();
78 if map.len() > GC_THRESHOLD {
79 map.retain(|_, e| {
80 e.consecutive_failures >= FAILURE_THRESHOLD
81 && e.skip_until_cycle > cycle
82 });
83 }
84 }
85 }
86
87 /// Returns true if this item should be skipped (in backoff).
88 /// Also resets the entry when backoff expires, giving it another chance.
89 pub fn should_skip(&self, key: &str) -> bool {
90 let mut map = self.inner.lock().unwrap();
91 if let Some(entry) = map.get_mut(key)
92 && entry.consecutive_failures >= FAILURE_THRESHOLD
93 {
94 let cycle = self.cycle.load(Ordering::Relaxed);
95 if entry.skip_until_cycle > cycle {
96 return true;
97 }
98 // Backoff expired — reset and allow retry
99 entry.consecutive_failures = 0;
100 entry.skip_until_cycle = 0;
101 }
102 false
103 }
104
105 /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
106 /// logs an error and puts the item into backoff.
107 pub fn record_failure(&self, key: &str, scanner_name: &str) {
108 let mut map = self.inner.lock().unwrap();
109 let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
110 consecutive_failures: 0,
111 skip_until_cycle: 0,
112 });
113 entry.consecutive_failures += 1;
114 if entry.consecutive_failures == FAILURE_THRESHOLD {
115 let cycle = self.cycle.load(Ordering::Relaxed);
116 entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
117 tracing::error!(
118 scanner = scanner_name,
119 item = key,
120 failures = entry.consecutive_failures,
121 backoff_cycles = BACKOFF_CYCLES,
122 "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
123 );
124 }
125 }
126
127 /// Record a success — clears any tracked failure state.
128 pub fn record_success(&self, key: &str) {
129 let mut map = self.inner.lock().unwrap();
130 map.remove(key);
131 }
132}
133
134/// Trait for background partition scanners.
135///
136/// Each implementation scans one aspect of execution state (lease expiry,
137/// delayed promotion, index consistency) across all partitions at a
138/// configured interval.
139pub trait Scanner: Send + Sync + 'static {
140 /// Human-readable name for logging.
141 fn name(&self) -> &'static str;
142
143 /// How often to run a full scan across all partitions.
144 fn interval(&self) -> Duration;
145
146 /// Per-consumer filter applied by execution-shaped scanners to
147 /// restrict the set of candidates they act on (issue #122).
148 ///
149 /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
150 /// Implementations override by storing a `ScannerFilter` on the
151 /// struct (constructed via `Self::with_filter(..)`) and
152 /// returning `&self.filter`.
153 fn filter(&self) -> &ScannerFilter {
154 &ScannerFilter::NOOP
155 }
156
157 /// Scan a single partition. Called once per partition per cycle.
158 fn scan_partition(
159 &self,
160 client: &ferriskey::Client,
161 partition: u16,
162 ) -> impl std::future::Future<Output = ScanResult> + Send;
163
164 /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
165 /// across all partitions by the scanner runner to produce a
166 /// single gauge value (today only `cancel_reconciler` exports
167 /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
168 /// scanners that don't export a gauge compile unchanged.
169 ///
170 /// Runs AFTER `scan_partition` for the same `partition` within
171 /// the same cycle, so implementations can reuse cached state.
172 /// The trivial default implementation returns `None` for every
173 /// partition and the runner writes nothing.
174 fn sample_backlog_depth(
175 &self,
176 _client: &ferriskey::Client,
177 _partition: u16,
178 ) -> impl std::future::Future<Output = Option<u64>> + Send {
179 async { None }
180 }
181}
182
183/// Issue #122: per-candidate filter helper shared by all
184/// execution-shaped scanners.
185///
186/// Returns true iff the candidate `eid` on `partition` should be
187/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
188/// filter never rejects — returns false without issuing any HGET.
189///
190/// # `eid` format
191///
192/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
193/// string — identical to what Lua stores as a ZSET member via
194/// `ZADD ... A.execution_id` and what every scanner reads out of
195/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
196/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
197/// the canonical **double-tagged** production key shape
198/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
199/// the partition's routing hash-tag; the second is the tag baked
200/// into the ExecutionId string. Matches
201/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
202/// Do not strip the prefix — doing so would build a
203/// single-tagged key that does not exist in production.
204///
205/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
206///
207/// # Cost
208///
209/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
210/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
211/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
212/// 2 HGETs when both are set. Namespace is checked first (cheaper
213/// — touches the already-hot `core` hash) and short-circuits on
214/// mismatch.
215///
216/// # Failure mode
217///
218/// On HGET failure the helper returns true (skip), conservatively:
219/// leaking a cross-tenant candidate due to a transient read error is
220/// worse than the scanner temporarily underclaiming — the next cycle
221/// picks it back up once the backend recovers.
222pub async fn should_skip_candidate(
223 client: &ferriskey::Client,
224 filter: &ScannerFilter,
225 partition: u16,
226 eid: &str,
227) -> bool {
228 if filter.is_noop() {
229 return false;
230 }
231 let p = Partition {
232 family: PartitionFamily::Execution,
233 index: partition,
234 };
235 let tag = p.hash_tag();
236
237 if let Some(ref want_ns) = filter.namespace {
238 let core_key = format!("ff:exec:{}:{}:core", tag, eid);
239 match client
240 .cmd("HGET")
241 .arg(&core_key)
242 .arg("namespace")
243 .execute::<Option<String>>()
244 .await
245 {
246 Ok(Some(s)) => {
247 if &Namespace::new(s) != want_ns {
248 return true;
249 }
250 }
251 // nil or transport error — skip conservatively.
252 _ => return true,
253 }
254 }
255
256 if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
257 let tags_key = format!("ff:exec:{}:{}:tags", tag, eid);
258 match client
259 .cmd("HGET")
260 .arg(&tags_key)
261 .arg(tag_key.as_str())
262 .execute::<Option<String>>()
263 .await
264 {
265 Ok(Some(v)) if &v == want_value => {}
266 _ => return true,
267 }
268 }
269
270 false
271}
272
273/// Drives a scanner across all execution partitions in a loop.
274pub struct ScannerRunner;
275
276impl ScannerRunner {
277 /// Spawn a tokio task that runs the scanner forever until shutdown.
278 ///
279 /// PR-94: the `metrics` handle records per-cycle duration +
280 /// cycle-total counter. Under the no-op shim (`observability`
281 /// feature off) the recorder calls compile to nothing.
282 pub fn spawn<S: Scanner>(
283 scanner: Arc<S>,
284 client: ferriskey::Client,
285 num_partitions: u16,
286 mut shutdown: watch::Receiver<bool>,
287 metrics: Arc<ff_observability::Metrics>,
288 ) -> JoinHandle<()> {
289 tokio::spawn(async move {
290 let name = scanner.name();
291 let interval = scanner.interval().max(Duration::from_millis(100));
292 tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
293
294 loop {
295 let cycle_start = tokio::time::Instant::now();
296 let mut total_processed: u32 = 0;
297 let mut total_errors: u32 = 0;
298 // Gauge aggregation strategy: require **every**
299 // partition to return a sample before writing.
300 // A partial sum (some partitions sampled, some
301 // returned None on error) would under-report and
302 // make the gauge jump below the true backlog, which
303 // an operator could mis-interpret as drain progress.
304 // On any missing sample we set `sample_valid = false`
305 // and skip the gauge write — the previous value
306 // stands until a full cycle succeeds.
307 //
308 // First partition returning `Some(_)` sets
309 // `sampled = true`; a subsequent `None` on any
310 // partition invalidates the cycle's write.
311 let mut total_backlog_depth: u64 = 0;
312 let mut sampled = false;
313 let mut sample_valid = true;
314
315 for p in 0..num_partitions {
316 // Check for shutdown between partitions
317 if *shutdown.borrow() {
318 tracing::info!(scanner = name, "shutdown requested, stopping");
319 return;
320 }
321
322 let result = scanner.scan_partition(&client, p).await;
323 total_processed += result.processed;
324 total_errors += result.errors;
325
326 // Only query the gauge-sample hook on scanners
327 // that override it (default returns None for
328 // every partition — the check short-circuits).
329 match scanner.sample_backlog_depth(&client, p).await {
330 Some(d) => {
331 sampled = true;
332 total_backlog_depth =
333 total_backlog_depth.saturating_add(d);
334 }
335 None => {
336 // A non-overriding scanner returns None
337 // on every partition → `sampled` stays
338 // false → gauge write skipped anyway.
339 // An overriding scanner with a transient
340 // failure invalidates the cycle only if
341 // it had already sampled a partition.
342 if sampled {
343 sample_valid = false;
344 }
345 }
346 }
347 }
348
349 let elapsed = cycle_start.elapsed();
350 // PR-94: scanner cycle metrics. Recorded every cycle,
351 // regardless of whether the cycle did any work, so
352 // operators can see cadence drift (a stuck scanner
353 // stops producing data points).
354 metrics.record_scanner_cycle(name, elapsed);
355 if sampled && sample_valid {
356 metrics.set_cancel_backlog_depth(total_backlog_depth);
357 } else if sampled && !sample_valid {
358 // At least one partition sampled but another
359 // failed — leave the gauge at its prior value.
360 // Log at debug so operators investigating a
361 // flat gauge can correlate to the cycle where
362 // sampling partially failed.
363 tracing::debug!(
364 scanner = name,
365 "skipping cancel_backlog_depth gauge write this cycle \
366 (partial partition sample)"
367 );
368 }
369 if total_processed > 0 || total_errors > 0 {
370 tracing::info!(
371 scanner = name,
372 processed = total_processed,
373 errors = total_errors,
374 elapsed_ms = elapsed.as_millis() as u64,
375 "scan cycle complete"
376 );
377 } else {
378 tracing::trace!(
379 scanner = name,
380 elapsed_ms = elapsed.as_millis() as u64,
381 "scan cycle complete (nothing to do)"
382 );
383 }
384
385 // Sleep for the remaining interval (or immediately if scan took longer)
386 let sleep_dur = interval.saturating_sub(elapsed);
387 tokio::select! {
388 _ = tokio::time::sleep(sleep_dur) => {}
389 _ = shutdown.changed() => {
390 if *shutdown.borrow() {
391 tracing::info!(scanner = name, "shutdown requested, stopping");
392 return;
393 }
394 }
395 }
396 }
397 })
398 }
399}