ff_engine/scanner/
suspension_timeout.rs1use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{FailureTracker, ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 50;
18
19pub struct SuspensionTimeoutScanner {
20 interval: Duration,
21 failures: FailureTracker,
22}
23
24impl SuspensionTimeoutScanner {
25 pub fn new(interval: Duration) -> Self {
26 Self { interval, failures: FailureTracker::new() }
27 }
28}
29
30impl Scanner for SuspensionTimeoutScanner {
31 fn name(&self) -> &'static str {
32 "suspension_timeout"
33 }
34
35 fn interval(&self) -> Duration {
36 self.interval
37 }
38
39 async fn scan_partition(
40 &self,
41 client: &ferriskey::Client,
42 partition: u16,
43 ) -> ScanResult {
44 let p = Partition {
45 family: PartitionFamily::Execution,
46 index: partition,
47 };
48 let idx = IndexKeys::new(&p);
49 let timeout_key = idx.suspension_timeout();
50 let tag = p.hash_tag();
51
52 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
53 Ok(t) => t,
54 Err(e) => {
55 tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
56 return ScanResult { processed: 0, errors: 1 };
57 }
58 };
59
60 let timed_out: Vec<String> = match client
62 .cmd("ZRANGEBYSCORE")
63 .arg(&timeout_key)
64 .arg("-inf")
65 .arg(now_ms.to_string().as_str())
66 .arg("LIMIT")
67 .arg("0")
68 .arg(BATCH_SIZE.to_string().as_str())
69 .execute()
70 .await
71 {
72 Ok(ids) => ids,
73 Err(e) => {
74 tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
75 return ScanResult { processed: 0, errors: 1 };
76 }
77 };
78
79 if partition == 0 {
80 self.failures.advance_cycle();
81 }
82
83 if timed_out.is_empty() {
84 return ScanResult { processed: 0, errors: 0 };
85 }
86
87 let mut processed: u32 = 0;
88 let mut errors: u32 = 0;
89
90 for eid_str in &timed_out {
91 if self.failures.should_skip(eid_str) {
92 continue;
93 }
94
95 match expire_suspension(client, &tag, &idx, eid_str).await {
96 Ok(()) => {
97 self.failures.record_success(eid_str);
98 processed += 1;
99 }
100 Err(e) => {
101 tracing::warn!(
102 partition,
103 execution_id = eid_str.as_str(),
104 error = %e,
105 "suspension_timeout: ff_expire_suspension failed"
106 );
107 self.failures.record_failure(eid_str, "suspension_timeout");
108 errors += 1;
109 }
110 }
111 }
112
113 ScanResult { processed, errors }
114 }
115}
116
117async fn expire_suspension(
131 client: &ferriskey::Client,
132 tag: &str,
133 idx: &IndexKeys,
134 eid_str: &str,
135) -> Result<(), ferriskey::Error> {
136 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
139 let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
140
141 let wp_id: Option<String> = client
143 .cmd("HGET")
144 .arg(&exec_core)
145 .arg("current_waitpoint_id")
146 .execute()
147 .await?;
148 let att_idx: Option<String> = client
149 .cmd("HGET")
150 .arg(&exec_core)
151 .arg("current_attempt_index")
152 .execute()
153 .await?;
154
155 let wp_id = wp_id.unwrap_or_default();
156 let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
157
158 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
159 let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
160 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
161 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
162
163 let suspension_timeout = idx.suspension_timeout();
165 let lane: Option<String> = client
168 .cmd("HGET")
169 .arg(&exec_core)
170 .arg("lane_id")
171 .execute()
172 .await?;
173 let lane_str = lane.unwrap_or_else(|| "default".to_string());
174 let lane_id = ff_core::types::LaneId::new(&lane_str);
175
176 let suspended_zset = idx.lane_suspended(&lane_id);
177 let terminal_zset = idx.lane_terminal(&lane_id);
178 let eligible_zset = idx.lane_eligible(&lane_id);
179 let delayed_zset = idx.lane_delayed(&lane_id);
180 let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
181
182 let keys: [&str; 12] = [
183 &exec_core, &suspension_current, &waitpoint_hash, &wp_condition, &attempt_hash, &stream_meta, &suspension_timeout, &suspended_zset, &terminal_zset, &eligible_zset, &delayed_zset, &lease_history, ];
196
197 let argv: [&str; 1] = [eid_str];
198
199 let _: ferriskey::Value = client
200 .fcall("ff_expire_suspension", &keys, &argv)
201 .await?;
202
203 Ok(())
204}