Skip to main content

ff_engine/scanner/
delayed_promoter.rs

1//! Delayed execution promoter scanner.
2//!
3//! Iterates `ff:idx:{p:N}:lane:<lane_id>:delayed` for each partition,
4//! finding executions whose `delay_until` score is <= now. For each,
5//! calls `FCALL ff_promote_delayed` to move them from delayed to eligible.
6//!
7//! Note: ff_promote_delayed is Phase 2 Lua. For Phase 1, this scanner
8//! discovers due executions and logs them. The actual promotion requires
9//! knowing the lane_id (embedded in the delayed ZSET key), which in turn
10//! requires knowing which lanes exist. Phase 1 uses a single "default" lane.
11//!
12//! Reference: RFC-010 ยง6, function #27
13
14use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19use ff_core::types::LaneId;
20
21use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 50;
24
25pub struct DelayedPromoter {
26    interval: Duration,
27    /// Lanes to scan. Phase 1: just "default".
28    lanes: Vec<LaneId>,
29    failures: FailureTracker,
30    filter: ScannerFilter,
31}
32
33impl DelayedPromoter {
34    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
35        Self::with_filter(interval, lanes, ScannerFilter::default())
36    }
37
38    /// Construct with a [`ScannerFilter`] applied per candidate
39    /// (issue #122).
40    pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
41        Self {
42            interval,
43            lanes,
44            failures: FailureTracker::new(),
45            filter,
46        }
47    }
48}
49
50impl Scanner for DelayedPromoter {
51    fn name(&self) -> &'static str {
52        "delayed_promoter"
53    }
54
55    fn interval(&self) -> Duration {
56        self.interval
57    }
58
59    fn filter(&self) -> &ScannerFilter {
60        &self.filter
61    }
62
63    async fn scan_partition(
64        &self,
65        client: &ferriskey::Client,
66        partition: u16,
67    ) -> ScanResult {
68        let p = Partition {
69            family: PartitionFamily::Execution,
70            index: partition,
71        };
72        let idx = IndexKeys::new(&p);
73
74        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
75            Ok(t) => t,
76            Err(e) => {
77                tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
78                return ScanResult { processed: 0, errors: 1 };
79            }
80        };
81
82        if partition == 0 {
83            self.failures.advance_cycle();
84        }
85
86        let mut total_processed: u32 = 0;
87        let mut total_errors: u32 = 0;
88
89        for lane in &self.lanes {
90            let delayed_key = idx.lane_delayed(lane);
91            let eligible_key = idx.lane_eligible(lane);
92
93            // ZRANGEBYSCORE delayed -inf now LIMIT 0 batch_size
94            let due: Vec<String> = match client
95                .cmd("ZRANGEBYSCORE")
96                .arg(&delayed_key)
97                .arg("-inf")
98                .arg(now_ms.to_string().as_str())
99                .arg("LIMIT")
100                .arg("0")
101                .arg(BATCH_SIZE.to_string().as_str())
102                .execute()
103                .await
104            {
105                Ok(ids) => ids,
106                Err(e) => {
107                    tracing::warn!(
108                        partition, lane = %lane, error = %e,
109                        "delayed_promoter: ZRANGEBYSCORE failed"
110                    );
111                    total_errors += 1;
112                    continue;
113                }
114            };
115
116            if due.is_empty() {
117                continue;
118            }
119
120            // For each due execution, call ff_promote_delayed
121            // KEYS(3): exec_core, delayed_zset, eligible_zset
122            // ARGV(2): execution_id, now_ms
123            for eid_str in &due {
124                if self.failures.should_skip(eid_str) {
125                    continue;
126                }
127                if should_skip_candidate(client, &self.filter, partition, eid_str).await {
128                    continue;
129                }
130
131                let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
132                let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
133                let now_str = now_ms.to_string();
134                let argv: [&str; 2] = [eid_str.as_str(), &now_str];
135
136                match client.fcall::<ferriskey::Value>(
137                    "ff_promote_delayed",
138                    &keys,
139                    &argv,
140                ).await {
141                    Ok(_) => {
142                        self.failures.record_success(eid_str);
143                        total_processed += 1;
144                    }
145                    Err(e) => {
146                        tracing::warn!(
147                            partition,
148                            execution_id = eid_str.as_str(),
149                            lane = %lane,
150                            error = %e,
151                            "delayed_promoter: ff_promote_delayed failed"
152                        );
153                        self.failures.record_failure(eid_str, "delayed_promoter");
154                        total_errors += 1;
155                    }
156                }
157            }
158        }
159
160        ScanResult {
161            processed: total_processed,
162            errors: total_errors,
163        }
164    }
165}
166