Skip to main content

ff_engine/scanner/
delayed_promoter.rs

1//! Delayed execution promoter scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lane:<lane_id>:delayed` for each partition,
4//! finding executions whose `delay_until` score is <= now. For each,
5//! calls `FCALL ff_promote_delayed` to move them from delayed to eligible.
6//!
7//! Note: ff_promote_delayed is Phase 2 Lua. For Phase 1, this scanner
8//! discovers due executions and logs them. The actual promotion requires
9//! knowing the lane_id (embedded in the delayed ZSET key), which in turn
10//! requires knowing which lanes exist. Phase 1 uses a single "default" lane.
11//!
12//! Reference: RFC-010 ยง6, function #27
13
14use std::time::Duration;
15
16use ff_core::keys::IndexKeys;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::LaneId;
19
20use super::{FailureTracker, ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 50;
23
24pub struct DelayedPromoter {
25    interval: Duration,
26    /// Lanes to scan. Phase 1: just "default".
27    lanes: Vec<LaneId>,
28    failures: FailureTracker,
29}
30
31impl DelayedPromoter {
32    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
33        Self { interval, lanes, failures: FailureTracker::new() }
34    }
35}
36
37impl Scanner for DelayedPromoter {
38    fn name(&self) -> &'static str {
39        "delayed_promoter"
40    }
41
42    fn interval(&self) -> Duration {
43        self.interval
44    }
45
46    async fn scan_partition(
47        &self,
48        client: &ferriskey::Client,
49        partition: u16,
50    ) -> ScanResult {
51        let p = Partition {
52            family: PartitionFamily::Execution,
53            index: partition,
54        };
55        let idx = IndexKeys::new(&p);
56
57        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
58            Ok(t) => t,
59            Err(e) => {
60                tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
61                return ScanResult { processed: 0, errors: 1 };
62            }
63        };
64
65        if partition == 0 {
66            self.failures.advance_cycle();
67        }
68
69        let mut total_processed: u32 = 0;
70        let mut total_errors: u32 = 0;
71
72        for lane in &self.lanes {
73            let delayed_key = idx.lane_delayed(lane);
74            let eligible_key = idx.lane_eligible(lane);
75
76            // ZRANGEBYSCORE delayed -inf now LIMIT 0 batch_size
77            let due: Vec<String> = match client
78                .cmd("ZRANGEBYSCORE")
79                .arg(&delayed_key)
80                .arg("-inf")
81                .arg(now_ms.to_string().as_str())
82                .arg("LIMIT")
83                .arg("0")
84                .arg(BATCH_SIZE.to_string().as_str())
85                .execute()
86                .await
87            {
88                Ok(ids) => ids,
89                Err(e) => {
90                    tracing::warn!(
91                        partition, lane = %lane, error = %e,
92                        "delayed_promoter: ZRANGEBYSCORE failed"
93                    );
94                    total_errors += 1;
95                    continue;
96                }
97            };
98
99            if due.is_empty() {
100                continue;
101            }
102
103            // For each due execution, call ff_promote_delayed
104            // KEYS(3): exec_core, delayed_zset, eligible_zset
105            // ARGV(2): execution_id, now_ms
106            for eid_str in &due {
107                if self.failures.should_skip(eid_str) {
108                    continue;
109                }
110
111                let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
112                let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
113                let now_str = now_ms.to_string();
114                let argv: [&str; 2] = [eid_str.as_str(), &now_str];
115
116                match client.fcall::<ferriskey::Value>(
117                    "ff_promote_delayed",
118                    &keys,
119                    &argv,
120                ).await {
121                    Ok(_) => {
122                        self.failures.record_success(eid_str);
123                        total_processed += 1;
124                    }
125                    Err(e) => {
126                        tracing::warn!(
127                            partition,
128                            execution_id = eid_str.as_str(),
129                            lane = %lane,
130                            error = %e,
131                            "delayed_promoter: ff_promote_delayed failed"
132                        );
133                        self.failures.record_failure(eid_str, "delayed_promoter");
134                        total_errors += 1;
135                    }
136                }
137            }
138        }
139
140        ScanResult {
141            processed: total_processed,
142            errors: total_errors,
143        }
144    }
145}
146