Skip to main content

ff_engine/scanner/
attempt_timeout.rs

1//! Attempt timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:attempt_timeout` for each partition, finding
4//! attempts whose timeout score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! Reference: RFC-010 §6, function #29c
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::engine_backend::{EngineBackend, ExpirePhase};
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::{ExecutionId, LaneId, TimestampMs};
18
19use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
20
21const BATCH_SIZE: u32 = 50;
22
23// ─── Postgres branch (wave 6c) ──────────────────────────────────────────
24//
25// Parallel entry point for deployments running the Postgres backend.
26// Delegates to `ff_backend_postgres::reconcilers::attempt_timeout`. The
27// Valkey scan path above is untouched. The engine's driver picks this
28// branch when the configured backend is Postgres; same shape as
29// `dispatch_via_postgres` (partition_router.rs).
30#[cfg(feature = "postgres")]
31pub async fn scan_tick_pg(
32    pool: &ff_backend_postgres::PgPool,
33    partition_key: i16,
34    filter: &ff_core::backend::ScannerFilter,
35) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
36    ff_backend_postgres::reconcilers::attempt_timeout::scan_tick(pool, partition_key, filter).await
37}
38
39pub struct AttemptTimeoutScanner {
40    interval: Duration,
41    failures: FailureTracker,
42    filter: ScannerFilter,
43    backend: Option<Arc<dyn EngineBackend>>,
44}
45
46impl AttemptTimeoutScanner {
47    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
48        Self::with_filter(interval, lanes, ScannerFilter::default())
49    }
50
51    /// Construct with a [`ScannerFilter`] applied per candidate
52    /// (issue #122).
53    pub fn with_filter(
54        interval: Duration,
55        _lanes: Vec<LaneId>,
56        filter: ScannerFilter,
57    ) -> Self {
58        Self {
59            interval,
60            failures: FailureTracker::new(),
61            filter,
62            backend: None,
63        }
64    }
65
66    /// PR-7b Cluster 1: wire an `EngineBackend` for trait-routed FCALLs.
67    pub fn with_filter_and_backend(
68        interval: Duration,
69        _lanes: Vec<LaneId>,
70        filter: ScannerFilter,
71        backend: Arc<dyn EngineBackend>,
72    ) -> Self {
73        Self {
74            interval,
75            failures: FailureTracker::new(),
76            filter,
77            backend: Some(backend),
78        }
79    }
80}
81
82impl Scanner for AttemptTimeoutScanner {
83    fn name(&self) -> &'static str {
84        "attempt_timeout"
85    }
86
87    fn interval(&self) -> Duration {
88        self.interval
89    }
90
91    fn filter(&self) -> &ScannerFilter {
92        &self.filter
93    }
94
95    async fn scan_partition(
96        &self,
97        client: &ferriskey::Client,
98        partition: u16,
99    ) -> ScanResult {
100        let p = Partition {
101            family: PartitionFamily::Execution,
102            index: partition,
103        };
104        let idx = IndexKeys::new(&p);
105        let timeout_key = idx.attempt_timeout();
106
107        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
108            b.server_time_ms().await.map_err(|e| e.to_string())
109        } else {
110            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
111        };
112        let now_ms = match now_ms_res {
113            Ok(t) => t,
114            Err(e) => {
115                tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
116                return ScanResult { processed: 0, errors: 1 };
117            }
118        };
119
120        // ZRANGEBYSCORE attempt_timeout -inf now LIMIT 0 batch_size
121        let timed_out: Vec<String> = match client
122            .cmd("ZRANGEBYSCORE")
123            .arg(&timeout_key)
124            .arg("-inf")
125            .arg(now_ms.to_string().as_str())
126            .arg("LIMIT")
127            .arg("0")
128            .arg(BATCH_SIZE.to_string().as_str())
129            .execute()
130            .await
131        {
132            Ok(ids) => ids,
133            Err(e) => {
134                tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
135                return ScanResult { processed: 0, errors: 1 };
136            }
137        };
138
139        if partition == 0 {
140            self.failures.advance_cycle();
141        }
142
143        if timed_out.is_empty() {
144            return ScanResult { processed: 0, errors: 0 };
145        }
146
147        let mut processed: u32 = 0;
148        let mut errors: u32 = 0;
149
150        for eid_str in &timed_out {
151            if self.failures.should_skip(eid_str) {
152                continue;
153            }
154            if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
155                continue;
156            }
157
158            let res = if let Some(ref backend) = self.backend {
159                let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
160                backend
161                    .expire_execution(p, &eid, ExpirePhase::AttemptTimeout, TimestampMs(now_ms as i64))
162                    .await
163                    .map_err(|e| e.to_string())
164            } else {
165                expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout")
166                    .await
167                    .map_err(|e| e.to_string())
168            };
169            match res {
170                Ok(()) => {
171                    self.failures.record_success(eid_str);
172                    processed += 1;
173                }
174                Err(e) => {
175                    tracing::warn!(
176                        partition,
177                        execution_id = eid_str.as_str(),
178                        error = %e,
179                        "attempt_timeout: expire_execution failed"
180                    );
181                    self.failures.record_failure(eid_str, "attempt_timeout");
182                    errors += 1;
183                }
184            }
185        }
186
187        ScanResult { processed, errors }
188    }
189}
190
191/// Call ff_expire_execution for one execution.
192///
193/// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
194///                lease_history, lease_expiry_zset, worker_leases,
195///                active_index, terminal_zset, attempt_timeout_zset,
196///                execution_deadline_zset, suspended_zset,
197///                suspension_timeout_zset, suspension_current
198/// ARGV (2): execution_id, expire_reason
199///
200/// Pre-reads `lane_id` from exec_core so that lane-scoped index keys
201/// (active, terminal, suspended) point to the correct ZSET. Same pattern
202/// as suspension_timeout::expire_suspension.
203///
204/// Public so execution_deadline scanner can reuse it.
205pub async fn expire_execution_raw(
206    client: &ferriskey::Client,
207    partition: &Partition,
208    idx: &IndexKeys,
209    eid_str: &str,
210    reason: &str,
211) -> Result<(), ferriskey::Error> {
212    let tag = partition.hash_tag();
213
214    // Entity-level keys
215    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
216    let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
217    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
218    let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
219
220    // Pre-read lane_id and current_attempt_index from exec_core
221    // (same pattern as suspension_timeout — need real attempt index for
222    // correct attempt_hash and stream_meta keys)
223    let pre_fields: Vec<Option<String>> = client
224        .cmd("HMGET")
225        .arg(&exec_core)
226        .arg("lane_id")
227        .arg("current_attempt_index")
228        .execute()
229        .await?;
230    let lane = ff_core::types::LaneId::new(
231        pre_fields.first()
232            .and_then(|v| v.as_deref())
233            .unwrap_or("default"),
234    );
235    let att_idx = pre_fields.get(1)
236        .and_then(|v| v.as_deref())
237        .unwrap_or("0");
238
239    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
240    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
241
242    // Partition-level index keys
243    let lease_expiry = idx.lease_expiry();
244    let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
245    let active = idx.lane_active(&lane);
246    let terminal = idx.lane_terminal(&lane);
247    let attempt_timeout = idx.attempt_timeout();
248    let execution_deadline = idx.execution_deadline();
249    let suspended = idx.lane_suspended(&lane);
250    let suspension_timeout = idx.suspension_timeout();
251
252    // KEYS must match Lua's positional order exactly
253    let keys: [&str; 14] = [
254        &exec_core,           // 1  K.core_key
255        &attempt_hash,        // 2  K.attempt_hash
256        &stream_meta,         // 3  K.stream_meta
257        &lease_current,       // 4  K.lease_current_key
258        &lease_history,       // 5  K.lease_history_key
259        &lease_expiry,        // 6  K.lease_expiry_key
260        &worker_leases,       // 7  K.worker_leases_key
261        &active,              // 8  K.active_index_key
262        &terminal,            // 9  K.terminal_key
263        &attempt_timeout,     // 10 K.attempt_timeout_key
264        &execution_deadline,  // 11 K.execution_deadline_key
265        &suspended,           // 12 K.suspended_zset
266        &suspension_timeout,  // 13 K.suspension_timeout_key
267        &susp_current,        // 14 K.suspension_current
268    ];
269
270    let argv: [&str; 2] = [eid_str, reason];
271
272    let _: ferriskey::Value = client
273        .fcall("ff_expire_execution", &keys, &argv)
274        .await?;
275
276    Ok(())
277}