Skip to main content

ff_engine/scanner/
attempt_timeout.rs

1//! Attempt timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:attempt_timeout` for each partition, finding
4//! attempts whose timeout score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! Reference: RFC-010 §6, function #29c
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15use ff_core::types::LaneId;
16
17use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
18
19const BATCH_SIZE: u32 = 50;
20
21// ─── Postgres branch (wave 6c) ──────────────────────────────────────────
22//
23// Parallel entry point for deployments running the Postgres backend.
24// Delegates to `ff_backend_postgres::reconcilers::attempt_timeout`. The
25// Valkey scan path above is untouched. The engine's driver picks this
26// branch when the configured backend is Postgres; same shape as
27// `dispatch_via_postgres` (partition_router.rs).
28#[cfg(feature = "postgres")]
29pub async fn scan_tick_pg(
30    pool: &ff_backend_postgres::PgPool,
31    partition_key: i16,
32    filter: &ff_core::backend::ScannerFilter,
33) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
34    ff_backend_postgres::reconcilers::attempt_timeout::scan_tick(pool, partition_key, filter).await
35}
36
37pub struct AttemptTimeoutScanner {
38    interval: Duration,
39    failures: FailureTracker,
40    filter: ScannerFilter,
41}
42
43impl AttemptTimeoutScanner {
44    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
45        Self::with_filter(interval, lanes, ScannerFilter::default())
46    }
47
48    /// Construct with a [`ScannerFilter`] applied per candidate
49    /// (issue #122).
50    pub fn with_filter(
51        interval: Duration,
52        _lanes: Vec<LaneId>,
53        filter: ScannerFilter,
54    ) -> Self {
55        Self {
56            interval,
57            failures: FailureTracker::new(),
58            filter,
59        }
60    }
61}
62
63impl Scanner for AttemptTimeoutScanner {
64    fn name(&self) -> &'static str {
65        "attempt_timeout"
66    }
67
68    fn interval(&self) -> Duration {
69        self.interval
70    }
71
72    fn filter(&self) -> &ScannerFilter {
73        &self.filter
74    }
75
76    async fn scan_partition(
77        &self,
78        client: &ferriskey::Client,
79        partition: u16,
80    ) -> ScanResult {
81        let p = Partition {
82            family: PartitionFamily::Execution,
83            index: partition,
84        };
85        let idx = IndexKeys::new(&p);
86        let timeout_key = idx.attempt_timeout();
87
88        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
89            Ok(t) => t,
90            Err(e) => {
91                tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
92                return ScanResult { processed: 0, errors: 1 };
93            }
94        };
95
96        // ZRANGEBYSCORE attempt_timeout -inf now LIMIT 0 batch_size
97        let timed_out: Vec<String> = match client
98            .cmd("ZRANGEBYSCORE")
99            .arg(&timeout_key)
100            .arg("-inf")
101            .arg(now_ms.to_string().as_str())
102            .arg("LIMIT")
103            .arg("0")
104            .arg(BATCH_SIZE.to_string().as_str())
105            .execute()
106            .await
107        {
108            Ok(ids) => ids,
109            Err(e) => {
110                tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
111                return ScanResult { processed: 0, errors: 1 };
112            }
113        };
114
115        if partition == 0 {
116            self.failures.advance_cycle();
117        }
118
119        if timed_out.is_empty() {
120            return ScanResult { processed: 0, errors: 0 };
121        }
122
123        let mut processed: u32 = 0;
124        let mut errors: u32 = 0;
125
126        for eid_str in &timed_out {
127            if self.failures.should_skip(eid_str) {
128                continue;
129            }
130            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
131                continue;
132            }
133
134            match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
135                Ok(()) => {
136                    self.failures.record_success(eid_str);
137                    processed += 1;
138                }
139                Err(e) => {
140                    tracing::warn!(
141                        partition,
142                        execution_id = eid_str.as_str(),
143                        error = %e,
144                        "attempt_timeout: ff_expire_execution failed"
145                    );
146                    self.failures.record_failure(eid_str, "attempt_timeout");
147                    errors += 1;
148                }
149            }
150        }
151
152        ScanResult { processed, errors }
153    }
154}
155
156/// Call ff_expire_execution for one execution.
157///
158/// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
159///                lease_history, lease_expiry_zset, worker_leases,
160///                active_index, terminal_zset, attempt_timeout_zset,
161///                execution_deadline_zset, suspended_zset,
162///                suspension_timeout_zset, suspension_current
163/// ARGV (2): execution_id, expire_reason
164///
165/// Pre-reads `lane_id` from exec_core so that lane-scoped index keys
166/// (active, terminal, suspended) point to the correct ZSET. Same pattern
167/// as suspension_timeout::expire_suspension.
168///
169/// Public so execution_deadline scanner can reuse it.
170pub async fn expire_execution_raw(
171    client: &ferriskey::Client,
172    partition: &Partition,
173    idx: &IndexKeys,
174    eid_str: &str,
175    reason: &str,
176) -> Result<(), ferriskey::Error> {
177    let tag = partition.hash_tag();
178
179    // Entity-level keys
180    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
181    let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
182    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
183    let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
184
185    // Pre-read lane_id and current_attempt_index from exec_core
186    // (same pattern as suspension_timeout — need real attempt index for
187    // correct attempt_hash and stream_meta keys)
188    let pre_fields: Vec<Option<String>> = client
189        .cmd("HMGET")
190        .arg(&exec_core)
191        .arg("lane_id")
192        .arg("current_attempt_index")
193        .execute()
194        .await?;
195    let lane = ff_core::types::LaneId::new(
196        pre_fields.first()
197            .and_then(|v| v.as_deref())
198            .unwrap_or("default"),
199    );
200    let att_idx = pre_fields.get(1)
201        .and_then(|v| v.as_deref())
202        .unwrap_or("0");
203
204    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
205    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
206
207    // Partition-level index keys
208    let lease_expiry = idx.lease_expiry();
209    let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
210    let active = idx.lane_active(&lane);
211    let terminal = idx.lane_terminal(&lane);
212    let attempt_timeout = idx.attempt_timeout();
213    let execution_deadline = idx.execution_deadline();
214    let suspended = idx.lane_suspended(&lane);
215    let suspension_timeout = idx.suspension_timeout();
216
217    // KEYS must match Lua's positional order exactly
218    let keys: [&str; 14] = [
219        &exec_core,           // 1  K.core_key
220        &attempt_hash,        // 2  K.attempt_hash
221        &stream_meta,         // 3  K.stream_meta
222        &lease_current,       // 4  K.lease_current_key
223        &lease_history,       // 5  K.lease_history_key
224        &lease_expiry,        // 6  K.lease_expiry_key
225        &worker_leases,       // 7  K.worker_leases_key
226        &active,              // 8  K.active_index_key
227        &terminal,            // 9  K.terminal_key
228        &attempt_timeout,     // 10 K.attempt_timeout_key
229        &execution_deadline,  // 11 K.execution_deadline_key
230        &suspended,           // 12 K.suspended_zset
231        &suspension_timeout,  // 13 K.suspension_timeout_key
232        &susp_current,        // 14 K.suspension_current
233    ];
234
235    let argv: [&str; 2] = [eid_str, reason];
236
237    let _: ferriskey::Value = client
238        .fcall("ff_expire_execution", &keys, &argv)
239        .await?;
240
241    Ok(())
242}