ff_engine/scanner/
lease_expiry.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
20
21pub struct LeaseExpiryScanner {
22 interval: Duration,
23 failures: FailureTracker,
24 filter: ScannerFilter,
25}
26
27impl LeaseExpiryScanner {
28 pub fn new(interval: Duration) -> Self {
29 Self::with_filter(interval, ScannerFilter::default())
30 }
31
32 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
35 Self {
36 interval,
37 failures: FailureTracker::new(),
38 filter,
39 }
40 }
41}
42
43impl Scanner for LeaseExpiryScanner {
44 fn name(&self) -> &'static str {
45 "lease_expiry"
46 }
47
48 fn interval(&self) -> Duration {
49 self.interval
50 }
51
52 fn filter(&self) -> &ScannerFilter {
53 &self.filter
54 }
55
56 async fn scan_partition(
57 &self,
58 client: &ferriskey::Client,
59 partition: u16,
60 ) -> ScanResult {
61 let p = Partition {
62 family: PartitionFamily::Execution,
63 index: partition,
64 };
65 let idx = IndexKeys::new(&p);
66 let lease_expiry_key = idx.lease_expiry();
67
68 let now_ms = match server_time_ms(client).await {
70 Ok(t) => t,
71 Err(e) => {
72 tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
73 return ScanResult { processed: 0, errors: 1 };
74 }
75 };
76
77 let expired: Vec<String> = match client
79 .cmd("ZRANGEBYSCORE")
80 .arg(&lease_expiry_key)
81 .arg("-inf")
82 .arg(now_ms.to_string().as_str())
83 .arg("LIMIT")
84 .arg("0")
85 .arg(BATCH_SIZE.to_string().as_str())
86 .execute()
87 .await
88 {
89 Ok(ids) => ids,
90 Err(e) => {
91 tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
92 return ScanResult { processed: 0, errors: 1 };
93 }
94 };
95
96 if partition == 0 {
97 self.failures.advance_cycle();
98 }
99
100 if expired.is_empty() {
101 return ScanResult { processed: 0, errors: 0 };
102 }
103
104 let mut processed: u32 = 0;
105 let mut errors: u32 = 0;
106
107 for eid_str in &expired {
111 if self.failures.should_skip(eid_str) {
112 continue;
113 }
114 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
115 continue;
116 }
117
118 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
119 let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
120 let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
121
122 let keys: [&str; 4] = [
123 &exec_core,
124 &lease_current,
125 &lease_expiry_key,
126 &lease_history,
127 ];
128
129 match client.fcall::<ferriskey::Value>(
130 "ff_mark_lease_expired_if_due",
131 &keys,
132 &[eid_str.as_str()],
133 ).await {
134 Ok(_) => {
135 self.failures.record_success(eid_str);
136 processed += 1;
137 }
138 Err(e) => {
139 tracing::warn!(
140 partition,
141 execution_id = eid_str.as_str(),
142 error = %e,
143 "lease_expiry: ff_mark_lease_expired_if_due failed"
144 );
145 self.failures.record_failure(eid_str, "lease_expiry");
146 errors += 1;
147 }
148 }
149 }
150
151 ScanResult { processed, errors }
152 }
153}
154
155pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
157 let result: Vec<String> = client
158 .cmd("TIME")
159 .execute()
160 .await?;
161 if result.len() < 2 {
162 return Err(ferriskey::Error::from((
163 ferriskey::ErrorKind::ClientError,
164 "TIME returned fewer than 2 elements",
165 )));
166 }
167 let secs: u64 = result[0].parse().map_err(|_| {
168 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
169 })?;
170 let micros: u64 = result[1].parse().map_err(|_| {
171 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
172 })?;
173 Ok(secs * 1000 + micros / 1000)
174}