Skip to main content

ff_engine/scanner/
suspension_timeout.rs

1//! Suspension timeout scanner.
2//!
3//! Iterates `ff:idx:{p:N}:suspension_timeout` for each partition, finding
4//! suspended executions whose `timeout_at` score is <= now. For each, calls
5//! `FCALL ff_expire_suspension` which re-validates and applies the configured
6//! timeout behavior (fail/cancel/expire/auto_resume/escalate).
7//!
8//! Reference: RFC-004 §Timeout scanner, RFC-010 §6.2
9
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::keys::IndexKeys;
14use ff_core::partition::{Partition, PartitionFamily};
15
16use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
17
18const BATCH_SIZE: u32 = 50;
19
20pub struct SuspensionTimeoutScanner {
21    interval: Duration,
22    failures: FailureTracker,
23    filter: ScannerFilter,
24}
25
26impl SuspensionTimeoutScanner {
27    pub fn new(interval: Duration) -> Self {
28        Self::with_filter(interval, ScannerFilter::default())
29    }
30
31    /// Construct with a [`ScannerFilter`] applied per candidate
32    /// (issue #122).
33    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
34        Self {
35            interval,
36            failures: FailureTracker::new(),
37            filter,
38        }
39    }
40}
41
42impl Scanner for SuspensionTimeoutScanner {
43    fn name(&self) -> &'static str {
44        "suspension_timeout"
45    }
46
47    fn interval(&self) -> Duration {
48        self.interval
49    }
50
51    fn filter(&self) -> &ScannerFilter {
52        &self.filter
53    }
54
55    async fn scan_partition(
56        &self,
57        client: &ferriskey::Client,
58        partition: u16,
59    ) -> ScanResult {
60        let p = Partition {
61            family: PartitionFamily::Execution,
62            index: partition,
63        };
64        let idx = IndexKeys::new(&p);
65        let timeout_key = idx.suspension_timeout();
66        let tag = p.hash_tag();
67
68        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
69            Ok(t) => t,
70            Err(e) => {
71                tracing::warn!(partition, error = %e, "suspension_timeout: failed to get server time");
72                return ScanResult { processed: 0, errors: 1 };
73            }
74        };
75
76        // ZRANGEBYSCORE suspension_timeout -inf now LIMIT 0 batch_size
77        let timed_out: Vec<String> = match client
78            .cmd("ZRANGEBYSCORE")
79            .arg(&timeout_key)
80            .arg("-inf")
81            .arg(now_ms.to_string().as_str())
82            .arg("LIMIT")
83            .arg("0")
84            .arg(BATCH_SIZE.to_string().as_str())
85            .execute()
86            .await
87        {
88            Ok(ids) => ids,
89            Err(e) => {
90                tracing::warn!(partition, error = %e, "suspension_timeout: ZRANGEBYSCORE failed");
91                return ScanResult { processed: 0, errors: 1 };
92            }
93        };
94
95        if partition == 0 {
96            self.failures.advance_cycle();
97        }
98
99        if timed_out.is_empty() {
100            return ScanResult { processed: 0, errors: 0 };
101        }
102
103        let mut processed: u32 = 0;
104        let mut errors: u32 = 0;
105
106        for eid_str in &timed_out {
107            if self.failures.should_skip(eid_str) {
108                continue;
109            }
110            if should_skip_candidate(client, &self.filter, partition, eid_str).await {
111                continue;
112            }
113
114            match expire_suspension(client, &tag, &idx, eid_str).await {
115                Ok(()) => {
116                    self.failures.record_success(eid_str);
117                    processed += 1;
118                }
119                Err(e) => {
120                    tracing::warn!(
121                        partition,
122                        execution_id = eid_str.as_str(),
123                        error = %e,
124                        "suspension_timeout: ff_expire_suspension failed"
125                    );
126                    self.failures.record_failure(eid_str, "suspension_timeout");
127                    errors += 1;
128                }
129            }
130        }
131
132        ScanResult { processed, errors }
133    }
134}
135
136/// Call ff_expire_suspension for one execution.
137///
138/// KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
139///            attempt_hash, stream_meta, suspension_timeout_zset,
140///            suspended_zset, terminal_zset, eligible_zset, delayed_zset,
141///            lease_history
142/// ARGV (1): execution_id
143///
144/// NOTE: waitpoint_hash, wp_condition, attempt_hash, and stream_meta use
145/// placeholder values (attempt_index=0, waitpoint_id=placeholder).
146/// The Lua reads current_waitpoint_id and current_attempt_index from
147/// exec_core/suspension_current to find the actual keys.
148/// All keys share the same {p:N} hash tag → same cluster slot.
149async fn expire_suspension(
150    client: &ferriskey::Client,
151    tag: &str,
152    idx: &IndexKeys,
153    eid_str: &str,
154) -> Result<(), ferriskey::Error> {
155    // Entity-level keys — we need exec_core and suspension_current.
156    // For waitpoint/attempt, we need the real IDs. Read them from exec_core.
157    let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
158    let suspension_current = format!("ff:exec:{}:{}:suspension:current", tag, eid_str);
159
160    // Read current_waitpoint_id and current_attempt_index from exec_core
161    let wp_id: Option<String> = client
162        .cmd("HGET")
163        .arg(&exec_core)
164        .arg("current_waitpoint_id")
165        .execute()
166        .await?;
167    let att_idx: Option<String> = client
168        .cmd("HGET")
169        .arg(&exec_core)
170        .arg("current_attempt_index")
171        .execute()
172        .await?;
173
174    let wp_id = wp_id.unwrap_or_default();
175    let att_idx = att_idx.unwrap_or_else(|| "0".to_string());
176
177    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id);
178    let wp_condition = format!("ff:wp:{}:{}:condition", tag, wp_id);
179    let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
180    let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
181
182    // Partition-level index keys
183    let suspension_timeout = idx.suspension_timeout();
184    // We need lane for suspended_zset + terminal + eligible + delayed.
185    // Read lane from exec_core.
186    let lane: Option<String> = client
187        .cmd("HGET")
188        .arg(&exec_core)
189        .arg("lane_id")
190        .execute()
191        .await?;
192    let lane_str = lane.unwrap_or_else(|| "default".to_string());
193    let lane_id = ff_core::types::LaneId::new(&lane_str);
194
195    let suspended_zset = idx.lane_suspended(&lane_id);
196    let terminal_zset = idx.lane_terminal(&lane_id);
197    let eligible_zset = idx.lane_eligible(&lane_id);
198    let delayed_zset = idx.lane_delayed(&lane_id);
199    let lease_history = format!("ff:exec:{}:{}:lease:history", tag, eid_str);
200
201    let keys: [&str; 12] = [
202        &exec_core,           // 1
203        &suspension_current,  // 2
204        &waitpoint_hash,      // 3
205        &wp_condition,        // 4
206        &attempt_hash,        // 5
207        &stream_meta,         // 6
208        &suspension_timeout,  // 7
209        &suspended_zset,      // 8
210        &terminal_zset,       // 9
211        &eligible_zset,       // 10
212        &delayed_zset,        // 11
213        &lease_history,       // 12
214    ];
215
216    let argv: [&str; 1] = [eid_str];
217
218    let _: ferriskey::Value = client
219        .fcall("ff_expire_suspension", &keys, &argv)
220        .await?;
221
222    Ok(())
223}