ff_engine/scanner/mod.rs
1//! Background scanner infrastructure.
2//!
3//! Scanners iterate execution partitions at fixed intervals, checking for
4//! conditions that require action (expired leases, due delays, index drift).
5//! Each scanner type implements the `Scanner` trait; the `ScannerRunner`
6//! drives them as tokio tasks.
7
8pub mod attempt_timeout;
9pub mod budget_reconciler;
10pub mod execution_deadline;
11pub mod budget_reset;
12pub mod delayed_promoter;
13pub mod cancel_reconciler;
14pub mod dependency_reconciler;
15pub mod flow_projector;
16pub mod index_reconciler;
17pub mod lease_expiry;
18pub mod pending_wp_expiry;
19pub mod quota_reconciler;
20pub mod retention_trimmer;
21pub mod suspension_timeout;
22pub mod unblock;
23
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Mutex;
28use std::time::Duration;
29
30use ff_core::backend::ScannerFilter;
31use ff_core::partition::{Partition, PartitionFamily};
32use ff_core::types::Namespace;
33use tokio::sync::watch;
34use tokio::task::JoinHandle;
35
36/// Result of scanning one partition.
37pub struct ScanResult {
38 pub processed: u32,
39 pub errors: u32,
40}
41
42// ── Failure tracking for persistent FCALL errors ──
43
44/// Max consecutive failures before an item enters backoff.
45const FAILURE_THRESHOLD: u32 = 3;
46/// Number of scan cycles to skip after hitting the threshold.
47const BACKOFF_CYCLES: u64 = 10;
48/// Max tracked entries before GC runs.
49const GC_THRESHOLD: usize = 500;
50
51struct FailureEntry {
52 consecutive_failures: u32,
53 skip_until_cycle: u64,
54}
55
56/// Tracks persistently-failing items so they don't permanently consume
57/// batch slots. After [`FAILURE_THRESHOLD`] consecutive failures for the
58/// same key, the item is skipped for [`BACKOFF_CYCLES`] scan cycles.
59#[derive(Default)]
60pub struct FailureTracker {
61 inner: Mutex<HashMap<String, FailureEntry>>,
62 cycle: AtomicU64,
63}
64
65impl FailureTracker {
66 pub fn new() -> Self {
67 Self::default()
68 }
69
70 /// Call once per full scan cycle (e.g., when partition == 0).
71 pub fn advance_cycle(&self) {
72 let cycle = self.cycle.fetch_add(1, Ordering::Relaxed) + 1;
73 // Periodic GC: remove entries whose backoff has expired
74 if cycle.is_multiple_of(50) {
75 let mut map = self.inner.lock().unwrap();
76 if map.len() > GC_THRESHOLD {
77 map.retain(|_, e| {
78 e.consecutive_failures >= FAILURE_THRESHOLD
79 && e.skip_until_cycle > cycle
80 });
81 }
82 }
83 }
84
85 /// Returns true if this item should be skipped (in backoff).
86 /// Also resets the entry when backoff expires, giving it another chance.
87 pub fn should_skip(&self, key: &str) -> bool {
88 let mut map = self.inner.lock().unwrap();
89 if let Some(entry) = map.get_mut(key)
90 && entry.consecutive_failures >= FAILURE_THRESHOLD
91 {
92 let cycle = self.cycle.load(Ordering::Relaxed);
93 if entry.skip_until_cycle > cycle {
94 return true;
95 }
96 // Backoff expired — reset and allow retry
97 entry.consecutive_failures = 0;
98 entry.skip_until_cycle = 0;
99 }
100 false
101 }
102
103 /// Record a failure. After [`FAILURE_THRESHOLD`] consecutive failures,
104 /// logs an error and puts the item into backoff.
105 pub fn record_failure(&self, key: &str, scanner_name: &str) {
106 let mut map = self.inner.lock().unwrap();
107 let entry = map.entry(key.to_owned()).or_insert(FailureEntry {
108 consecutive_failures: 0,
109 skip_until_cycle: 0,
110 });
111 entry.consecutive_failures += 1;
112 if entry.consecutive_failures == FAILURE_THRESHOLD {
113 let cycle = self.cycle.load(Ordering::Relaxed);
114 entry.skip_until_cycle = cycle + BACKOFF_CYCLES;
115 tracing::error!(
116 scanner = scanner_name,
117 item = key,
118 failures = entry.consecutive_failures,
119 backoff_cycles = BACKOFF_CYCLES,
120 "persistent FCALL failure — skipping for {BACKOFF_CYCLES} scan cycles"
121 );
122 }
123 }
124
125 /// Record a success — clears any tracked failure state.
126 pub fn record_success(&self, key: &str) {
127 let mut map = self.inner.lock().unwrap();
128 map.remove(key);
129 }
130}
131
132/// Trait for background partition scanners.
133///
134/// Each implementation scans one aspect of execution state (lease expiry,
135/// delayed promotion, index consistency) across all partitions at a
136/// configured interval.
137pub trait Scanner: Send + Sync + 'static {
138 /// Human-readable name for logging.
139 fn name(&self) -> &'static str;
140
141 /// How often to run a full scan across all partitions.
142 fn interval(&self) -> Duration;
143
144 /// Per-consumer filter applied by execution-shaped scanners to
145 /// restrict the set of candidates they act on (issue #122).
146 ///
147 /// Default returns [`ScannerFilter::NOOP`] — pre-#122 behaviour.
148 /// Implementations override by storing a `ScannerFilter` on the
149 /// struct (constructed via `Self::with_filter(..)`) and
150 /// returning `&self.filter`.
151 fn filter(&self) -> &ScannerFilter {
152 &ScannerFilter::NOOP
153 }
154
155 /// Scan a single partition. Called once per partition per cycle.
156 fn scan_partition(
157 &self,
158 client: &ferriskey::Client,
159 partition: u16,
160 ) -> impl std::future::Future<Output = ScanResult> + Send;
161
162 /// PR-94: per-cycle gauge sample. Returns `Some(depth)` summed
163 /// across all partitions by the scanner runner to produce a
164 /// single gauge value (today only `cancel_reconciler` exports
165 /// one, feeding `ff_cancel_backlog_depth`). Default: `None` so
166 /// scanners that don't export a gauge compile unchanged.
167 ///
168 /// Runs AFTER `scan_partition` for the same `partition` within
169 /// the same cycle, so implementations can reuse cached state.
170 /// The trivial default implementation returns `None` for every
171 /// partition and the runner writes nothing.
172 fn sample_backlog_depth(
173 &self,
174 _client: &ferriskey::Client,
175 _partition: u16,
176 ) -> impl std::future::Future<Output = Option<u64>> + Send {
177 async { None }
178 }
179}
180
181/// Issue #122: per-candidate filter helper shared by all
182/// execution-shaped scanners.
183///
184/// Returns true iff the candidate `eid` on `partition` should be
185/// SKIPPED by the scanner (i.e. the filter rejects it). A no-op
186/// filter never rejects — returns false without issuing any HGET.
187///
188/// # `eid` format
189///
190/// Callers pass `eid` as the **full** `{fp:N}:<uuid>` ExecutionId
191/// string — identical to what Lua stores as a ZSET member via
192/// `ZADD ... A.execution_id` and what every scanner reads out of
193/// per-partition index ZSETs with `ZRANGEBYSCORE`. The helper
194/// formats `format!("ff:exec:{}:{}:core", tag, eid)` which produces
195/// the canonical **double-tagged** production key shape
196/// `ff:exec:{fp:N}:{fp:N}:<uuid>:core` — the first tag comes from
197/// the partition's routing hash-tag; the second is the tag baked
198/// into the ExecutionId string. Matches
199/// [`ff_core::keys::ExecKeyContext::core`] / [`::tags`] exactly.
200/// Do not strip the prefix — doing so would build a
201/// single-tagged key that does not exist in production.
202///
203/// [`::tags`]: ff_core::keys::ExecKeyContext::tags
204///
205/// # Cost
206///
207/// 0 HGET if `filter.is_noop()`; 1 HGET when only `namespace` is
208/// set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:core`); 1 HGET when only
209/// `instance_tag` is set (on `ff:exec:{fp:N}:{fp:N}:<uuid>:tags`);
210/// 2 HGETs when both are set. Namespace is checked first (cheaper
211/// — touches the already-hot `core` hash) and short-circuits on
212/// mismatch.
213///
214/// # Failure mode
215///
216/// On HGET failure the helper returns true (skip), conservatively:
217/// leaking a cross-tenant candidate due to a transient read error is
218/// worse than the scanner temporarily underclaiming — the next cycle
219/// picks it back up once the backend recovers.
220pub async fn should_skip_candidate(
221 client: &ferriskey::Client,
222 filter: &ScannerFilter,
223 partition: u16,
224 eid: &str,
225) -> bool {
226 if filter.is_noop() {
227 return false;
228 }
229 let p = Partition {
230 family: PartitionFamily::Execution,
231 index: partition,
232 };
233 let tag = p.hash_tag();
234
235 if let Some(ref want_ns) = filter.namespace {
236 let core_key = format!("ff:exec:{}:{}:core", tag, eid);
237 match client
238 .cmd("HGET")
239 .arg(&core_key)
240 .arg("namespace")
241 .execute::<Option<String>>()
242 .await
243 {
244 Ok(Some(s)) => {
245 if &Namespace::new(s) != want_ns {
246 return true;
247 }
248 }
249 // nil or transport error — skip conservatively.
250 _ => return true,
251 }
252 }
253
254 if let Some((ref tag_key, ref want_value)) = filter.instance_tag {
255 let tags_key = format!("ff:exec:{}:{}:tags", tag, eid);
256 match client
257 .cmd("HGET")
258 .arg(&tags_key)
259 .arg(tag_key.as_str())
260 .execute::<Option<String>>()
261 .await
262 {
263 Ok(Some(v)) if &v == want_value => {}
264 _ => return true,
265 }
266 }
267
268 false
269}
270
271/// Drives a scanner across all execution partitions in a loop.
272pub struct ScannerRunner;
273
274impl ScannerRunner {
275 /// Spawn a tokio task that runs the scanner forever until shutdown.
276 ///
277 /// PR-94: the `metrics` handle records per-cycle duration +
278 /// cycle-total counter. Under the no-op shim (`observability`
279 /// feature off) the recorder calls compile to nothing.
280 pub fn spawn<S: Scanner>(
281 scanner: Arc<S>,
282 client: ferriskey::Client,
283 num_partitions: u16,
284 mut shutdown: watch::Receiver<bool>,
285 metrics: Arc<ff_observability::Metrics>,
286 ) -> JoinHandle<()> {
287 tokio::spawn(async move {
288 let name = scanner.name();
289 let interval = scanner.interval().max(Duration::from_millis(100));
290 tracing::info!(scanner = name, ?interval, partitions = num_partitions, "scanner started");
291
292 loop {
293 let cycle_start = tokio::time::Instant::now();
294 let mut total_processed: u32 = 0;
295 let mut total_errors: u32 = 0;
296 // Gauge aggregation strategy: require **every**
297 // partition to return a sample before writing.
298 // A partial sum (some partitions sampled, some
299 // returned None on error) would under-report and
300 // make the gauge jump below the true backlog, which
301 // an operator could mis-interpret as drain progress.
302 // On any missing sample we set `sample_valid = false`
303 // and skip the gauge write — the previous value
304 // stands until a full cycle succeeds.
305 //
306 // First partition returning `Some(_)` sets
307 // `sampled = true`; a subsequent `None` on any
308 // partition invalidates the cycle's write.
309 let mut total_backlog_depth: u64 = 0;
310 let mut sampled = false;
311 let mut sample_valid = true;
312
313 for p in 0..num_partitions {
314 // Check for shutdown between partitions
315 if *shutdown.borrow() {
316 tracing::info!(scanner = name, "shutdown requested, stopping");
317 return;
318 }
319
320 let result = scanner.scan_partition(&client, p).await;
321 total_processed += result.processed;
322 total_errors += result.errors;
323
324 // Only query the gauge-sample hook on scanners
325 // that override it (default returns None for
326 // every partition — the check short-circuits).
327 match scanner.sample_backlog_depth(&client, p).await {
328 Some(d) => {
329 sampled = true;
330 total_backlog_depth =
331 total_backlog_depth.saturating_add(d);
332 }
333 None => {
334 // A non-overriding scanner returns None
335 // on every partition → `sampled` stays
336 // false → gauge write skipped anyway.
337 // An overriding scanner with a transient
338 // failure invalidates the cycle only if
339 // it had already sampled a partition.
340 if sampled {
341 sample_valid = false;
342 }
343 }
344 }
345 }
346
347 let elapsed = cycle_start.elapsed();
348 // PR-94: scanner cycle metrics. Recorded every cycle,
349 // regardless of whether the cycle did any work, so
350 // operators can see cadence drift (a stuck scanner
351 // stops producing data points).
352 metrics.record_scanner_cycle(name, elapsed);
353 if sampled && sample_valid {
354 metrics.set_cancel_backlog_depth(total_backlog_depth);
355 } else if sampled && !sample_valid {
356 // At least one partition sampled but another
357 // failed — leave the gauge at its prior value.
358 // Log at debug so operators investigating a
359 // flat gauge can correlate to the cycle where
360 // sampling partially failed.
361 tracing::debug!(
362 scanner = name,
363 "skipping cancel_backlog_depth gauge write this cycle \
364 (partial partition sample)"
365 );
366 }
367 if total_processed > 0 || total_errors > 0 {
368 tracing::info!(
369 scanner = name,
370 processed = total_processed,
371 errors = total_errors,
372 elapsed_ms = elapsed.as_millis() as u64,
373 "scan cycle complete"
374 );
375 } else {
376 tracing::trace!(
377 scanner = name,
378 elapsed_ms = elapsed.as_millis() as u64,
379 "scan cycle complete (nothing to do)"
380 );
381 }
382
383 // Sleep for the remaining interval (or immediately if scan took longer)
384 let sleep_dur = interval.saturating_sub(elapsed);
385 tokio::select! {
386 _ = tokio::time::sleep(sleep_dur) => {}
387 _ = shutdown.changed() => {
388 if *shutdown.borrow() {
389 tracing::info!(scanner = name, "shutdown requested, stopping");
390 return;
391 }
392 }
393 }
394 }
395 })
396 }
397}