ff_engine/scanner/
suspension_timeout.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20#[cfg(feature = "postgres")]
22pub async fn scan_tick_pg(
23 pool: &ff_backend_postgres::PgPool,
24 partition_key: i16,
25 filter: &ff_core::backend::ScannerFilter,
26) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
27 ff_backend_postgres::reconcilers::suspension_timeout::scan_tick(pool, partition_key, filter)
28 .await
29}
30
31pub struct SuspensionTimeoutScanner {
32 interval: Duration,
33 failures: FailureTracker,
34 filter: ScannerFilter,
35}
36
37impl SuspensionTimeoutScanner {
38 pub fn new(interval: Duration) -> Self {
39 Self::with_filter(interval, ScannerFilter::default())
40 }
41
42 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
45 Self {
46 interval,
47 failures: FailureTracker::new(),
48 filter,
49 }
50 }
51}
52
53impl Scanner for SuspensionTimeoutScanner {
54 fn name(&self) -> &'static str {
55 "suspension_timeout"
56 }
57
58 fn interval(&self) -> Duration {
59 self.interval
60 }
61
62 fn filter(&self) -> &ScannerFilter {
63 &self.filter
64 }
65
66 async fn scan_partition(
67 &self,
68 client: &ferriskey::Client,
69 partition: u16,
70 ) -> ScanResult {
71 let p = Partition {
72 family: PartitionFamily::Execution,
73 index: partition,
74 };
75 let idx = IndexKeys::new(&p);
76 let timeout_key = idx.suspension_timeout();
77 let tag = p.hash_tag();
78
79 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
80 Ok(t) => t,
81 Err(e) => {
82 tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
83 return ScanResult { processed: 0, errors: 1 };
84 }
85 };
86
87 let timed_out: Vec<String> = match client
89 .cmd("ZRANGEBYSCORE")
90 .arg(&timeout_key)
91 .arg("-inf")
92 .arg(now_ms.to_string().as_str())
93 .arg("LIMIT")
94 .arg("0")
95 .arg(BATCH_SIZE.to_string().as_str())
96 .execute()
97 .await
98 {
99 Ok(ids) => ids,
100 Err(e) => {
101 tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
102 return ScanResult { processed: 0, errors: 1 };
103 }
104 };
105
106 if partition == 0 {
107 self.failures.advance_cycle();
108 }
109
110 if timed_out.is_empty() {
111 return ScanResult { processed: 0, errors: 0 };
112 }
113
114 let mut processed: u32 = 0;
115 let mut errors: u32 = 0;
116
117 for eid_str in &timed_out {
118 if self.failures.should_skip(eid_str) {
119 continue;
120 }
121 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
122 continue;
123 }
124
125 match expire_suspension(client, &tag, &idx, eid_str).await {
126 Ok(()) => {
127 self.failures.record_success(eid_str);
128 processed += 1;
129 }
130 Err(e) => {
131 tracing::warn!(
132 partition,
133 execution_id = eid_str.as_str(),
134 error = %e,
135 "suspension_timeout: ff_expire_suspension failed"
136 );
137 self.failures.record_failure(eid_str, "suspension_timeout");
138 errors += 1;
139 }
140 }
141 }
142
143 ScanResult { processed, errors }
144 }
145}
146
147async fn expire_suspension(
161 client: &ferriskey::Client,
162 tag: &str,
163 idx: &IndexKeys,
164 eid_str: &str,
165) -> Result<(), ferriskey::Error> {
166 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
169 let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
170
171 let wp_id: Option<String> = client
173 .cmd("HGET")
174 .arg(&exec_core)
175 .arg("current_waitpoint_id")
176 .execute()
177 .await?;
178 let att_idx: Option<String> = client
179 .cmd("HGET")
180 .arg(&exec_core)
181 .arg("current_attempt_index")
182 .execute()
183 .await?;
184
185 let wp_id = wp_id.unwrap_or_default();
186 let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
187
188 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
189 let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
190 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
191 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
192
193 let suspension_timeout = idx.suspension_timeout();
195 let lane: Option<String> = client
198 .cmd("HGET")
199 .arg(&exec_core)
200 .arg("lane_id")
201 .execute()
202 .await?;
203 let lane_str = lane.unwrap_or_else(|| "default".to_string());
204 let lane_id = ff_core::types::LaneId::new(&lane_str);
205
206 let suspended_zset = idx.lane_suspended(&lane_id);
207 let terminal_zset = idx.lane_terminal(&lane_id);
208 let eligible_zset = idx.lane_eligible(&lane_id);
209 let delayed_zset = idx.lane_delayed(&lane_id);
210 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
211
212 let keys: [&str; 12] = [
213 &exec_core, &suspension_current, &waitpoint_hash, &wp_condition, &attempt_hash, &stream_meta, &suspension_timeout, &suspended_zset, &terminal_zset, &eligible_zset, &delayed_zset, &lease_history, ];
226
227 let argv: [&str; 1] = [eid_str];
228
229 let _: ferriskey::Value = client
230 .fcall("ff_expire_suspension", &keys, &argv)
231 .await?;
232
233 Ok(())
234}