Skip to main content

ff_engine/scanner/
lease_expiry.rs

1//! Lease expiry scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lease_expiry` for each partition, finding leases
4//! whose `expires_at` score is <= now. For each, calls
5//! `FCALL ff_mark_lease_expired_if_due` which re-validates and atomically
6//! marks the execution as `lease_expired_reclaimable`.
7//!
8//! Reference: RFC-003 §Reclaim scan pattern, RFC-010 §6.1
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18/// Batch size per ZRANGEBYSCORE call.
19const BATCH_SIZE: u32 = 50;
20
21pub struct LeaseExpiryScanner {
22    interval: Duration,
23    failures: FailureTracker,
24    filter: ScannerFilter,
25}
26
27impl LeaseExpiryScanner {
28    pub fn new(interval: Duration) -> Self {
29        Self::with_filter(interval, ScannerFilter::default())
30    }
31
32    /// Construct with a [`ScannerFilter`] applied per candidate
33    /// (issue #122). See [`ScannerFilter`] rustdoc for cost.
34    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
35        Self {
36            interval,
37            failures: FailureTracker::new(),
38            filter,
39        }
40    }
41}
42
43impl Scanner for LeaseExpiryScanner {
44    fn name(&self) -> &'static str {
45        "lease_expiry"
46    }
47
48    fn interval(&self) -> Duration {
49        self.interval
50    }
51
52    fn filter(&self) -> &ScannerFilter {
53        &self.filter
54    }
55
56    async fn scan_partition(
57        &self,
58        client: &ferriskey::Client,
59        partition: u16,
60    ) -> ScanResult {
61        let p = Partition {
62            family: PartitionFamily::Execution,
63            index: partition,
64        };
65        let idx = IndexKeys::new(&p);
66        let lease_expiry_key = idx.lease_expiry();
67
68        // Get current server time
69        let now_ms = match server_time_ms(client).await {
70            Ok(t) => t,
71            Err(e) => {
72                tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
73                return ScanResult { processed: 0, errors: 1 };
74            }
75        };
76
77        // ZRANGEBYSCORE lease_expiry -inf now LIMIT 0 batch_size
78        let expired: Vec<String> = match client
79            .cmd("ZRANGEBYSCORE")
80            .arg(&lease_expiry_key)
81            .arg("-inf")
82            .arg(now_ms.to_string().as_str())
83            .arg("LIMIT")
84            .arg("0")
85            .arg(BATCH_SIZE.to_string().as_str())
86            .execute()
87            .await
88        {
89            Ok(ids) => ids,
90            Err(e) => {
91                tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
92                return ScanResult { processed: 0, errors: 1 };
93            }
94        };
95
96        if partition == 0 {
97            self.failures.advance_cycle();
98        }
99
100        if expired.is_empty() {
101            return ScanResult { processed: 0, errors: 0 };
102        }
103
104        let mut processed: u32 = 0;
105        let mut errors: u32 = 0;
106
107        // For each expired execution, call ff_mark_lease_expired_if_due
108        // KEYS(4): exec_core, lease_current, lease_expiry_zset, lease_history
109        // ARGV(1): execution_id
110        for eid_str in &expired {
111            if self.failures.should_skip(eid_str) {
112                continue;
113            }
114            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
115                continue;
116            }
117
118            let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
119            let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
120            let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
121
122            let keys: [&str; 4] = [
123                &exec_core,
124                &lease_current,
125                &lease_expiry_key,
126                &lease_history,
127            ];
128
129            match client.fcall::<ferriskey::Value>(
130                "ff_mark_lease_expired_if_due",
131                &keys,
132                &[eid_str.as_str()],
133            ).await {
134                Ok(_) => {
135                    self.failures.record_success(eid_str);
136                    processed += 1;
137                }
138                Err(e) => {
139                    tracing::warn!(
140                        partition,
141                        execution_id = eid_str.as_str(),
142                        error = %e,
143                        "lease_expiry: ff_mark_lease_expired_if_due failed"
144                    );
145                    self.failures.record_failure(eid_str, "lease_expiry");
146                    errors += 1;
147                }
148            }
149        }
150
151        ScanResult { processed, errors }
152    }
153}
154
155/// Get server time in milliseconds via the TIME command.
156pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
157    let result: Vec<String> = client
158        .cmd("TIME")
159        .execute()
160        .await?;
161    if result.len() < 2 {
162        return Err(ferriskey::Error::from((
163            ferriskey::ErrorKind::ClientError,
164            "TIME returned fewer than 2 elements",
165        )));
166    }
167    let secs: u64 = result[0].parse().map_err(|_| {
168        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
169    })?;
170    let micros: u64 = result[1].parse().map_err(|_| {
171        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
172    })?;
173    Ok(secs * 1000 + micros / 1000)
174}