ff_engine/scanner/
execution_deadline.rs1use std::time::Duration;
16
17use ff_core::backend::ScannerFilter;
18use ff_core::keys::IndexKeys;
19use ff_core::partition::{Partition, PartitionFamily};
20
21use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 50;
24
25pub struct ExecutionDeadlineScanner {
26 interval: Duration,
27 failures: FailureTracker,
28 filter: ScannerFilter,
29}
30
31impl ExecutionDeadlineScanner {
32 pub fn new(interval: Duration, lanes: Vec<ff_core::types::LaneId>) -> Self {
33 Self::with_filter(interval, lanes, ScannerFilter::default())
34 }
35
36 pub fn with_filter(
39 interval: Duration,
40 _lanes: Vec<ff_core::types::LaneId>,
41 filter: ScannerFilter,
42 ) -> Self {
43 Self {
44 interval,
45 failures: FailureTracker::new(),
46 filter,
47 }
48 }
49}
50
51impl Scanner for ExecutionDeadlineScanner {
52 fn name(&self) -> &'static str {
53 "execution_deadline"
54 }
55
56 fn interval(&self) -> Duration {
57 self.interval
58 }
59
60 fn filter(&self) -> &ScannerFilter {
61 &self.filter
62 }
63
64 async fn scan_partition(
65 &self,
66 client: &ferriskey::Client,
67 partition: u16,
68 ) -> ScanResult {
69 let p = Partition {
70 family: PartitionFamily::Execution,
71 index: partition,
72 };
73 let idx = IndexKeys::new(&p);
74 let deadline_key = idx.execution_deadline();
75
76 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
77 Ok(t) => t,
78 Err(e) => {
79 tracing::warn!(partition, error = %e, "execution_deadline: failed to get server time");
80 return ScanResult { processed: 0, errors: 1 };
81 }
82 };
83
84 let expired: Vec<String> = match client
86 .cmd("ZRANGEBYSCORE")
87 .arg(&deadline_key)
88 .arg("-inf")
89 .arg(now_ms.to_string().as_str())
90 .arg("LIMIT")
91 .arg("0")
92 .arg(BATCH_SIZE.to_string().as_str())
93 .execute()
94 .await
95 {
96 Ok(ids) => ids,
97 Err(e) => {
98 tracing::warn!(partition, error = %e, "execution_deadline: ZRANGEBYSCORE failed");
99 return ScanResult { processed: 0, errors: 1 };
100 }
101 };
102
103 if partition == 0 {
104 self.failures.advance_cycle();
105 }
106
107 if expired.is_empty() {
108 return ScanResult { processed: 0, errors: 0 };
109 }
110
111 let mut processed: u32 = 0;
112 let mut errors: u32 = 0;
113
114 for eid_str in &expired {
115 if self.failures.should_skip(eid_str) {
116 continue;
117 }
118 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
119 continue;
120 }
121
122 match crate::scanner::attempt_timeout::expire_execution_raw(
127 client, &p, &idx, eid_str, "execution_deadline",
128 ).await {
129 Ok(()) => {
130 self.failures.record_success(eid_str);
131 processed += 1;
132 }
133 Err(e) => {
134 tracing::warn!(
135 partition,
136 execution_id = eid_str.as_str(),
137 error = %e,
138 "execution_deadline: ff_expire_execution failed"
139 );
140 self.failures.record_failure(eid_str, "execution_deadline");
141 errors += 1;
142 }
143 }
144 }
145
146 ScanResult { processed, errors }
147 }
148}