Skip to main content

ff_engine/scanner/
execution_deadline.rs

1//! Execution deadline scanner.
2//!
3//! Iterates `ff:idx:{p:N}:execution_deadline` for each partition, finding
4//! executions whose absolute deadline score is <= now. For each, calls
5//! `FCALL ff_expire_execution` which handles all lifecycle phases
6//! (active, runnable, suspended) and transitions to terminal(expired).
7//!
8//! This is distinct from the attempt_timeout scanner: attempt_timeout
9//! tracks per-attempt relative deadlines, while execution_deadline tracks
10//! the absolute maximum lifetime of the entire execution regardless of
11//! retries, suspensions, or delays.
12//!
13//! Reference: RFC-001 §execution_deadline, RFC-010 §6
14
15use std::time::Duration;
16
17use ff_core::backend::ScannerFilter;
18use ff_core::keys::IndexKeys;
19use ff_core::partition::{Partition, PartitionFamily};
20
21use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 50;
24
25pub struct ExecutionDeadlineScanner {
26    interval: Duration,
27    failures: FailureTracker,
28    filter: ScannerFilter,
29}
30
31impl ExecutionDeadlineScanner {
32    pub fn new(interval: Duration, lanes: Vec<ff_core::types::LaneId>) -> Self {
33        Self::with_filter(interval, lanes, ScannerFilter::default())
34    }
35
36    /// Construct with a [`ScannerFilter`] applied per candidate
37    /// (issue #122).
38    pub fn with_filter(
39        interval: Duration,
40        _lanes: Vec<ff_core::types::LaneId>,
41        filter: ScannerFilter,
42    ) -> Self {
43        Self {
44            interval,
45            failures: FailureTracker::new(),
46            filter,
47        }
48    }
49}
50
51impl Scanner for ExecutionDeadlineScanner {
52    fn name(&self) -> &'static str {
53        "execution_deadline"
54    }
55
56    fn interval(&self) -> Duration {
57        self.interval
58    }
59
60    fn filter(&self) -> &ScannerFilter {
61        &self.filter
62    }
63
64    async fn scan_partition(
65        &self,
66        client: &ferriskey::Client,
67        partition: u16,
68    ) -> ScanResult {
69        let p = Partition {
70            family: PartitionFamily::Execution,
71            index: partition,
72        };
73        let idx = IndexKeys::new(&p);
74        let deadline_key = idx.execution_deadline();
75
76        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
77            Ok(t) => t,
78            Err(e) => {
79                tracing::warn!(partition, error = %e, "execution_deadline: failed to get server time");
80                return ScanResult { processed: 0, errors: 1 };
81            }
82        };
83
84        // ZRANGEBYSCORE execution_deadline -inf now LIMIT 0 batch_size
85        let expired: Vec<String> = match client
86            .cmd("ZRANGEBYSCORE")
87            .arg(&deadline_key)
88            .arg("-inf")
89            .arg(now_ms.to_string().as_str())
90            .arg("LIMIT")
91            .arg("0")
92            .arg(BATCH_SIZE.to_string().as_str())
93            .execute()
94            .await
95        {
96            Ok(ids) => ids,
97            Err(e) => {
98                tracing::warn!(partition, error = %e, "execution_deadline: ZRANGEBYSCORE failed");
99                return ScanResult { processed: 0, errors: 1 };
100            }
101        };
102
103        if partition == 0 {
104            self.failures.advance_cycle();
105        }
106
107        if expired.is_empty() {
108            return ScanResult { processed: 0, errors: 0 };
109        }
110
111        let mut processed: u32 = 0;
112        let mut errors: u32 = 0;
113
114        for eid_str in &expired {
115            if self.failures.should_skip(eid_str) {
116                continue;
117            }
118            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
119                continue;
120            }
121
122            // Reuse the same expire_execution helper as attempt_timeout —
123            // ff_expire_execution handles all lifecycle phases and ZREMs from
124            // both attempt_timeout and execution_deadline indexes.
125            // Lane is now pre-read from exec_core inside expire_execution_raw.
126            match crate::scanner::attempt_timeout::expire_execution_raw(
127                client, &p, &idx, eid_str, "execution_deadline",
128            ).await {
129                Ok(()) => {
130                    self.failures.record_success(eid_str);
131                    processed += 1;
132                }
133                Err(e) => {
134                    tracing::warn!(
135                        partition,
136                        execution_id = eid_str.as_str(),
137                        error = %e,
138                        "execution_deadline: ff_expire_execution failed"
139                    );
140                    self.failures.record_failure(eid_str, "execution_deadline");
141                    errors += 1;
142                }
143            }
144        }
145
146        ScanResult { processed, errors }
147    }
148}