ff_engine/scanner/
pending_wp_expiry.rs1use std::time::Duration;
10
11use ff_core::backend::ScannerFilter;
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 100;
18
19pub struct PendingWaitpointExpiryScanner {
20 interval: Duration,
21 failures: FailureTracker,
22 filter: ScannerFilter,
23}
24
25impl PendingWaitpointExpiryScanner {
26 pub fn new(interval: Duration) -> Self {
27 Self::with_filter(interval, ScannerFilter::default())
28 }
29
30 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
35 Self {
36 interval,
37 failures: FailureTracker::new(),
38 filter,
39 }
40 }
41}
42
43impl Scanner for PendingWaitpointExpiryScanner {
44 fn name(&self) -> &'static str {
45 "pending_wp_expiry"
46 }
47
48 fn interval(&self) -> Duration {
49 self.interval
50 }
51
52 fn filter(&self) -> &ScannerFilter {
53 &self.filter
54 }
55
56 async fn scan_partition(
57 &self,
58 client: &ferriskey::Client,
59 partition: u16,
60 ) -> ScanResult {
61 let p = Partition {
62 family: PartitionFamily::Execution,
63 index: partition,
64 };
65 let idx = IndexKeys::new(&p);
66 let expiry_key = idx.pending_waitpoint_expiry();
67 let tag = p.hash_tag();
68
69 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
70 Ok(t) => t,
71 Err(e) => {
72 tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
73 return ScanResult { processed: 0, errors: 1 };
74 }
75 };
76
77 let expired: Vec<String> = match client
79 .cmd("ZRANGEBYSCORE")
80 .arg(&expiry_key)
81 .arg("-inf")
82 .arg(now_ms.to_string().as_str())
83 .arg("LIMIT")
84 .arg("0")
85 .arg(BATCH_SIZE.to_string().as_str())
86 .execute()
87 .await
88 {
89 Ok(ids) => ids,
90 Err(e) => {
91 tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
92 return ScanResult { processed: 0, errors: 1 };
93 }
94 };
95
96 if partition == 0 {
97 self.failures.advance_cycle();
98 }
99
100 if expired.is_empty() {
101 return ScanResult { processed: 0, errors: 0 };
102 }
103
104 let mut processed: u32 = 0;
105 let mut errors: u32 = 0;
106
107 for wp_id_str in &expired {
108 if self.failures.should_skip(wp_id_str) {
109 continue;
110 }
111 if !self.filter.is_noop() {
117 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
118 let eid: Option<String> = match client
119 .cmd("HGET")
120 .arg(&waitpoint_hash)
121 .arg("execution_id")
122 .execute()
123 .await
124 {
125 Ok(v) => v,
126 Err(_) => {
127 continue;
129 }
130 };
131 let Some(eid) = eid else { continue };
132 if should_skip_candidate(client, &self.filter, partition, &eid).await {
133 continue;
134 }
135 }
136
137 match close_expired_waitpoint(client, &tag, &idx, wp_id_str).await {
138 Ok(()) => {
139 self.failures.record_success(wp_id_str);
140 processed += 1;
141 }
142 Err(e) => {
143 tracing::warn!(
144 partition,
145 waitpoint_id = wp_id_str.as_str(),
146 error = %e,
147 "pending_wp_expiry: ff_close_waitpoint failed"
148 );
149 self.failures.record_failure(wp_id_str, "pending_wp_expiry");
150 errors += 1;
151 }
152 }
153 }
154
155 ScanResult { processed, errors }
156 }
157}
158
159async fn close_expired_waitpoint(
169 client: &ferriskey::Client,
170 tag: &str,
171 idx: &IndexKeys,
172 wp_id_str: &str,
173) -> Result<(), ferriskey::Error> {
174 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
175
176 let eid: Option<String> = client
178 .cmd("HGET")
179 .arg(&waitpoint_hash)
180 .arg("execution_id")
181 .execute()
182 .await?;
183
184 let eid = eid.unwrap_or_default();
185 let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
186 let pending_wp_expiry = idx.pending_waitpoint_expiry();
187
188 let keys: [&str; 3] = [
189 &exec_core,
190 &waitpoint_hash,
191 &pending_wp_expiry,
192 ];
193
194 let argv: [&str; 2] = [wp_id_str, "never_committed"];
195
196 let _: ferriskey::Value = client
197 .fcall("ff_close_waitpoint", &keys, &argv)
198 .await?;
199
200 Ok(())
201}