Skip to main content

ff_engine/scanner/
lease_expiry.rs

1//! Lease expiry scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lease_expiry` for each partition, finding leases
4//! whose `expires_at` score is <= now. For each, calls
5//! `FCALL ff_mark_lease_expired_if_due` which re-validates and atomically
6//! marks the execution as `lease_expired_reclaimable`.
7//!
8//! Reference: RFC-003 §Reclaim scan pattern, RFC-010 §6.1
9
10use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{FailureTracker, ScanResult, Scanner};
16
17/// Batch size per ZRANGEBYSCORE call.
18const BATCH_SIZE: u32 = 50;
19
20pub struct LeaseExpiryScanner {
21    interval: Duration,
22    failures: FailureTracker,
23}
24
25impl LeaseExpiryScanner {
26    pub fn new(interval: Duration) -> Self {
27        Self { interval, failures: FailureTracker::new() }
28    }
29}
30
31impl Scanner for LeaseExpiryScanner {
32    fn name(&self) -> &'static str {
33        "lease_expiry"
34    }
35
36    fn interval(&self) -> Duration {
37        self.interval
38    }
39
40    async fn scan_partition(
41        &self,
42        client: &ferriskey::Client,
43        partition: u16,
44    ) -> ScanResult {
45        let p = Partition {
46            family: PartitionFamily::Execution,
47            index: partition,
48        };
49        let idx = IndexKeys::new(&p);
50        let lease_expiry_key = idx.lease_expiry();
51
52        // Get current server time
53        let now_ms = match server_time_ms(client).await {
54            Ok(t) => t,
55            Err(e) => {
56                tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
57                return ScanResult { processed: 0, errors: 1 };
58            }
59        };
60
61        // ZRANGEBYSCORE lease_expiry -inf now LIMIT 0 batch_size
62        let expired: Vec<String> = match client
63            .cmd("ZRANGEBYSCORE")
64            .arg(&lease_expiry_key)
65            .arg("-inf")
66            .arg(now_ms.to_string().as_str())
67            .arg("LIMIT")
68            .arg("0")
69            .arg(BATCH_SIZE.to_string().as_str())
70            .execute()
71            .await
72        {
73            Ok(ids) => ids,
74            Err(e) => {
75                tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
76                return ScanResult { processed: 0, errors: 1 };
77            }
78        };
79
80        if partition == 0 {
81            self.failures.advance_cycle();
82        }
83
84        if expired.is_empty() {
85            return ScanResult { processed: 0, errors: 0 };
86        }
87
88        let mut processed: u32 = 0;
89        let mut errors: u32 = 0;
90
91        // For each expired execution, call ff_mark_lease_expired_if_due
92        // KEYS(4): exec_core, lease_current, lease_expiry_zset, lease_history
93        // ARGV(1): execution_id
94        for eid_str in &expired {
95            if self.failures.should_skip(eid_str) {
96                continue;
97            }
98
99            let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
100            let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
101            let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
102
103            let keys: [&str; 4] = [
104                &exec_core,
105                &lease_current,
106                &lease_expiry_key,
107                &lease_history,
108            ];
109
110            match client.fcall::<ferriskey::Value>(
111                "ff_mark_lease_expired_if_due",
112                &keys,
113                &[eid_str.as_str()],
114            ).await {
115                Ok(_) => {
116                    self.failures.record_success(eid_str);
117                    processed += 1;
118                }
119                Err(e) => {
120                    tracing::warn!(
121                        partition,
122                        execution_id = eid_str.as_str(),
123                        error = %e,
124                        "lease_expiry: ff_mark_lease_expired_if_due failed"
125                    );
126                    self.failures.record_failure(eid_str, "lease_expiry");
127                    errors += 1;
128                }
129            }
130        }
131
132        ScanResult { processed, errors }
133    }
134}
135
136/// Get server time in milliseconds via the TIME command.
137pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
138    let result: Vec<String> = client
139        .cmd("TIME")
140        .execute()
141        .await?;
142    if result.len() < 2 {
143        return Err(ferriskey::Error::from((
144            ferriskey::ErrorKind::ClientError,
145            "TIME returned fewer than 2 elements",
146        )));
147    }
148    let secs: u64 = result[0].parse().map_err(|_| {
149        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
150    })?;
151    let micros: u64 = result[1].parse().map_err(|_| {
152        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
153    })?;
154    Ok(secs * 1000 + micros / 1000)
155}