ff_engine/scanner/
execution_deadline.rs1use std::time::Duration;
16
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19
20use super::{FailureTracker, ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 50;
23
24pub struct ExecutionDeadlineScanner {
25 interval: Duration,
26 failures: FailureTracker,
27}
28
29impl ExecutionDeadlineScanner {
30 pub fn new(interval: Duration, _lanes: Vec<ff_core::types::LaneId>) -> Self {
31 Self { interval, failures: FailureTracker::new() }
32 }
33}
34
35impl Scanner for ExecutionDeadlineScanner {
36 fn name(&self) -> &'static str {
37 "execution_deadline"
38 }
39
40 fn interval(&self) -> Duration {
41 self.interval
42 }
43
44 async fn scan_partition(
45 &self,
46 client: &ferriskey::Client,
47 partition: u16,
48 ) -> ScanResult {
49 let p = Partition {
50 family: PartitionFamily::Execution,
51 index: partition,
52 };
53 let idx = IndexKeys::new(&p);
54 let deadline_key = idx.execution_deadline();
55
56 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
57 Ok(t) => t,
58 Err(e) => {
59 tracing::warn!(partition, error = %e, "execution_deadline: failed to get server time");
60 return ScanResult { processed: 0, errors: 1 };
61 }
62 };
63
64 let expired: Vec<String> = match client
66 .cmd("ZRANGEBYSCORE")
67 .arg(&deadline_key)
68 .arg("-inf")
69 .arg(now_ms.to_string().as_str())
70 .arg("LIMIT")
71 .arg("0")
72 .arg(BATCH_SIZE.to_string().as_str())
73 .execute()
74 .await
75 {
76 Ok(ids) => ids,
77 Err(e) => {
78 tracing::warn!(partition, error = %e, "execution_deadline: ZRANGEBYSCORE failed");
79 return ScanResult { processed: 0, errors: 1 };
80 }
81 };
82
83 if partition == 0 {
84 self.failures.advance_cycle();
85 }
86
87 if expired.is_empty() {
88 return ScanResult { processed: 0, errors: 0 };
89 }
90
91 let mut processed: u32 = 0;
92 let mut errors: u32 = 0;
93
94 for eid_str in &expired {
95 if self.failures.should_skip(eid_str) {
96 continue;
97 }
98
99 match crate::scanner::attempt_timeout::expire_execution_raw(
104 client, &p, &idx, eid_str, "execution_deadline",
105 ).await {
106 Ok(()) => {
107 self.failures.record_success(eid_str);
108 processed += 1;
109 }
110 Err(e) => {
111 tracing::warn!(
112 partition,
113 execution_id = eid_str.as_str(),
114 error = %e,
115 "execution_deadline: ff_expire_execution failed"
116 );
117 self.failures.record_failure(eid_str, "execution_deadline");
118 errors += 1;
119 }
120 }
121 }
122
123 ScanResult { processed, errors }
124 }
125}