Skip to main content

ff_engine/scanner/
pending_wp_expiry.rs

1//! Pending waitpoint expiry scanner.
2//!
3//! Iterates `ff:idx:{p:N}:pending_waitpoint_expiry` for each partition,
4//! finding pending waitpoints whose `expires_at` score is <= now. For each,
5//! calls `FCALL ff_close_waitpoint` to close the expired pending waitpoint.
6//!
7//! Reference: RFC-004 §Pending waitpoint expiry scanner, RFC-010 §6.3
8
9use std::time::Duration;
10
11use ff_core::backend::ScannerFilter;
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 100;
18
19pub struct PendingWaitpointExpiryScanner {
20    interval: Duration,
21    failures: FailureTracker,
22    filter: ScannerFilter,
23}
24
25impl PendingWaitpointExpiryScanner {
26    pub fn new(interval: Duration) -> Self {
27        Self::with_filter(interval, ScannerFilter::default())
28    }
29
30    /// Construct with a [`ScannerFilter`] applied per candidate
31    /// (issue #122). The filter is resolved to the waitpoint's
32    /// owning execution via the waitpoint hash's `execution_id`
33    /// field, then applied via [`should_skip_candidate`].
34    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
35        Self {
36            interval,
37            failures: FailureTracker::new(),
38            filter,
39        }
40    }
41}
42
43impl Scanner for PendingWaitpointExpiryScanner {
44    fn name(&self) -> &'static str {
45        "pending_wp_expiry"
46    }
47
48    fn interval(&self) -> Duration {
49        self.interval
50    }
51
52    fn filter(&self) -> &ScannerFilter {
53        &self.filter
54    }
55
56    async fn scan_partition(
57        &self,
58        client: &ferriskey::Client,
59        partition: u16,
60    ) -> ScanResult {
61        let p = Partition {
62            family: PartitionFamily::Execution,
63            index: partition,
64        };
65        let idx = IndexKeys::new(&p);
66        let expiry_key = idx.pending_waitpoint_expiry();
67        let tag = p.hash_tag();
68
69        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
70            Ok(t) => t,
71            Err(e) => {
72                tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
73                return ScanResult { processed: 0, errors: 1 };
74            }
75        };
76
77        // ZRANGEBYSCORE pending_waitpoint_expiry -inf now LIMIT 0 batch_size
78        let expired: Vec<String> = match client
79            .cmd("ZRANGEBYSCORE")
80            .arg(&expiry_key)
81            .arg("-inf")
82            .arg(now_ms.to_string().as_str())
83            .arg("LIMIT")
84            .arg("0")
85            .arg(BATCH_SIZE.to_string().as_str())
86            .execute()
87            .await
88        {
89            Ok(ids) => ids,
90            Err(e) => {
91                tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
92                return ScanResult { processed: 0, errors: 1 };
93            }
94        };
95
96        if partition == 0 {
97            self.failures.advance_cycle();
98        }
99
100        if expired.is_empty() {
101            return ScanResult { processed: 0, errors: 0 };
102        }
103
104        let mut processed: u32 = 0;
105        let mut errors: u32 = 0;
106
107        for wp_id_str in &expired {
108            if self.failures.should_skip(wp_id_str) {
109                continue;
110            }
111            // Issue #122: this scanner iterates waitpoint IDs, not
112            // exec IDs. To apply the filter we resolve the owning
113            // execution via the waitpoint hash's `execution_id`
114            // field (one HGET), then call the shared helper. When
115            // the filter is a no-op this entire block is skipped.
116            if !self.filter.is_noop() {
117                let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
118                let eid: Option<String> = match client
119                    .cmd("HGET")
120                    .arg(&waitpoint_hash)
121                    .arg("execution_id")
122                    .execute()
123                    .await
124                {
125                    Ok(v) => v,
126                    Err(_) => {
127                        // Conservative skip on transport error.
128                        continue;
129                    }
130                };
131                let Some(eid) = eid else { continue };
132                if should_skip_candidate(client, &self.filter, partition, &eid).await {
133                    continue;
134                }
135            }
136
137            match close_expired_waitpoint(client, &tag, &idx, wp_id_str).await {
138                Ok(()) => {
139                    self.failures.record_success(wp_id_str);
140                    processed += 1;
141                }
142                Err(e) => {
143                    tracing::warn!(
144                        partition,
145                        waitpoint_id = wp_id_str.as_str(),
146                        error = %e,
147                        "pending_wp_expiry: ff_close_waitpoint failed"
148                    );
149                    self.failures.record_failure(wp_id_str, "pending_wp_expiry");
150                    errors += 1;
151                }
152            }
153        }
154
155        ScanResult { processed, errors }
156    }
157}
158
159/// Call ff_close_waitpoint for one expired pending waitpoint.
160///
161/// KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
162/// ARGV (2): waitpoint_id, reason
163///
164/// NOTE: exec_core is required by the function signature but the waitpoint
165/// record contains the execution_id. We read it from the waitpoint to
166/// construct the exec_core key. If the waitpoint doesn't exist, the Lua
167/// function handles it gracefully.
168async fn close_expired_waitpoint(
169    client: &ferriskey::Client,
170    tag: &str,
171    idx: &IndexKeys,
172    wp_id_str: &str,
173) -> Result<(), ferriskey::Error> {
174    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
175
176    // Read execution_id from waitpoint to construct exec_core key
177    let eid: Option<String> = client
178        .cmd("HGET")
179        .arg(&waitpoint_hash)
180        .arg("execution_id")
181        .execute()
182        .await?;
183
184    let eid = eid.unwrap_or_default();
185    let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
186    let pending_wp_expiry = idx.pending_waitpoint_expiry();
187
188    let keys: [&str; 3] = [
189        &exec_core,
190        &waitpoint_hash,
191        &pending_wp_expiry,
192    ];
193
194    let argv: [&str; 2] = [wp_id_str, "never_committed"];
195
196    let _: ferriskey::Value = client
197        .fcall("ff_close_waitpoint", &keys, &argv)
198        .await?;
199
200    Ok(())
201}