ff_engine/scanner/
suspension_timeout.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20pub struct SuspensionTimeoutScanner {
21 interval: Duration,
22 failures: FailureTracker,
23 filter: ScannerFilter,
24}
25
26impl SuspensionTimeoutScanner {
27 pub fn new(interval: Duration) -> Self {
28 Self::with_filter(interval, ScannerFilter::default())
29 }
30
31 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
34 Self {
35 interval,
36 failures: FailureTracker::new(),
37 filter,
38 }
39 }
40}
41
42impl Scanner for SuspensionTimeoutScanner {
43 fn name(&self) -> &'static str {
44 "suspension_timeout"
45 }
46
47 fn interval(&self) -> Duration {
48 self.interval
49 }
50
51 fn filter(&self) -> &ScannerFilter {
52 &self.filter
53 }
54
55 async fn scan_partition(
56 &self,
57 client: &ferriskey::Client,
58 partition: u16,
59 ) -> ScanResult {
60 let p = Partition {
61 family: PartitionFamily::Execution,
62 index: partition,
63 };
64 let idx = IndexKeys::new(&p);
65 let timeout_key = idx.suspension_timeout();
66 let tag = p.hash_tag();
67
68 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
69 Ok(t) => t,
70 Err(e) => {
71 tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
72 return ScanResult { processed: 0, errors: 1 };
73 }
74 };
75
76 let timed_out: Vec<String> = match client
78 .cmd("ZRANGEBYSCORE")
79 .arg(&timeout_key)
80 .arg("-inf")
81 .arg(now_ms.to_string().as_str())
82 .arg("LIMIT")
83 .arg("0")
84 .arg(BATCH_SIZE.to_string().as_str())
85 .execute()
86 .await
87 {
88 Ok(ids) => ids,
89 Err(e) => {
90 tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
91 return ScanResult { processed: 0, errors: 1 };
92 }
93 };
94
95 if partition == 0 {
96 self.failures.advance_cycle();
97 }
98
99 if timed_out.is_empty() {
100 return ScanResult { processed: 0, errors: 0 };
101 }
102
103 let mut processed: u32 = 0;
104 let mut errors: u32 = 0;
105
106 for eid_str in &timed_out {
107 if self.failures.should_skip(eid_str) {
108 continue;
109 }
110 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
111 continue;
112 }
113
114 match expire_suspension(client, &tag, &idx, eid_str).await {
115 Ok(()) => {
116 self.failures.record_success(eid_str);
117 processed += 1;
118 }
119 Err(e) => {
120 tracing::warn!(
121 partition,
122 execution_id = eid_str.as_str(),
123 error = %e,
124 "suspension_timeout: ff_expire_suspension failed"
125 );
126 self.failures.record_failure(eid_str, "suspension_timeout");
127 errors += 1;
128 }
129 }
130 }
131
132 ScanResult { processed, errors }
133 }
134}
135
136async fn expire_suspension(
150 client: &ferriskey::Client,
151 tag: &str,
152 idx: &IndexKeys,
153 eid_str: &str,
154) -> Result<(), ferriskey::Error> {
155 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
158 let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
159
160 let wp_id: Option<String> = client
162 .cmd("HGET")
163 .arg(&exec_core)
164 .arg("current_waitpoint_id")
165 .execute()
166 .await?;
167 let att_idx: Option<String> = client
168 .cmd("HGET")
169 .arg(&exec_core)
170 .arg("current_attempt_index")
171 .execute()
172 .await?;
173
174 let wp_id = wp_id.unwrap_or_default();
175 let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
176
177 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
178 let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
179 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
180 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
181
182 let suspension_timeout = idx.suspension_timeout();
184 let lane: Option<String> = client
187 .cmd("HGET")
188 .arg(&exec_core)
189 .arg("lane_id")
190 .execute()
191 .await?;
192 let lane_str = lane.unwrap_or_else(|| "default".to_string());
193 let lane_id = ff_core::types::LaneId::new(&lane_str);
194
195 let suspended_zset = idx.lane_suspended(&lane_id);
196 let terminal_zset = idx.lane_terminal(&lane_id);
197 let eligible_zset = idx.lane_eligible(&lane_id);
198 let delayed_zset = idx.lane_delayed(&lane_id);
199 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
200
201 let keys: [&str; 12] = [
202 &exec_core, &suspension_current, &waitpoint_hash, &wp_condition, &attempt_hash, &stream_meta, &suspension_timeout, &suspended_zset, &terminal_zset, &eligible_zset, &delayed_zset, &lease_history, ];
215
216 let argv: [&str; 1] = [eid_str];
217
218 let _: ferriskey::Value = client
219 .fcall("ff_expire_suspension", &keys, &argv)
220 .await?;
221
222 Ok(())
223}