1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15use ff_core::types::LaneId;
16
17use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
18
19const BATCH_SIZE: u32 = 50;
20
21#[cfg(feature = "postgres")]
29pub async fn scan_tick_pg(
30 pool: &ff_backend_postgres::PgPool,
31 partition_key: i16,
32 filter: &ff_core::backend::ScannerFilter,
33) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
34 ff_backend_postgres::reconcilers::attempt_timeout::scan_tick(pool, partition_key, filter).await
35}
36
37pub struct AttemptTimeoutScanner {
38 interval: Duration,
39 failures: FailureTracker,
40 filter: ScannerFilter,
41}
42
43impl AttemptTimeoutScanner {
44 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
45 Self::with_filter(interval, lanes, ScannerFilter::default())
46 }
47
48 pub fn with_filter(
51 interval: Duration,
52 _lanes: Vec<LaneId>,
53 filter: ScannerFilter,
54 ) -> Self {
55 Self {
56 interval,
57 failures: FailureTracker::new(),
58 filter,
59 }
60 }
61}
62
63impl Scanner for AttemptTimeoutScanner {
64 fn name(&self) -> &'static str {
65 "attempt_timeout"
66 }
67
68 fn interval(&self) -> Duration {
69 self.interval
70 }
71
72 fn filter(&self) -> &ScannerFilter {
73 &self.filter
74 }
75
76 async fn scan_partition(
77 &self,
78 client: &ferriskey::Client,
79 partition: u16,
80 ) -> ScanResult {
81 let p = Partition {
82 family: PartitionFamily::Execution,
83 index: partition,
84 };
85 let idx = IndexKeys::new(&p);
86 let timeout_key = idx.attempt_timeout();
87
88 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
89 Ok(t) => t,
90 Err(e) => {
91 tracing::warn!(partition, error = %e, "attempt_timeout: failed to get server time");
92 return ScanResult { processed: 0, errors: 1 };
93 }
94 };
95
96 let timed_out: Vec<String> = match client
98 .cmd("ZRANGEBYSCORE")
99 .arg(&timeout_key)
100 .arg("-inf")
101 .arg(now_ms.to_string().as_str())
102 .arg("LIMIT")
103 .arg("0")
104 .arg(BATCH_SIZE.to_string().as_str())
105 .execute()
106 .await
107 {
108 Ok(ids) => ids,
109 Err(e) => {
110 tracing::warn!(partition, error = %e, "attempt_timeout: ZRANGEBYSCORE failed");
111 return ScanResult { processed: 0, errors: 1 };
112 }
113 };
114
115 if partition == 0 {
116 self.failures.advance_cycle();
117 }
118
119 if timed_out.is_empty() {
120 return ScanResult { processed: 0, errors: 0 };
121 }
122
123 let mut processed: u32 = 0;
124 let mut errors: u32 = 0;
125
126 for eid_str in &timed_out {
127 if self.failures.should_skip(eid_str) {
128 continue;
129 }
130 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
131 continue;
132 }
133
134 match expire_execution_raw(client, &p, &idx, eid_str, "attempt_timeout").await {
135 Ok(()) => {
136 self.failures.record_success(eid_str);
137 processed += 1;
138 }
139 Err(e) => {
140 tracing::warn!(
141 partition,
142 execution_id = eid_str.as_str(),
143 error = %e,
144 "attempt_timeout: ff_expire_execution failed"
145 );
146 self.failures.record_failure(eid_str, "attempt_timeout");
147 errors += 1;
148 }
149 }
150 }
151
152 ScanResult { processed, errors }
153 }
154}
155
156pub async fn expire_execution_raw(
171 client: &ferriskey::Client,
172 partition: &Partition,
173 idx: &IndexKeys,
174 eid_str: &str,
175 reason: &str,
176) -> Result<(), ferriskey::Error> {
177 let tag = partition.hash_tag();
178
179 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
181 let lease_current = format!("ff:exec:{}:{}:lease:current", tag, eid_str);
182 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
183 let susp_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
184
185 let pre_fields: Vec<Option<String>> = client
189 .cmd("HMGET")
190 .arg(&exec_core)
191 .arg("lane_id")
192 .arg("current_attempt_index")
193 .execute()
194 .await?;
195 let lane = ff_core::types::LaneId::new(
196 pre_fields.first()
197 .and_then(|v| v.as_deref())
198 .unwrap_or("default"),
199 );
200 let att_idx = pre_fields.get(1)
201 .and_then(|v| v.as_deref())
202 .unwrap_or("0");
203
204 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
205 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
206
207 let lease_expiry = idx.lease_expiry();
209 let worker_leases = idx.worker_leases(&ff_core::types::WorkerInstanceId::new(""));
210 let active = idx.lane_active(&lane);
211 let terminal = idx.lane_terminal(&lane);
212 let attempt_timeout = idx.attempt_timeout();
213 let execution_deadline = idx.execution_deadline();
214 let suspended = idx.lane_suspended(&lane);
215 let suspension_timeout = idx.suspension_timeout();
216
217 let keys: [&str; 14] = [
219 &exec_core, &attempt_hash, &stream_meta, &lease_current, &lease_history, &lease_expiry, &worker_leases, &active, &terminal, &attempt_timeout, &execution_deadline, &suspended, &suspension_timeout, &susp_current, ];
234
235 let argv: [&str; 2] = [eid_str, reason];
236
237 let _: ferriskey::Value = client
238 .fcall("ff_expire_execution", &keys, &argv)
239 .await?;
240
241 Ok(())
242}