ff_engine/scanner/
delayed_promoter.rs1use std::sync::Arc;
15use std::time::Duration;
16
17use ff_core::backend::ScannerFilter;
18use ff_core::engine_backend::EngineBackend;
19use ff_core::keys::IndexKeys;
20use ff_core::partition::{Partition, PartitionFamily};
21use ff_core::types::{ExecutionId, LaneId, TimestampMs};
22
23use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
24
25const BATCH_SIZE: u32 = 50;
26
27pub struct DelayedPromoter {
28 interval: Duration,
29 lanes: Vec<LaneId>,
31 failures: FailureTracker,
32 filter: ScannerFilter,
33 backend: Option<Arc<dyn EngineBackend>>,
36}
37
38impl DelayedPromoter {
39 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
40 Self::with_filter(interval, lanes, ScannerFilter::default())
41 }
42
43 pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
46 Self {
47 interval,
48 lanes,
49 failures: FailureTracker::new(),
50 filter,
51 backend: None,
52 }
53 }
54
55 pub fn with_filter_and_backend(
57 interval: Duration,
58 lanes: Vec<LaneId>,
59 filter: ScannerFilter,
60 backend: Arc<dyn EngineBackend>,
61 ) -> Self {
62 Self {
63 interval,
64 lanes,
65 failures: FailureTracker::new(),
66 filter,
67 backend: Some(backend),
68 }
69 }
70}
71
72impl Scanner for DelayedPromoter {
73 fn name(&self) -> &'static str {
74 "delayed_promoter"
75 }
76
77 fn interval(&self) -> Duration {
78 self.interval
79 }
80
81 fn filter(&self) -> &ScannerFilter {
82 &self.filter
83 }
84
85 async fn scan_partition(
86 &self,
87 client: &ferriskey::Client,
88 partition: u16,
89 ) -> ScanResult {
90 let p = Partition {
91 family: PartitionFamily::Execution,
92 index: partition,
93 };
94 let idx = IndexKeys::new(&p);
95
96 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
97 b.server_time_ms().await.map_err(|e| e.to_string())
98 } else {
99 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
100 };
101 let now_ms = match now_ms_res {
102 Ok(t) => t,
103 Err(e) => {
104 tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
105 return ScanResult { processed: 0, errors: 1 };
106 }
107 };
108
109 if partition == 0 {
110 self.failures.advance_cycle();
111 }
112
113 let mut total_processed: u32 = 0;
114 let mut total_errors: u32 = 0;
115
116 for lane in &self.lanes {
117 let delayed_key = idx.lane_delayed(lane);
118 let eligible_key = idx.lane_eligible(lane);
119
120 let due: Vec<String> = match client
122 .cmd("ZRANGEBYSCORE")
123 .arg(&delayed_key)
124 .arg("-inf")
125 .arg(now_ms.to_string().as_str())
126 .arg("LIMIT")
127 .arg("0")
128 .arg(BATCH_SIZE.to_string().as_str())
129 .execute()
130 .await
131 {
132 Ok(ids) => ids,
133 Err(e) => {
134 tracing::warn!(
135 partition, lane = %lane, error = %e,
136 "delayed_promoter: ZRANGEBYSCORE failed"
137 );
138 total_errors += 1;
139 continue;
140 }
141 };
142
143 if due.is_empty() {
144 continue;
145 }
146
147 for eid_str in &due {
151 if self.failures.should_skip(eid_str) {
152 continue;
153 }
154 if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
155 continue;
156 }
157
158 let res = if let Some(ref backend) = self.backend {
159 let Ok(eid) = ExecutionId::parse(eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
160 backend
161 .promote_delayed(p, lane, &eid, TimestampMs(now_ms as i64))
162 .await
163 .map_err(|e| e.to_string())
164 } else {
165 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
166 let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
167 let now_str = now_ms.to_string();
168 let argv: [&str; 2] = [eid_str.as_str(), &now_str];
169 client
170 .fcall::<ferriskey::Value>("ff_promote_delayed", &keys, &argv)
171 .await
172 .map(|_: ferriskey::Value| ())
173 .map_err(|e| e.to_string())
174 };
175 match res {
176 Ok(()) => {
177 self.failures.record_success(eid_str);
178 total_processed += 1;
179 }
180 Err(e) => {
181 tracing::warn!(
182 partition,
183 execution_id = eid_str.as_str(),
184 lane = %lane,
185 error = %e,
186 "delayed_promoter: promote_delayed failed"
187 );
188 self.failures.record_failure(eid_str, "delayed_promoter");
189 total_errors += 1;
190 }
191 }
192 }
193 }
194
195 ScanResult {
196 processed: total_processed,
197 errors: total_errors,
198 }
199 }
200}
201