Skip to main content

ff_engine/scanner/
suspension_timeout.rs

1//! Suspension timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:suspension_timeout` for each partition, finding
4//! suspended executions whose `timeout_at` score is <= now. For each, calls
5//! `FCALL ff_expire_suspension` which re-validates and applies the configured
6//! timeout behavior (fail/cancel/expire/auto_resume/escalate).
7//!
8//! Reference: RFC-004 §Timeout scanner, RFC-010 §6.2
9
10use std::time::Duration;
11
12use ff_core::keys::IndexKeys;
13use ff_core::partition::{Partition, PartitionFamily};
14
15use super::{FailureTracker, ScanResult, Scanner};
16
17const BATCH_SIZE: u32 = 50;
18
19pub struct SuspensionTimeoutScanner {
20    interval: Duration,
21    failures: FailureTracker,
22}
23
24impl SuspensionTimeoutScanner {
25    pub fn new(interval: Duration) -> Self {
26        Self { interval, failures: FailureTracker::new() }
27    }
28}
29
30impl Scanner for SuspensionTimeoutScanner {
31    fn name(&self) -> &'static str {
32        "suspension_timeout"
33    }
34
35    fn interval(&self) -> Duration {
36        self.interval
37    }
38
39    async fn scan_partition(
40        &self,
41        client: &ferriskey::Client,
42        partition: u16,
43    ) -> ScanResult {
44        let p = Partition {
45            family: PartitionFamily::Execution,
46            index: partition,
47        };
48        let idx = IndexKeys::new(&p);
49        let timeout_key = idx.suspension_timeout();
50        let tag = p.hash_tag();
51
52        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
53            Ok(t) => t,
54            Err(e) => {
55                tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
56                return ScanResult { processed: 0, errors: 1 };
57            }
58        };
59
60        // ZRANGEBYSCORE suspension_timeout -inf now LIMIT 0 batch_size
61        let timed_out: Vec<String> = match client
62            .cmd("ZRANGEBYSCORE")
63            .arg(&timeout_key)
64            .arg("-inf")
65            .arg(now_ms.to_string().as_str())
66            .arg("LIMIT")
67            .arg("0")
68            .arg(BATCH_SIZE.to_string().as_str())
69            .execute()
70            .await
71        {
72            Ok(ids) => ids,
73            Err(e) => {
74                tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
75                return ScanResult { processed: 0, errors: 1 };
76            }
77        };
78
79        if partition == 0 {
80            self.failures.advance_cycle();
81        }
82
83        if timed_out.is_empty() {
84            return ScanResult { processed: 0, errors: 0 };
85        }
86
87        let mut processed: u32 = 0;
88        let mut errors: u32 = 0;
89
90        for eid_str in &timed_out {
91            if self.failures.should_skip(eid_str) {
92                continue;
93            }
94
95            match expire_suspension(client, &tag, &idx, eid_str).await {
96                Ok(()) => {
97                    self.failures.record_success(eid_str);
98                    processed += 1;
99                }
100                Err(e) => {
101                    tracing::warn!(
102                        partition,
103                        execution_id = eid_str.as_str(),
104                        error = %e,
105                        "suspension_timeout: ff_expire_suspension failed"
106                    );
107                    self.failures.record_failure(eid_str, "suspension_timeout");
108                    errors += 1;
109                }
110            }
111        }
112
113        ScanResult { processed, errors }
114    }
115}
116
117/// Call ff_expire_suspension for one execution.
118///
119/// KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
120///            attempt_hash, stream_meta, suspension_timeout_zset,
121///            suspended_zset, terminal_zset, eligible_zset, delayed_zset,
122///            lease_history
123/// ARGV (1): execution_id
124///
125/// NOTE: waitpoint_hash, wp_condition, attempt_hash, and stream_meta use
126/// placeholder values (attempt_index=0, waitpoint_id=placeholder).
127/// The Lua reads current_waitpoint_id and current_attempt_index from
128/// exec_core/suspension_current to find the actual keys.
129/// All keys share the same {p:N} hash tag → same cluster slot.
130async fn expire_suspension(
131    client: &ferriskey::Client,
132    tag: &str,
133    idx: &IndexKeys,
134    eid_str: &str,
135) -> Result<(), ferriskey::Error> {
136    // Entity-level keys — we need exec_core and suspension_current.
137    // For waitpoint/attempt, we need the real IDs. Read them from exec_core.
138    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
139    let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
140
141    // Read current_waitpoint_id and current_attempt_index from exec_core
142    let wp_id: Option<String> = client
143        .cmd("HGET")
144        .arg(&exec_core)
145        .arg("current_waitpoint_id")
146        .execute()
147        .await?;
148    let att_idx: Option<String> = client
149        .cmd("HGET")
150        .arg(&exec_core)
151        .arg("current_attempt_index")
152        .execute()
153        .await?;
154
155    let wp_id = wp_id.unwrap_or_default();
156    let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
157
158    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
159    let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
160    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
161    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
162
163    // Partition-level index keys
164    let suspension_timeout = idx.suspension_timeout();
165    // We need lane for suspended_zset + terminal + eligible + delayed.
166    // Read lane from exec_core.
167    let lane: Option<String> = client
168        .cmd("HGET")
169        .arg(&exec_core)
170        .arg("lane_id")
171        .execute()
172        .await?;
173    let lane_str = lane.unwrap_or_else(|| "default".to_string());
174    let lane_id = ff_core::types::LaneId::new(&lane_str);
175
176    let suspended_zset = idx.lane_suspended(&lane_id);
177    let terminal_zset = idx.lane_terminal(&lane_id);
178    let eligible_zset = idx.lane_eligible(&lane_id);
179    let delayed_zset = idx.lane_delayed(&lane_id);
180    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
181
182    let keys: [&str; 12] = [
183        &exec_core,           // 1
184        &suspension_current,  // 2
185        &waitpoint_hash,      // 3
186        &wp_condition,        // 4
187        &attempt_hash,        // 5
188        &stream_meta,         // 6
189        &suspension_timeout,  // 7
190        &suspended_zset,      // 8
191        &terminal_zset,       // 9
192        &eligible_zset,       // 10
193        &delayed_zset,        // 11
194        &lease_history,       // 12
195    ];
196
197    let argv: [&str; 1] = [eid_str];
198
199    let _: ferriskey::Value = client
200        .fcall("ff_expire_suspension", &keys, &argv)
201        .await?;
202
203    Ok(())
204}