Skip to main content

ff_engine/scanner/
execution_deadline.rs

1//! Execution deadline scanner.
2//!
3//! Iterates `ff:idx:{p:N}:execution_deadline` for each partition, finding
4//! executions whose absolute deadline score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! This is distinct from the attempt_timeout scanner: attempt_timeout
9//! tracks per-attempt relative deadlines, while execution_deadline tracks
10//! the absolute maximum lifetime of the entire execution regardless of
11//! retries, suspensions, or delays.
12//!
13//! Reference: RFC-001 §execution_deadline, RFC-010 §6
14
15use std::time::Duration;
16
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{FailureTracker, ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 50;
23
24pub struct ExecutionDeadlineScanner {
25    interval: Duration,
26    failures: FailureTracker,
27}
28
29impl ExecutionDeadlineScanner {
30    pub fn new(interval: Duration, _lanes: Vec<ff_core::types::LaneId>) -> Self {
31        Self { interval, failures: FailureTracker::new() }
32    }
33}
34
35impl Scanner for ExecutionDeadlineScanner {
36    fn name(&self) -> &'static str {
37        "execution_deadline"
38    }
39
40    fn interval(&self) -> Duration {
41        self.interval
42    }
43
44    async fn scan_partition(
45        &self,
46        client: &ferriskey::Client,
47        partition: u16,
48    ) -> ScanResult {
49        let p = Partition {
50            family: PartitionFamily::Execution,
51            index: partition,
52        };
53        let idx = IndexKeys::new(&p);
54        let deadline_key = idx.execution_deadline();
55
56        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
57            Ok(t) => t,
58            Err(e) => {
59                tracing::warn!(partition, error = %e, "execution_deadline: failed to get server time");
60                return ScanResult { processed: 0, errors: 1 };
61            }
62        };
63
64        // ZRANGEBYSCORE execution_deadline -inf now LIMIT 0 batch_size
65        let expired: Vec<String> = match client
66            .cmd("ZRANGEBYSCORE")
67            .arg(&deadline_key)
68            .arg("-inf")
69            .arg(now_ms.to_string().as_str())
70            .arg("LIMIT")
71            .arg("0")
72            .arg(BATCH_SIZE.to_string().as_str())
73            .execute()
74            .await
75        {
76            Ok(ids) => ids,
77            Err(e) => {
78                tracing::warn!(partition, error = %e, "execution_deadline: ZRANGEBYSCORE failed");
79                return ScanResult { processed: 0, errors: 1 };
80            }
81        };
82
83        if partition == 0 {
84            self.failures.advance_cycle();
85        }
86
87        if expired.is_empty() {
88            return ScanResult { processed: 0, errors: 0 };
89        }
90
91        let mut processed: u32 = 0;
92        let mut errors: u32 = 0;
93
94        for eid_str in &expired {
95            if self.failures.should_skip(eid_str) {
96                continue;
97            }
98
99            // Reuse the same expire_execution helper as attempt_timeout —
100            // ff_expire_execution handles all lifecycle phases and ZREMs from
101            // both attempt_timeout and execution_deadline indexes.
102            // Lane is now pre-read from exec_core inside expire_execution_raw.
103            match crate::scanner::attempt_timeout::expire_execution_raw(
104                client, &p, &idx, eid_str, "execution_deadline",
105            ).await {
106                Ok(()) => {
107                    self.failures.record_success(eid_str);
108                    processed += 1;
109                }
110                Err(e) => {
111                    tracing::warn!(
112                        partition,
113                        execution_id = eid_str.as_str(),
114                        error = %e,
115                        "execution_deadline: ff_expire_execution failed"
116                    );
117                    self.failures.record_failure(eid_str, "execution_deadline");
118                    errors += 1;
119                }
120            }
121        }
122
123        ScanResult { processed, errors }
124    }
125}