Skip to main content

ff_engine/scanner/
suspension_timeout.rs

1//! Suspension timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:suspension_timeout` for each partition, finding
4//! suspended executions whose `timeout_at` score is <= now. For each, calls
5//! `FCALL ff_expire_suspension` which re-validates and applies the configured
6//! timeout behavior (fail/cancel/expire/auto_resume/escalate).
7//!
8//! Reference: RFC-004 §Timeout scanner, RFC-010 §6.2
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20// ─── Postgres branch (wave 6c) ──────────────────────────────────────────
21#[cfg(feature = "postgres")]
22pub async fn scan_tick_pg(
23    pool: &ff_backend_postgres::PgPool,
24    partition_key: i16,
25    filter: &ff_core::backend::ScannerFilter,
26) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
27    ff_backend_postgres::reconcilers::suspension_timeout::scan_tick(pool, partition_key, filter)
28        .await
29}
30
31pub struct SuspensionTimeoutScanner {
32    interval: Duration,
33    failures: FailureTracker,
34    filter: ScannerFilter,
35}
36
37impl SuspensionTimeoutScanner {
38    pub fn new(interval: Duration) -> Self {
39        Self::with_filter(interval, ScannerFilter::default())
40    }
41
42    /// Construct with a [`ScannerFilter`] applied per candidate
43    /// (issue #122).
44    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
45        Self {
46            interval,
47            failures: FailureTracker::new(),
48            filter,
49        }
50    }
51}
52
53impl Scanner for SuspensionTimeoutScanner {
54    fn name(&self) -> &'static str {
55        "suspension_timeout"
56    }
57
58    fn interval(&self) -> Duration {
59        self.interval
60    }
61
62    fn filter(&self) -> &ScannerFilter {
63        &self.filter
64    }
65
66    async fn scan_partition(
67        &self,
68        client: &ferriskey::Client,
69        partition: u16,
70    ) -> ScanResult {
71        let p = Partition {
72            family: PartitionFamily::Execution,
73            index: partition,
74        };
75        let idx = IndexKeys::new(&p);
76        let timeout_key = idx.suspension_timeout();
77        let tag = p.hash_tag();
78
79        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
80            Ok(t) => t,
81            Err(e) => {
82                tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
83                return ScanResult { processed: 0, errors: 1 };
84            }
85        };
86
87        // ZRANGEBYSCORE suspension_timeout -inf now LIMIT 0 batch_size
88        let timed_out: Vec<String> = match client
89            .cmd("ZRANGEBYSCORE")
90            .arg(&timeout_key)
91            .arg("-inf")
92            .arg(now_ms.to_string().as_str())
93            .arg("LIMIT")
94            .arg("0")
95            .arg(BATCH_SIZE.to_string().as_str())
96            .execute()
97            .await
98        {
99            Ok(ids) => ids,
100            Err(e) => {
101                tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
102                return ScanResult { processed: 0, errors: 1 };
103            }
104        };
105
106        if partition == 0 {
107            self.failures.advance_cycle();
108        }
109
110        if timed_out.is_empty() {
111            return ScanResult { processed: 0, errors: 0 };
112        }
113
114        let mut processed: u32 = 0;
115        let mut errors: u32 = 0;
116
117        for eid_str in &timed_out {
118            if self.failures.should_skip(eid_str) {
119                continue;
120            }
121            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
122                continue;
123            }
124
125            match expire_suspension(client, &tag, &idx, eid_str).await {
126                Ok(()) => {
127                    self.failures.record_success(eid_str);
128                    processed += 1;
129                }
130                Err(e) => {
131                    tracing::warn!(
132                        partition,
133                        execution_id = eid_str.as_str(),
134                        error = %e,
135                        "suspension_timeout: ff_expire_suspension failed"
136                    );
137                    self.failures.record_failure(eid_str, "suspension_timeout");
138                    errors += 1;
139                }
140            }
141        }
142
143        ScanResult { processed, errors }
144    }
145}
146
147/// Call ff_expire_suspension for one execution.
148///
149/// KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
150///            attempt_hash, stream_meta, suspension_timeout_zset,
151///            suspended_zset, terminal_zset, eligible_zset, delayed_zset,
152///            lease_history
153/// ARGV (1): execution_id
154///
155/// NOTE: waitpoint_hash, wp_condition, attempt_hash, and stream_meta use
156/// placeholder values (attempt_index=0, waitpoint_id=placeholder).
157/// The Lua reads current_waitpoint_id and current_attempt_index from
158/// exec_core/suspension_current to find the actual keys.
159/// All keys share the same {p:N} hash tag → same cluster slot.
160async fn expire_suspension(
161    client: &ferriskey::Client,
162    tag: &str,
163    idx: &IndexKeys,
164    eid_str: &str,
165) -> Result<(), ferriskey::Error> {
166    // Entity-level keys — we need exec_core and suspension_current.
167    // For waitpoint/attempt, we need the real IDs. Read them from exec_core.
168    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
169    let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
170
171    // Read current_waitpoint_id and current_attempt_index from exec_core
172    let wp_id: Option<String> = client
173        .cmd("HGET")
174        .arg(&exec_core)
175        .arg("current_waitpoint_id")
176        .execute()
177        .await?;
178    let att_idx: Option<String> = client
179        .cmd("HGET")
180        .arg(&exec_core)
181        .arg("current_attempt_index")
182        .execute()
183        .await?;
184
185    let wp_id = wp_id.unwrap_or_default();
186    let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
187
188    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
189    let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
190    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
191    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
192
193    // Partition-level index keys
194    let suspension_timeout = idx.suspension_timeout();
195    // We need lane for suspended_zset + terminal + eligible + delayed.
196    // Read lane from exec_core.
197    let lane: Option<String> = client
198        .cmd("HGET")
199        .arg(&exec_core)
200        .arg("lane_id")
201        .execute()
202        .await?;
203    let lane_str = lane.unwrap_or_else(|| "default".to_string());
204    let lane_id = ff_core::types::LaneId::new(&lane_str);
205
206    let suspended_zset = idx.lane_suspended(&lane_id);
207    let terminal_zset = idx.lane_terminal(&lane_id);
208    let eligible_zset = idx.lane_eligible(&lane_id);
209    let delayed_zset = idx.lane_delayed(&lane_id);
210    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
211
212    let keys: [&str; 12] = [
213        &exec_core,           // 1
214        &suspension_current,  // 2
215        &waitpoint_hash,      // 3
216        &wp_condition,        // 4
217        &attempt_hash,        // 5
218        &stream_meta,         // 6
219        &suspension_timeout,  // 7
220        &suspended_zset,      // 8
221        &terminal_zset,       // 9
222        &eligible_zset,       // 10
223        &delayed_zset,        // 11
224        &lease_history,       // 12
225    ];
226
227    let argv: [&str; 1] = [eid_str];
228
229    let _: ferriskey::Value = client
230        .fcall("ff_expire_suspension", &keys, &argv)
231        .await?;
232
233    Ok(())
234}