Skip to main content

ff_engine/scanner/
suspension_timeout.rs

1//! Suspension timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:suspension_timeout` for each partition, finding
4//! suspended executions whose `timeout_at` score is <= now. For each, calls
5//! `FCALL ff_expire_suspension` which re-validates and applies the configured
6//! timeout behavior (fail/cancel/expire/auto_resume/escalate).
7//!
8//! Reference: RFC-004 §Timeout scanner, RFC-010 §6.2
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use ff_core::backend::ScannerFilter;
14use ff_core::engine_backend::EngineBackend;
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::{ExecutionId, TimestampMs};
18
19use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
20
21const BATCH_SIZE: u32 = 50;
22
23// ─── Postgres branch (wave 6c) ──────────────────────────────────────────
24#[cfg(feature = "postgres")]
25pub async fn scan_tick_pg(
26    pool: &ff_backend_postgres::PgPool,
27    partition_key: i16,
28    filter: &ff_core::backend::ScannerFilter,
29) -> Result<ff_backend_postgres::reconcilers::ScanReport, ff_core::engine_error::EngineError> {
30    ff_backend_postgres::reconcilers::suspension_timeout::scan_tick(pool, partition_key, filter)
31        .await
32}
33
34pub struct SuspensionTimeoutScanner {
35    interval: Duration,
36    failures: FailureTracker,
37    filter: ScannerFilter,
38    backend: Option<Arc<dyn EngineBackend>>,
39}
40
41impl SuspensionTimeoutScanner {
42    pub fn new(interval: Duration) -> Self {
43        Self::with_filter(interval, ScannerFilter::default())
44    }
45
46    /// Construct with a [`ScannerFilter`] applied per candidate
47    /// (issue #122).
48    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
49        Self {
50            interval,
51            failures: FailureTracker::new(),
52            filter,
53            backend: None,
54        }
55    }
56
57    /// PR-7b Cluster 1: wire an `EngineBackend` for trait-routed FCALLs.
58    pub fn with_filter_and_backend(
59        interval: Duration,
60        filter: ScannerFilter,
61        backend: Arc<dyn EngineBackend>,
62    ) -> Self {
63        Self {
64            interval,
65            failures: FailureTracker::new(),
66            filter,
67            backend: Some(backend),
68        }
69    }
70}
71
72impl Scanner for SuspensionTimeoutScanner {
73    fn name(&self) -> &'static str {
74        "suspension_timeout"
75    }
76
77    fn interval(&self) -> Duration {
78        self.interval
79    }
80
81    fn filter(&self) -> &ScannerFilter {
82        &self.filter
83    }
84
85    async fn scan_partition(
86        &self,
87        client: &ferriskey::Client,
88        partition: u16,
89    ) -> ScanResult {
90        let p = Partition {
91            family: PartitionFamily::Execution,
92            index: partition,
93        };
94        let idx = IndexKeys::new(&p);
95        let timeout_key = idx.suspension_timeout();
96        let tag = p.hash_tag();
97
98        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
99            b.server_time_ms().await.map_err(|e| e.to_string())
100        } else {
101            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
102        };
103        let now_ms = match now_ms_res {
104            Ok(t) => t,
105            Err(e) => {
106                tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
107                return ScanResult { processed: 0, errors: 1 };
108            }
109        };
110
111        // ZRANGEBYSCORE suspension_timeout -inf now LIMIT 0 batch_size
112        let timed_out: Vec<String> = match client
113            .cmd("ZRANGEBYSCORE")
114            .arg(&timeout_key)
115            .arg("-inf")
116            .arg(now_ms.to_string().as_str())
117            .arg("LIMIT")
118            .arg("0")
119            .arg(BATCH_SIZE.to_string().as_str())
120            .execute()
121            .await
122        {
123            Ok(ids) => ids,
124            Err(e) => {
125                tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
126                return ScanResult { processed: 0, errors: 1 };
127            }
128        };
129
130        if partition == 0 {
131            self.failures.advance_cycle();
132        }
133
134        if timed_out.is_empty() {
135            return ScanResult { processed: 0, errors: 0 };
136        }
137
138        let mut processed: u32 = 0;
139        let mut errors: u32 = 0;
140
141        for eid_str in &timed_out {
142            if self.failures.should_skip(eid_str) {
143                continue;
144            }
145            if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
146                continue;
147            }
148
149            let res = if let Some(ref backend) = self.backend {
150                let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
151                backend
152                    .expire_suspension(p, &eid, TimestampMs(now_ms as i64))
153                    .await
154                    .map_err(|e| e.to_string())
155            } else {
156                expire_suspension(client, &tag, &idx, eid_str)
157                    .await
158                    .map_err(|e| e.to_string())
159            };
160            match res {
161                Ok(()) => {
162                    self.failures.record_success(eid_str);
163                    processed += 1;
164                }
165                Err(e) => {
166                    tracing::warn!(
167                        partition,
168                        execution_id = eid_str.as_str(),
169                        error = %e,
170                        "suspension_timeout: expire_suspension failed"
171                    );
172                    self.failures.record_failure(eid_str, "suspension_timeout");
173                    errors += 1;
174                }
175            }
176        }
177
178        ScanResult { processed, errors }
179    }
180}
181
182/// Call ff_expire_suspension for one execution.
183///
184/// KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
185///            attempt_hash, stream_meta, suspension_timeout_zset,
186///            suspended_zset, terminal_zset, eligible_zset, delayed_zset,
187///            lease_history
188/// ARGV (1): execution_id
189///
190/// NOTE: waitpoint_hash, wp_condition, attempt_hash, and stream_meta use
191/// placeholder values (attempt_index=0, waitpoint_id=placeholder).
192/// The Lua reads current_waitpoint_id and current_attempt_index from
193/// exec_core/suspension_current to find the actual keys.
194/// All keys share the same {p:N} hash tag → same cluster slot.
195async fn expire_suspension(
196    client: &ferriskey::Client,
197    tag: &str,
198    idx: &IndexKeys,
199    eid_str: &str,
200) -> Result<(), ferriskey::Error> {
201    // Entity-level keys — we need exec_core and suspension_current.
202    // For waitpoint/attempt, we need the real IDs. Read them from exec_core.
203    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
204    let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
205
206    // Read current_waitpoint_id and current_attempt_index from exec_core
207    let wp_id: Option<String> = client
208        .cmd("HGET")
209        .arg(&exec_core)
210        .arg("current_waitpoint_id")
211        .execute()
212        .await?;
213    let att_idx: Option<String> = client
214        .cmd("HGET")
215        .arg(&exec_core)
216        .arg("current_attempt_index")
217        .execute()
218        .await?;
219
220    let wp_id = wp_id.unwrap_or_default();
221    let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
222
223    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
224    let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
225    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
226    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
227
228    // Partition-level index keys
229    let suspension_timeout = idx.suspension_timeout();
230    // We need lane for suspended_zset + terminal + eligible + delayed.
231    // Read lane from exec_core.
232    let lane: Option<String> = client
233        .cmd("HGET")
234        .arg(&exec_core)
235        .arg("lane_id")
236        .execute()
237        .await?;
238    let lane_str = lane.unwrap_or_else(|| "default".to_string());
239    let lane_id = ff_core::types::LaneId::new(&lane_str);
240
241    let suspended_zset = idx.lane_suspended(&lane_id);
242    let terminal_zset = idx.lane_terminal(&lane_id);
243    let eligible_zset = idx.lane_eligible(&lane_id);
244    let delayed_zset = idx.lane_delayed(&lane_id);
245    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
246
247    let keys: [&str; 12] = [
248        &exec_core,           // 1
249        &suspension_current,  // 2
250        &waitpoint_hash,      // 3
251        &wp_condition,        // 4
252        &attempt_hash,        // 5
253        &stream_meta,         // 6
254        &suspension_timeout,  // 7
255        &suspended_zset,      // 8
256        &terminal_zset,       // 9
257        &eligible_zset,       // 10
258        &delayed_zset,        // 11
259        &lease_history,       // 12
260    ];
261
262    let argv: [&str; 1] = [eid_str];
263
264    let _: ferriskey::Value = client
265        .fcall("ff_expire_suspension", &keys, &argv)
266        .await?;
267
268    Ok(())
269}