ff_engine/scanner/
lease_expiry.rs1use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
20
21#[cfg(feature = "postgres")]
23pub async fn scan_tick_pg(
24 pool: &ff_backend_postgres::PgPool,
25 partition_key: i16,
26 filter: &ff_core::backend::ScannerFilter,
27) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
28 ff_backend_postgres::reconcilers::lease_expiry::scan_tick(pool, partition_key, filter).await
29}
30
31pub struct LeaseExpiryScanner {
32 interval: Duration,
33 failures: FailureTracker,
34 filter: ScannerFilter,
35}
36
37impl LeaseExpiryScanner {
38 pub fn new(interval: Duration) -> Self {
39 Self::with_filter(interval, ScannerFilter::default())
40 }
41
42 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
45 Self {
46 interval,
47 failures: FailureTracker::new(),
48 filter,
49 }
50 }
51}
52
53impl Scanner for LeaseExpiryScanner {
54 fn name(&self) -> &'static str {
55 "lease_expiry"
56 }
57
58 fn interval(&self) -> Duration {
59 self.interval
60 }
61
62 fn filter(&self) -> &ScannerFilter {
63 &self.filter
64 }
65
66 async fn scan_partition(
67 &self,
68 client: &ferriskey::Client,
69 partition: u16,
70 ) -> ScanResult {
71 let p = Partition {
72 family: PartitionFamily::Execution,
73 index: partition,
74 };
75 let idx = IndexKeys::new(&p);
76 let lease_expiry_key = idx.lease_expiry();
77
78 let now_ms = match server_time_ms(client).await {
80 Ok(t) => t,
81 Err(e) => {
82 tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
83 return ScanResult { processed: 0, errors: 1 };
84 }
85 };
86
87 let expired: Vec<String> = match client
89 .cmd("ZRANGEBYSCORE")
90 .arg(&lease_expiry_key)
91 .arg("-inf")
92 .arg(now_ms.to_string().as_str())
93 .arg("LIMIT")
94 .arg("0")
95 .arg(BATCH_SIZE.to_string().as_str())
96 .execute()
97 .await
98 {
99 Ok(ids) => ids,
100 Err(e) => {
101 tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
102 return ScanResult { processed: 0, errors: 1 };
103 }
104 };
105
106 if partition == 0 {
107 self.failures.advance_cycle();
108 }
109
110 if expired.is_empty() {
111 return ScanResult { processed: 0, errors: 0 };
112 }
113
114 let mut processed: u32 = 0;
115 let mut errors: u32 = 0;
116
117 for eid_str in &expired {
121 if self.failures.should_skip(eid_str) {
122 continue;
123 }
124 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
125 continue;
126 }
127
128 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
129 let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
130 let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
131
132 let keys: [&str; 4] = [
133 &exec_core,
134 &lease_current,
135 &lease_expiry_key,
136 &lease_history,
137 ];
138
139 match client.fcall::<ferriskey::Value>(
140 "ff_mark_lease_expired_if_due",
141 &keys,
142 &[eid_str.as_str()],
143 ).await {
144 Ok(_) => {
145 self.failures.record_success(eid_str);
146 processed += 1;
147 }
148 Err(e) => {
149 tracing::warn!(
150 partition,
151 execution_id = eid_str.as_str(),
152 error = %e,
153 "lease_expiry: ff_mark_lease_expired_if_due failed"
154 );
155 self.failures.record_failure(eid_str, "lease_expiry");
156 errors += 1;
157 }
158 }
159 }
160
161 ScanResult { processed, errors }
162 }
163}
164
165pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
167 let result: Vec<String> = client
168 .cmd("TIME")
169 .execute()
170 .await?;
171 if result.len() < 2 {
172 return Err(ferriskey::Error::from((
173 ferriskey::ErrorKind::ClientError,
174 "TIME returned fewer than 2 elements",
175 )));
176 }
177 let secs: u64 = result[0].parse().map_err(|_| {
178 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
179 })?;
180 let micros: u64 = result[1].parse().map_err(|_| {
181 ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
182 })?;
183 Ok(secs * 1000 + micros / 1000)
184}