Skip to main content

ff_engine/scanner/
lease_expiry.rs

1//! Lease expiry scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lease_expiry` for each partition, finding leases
4//! whose `expires_at` score is <= now. For each, calls
5//! `FCALL ff_mark_lease_expired_if_due` which re-validates and atomically
6//! marks the execution as `lease_expired_reclaimable`.
7//!
8//! Reference: RFC-003 §Reclaim scan pattern, RFC-010 §6.1
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18/// Batch size per ZRANGEBYSCORE call.
19const BATCH_SIZE: u32 = 50;
20
21// ─── Postgres branch (wave 6c) ──────────────────────────────────────────
22#[cfg(feature = "postgres")]
23pub async fn scan_tick_pg(
24    pool: &ff_backend_postgres::PgPool,
25    partition_key: i16,
26    filter: &ff_core::backend::ScannerFilter,
27) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
28    ff_backend_postgres::reconcilers::lease_expiry::scan_tick(pool, partition_key, filter).await
29}
30
31pub struct LeaseExpiryScanner {
32    interval: Duration,
33    failures: FailureTracker,
34    filter: ScannerFilter,
35}
36
37impl LeaseExpiryScanner {
38    pub fn new(interval: Duration) -> Self {
39        Self::with_filter(interval, ScannerFilter::default())
40    }
41
42    /// Construct with a [`ScannerFilter`] applied per candidate
43    /// (issue #122). See [`ScannerFilter`] rustdoc for cost.
44    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
45        Self {
46            interval,
47            failures: FailureTracker::new(),
48            filter,
49        }
50    }
51}
52
53impl Scanner for LeaseExpiryScanner {
54    fn name(&self) -> &'static str {
55        "lease_expiry"
56    }
57
58    fn interval(&self) -> Duration {
59        self.interval
60    }
61
62    fn filter(&self) -> &ScannerFilter {
63        &self.filter
64    }
65
66    async fn scan_partition(
67        &self,
68        client: &ferriskey::Client,
69        partition: u16,
70    ) -> ScanResult {
71        let p = Partition {
72            family: PartitionFamily::Execution,
73            index: partition,
74        };
75        let idx = IndexKeys::new(&p);
76        let lease_expiry_key = idx.lease_expiry();
77
78        // Get current server time
79        let now_ms = match server_time_ms(client).await {
80            Ok(t) => t,
81            Err(e) => {
82                tracing::warn!(partition, error = %e, "lease_expiry: failed to get server time");
83                return ScanResult { processed: 0, errors: 1 };
84            }
85        };
86
87        // ZRANGEBYSCORE lease_expiry -inf now LIMIT 0 batch_size
88        let expired: Vec<String> = match client
89            .cmd("ZRANGEBYSCORE")
90            .arg(&lease_expiry_key)
91            .arg("-inf")
92            .arg(now_ms.to_string().as_str())
93            .arg("LIMIT")
94            .arg("0")
95            .arg(BATCH_SIZE.to_string().as_str())
96            .execute()
97            .await
98        {
99            Ok(ids) => ids,
100            Err(e) => {
101                tracing::warn!(partition, error = %e, "lease_expiry: ZRANGEBYSCORE failed");
102                return ScanResult { processed: 0, errors: 1 };
103            }
104        };
105
106        if partition == 0 {
107            self.failures.advance_cycle();
108        }
109
110        if expired.is_empty() {
111            return ScanResult { processed: 0, errors: 0 };
112        }
113
114        let mut processed: u32 = 0;
115        let mut errors: u32 = 0;
116
117        // For each expired execution, call ff_mark_lease_expired_if_due
118        // KEYS(4): exec_core, lease_current, lease_expiry_zset, lease_history
119        // ARGV(1): execution_id
120        for eid_str in &expired {
121            if self.failures.should_skip(eid_str) {
122                continue;
123            }
124            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
125                continue;
126            }
127
128            let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
129            let lease_current = format!("ff:exec:{}:{}:lease:current", p.hash_tag(), eid_str);
130            let lease_history = format!("ff:exec:{}:{}:lease:history", p.hash_tag(), eid_str);
131
132            let keys: [&str; 4] = [
133                &exec_core,
134                &lease_current,
135                &lease_expiry_key,
136                &lease_history,
137            ];
138
139            match client.fcall::<ferriskey::Value>(
140                "ff_mark_lease_expired_if_due",
141                &keys,
142                &[eid_str.as_str()],
143            ).await {
144                Ok(_) => {
145                    self.failures.record_success(eid_str);
146                    processed += 1;
147                }
148                Err(e) => {
149                    tracing::warn!(
150                        partition,
151                        execution_id = eid_str.as_str(),
152                        error = %e,
153                        "lease_expiry: ff_mark_lease_expired_if_due failed"
154                    );
155                    self.failures.record_failure(eid_str, "lease_expiry");
156                    errors += 1;
157                }
158            }
159        }
160
161        ScanResult { processed, errors }
162    }
163}
164
165/// Get server time in milliseconds via the TIME command.
166pub(crate) async fn server_time_ms(client: &ferriskey::Client) -> Result<u64, ferriskey::Error> {
167    let result: Vec<String> = client
168        .cmd("TIME")
169        .execute()
170        .await?;
171    if result.len() < 2 {
172        return Err(ferriskey::Error::from((
173            ferriskey::ErrorKind::ClientError,
174            "TIME returned fewer than 2 elements",
175        )));
176    }
177    let secs: u64 = result[0].parse().map_err(|_| {
178        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid seconds"))
179    })?;
180    let micros: u64 = result[1].parse().map_err(|_| {
181        ferriskey::Error::from((ferriskey::ErrorKind::ClientError, "TIME: invalid microseconds"))
182    })?;
183    Ok(secs * 1000 + micros / 1000)
184}