Skip to main content

ff_engine/scanner/
delayed_promoter.rs

1//! Delayed execution promoter scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lane:<lane_id>:delayed` for each partition,
4//! finding executions whose `delay_until` score is <= now. For each,
5//! calls `FCALL ff_promote_delayed` to move them from delayed to eligible.
6//!
7//! Note: ff_promote_delayed is Phase 2 Lua. For Phase 1, this scanner
8//! discovers due executions and logs them. The actual promotion requires
9//! knowing the lane_id (embedded in the delayed ZSET key), which in turn
10//! requires knowing which lanes exist. Phase 1 uses a single "default" lane.
11//!
12//! Reference: RFC-010 ยง6, function #27
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use ff_core::backend::ScannerFilter;
18use ff_core::engine_backend::EngineBackend;
19use ff_core::keys::IndexKeys;
20use ff_core::partition::{Partition, PartitionFamily};
21use ff_core::types::{ExecutionId, LaneId, TimestampMs};
22
23use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
24
25const BATCH_SIZE: u32 = 50;
26
27pub struct DelayedPromoter {
28    interval: Duration,
29    /// Lanes to scan. Phase 1: just "default".
30    lanes: Vec<LaneId>,
31    failures: FailureTracker,
32    filter: ScannerFilter,
33    /// PR-7b Cluster 1: trait-dispatch target. See
34    /// [`super::lease_expiry::LeaseExpiryScanner::backend`] rustdoc.
35    backend: Option<Arc<dyn EngineBackend>>,
36}
37
38impl DelayedPromoter {
39    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
40        Self::with_filter(interval, lanes, ScannerFilter::default())
41    }
42
43    /// Construct with a [`ScannerFilter`] applied per candidate
44    /// (issue #122).
45    pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
46        Self {
47            interval,
48            lanes,
49            failures: FailureTracker::new(),
50            filter,
51            backend: None,
52        }
53    }
54
55    /// PR-7b Cluster 1: wire an `EngineBackend` for trait-routed FCALLs.
56    pub fn with_filter_and_backend(
57        interval: Duration,
58        lanes: Vec<LaneId>,
59        filter: ScannerFilter,
60        backend: Arc<dyn EngineBackend>,
61    ) -> Self {
62        Self {
63            interval,
64            lanes,
65            failures: FailureTracker::new(),
66            filter,
67            backend: Some(backend),
68        }
69    }
70}
71
72impl Scanner for DelayedPromoter {
73    fn name(&self) -> &'static str {
74        "delayed_promoter"
75    }
76
77    fn interval(&self) -> Duration {
78        self.interval
79    }
80
81    fn filter(&self) -> &ScannerFilter {
82        &self.filter
83    }
84
85    async fn scan_partition(
86        &self,
87        client: &ferriskey::Client,
88        partition: u16,
89    ) -> ScanResult {
90        let p = Partition {
91            family: PartitionFamily::Execution,
92            index: partition,
93        };
94        let idx = IndexKeys::new(&p);
95
96        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
97            b.server_time_ms().await.map_err(|e| e.to_string())
98        } else {
99            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
100        };
101        let now_ms = match now_ms_res {
102            Ok(t) => t,
103            Err(e) => {
104                tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
105                return ScanResult { processed: 0, errors: 1 };
106            }
107        };
108
109        if partition == 0 {
110            self.failures.advance_cycle();
111        }
112
113        let mut total_processed: u32 = 0;
114        let mut total_errors: u32 = 0;
115
116        for lane in &self.lanes {
117            let delayed_key = idx.lane_delayed(lane);
118            let eligible_key = idx.lane_eligible(lane);
119
120            // ZRANGEBYSCORE delayed -inf now LIMIT 0 batch_size
121            let due: Vec<String> = match client
122                .cmd("ZRANGEBYSCORE")
123                .arg(&delayed_key)
124                .arg("-inf")
125                .arg(now_ms.to_string().as_str())
126                .arg("LIMIT")
127                .arg("0")
128                .arg(BATCH_SIZE.to_string().as_str())
129                .execute()
130                .await
131            {
132                Ok(ids) => ids,
133                Err(e) => {
134                    tracing::warn!(
135                        partition, lane = %lane, error = %e,
136                        "delayed_promoter: ZRANGEBYSCORE failed"
137                    );
138                    total_errors += 1;
139                    continue;
140                }
141            };
142
143            if due.is_empty() {
144                continue;
145            }
146
147            // For each due execution, call ff_promote_delayed
148            // KEYS(3): exec_core, delayed_zset, eligible_zset
149            // ARGV(2): execution_id, now_ms
150            for eid_str in &due {
151                if self.failures.should_skip(eid_str) {
152                    continue;
153                }
154                if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
155                    continue;
156                }
157
158                let res = if let Some(ref backend) = self.backend {
159                    let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
160                    backend
161                        .promote_delayed(p, lane, &eid, TimestampMs(now_ms as i64))
162                        .await
163                        .map_err(|e| e.to_string())
164                } else {
165                    let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
166                    let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
167                    let now_str = now_ms.to_string();
168                    let argv: [&str; 2] = [eid_str.as_str(), &now_str];
169                    client
170                        .fcall::<ferriskey::Value>("ff_promote_delayed", &keys, &argv)
171                        .await
172                        .map(|_: ferriskey::Value| ())
173                        .map_err(|e| e.to_string())
174                };
175                match res {
176                    Ok(()) => {
177                        self.failures.record_success(eid_str);
178                        total_processed += 1;
179                    }
180                    Err(e) => {
181                        tracing::warn!(
182                            partition,
183                            execution_id = eid_str.as_str(),
184                            lane = %lane,
185                            error = %e,
186                            "delayed_promoter: promote_delayed failed"
187                        );
188                        self.failures.record_failure(eid_str, "delayed_promoter");
189                        total_errors += 1;
190                    }
191                }
192            }
193        }
194
195        ScanResult {
196            processed: total_processed,
197            errors: total_errors,
198        }
199    }
200}
201