Skip to main content

ff_engine/scanner/
attempt_timeout.rs

1//! Attempt timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:attempt_timeout` for each partition, finding
4//! attempts whose timeout score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! Reference: RFC-010 §6, function #29c
9
10use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14use ff_core::types::LaneId;
15
16use super::{FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20pub struct AttemptTimeoutScanner {
21    interval: Duration,
22    failures: FailureTracker,
23}
24
25impl AttemptTimeoutScanner {
26    pub fn new(interval: Duration, _lanes: Vec<LaneId>) -> Self {
27        Self { interval, failures: FailureTracker::new() }
28    }
29}
30
31impl Scanner for AttemptTimeoutScanner {
32    fn name(&self) -> &'static str {
33        "attempt_timeout"
34    }
35
36    fn interval(&self) -> Duration {
37        self.interval
38    }
39
40    async fn scan_partition(
41        &self,
42        client: &ferriskey::Client,
43        partition: u16,
44    ) -> ScanResult {
45        let p = Partition {
46            family: PartitionFamily::Execution,
47            index: partition,
48        };
49        let idx = IndexKeys::new(&p);
50        let timeout_key = idx.attempt_timeout();
51
52        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
53            Ok(t) => t,
54            Err(e) => {
55                tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
56                return ScanResult { processed: 0, errors: 1 };
57            }
58        };
59
60        // ZRANGEBYSCORE attempt_timeout -inf now LIMIT 0 batch_size
61        let timed_out: Vec<String> = match client
62            .cmd("ZRANGEBYSCORE")
63            .arg(&timeout_key)
64            .arg("-inf")
65            .arg(now_ms.to_string().as_str())
66            .arg("LIMIT")
67            .arg("0")
68            .arg(BATCH_SIZE.to_string().as_str())
69            .execute()
70            .await
71        {
72            Ok(ids) => ids,
73            Err(e) => {
74                tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
75                return ScanResult { processed: 0, errors: 1 };
76            }
77        };
78
79        if partition == 0 {
80            self.failures.advance_cycle();
81        }
82
83        if timed_out.is_empty() {
84            return ScanResult { processed: 0, errors: 0 };
85        }
86
87        let mut processed: u32 = 0;
88        let mut errors: u32 = 0;
89
90        for eid_str in &timed_out {
91            if self.failures.should_skip(eid_str) {
92                continue;
93            }
94
95            match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
96                Ok(()) => {
97                    self.failures.record_success(eid_str);
98                    processed += 1;
99                }
100                Err(e) => {
101                    tracing::warn!(
102                        partition,
103                        execution_id = eid_str.as_str(),
104                        error = %e,
105                        "attempt_timeout: ff_expire_execution failed"
106                    );
107                    self.failures.record_failure(eid_str, "attempt_timeout");
108                    errors += 1;
109                }
110            }
111        }
112
113        ScanResult { processed, errors }
114    }
115}
116
117/// Call ff_expire_execution for one execution.
118///
119/// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
120///                lease_history, lease_expiry_zset, worker_leases,
121///                active_index, terminal_zset, attempt_timeout_zset,
122///                execution_deadline_zset, suspended_zset,
123///                suspension_timeout_zset, suspension_current
124/// ARGV (2): execution_id, expire_reason
125///
126/// Pre-reads `lane_id` from exec_core so that lane-scoped index keys
127/// (active, terminal, suspended) point to the correct ZSET. Same pattern
128/// as suspension_timeout::expire_suspension.
129///
130/// Public so execution_deadline scanner can reuse it.
131pub async fn expire_execution_raw(
132    client: &ferriskey::Client,
133    partition: &Partition,
134    idx: &IndexKeys,
135    eid_str: &str,
136    reason: &str,
137) -> Result<(), ferriskey::Error> {
138    let tag = partition.hash_tag();
139
140    // Entity-level keys
141    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
142    let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
143    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
144    let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
145
146    // Pre-read lane_id and current_attempt_index from exec_core
147    // (same pattern as suspension_timeout — need real attempt index for
148    // correct attempt_hash and stream_meta keys)
149    let pre_fields: Vec<Option<String>> = client
150        .cmd("HMGET")
151        .arg(&exec_core)
152        .arg("lane_id")
153        .arg("current_attempt_index")
154        .execute()
155        .await?;
156    let lane = ff_core::types::LaneId::new(
157        pre_fields.first()
158            .and_then(|v| v.as_deref())
159            .unwrap_or("default"),
160    );
161    let att_idx = pre_fields.get(1)
162        .and_then(|v| v.as_deref())
163        .unwrap_or("0");
164
165    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
166    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
167
168    // Partition-level index keys
169    let lease_expiry = idx.lease_expiry();
170    let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
171    let active = idx.lane_active(&lane);
172    let terminal = idx.lane_terminal(&lane);
173    let attempt_timeout = idx.attempt_timeout();
174    let execution_deadline = idx.execution_deadline();
175    let suspended = idx.lane_suspended(&lane);
176    let suspension_timeout = idx.suspension_timeout();
177
178    // KEYS must match Lua's positional order exactly
179    let keys: [&str; 14] = [
180        &exec_core,           // 1  K.core_key
181        &attempt_hash,        // 2  K.attempt_hash
182        &stream_meta,         // 3  K.stream_meta
183        &lease_current,       // 4  K.lease_current_key
184        &lease_history,       // 5  K.lease_history_key
185        &lease_expiry,        // 6  K.lease_expiry_key
186        &worker_leases,       // 7  K.worker_leases_key
187        &active,              // 8  K.active_index_key
188        &terminal,            // 9  K.terminal_key
189        &attempt_timeout,     // 10 K.attempt_timeout_key
190        &execution_deadline,  // 11 K.execution_deadline_key
191        &suspended,           // 12 K.suspended_zset
192        &suspension_timeout,  // 13 K.suspension_timeout_key
193        &susp_current,        // 14 K.suspension_current
194    ];
195
196    let argv: [&str; 2] = [eid_str, reason];
197
198    let _: ferriskey::Value = client
199        .fcall("ff_expire_execution", &keys, &argv)
200        .await?;
201
202    Ok(())
203}