Skip to main content

ff_engine/scanner/
attempt_timeout.rs

1//! Attempt timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:attempt_timeout` for each partition, finding
4//! attempts whose timeout score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! Reference: RFC-010 §6, function #29c
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15use ff_core::types::LaneId;
16
17use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
18
19const BATCH_SIZE: u32 = 50;
20
21pub struct AttemptTimeoutScanner {
22    interval: Duration,
23    failures: FailureTracker,
24    filter: ScannerFilter,
25}
26
27impl AttemptTimeoutScanner {
28    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
29        Self::with_filter(interval, lanes, ScannerFilter::default())
30    }
31
32    /// Construct with a [`ScannerFilter`] applied per candidate
33    /// (issue #122).
34    pub fn with_filter(
35        interval: Duration,
36        _lanes: Vec<LaneId>,
37        filter: ScannerFilter,
38    ) -> Self {
39        Self {
40            interval,
41            failures: FailureTracker::new(),
42            filter,
43        }
44    }
45}
46
47impl Scanner for AttemptTimeoutScanner {
48    fn name(&self) -> &'static str {
49        "attempt_timeout"
50    }
51
52    fn interval(&self) -> Duration {
53        self.interval
54    }
55
56    fn filter(&self) -> &ScannerFilter {
57        &self.filter
58    }
59
60    async fn scan_partition(
61        &self,
62        client: &ferriskey::Client,
63        partition: u16,
64    ) -> ScanResult {
65        let p = Partition {
66            family: PartitionFamily::Execution,
67            index: partition,
68        };
69        let idx = IndexKeys::new(&p);
70        let timeout_key = idx.attempt_timeout();
71
72        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
73            Ok(t) => t,
74            Err(e) => {
75                tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
76                return ScanResult { processed: 0, errors: 1 };
77            }
78        };
79
80        // ZRANGEBYSCORE attempt_timeout -inf now LIMIT 0 batch_size
81        let timed_out: Vec<String> = match client
82            .cmd("ZRANGEBYSCORE")
83            .arg(&timeout_key)
84            .arg("-inf")
85            .arg(now_ms.to_string().as_str())
86            .arg("LIMIT")
87            .arg("0")
88            .arg(BATCH_SIZE.to_string().as_str())
89            .execute()
90            .await
91        {
92            Ok(ids) => ids,
93            Err(e) => {
94                tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
95                return ScanResult { processed: 0, errors: 1 };
96            }
97        };
98
99        if partition == 0 {
100            self.failures.advance_cycle();
101        }
102
103        if timed_out.is_empty() {
104            return ScanResult { processed: 0, errors: 0 };
105        }
106
107        let mut processed: u32 = 0;
108        let mut errors: u32 = 0;
109
110        for eid_str in &timed_out {
111            if self.failures.should_skip(eid_str) {
112                continue;
113            }
114            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
115                continue;
116            }
117
118            match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
119                Ok(()) => {
120                    self.failures.record_success(eid_str);
121                    processed += 1;
122                }
123                Err(e) => {
124                    tracing::warn!(
125                        partition,
126                        execution_id = eid_str.as_str(),
127                        error = %e,
128                        "attempt_timeout: ff_expire_execution failed"
129                    );
130                    self.failures.record_failure(eid_str, "attempt_timeout");
131                    errors += 1;
132                }
133            }
134        }
135
136        ScanResult { processed, errors }
137    }
138}
139
140/// Call ff_expire_execution for one execution.
141///
142/// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
143///                lease_history, lease_expiry_zset, worker_leases,
144///                active_index, terminal_zset, attempt_timeout_zset,
145///                execution_deadline_zset, suspended_zset,
146///                suspension_timeout_zset, suspension_current
147/// ARGV (2): execution_id, expire_reason
148///
149/// Pre-reads `lane_id` from exec_core so that lane-scoped index keys
150/// (active, terminal, suspended) point to the correct ZSET. Same pattern
151/// as suspension_timeout::expire_suspension.
152///
153/// Public so execution_deadline scanner can reuse it.
154pub async fn expire_execution_raw(
155    client: &ferriskey::Client,
156    partition: &Partition,
157    idx: &IndexKeys,
158    eid_str: &str,
159    reason: &str,
160) -> Result<(), ferriskey::Error> {
161    let tag = partition.hash_tag();
162
163    // Entity-level keys
164    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
165    let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
166    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
167    let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
168
169    // Pre-read lane_id and current_attempt_index from exec_core
170    // (same pattern as suspension_timeout — need real attempt index for
171    // correct attempt_hash and stream_meta keys)
172    let pre_fields: Vec<Option<String>> = client
173        .cmd("HMGET")
174        .arg(&exec_core)
175        .arg("lane_id")
176        .arg("current_attempt_index")
177        .execute()
178        .await?;
179    let lane = ff_core::types::LaneId::new(
180        pre_fields.first()
181            .and_then(|v| v.as_deref())
182            .unwrap_or("default"),
183    );
184    let att_idx = pre_fields.get(1)
185        .and_then(|v| v.as_deref())
186        .unwrap_or("0");
187
188    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
189    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
190
191    // Partition-level index keys
192    let lease_expiry = idx.lease_expiry();
193    let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
194    let active = idx.lane_active(&lane);
195    let terminal = idx.lane_terminal(&lane);
196    let attempt_timeout = idx.attempt_timeout();
197    let execution_deadline = idx.execution_deadline();
198    let suspended = idx.lane_suspended(&lane);
199    let suspension_timeout = idx.suspension_timeout();
200
201    // KEYS must match Lua's positional order exactly
202    let keys: [&str; 14] = [
203        &exec_core,           // 1  K.core_key
204        &attempt_hash,        // 2  K.attempt_hash
205        &stream_meta,         // 3  K.stream_meta
206        &lease_current,       // 4  K.lease_current_key
207        &lease_history,       // 5  K.lease_history_key
208        &lease_expiry,        // 6  K.lease_expiry_key
209        &worker_leases,       // 7  K.worker_leases_key
210        &active,              // 8  K.active_index_key
211        &terminal,            // 9  K.terminal_key
212        &attempt_timeout,     // 10 K.attempt_timeout_key
213        &execution_deadline,  // 11 K.execution_deadline_key
214        &suspended,           // 12 K.suspended_zset
215        &suspension_timeout,  // 13 K.suspension_timeout_key
216        &susp_current,        // 14 K.suspension_current
217    ];
218
219    let argv: [&str; 2] = [eid_str, reason];
220
221    let _: ferriskey::Value = client
222        .fcall("ff_expire_execution", &keys, &argv)
223        .await?;
224
225    Ok(())
226}