ff_engine/scanner/
attempt_timeout.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15use ff_core::types::LaneId;
16
17use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
18
19const BATCH_SIZE: u32 = 50;
20
21pub struct AttemptTimeoutScanner {
22 interval: Duration,
23 failures: FailureTracker,
24 filter: ScannerFilter,
25}
26
27impl AttemptTimeoutScanner {
28 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
29 Self::with_filter(interval, lanes, ScannerFilter::default())
30 }
31
32 pub fn with_filter(
35 interval: Duration,
36 _lanes: Vec<LaneId>,
37 filter: ScannerFilter,
38 ) -> Self {
39 Self {
40 interval,
41 failures: FailureTracker::new(),
42 filter,
43 }
44 }
45}
46
47impl Scanner for AttemptTimeoutScanner {
48 fn name(&self) -> &'static str {
49 "attempt_timeout"
50 }
51
52 fn interval(&self) -> Duration {
53 self.interval
54 }
55
56 fn filter(&self) -> &ScannerFilter {
57 &self.filter
58 }
59
60 async fn scan_partition(
61 &self,
62 client: &ferriskey::Client,
63 partition: u16,
64 ) -> ScanResult {
65 let p = Partition {
66 family: PartitionFamily::Execution,
67 index: partition,
68 };
69 let idx = IndexKeys::new(&p);
70 let timeout_key = idx.attempt_timeout();
71
72 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
73 Ok(t) => t,
74 Err(e) => {
75 tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
76 return ScanResult { processed: 0, errors: 1 };
77 }
78 };
79
80 let timed_out: Vec<String> = match client
82 .cmd("ZRANGEBYSCORE")
83 .arg(&timeout_key)
84 .arg("-inf")
85 .arg(now_ms.to_string().as_str())
86 .arg("LIMIT")
87 .arg("0")
88 .arg(BATCH_SIZE.to_string().as_str())
89 .execute()
90 .await
91 {
92 Ok(ids) => ids,
93 Err(e) => {
94 tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
95 return ScanResult { processed: 0, errors: 1 };
96 }
97 };
98
99 if partition == 0 {
100 self.failures.advance_cycle();
101 }
102
103 if timed_out.is_empty() {
104 return ScanResult { processed: 0, errors: 0 };
105 }
106
107 let mut processed: u32 = 0;
108 let mut errors: u32 = 0;
109
110 for eid_str in &timed_out {
111 if self.failures.should_skip(eid_str) {
112 continue;
113 }
114 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
115 continue;
116 }
117
118 match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
119 Ok(()) => {
120 self.failures.record_success(eid_str);
121 processed += 1;
122 }
123 Err(e) => {
124 tracing::warn!(
125 partition,
126 execution_id = eid_str.as_str(),
127 error = %e,
128 "attempt_timeout: ff_expire_execution failed"
129 );
130 self.failures.record_failure(eid_str, "attempt_timeout");
131 errors += 1;
132 }
133 }
134 }
135
136 ScanResult { processed, errors }
137 }
138}
139
140pub async fn expire_execution_raw(
155 client: &ferriskey::Client,
156 partition: &Partition,
157 idx: &IndexKeys,
158 eid_str: &str,
159 reason: &str,
160) -> Result<(), ferriskey::Error> {
161 let tag = partition.hash_tag();
162
163 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
165 let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
166 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
167 let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
168
169 let pre_fields: Vec<Option<String>> = client
173 .cmd("HMGET")
174 .arg(&exec_core)
175 .arg("lane_id")
176 .arg("current_attempt_index")
177 .execute()
178 .await?;
179 let lane = ff_core::types::LaneId::new(
180 pre_fields.first()
181 .and_then(|v| v.as_deref())
182 .unwrap_or("default"),
183 );
184 let att_idx = pre_fields.get(1)
185 .and_then(|v| v.as_deref())
186 .unwrap_or("0");
187
188 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
189 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
190
191 let lease_expiry = idx.lease_expiry();
193 let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
194 let active = idx.lane_active(&lane);
195 let terminal = idx.lane_terminal(&lane);
196 let attempt_timeout = idx.attempt_timeout();
197 let execution_deadline = idx.execution_deadline();
198 let suspended = idx.lane_suspended(&lane);
199 let suspension_timeout = idx.suspension_timeout();
200
201 let keys: [&str; 14] = [
203 &exec_core, &attempt_hash, &stream_meta, &lease_current, &lease_history, &lease_expiry, &worker_leases, &active, &terminal, &attempt_timeout, &execution_deadline, &suspended, &suspension_timeout, &susp_current, ];
218
219 let argv: [&str; 2] = [eid_str, reason];
220
221 let _: ferriskey::Value = client
222 .fcall("ff_expire_execution", &keys, &argv)
223 .await?;
224
225 Ok(())
226}