ff_engine/scanner/
attempt_timeout.rs1use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14use ff_core::types::LaneId;
15
16use super::{FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20pub struct AttemptTimeoutScanner {
21 interval: Duration,
22 failures: FailureTracker,
23}
24
25impl AttemptTimeoutScanner {
26 pub fn new(interval: Duration, _lanes: Vec<LaneId>) -> Self {
27 Self { interval, failures: FailureTracker::new() }
28 }
29}
30
31impl Scanner for AttemptTimeoutScanner {
32 fn name(&self) -> &'static str {
33 "attempt_timeout"
34 }
35
36 fn interval(&self) -> Duration {
37 self.interval
38 }
39
40 async fn scan_partition(
41 &self,
42 client: &ferriskey::Client,
43 partition: u16,
44 ) -> ScanResult {
45 let p = Partition {
46 family: PartitionFamily::Execution,
47 index: partition,
48 };
49 let idx = IndexKeys::new(&p);
50 let timeout_key = idx.attempt_timeout();
51
52 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
53 Ok(t) => t,
54 Err(e) => {
55 tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
56 return ScanResult { processed: 0, errors: 1 };
57 }
58 };
59
60 let timed_out: Vec<String> = match client
62 .cmd("ZRANGEBYSCORE")
63 .arg(&timeout_key)
64 .arg("-inf")
65 .arg(now_ms.to_string().as_str())
66 .arg("LIMIT")
67 .arg("0")
68 .arg(BATCH_SIZE.to_string().as_str())
69 .execute()
70 .await
71 {
72 Ok(ids) => ids,
73 Err(e) => {
74 tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
75 return ScanResult { processed: 0, errors: 1 };
76 }
77 };
78
79 if partition == 0 {
80 self.failures.advance_cycle();
81 }
82
83 if timed_out.is_empty() {
84 return ScanResult { processed: 0, errors: 0 };
85 }
86
87 let mut processed: u32 = 0;
88 let mut errors: u32 = 0;
89
90 for eid_str in &timed_out {
91 if self.failures.should_skip(eid_str) {
92 continue;
93 }
94
95 match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
96 Ok(()) => {
97 self.failures.record_success(eid_str);
98 processed += 1;
99 }
100 Err(e) => {
101 tracing::warn!(
102 partition,
103 execution_id = eid_str.as_str(),
104 error = %e,
105 "attempt_timeout: ff_expire_execution failed"
106 );
107 self.failures.record_failure(eid_str, "attempt_timeout");
108 errors += 1;
109 }
110 }
111 }
112
113 ScanResult { processed, errors }
114 }
115}
116
117pub async fn expire_execution_raw(
132 client: &ferriskey::Client,
133 partition: &Partition,
134 idx: &IndexKeys,
135 eid_str: &str,
136 reason: &str,
137) -> Result<(), ferriskey::Error> {
138 let tag = partition.hash_tag();
139
140 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
142 let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
143 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
144 let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
145
146 let pre_fields: Vec<Option<String>> = client
150 .cmd("HMGET")
151 .arg(&exec_core)
152 .arg("lane_id")
153 .arg("current_attempt_index")
154 .execute()
155 .await?;
156 let lane = ff_core::types::LaneId::new(
157 pre_fields.first()
158 .and_then(|v| v.as_deref())
159 .unwrap_or("default"),
160 );
161 let att_idx = pre_fields.get(1)
162 .and_then(|v| v.as_deref())
163 .unwrap_or("0");
164
165 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
166 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
167
168 let lease_expiry = idx.lease_expiry();
170 let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
171 let active = idx.lane_active(&lane);
172 let terminal = idx.lane_terminal(&lane);
173 let attempt_timeout = idx.attempt_timeout();
174 let execution_deadline = idx.execution_deadline();
175 let suspended = idx.lane_suspended(&lane);
176 let suspension_timeout = idx.suspension_timeout();
177
178 let keys: [&str; 14] = [
180 &exec_core, &attempt_hash, &stream_meta, &lease_current, &lease_history, &lease_expiry, &worker_leases, &active, &terminal, &attempt_timeout, &execution_deadline, &suspended, &suspension_timeout, &susp_current, ];
195
196 let argv: [&str; 2] = [eid_str, reason];
197
198 let _: ferriskey::Value = client
199 .fcall("ff_expire_execution", &keys, &argv)
200 .await?;
201
202 Ok(())
203}