ff_engine/scanner/
lease_expiry.rs1use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{FailureTracker, ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 50;
19
20pub struct LeaseExpiryScanner {
21 interval: Duration,
22 failures: FailureTracker,
23}
24
25impl LeaseExpiryScanner {
26 pub fn new(interval: Duration) -> Self {
27 Self { interval, failures: FailureTracker::new() }
28 }
29}
30
31impl Scanner for LeaseExpiryScanner {
32 fn name(&self) -> &'static str {
33 "lease_expiry"
34 }
35
36 fn interval(&self) -> Duration {
37 self.interval
38 }
39
40 async fn scan_partition(
41 &self,
42 client: &ferriskey::Client,
43 partition: u16,
44 ) -> ScanResult {
45 let p = Partition {
46 family: PartitionFamily::Execution,
47 index: partition,
48 };
49 let idx = IndexKeys::new(&p);
50 let lease_expiry_key = idx.lease_expiry();
51
52 let now_ms = match server_time_ms(client).await {
54 Ok(t) => t,
55 Err(e) => {
56 tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
57 return ScanResult { processed: 0, errors: 1 };
58 }
59 };
60
61 let expired: Vec<String> = match client
63 .cmd("ZRANGEBYSCORE")
64 .arg(&lease_expiry_key)
65 .arg("-inf")
66 .arg(now_ms.to_string().as_str())
67 .arg("LIMIT")
68 .arg("0")
69 .arg(BATCH_SIZE.to_string().as_str())
70 .execute()
71 .await
72 {
73 Ok(ids) => ids,
74 Err(e) => {
75 tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
76 return ScanResult { processed: 0, errors: 1 };
77 }
78 };
79
80 if partition == 0 {
81 self.failures.advance_cycle();
82 }
83
84 if expired.is_empty() {
85 return ScanResult { processed: 0, errors: 0 };
86 }
87
88 let mut processed: u32 = 0;
89 let mut errors: u32 = 0;
90
91 for eid_str in &expired {
95 if self.failures.should_skip(eid_str) {
96 continue;
97 }
98
99 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
100 let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
101 let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
102
103 let keys: [&str; 4] = [
104 &exec_core,
105 &lease_current,
106 &lease_expiry_key,
107 &lease_history,
108 ];
109
110 match client.fcall::<ferriskey::Value>(
111 "ff_mark_lease_expired_if_due",
112 &keys,
113 &[eid_str.as_str()],
114 ).await {
115 Ok(_) => {
116 self.failures.record_success(eid_str);
117 processed += 1;
118 }
119 Err(e) => {
120 tracing::warn!(
121 partition,
122 execution_id = eid_str.as_str(),
123 error = %e,
124 "lease_expiry: ff_mark_lease_expired_if_due failed"
125 );
126 self.failures.record_failure(eid_str, "lease_expiry");
127 errors += 1;
128 }
129 }
130 }
131
132 ScanResult { processed, errors }
133 }
134}
135
136pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
138 let result: Vec<String> = client
139 .cmd("TIME")
140 .execute()
141 .await?;
142 if result.len() < 2 {
143 return Err(ferriskey::Error::from((
144 ferriskey::ErrorKind::ClientError,
145 "TIME returned fewer than 2 elements",
146 )));
147 }
148 let secs: u64 = result[0].parse().map_err(|_| {
149 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
150 })?;
151 let micros: u64 = result[1].parse().map_err(|_| {
152 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
153 })?;
154 Ok(secs * 1000 + micros / 1000)
155}