1use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::engine_backend::{EngineBackend, ExpirePhase};
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::{ExecutionId, LaneId, TimestampMs};
18
19use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
20
21const BATCH_SIZE: u32 = 50;
22
23#[cfg(feature = "postgres")]
31pub async fn scan_tick_pg(
32 pool: &ff_backend_postgres::PgPool,
33 partition_key: i16,
34 filter: &ff_core::backend::ScannerFilter,
35) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
36 ff_backend_postgres::reconcilers::attempt_timeout::scan_tick(pool, partition_key, filter).await
37}
38
39pub struct AttemptTimeoutScanner {
40 interval: Duration,
41 failures: FailureTracker,
42 filter: ScannerFilter,
43 backend: Option<Arc<dyn EngineBackend>>,
44}
45
46impl AttemptTimeoutScanner {
47 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
48 Self::with_filter(interval, lanes, ScannerFilter::default())
49 }
50
51 pub fn with_filter(
54 interval: Duration,
55 _lanes: Vec<LaneId>,
56 filter: ScannerFilter,
57 ) -> Self {
58 Self {
59 interval,
60 failures: FailureTracker::new(),
61 filter,
62 backend: None,
63 }
64 }
65
66 pub fn with_filter_and_backend(
68 interval: Duration,
69 _lanes: Vec<LaneId>,
70 filter: ScannerFilter,
71 backend: Arc<dyn EngineBackend>,
72 ) -> Self {
73 Self {
74 interval,
75 failures: FailureTracker::new(),
76 filter,
77 backend: Some(backend),
78 }
79 }
80}
81
82impl Scanner for AttemptTimeoutScanner {
83 fn name(&self) -> &'static str {
84 "attempt_timeout"
85 }
86
87 fn interval(&self) -> Duration {
88 self.interval
89 }
90
91 fn filter(&self) -> &ScannerFilter {
92 &self.filter
93 }
94
95 async fn scan_partition(
96 &self,
97 client: &ferriskey::Client,
98 partition: u16,
99 ) -> ScanResult {
100 let p = Partition {
101 family: PartitionFamily::Execution,
102 index: partition,
103 };
104 let idx = IndexKeys::new(&p);
105 let timeout_key = idx.attempt_timeout();
106
107 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
108 b.server_time_ms().await.map_err(|e| e.to_string())
109 } else {
110 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
111 };
112 let now_ms = match now_ms_res {
113 Ok(t) => t,
114 Err(e) => {
115 tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
116 return ScanResult { processed: 0, errors: 1 };
117 }
118 };
119
120 let timed_out: Vec<String> = match client
122 .cmd("ZRANGEBYSCORE")
123 .arg(&timeout_key)
124 .arg("-inf")
125 .arg(now_ms.to_string().as_str())
126 .arg("LIMIT")
127 .arg("0")
128 .arg(BATCH_SIZE.to_string().as_str())
129 .execute()
130 .await
131 {
132 Ok(ids) => ids,
133 Err(e) => {
134 tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
135 return ScanResult { processed: 0, errors: 1 };
136 }
137 };
138
139 if partition == 0 {
140 self.failures.advance_cycle();
141 }
142
143 if timed_out.is_empty() {
144 return ScanResult { processed: 0, errors: 0 };
145 }
146
147 let mut processed: u32 = 0;
148 let mut errors: u32 = 0;
149
150 for eid_str in &timed_out {
151 if self.failures.should_skip(eid_str) {
152 continue;
153 }
154 if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
155 continue;
156 }
157
158 let res = if let Some(ref backend) = self.backend {
159 let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
160 backend
161 .expire_execution(p, &eid, ExpirePhase::AttemptTimeout, TimestampMs(now_ms as i64))
162 .await
163 .map_err(|e| e.to_string())
164 } else {
165 expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout")
166 .await
167 .map_err(|e| e.to_string())
168 };
169 match res {
170 Ok(()) => {
171 self.failures.record_success(eid_str);
172 processed += 1;
173 }
174 Err(e) => {
175 tracing::warn!(
176 partition,
177 execution_id = eid_str.as_str(),
178 error = %e,
179 "attempt_timeout: expire_execution failed"
180 );
181 self.failures.record_failure(eid_str, "attempt_timeout");
182 errors += 1;
183 }
184 }
185 }
186
187 ScanResult { processed, errors }
188 }
189}
190
191pub async fn expire_execution_raw(
206 client: &ferriskey::Client,
207 partition: &Partition,
208 idx: &IndexKeys,
209 eid_str: &str,
210 reason: &str,
211) -> Result<(), ferriskey::Error> {
212 let tag = partition.hash_tag();
213
214 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
216 let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
217 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
218 let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
219
220 let pre_fields: Vec<Option<String>> = client
224 .cmd("HMGET")
225 .arg(&exec_core)
226 .arg("lane_id")
227 .arg("current_attempt_index")
228 .execute()
229 .await?;
230 let lane = ff_core::types::LaneId::new(
231 pre_fields.first()
232 .and_then(|v| v.as_deref())
233 .unwrap_or("default"),
234 );
235 let att_idx = pre_fields.get(1)
236 .and_then(|v| v.as_deref())
237 .unwrap_or("0");
238
239 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
240 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
241
242 let lease_expiry = idx.lease_expiry();
244 let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
245 let active = idx.lane_active(&lane);
246 let terminal = idx.lane_terminal(&lane);
247 let attempt_timeout = idx.attempt_timeout();
248 let execution_deadline = idx.execution_deadline();
249 let suspended = idx.lane_suspended(&lane);
250 let suspension_timeout = idx.suspension_timeout();
251
252 let keys: [&str; 14] = [
254 &exec_core, &attempt_hash, &stream_meta, &lease_current, &lease_history, &lease_expiry, &worker_leases, &active, &terminal, &attempt_timeout, &execution_deadline, &suspended, &suspension_timeout, &susp_current, ];
269
270 let argv: [&str; 2] = [eid_str, reason];
271
272 let _: ferriskey::Value = client
273 .fcall("ff_expire_execution", &keys, &argv)
274 .await?;
275
276 Ok(())
277}