1use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::engine_backend::EngineBackend;
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::{ExecutionId, TimestampMs};
18
19use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
20
21const BATCH_SIZE: u32 = 50;
22
23#[cfg(feature = "postgres")]
25pub async fn scan_tick_pg(
26 pool: &ff_backend_postgres::PgPool,
27 partition_key: i16,
28 filter: &ff_core::backend::ScannerFilter,
29) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
30 ff_backend_postgres::reconcilers::suspension_timeout::scan_tick(pool, partition_key, filter)
31 .await
32}
33
34pub struct SuspensionTimeoutScanner {
35 interval: Duration,
36 failures: FailureTracker,
37 filter: ScannerFilter,
38 backend: Option<Arc<dyn EngineBackend>>,
39}
40
41impl SuspensionTimeoutScanner {
42 pub fn new(interval: Duration) -> Self {
43 Self::with_filter(interval, ScannerFilter::default())
44 }
45
46 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
49 Self {
50 interval,
51 failures: FailureTracker::new(),
52 filter,
53 backend: None,
54 }
55 }
56
57 pub fn with_filter_and_backend(
59 interval: Duration,
60 filter: ScannerFilter,
61 backend: Arc<dyn EngineBackend>,
62 ) -> Self {
63 Self {
64 interval,
65 failures: FailureTracker::new(),
66 filter,
67 backend: Some(backend),
68 }
69 }
70}
71
72impl Scanner for SuspensionTimeoutScanner {
73 fn name(&self) -> &'static str {
74 "suspension_timeout"
75 }
76
77 fn interval(&self) -> Duration {
78 self.interval
79 }
80
81 fn filter(&self) -> &ScannerFilter {
82 &self.filter
83 }
84
85 async fn scan_partition(
86 &self,
87 client: &ferriskey::Client,
88 partition: u16,
89 ) -> ScanResult {
90 let p = Partition {
91 family: PartitionFamily::Execution,
92 index: partition,
93 };
94 let idx = IndexKeys::new(&p);
95 let timeout_key = idx.suspension_timeout();
96 let tag = p.hash_tag();
97
98 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
99 b.server_time_ms().await.map_err(|e| e.to_string())
100 } else {
101 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
102 };
103 let now_ms = match now_ms_res {
104 Ok(t) => t,
105 Err(e) => {
106 tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
107 return ScanResult { processed: 0, errors: 1 };
108 }
109 };
110
111 let timed_out: Vec<String> = match client
113 .cmd("ZRANGEBYSCORE")
114 .arg(&timeout_key)
115 .arg("-inf")
116 .arg(now_ms.to_string().as_str())
117 .arg("LIMIT")
118 .arg("0")
119 .arg(BATCH_SIZE.to_string().as_str())
120 .execute()
121 .await
122 {
123 Ok(ids) => ids,
124 Err(e) => {
125 tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
126 return ScanResult { processed: 0, errors: 1 };
127 }
128 };
129
130 if partition == 0 {
131 self.failures.advance_cycle();
132 }
133
134 if timed_out.is_empty() {
135 return ScanResult { processed: 0, errors: 0 };
136 }
137
138 let mut processed: u32 = 0;
139 let mut errors: u32 = 0;
140
141 for eid_str in &timed_out {
142 if self.failures.should_skip(eid_str) {
143 continue;
144 }
145 if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
146 continue;
147 }
148
149 let res = if let Some(ref backend) = self.backend {
150 let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
151 backend
152 .expire_suspension(p, &eid, TimestampMs(now_ms as i64))
153 .await
154 .map_err(|e| e.to_string())
155 } else {
156 expire_suspension(client, &tag, &idx, eid_str)
157 .await
158 .map_err(|e| e.to_string())
159 };
160 match res {
161 Ok(()) => {
162 self.failures.record_success(eid_str);
163 processed += 1;
164 }
165 Err(e) => {
166 tracing::warn!(
167 partition,
168 execution_id = eid_str.as_str(),
169 error = %e,
170 "suspension_timeout: expire_suspension failed"
171 );
172 self.failures.record_failure(eid_str, "suspension_timeout");
173 errors += 1;
174 }
175 }
176 }
177
178 ScanResult { processed, errors }
179 }
180}
181
182async fn expire_suspension(
196 client: &ferriskey::Client,
197 tag: &str,
198 idx: &IndexKeys,
199 eid_str: &str,
200) -> Result<(), ferriskey::Error> {
201 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
204 let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
205
206 let wp_id: Option<String> = client
208 .cmd("HGET")
209 .arg(&exec_core)
210 .arg("current_waitpoint_id")
211 .execute()
212 .await?;
213 let att_idx: Option<String> = client
214 .cmd("HGET")
215 .arg(&exec_core)
216 .arg("current_attempt_index")
217 .execute()
218 .await?;
219
220 let wp_id = wp_id.unwrap_or_default();
221 let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
222
223 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
224 let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
225 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
226 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
227
228 let suspension_timeout = idx.suspension_timeout();
230 let lane: Option<String> = client
233 .cmd("HGET")
234 .arg(&exec_core)
235 .arg("lane_id")
236 .execute()
237 .await?;
238 let lane_str = lane.unwrap_or_else(|| "default".to_string());
239 let lane_id = ff_core::types::LaneId::new(&lane_str);
240
241 let suspended_zset = idx.lane_suspended(&lane_id);
242 let terminal_zset = idx.lane_terminal(&lane_id);
243 let eligible_zset = idx.lane_eligible(&lane_id);
244 let delayed_zset = idx.lane_delayed(&lane_id);
245 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
246
247 let keys: [&str; 12] = [
248 &exec_core, &suspension_current, &waitpoint_hash, &wp_condition, &attempt_hash, &stream_meta, &suspension_timeout, &suspended_zset, &terminal_zset, &eligible_zset, &delayed_zset, &lease_history, ];
261
262 let argv: [&str; 1] = [eid_str];
263
264 let _: ferriskey::Value = client
265 .fcall("ff_expire_suspension", &keys, &argv)
266 .await?;
267
268 Ok(())
269}