Skip to main content

ff_engine/scanner/
pending_wp_expiry.rs

1//! Pending waitpoint expiry scanner.
2//!
3//! Iterates `ff:idx:{p:N}:pending_waitpoint_expiry` for each partition,
4//! finding pending waitpoints whose `expires_at` score is <= now. For each,
5//! calls `FCALL ff_close_waitpoint` to close the expired pending waitpoint.
6//!
7//! Reference: RFC-004 §Pending waitpoint expiry scanner, RFC-010 §6.3
8
9use std::time::Duration;
10
11use ff_core::keys::IndexKeys;
12use ff_core::partition::{Partition, PartitionFamily};
13
14use super::{FailureTracker, ScanResult, Scanner};
15
16const BATCH_SIZE: u32 = 100;
17
18pub struct PendingWaitpointExpiryScanner {
19    interval: Duration,
20    failures: FailureTracker,
21}
22
23impl PendingWaitpointExpiryScanner {
24    pub fn new(interval: Duration) -> Self {
25        Self { interval, failures: FailureTracker::new() }
26    }
27}
28
29impl Scanner for PendingWaitpointExpiryScanner {
30    fn name(&self) -> &'static str {
31        "pending_wp_expiry"
32    }
33
34    fn interval(&self) -> Duration {
35        self.interval
36    }
37
38    async fn scan_partition(
39        &self,
40        client: &ferriskey::Client,
41        partition: u16,
42    ) -> ScanResult {
43        let p = Partition {
44            family: PartitionFamily::Execution,
45            index: partition,
46        };
47        let idx = IndexKeys::new(&p);
48        let expiry_key = idx.pending_waitpoint_expiry();
49        let tag = p.hash_tag();
50
51        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
52            Ok(t) => t,
53            Err(e) => {
54                tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
55                return ScanResult { processed: 0, errors: 1 };
56            }
57        };
58
59        // ZRANGEBYSCORE pending_waitpoint_expiry -inf now LIMIT 0 batch_size
60        let expired: Vec<String> = match client
61            .cmd("ZRANGEBYSCORE")
62            .arg(&expiry_key)
63            .arg("-inf")
64            .arg(now_ms.to_string().as_str())
65            .arg("LIMIT")
66            .arg("0")
67            .arg(BATCH_SIZE.to_string().as_str())
68            .execute()
69            .await
70        {
71            Ok(ids) => ids,
72            Err(e) => {
73                tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
74                return ScanResult { processed: 0, errors: 1 };
75            }
76        };
77
78        if partition == 0 {
79            self.failures.advance_cycle();
80        }
81
82        if expired.is_empty() {
83            return ScanResult { processed: 0, errors: 0 };
84        }
85
86        let mut processed: u32 = 0;
87        let mut errors: u32 = 0;
88
89        for wp_id_str in &expired {
90            if self.failures.should_skip(wp_id_str) {
91                continue;
92            }
93
94            match close_expired_waitpoint(client, &tag, &idx, wp_id_str).await {
95                Ok(()) => {
96                    self.failures.record_success(wp_id_str);
97                    processed += 1;
98                }
99                Err(e) => {
100                    tracing::warn!(
101                        partition,
102                        waitpoint_id = wp_id_str.as_str(),
103                        error = %e,
104                        "pending_wp_expiry: ff_close_waitpoint failed"
105                    );
106                    self.failures.record_failure(wp_id_str, "pending_wp_expiry");
107                    errors += 1;
108                }
109            }
110        }
111
112        ScanResult { processed, errors }
113    }
114}
115
116/// Call ff_close_waitpoint for one expired pending waitpoint.
117///
118/// KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
119/// ARGV (2): waitpoint_id, reason
120///
121/// NOTE: exec_core is required by the function signature but the waitpoint
122/// record contains the execution_id. We read it from the waitpoint to
123/// construct the exec_core key. If the waitpoint doesn't exist, the Lua
124/// function handles it gracefully.
125async fn close_expired_waitpoint(
126    client: &ferriskey::Client,
127    tag: &str,
128    idx: &IndexKeys,
129    wp_id_str: &str,
130) -> Result<(), ferriskey::Error> {
131    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
132
133    // Read execution_id from waitpoint to construct exec_core key
134    let eid: Option<String> = client
135        .cmd("HGET")
136        .arg(&waitpoint_hash)
137        .arg("execution_id")
138        .execute()
139        .await?;
140
141    let eid = eid.unwrap_or_default();
142    let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
143    let pending_wp_expiry = idx.pending_waitpoint_expiry();
144
145    let keys: [&str; 3] = [
146        &exec_core,
147        &waitpoint_hash,
148        &pending_wp_expiry,
149    ];
150
151    let argv: [&str; 2] = [wp_id_str, "never_committed"];
152
153    let _: ferriskey::Value = client
154        .fcall("ff_close_waitpoint", &keys, &argv)
155        .await?;
156
157    Ok(())
158}