ff_engine/scanner/
pending_wp_expiry.rs1use std::time::Duration;
10
11use ff_core::keys::IndexKeys;
12use ff_core::partition::{Partition, PartitionFamily};
13
14use super::{FailureTracker, ScanResult, Scanner};
15
16const BATCH_SIZE: u32 = 100;
17
18pub struct PendingWaitpointExpiryScanner {
19 interval: Duration,
20 failures: FailureTracker,
21}
22
23impl PendingWaitpointExpiryScanner {
24 pub fn new(interval: Duration) -> Self {
25 Self { interval, failures: FailureTracker::new() }
26 }
27}
28
29impl Scanner for PendingWaitpointExpiryScanner {
30 fn name(&self) -> &'static str {
31 "pending_wp_expiry"
32 }
33
34 fn interval(&self) -> Duration {
35 self.interval
36 }
37
38 async fn scan_partition(
39 &self,
40 client: &ferriskey::Client,
41 partition: u16,
42 ) -> ScanResult {
43 let p = Partition {
44 family: PartitionFamily::Execution,
45 index: partition,
46 };
47 let idx = IndexKeys::new(&p);
48 let expiry_key = idx.pending_waitpoint_expiry();
49 let tag = p.hash_tag();
50
51 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52 Ok(t) => t,
53 Err(e) => {
54 tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
55 return ScanResult { processed: 0, errors: 1 };
56 }
57 };
58
59 let expired: Vec<String> = match client
61 .cmd("ZRANGEBYSCORE")
62 .arg(&expiry_key)
63 .arg("-inf")
64 .arg(now_ms.to_string().as_str())
65 .arg("LIMIT")
66 .arg("0")
67 .arg(BATCH_SIZE.to_string().as_str())
68 .execute()
69 .await
70 {
71 Ok(ids) => ids,
72 Err(e) => {
73 tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
74 return ScanResult { processed: 0, errors: 1 };
75 }
76 };
77
78 if partition == 0 {
79 self.failures.advance_cycle();
80 }
81
82 if expired.is_empty() {
83 return ScanResult { processed: 0, errors: 0 };
84 }
85
86 let mut processed: u32 = 0;
87 let mut errors: u32 = 0;
88
89 for wp_id_str in &expired {
90 if self.failures.should_skip(wp_id_str) {
91 continue;
92 }
93
94 match close_expired_waitpoint(client, &tag, &idx, wp_id_str).await {
95 Ok(()) => {
96 self.failures.record_success(wp_id_str);
97 processed += 1;
98 }
99 Err(e) => {
100 tracing::warn!(
101 partition,
102 waitpoint_id = wp_id_str.as_str(),
103 error = %e,
104 "pending_wp_expiry: ff_close_waitpoint failed"
105 );
106 self.failures.record_failure(wp_id_str, "pending_wp_expiry");
107 errors += 1;
108 }
109 }
110 }
111
112 ScanResult { processed, errors }
113 }
114}
115
116async fn close_expired_waitpoint(
126 client: &ferriskey::Client,
127 tag: &str,
128 idx: &IndexKeys,
129 wp_id_str: &str,
130) -> Result<(), ferriskey::Error> {
131 let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
132
133 let eid: Option<String> = client
135 .cmd("HGET")
136 .arg(&waitpoint_hash)
137 .arg("execution_id")
138 .execute()
139 .await?;
140
141 let eid = eid.unwrap_or_default();
142 let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
143 let pending_wp_expiry = idx.pending_waitpoint_expiry();
144
145 let keys: [&str; 3] = [
146 &exec_core,
147 &waitpoint_hash,
148 &pending_wp_expiry,
149 ];
150
151 let argv: [&str; 2] = [wp_id_str, "never_committed"];
152
153 let _: ferriskey::Value = client
154 .fcall("ff_close_waitpoint", &keys, &argv)
155 .await?;
156
157 Ok(())
158}