ff_engine/scanner/
delayed_promoter.rs1use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19use ff_core::types::LaneId;
20
21use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 50;
24
25pub struct DelayedPromoter {
26 interval: Duration,
27 lanes: Vec<LaneId>,
29 failures: FailureTracker,
30 filter: ScannerFilter,
31}
32
33impl DelayedPromoter {
34 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
35 Self::with_filter(interval, lanes, ScannerFilter::default())
36 }
37
38 pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
41 Self {
42 interval,
43 lanes,
44 failures: FailureTracker::new(),
45 filter,
46 }
47 }
48}
49
50impl Scanner for DelayedPromoter {
51 fn name(&self) -> &'static str {
52 "delayed_promoter"
53 }
54
55 fn interval(&self) -> Duration {
56 self.interval
57 }
58
59 fn filter(&self) -> &ScannerFilter {
60 &self.filter
61 }
62
63 async fn scan_partition(
64 &self,
65 client: &ferriskey::Client,
66 partition: u16,
67 ) -> ScanResult {
68 let p = Partition {
69 family: PartitionFamily::Execution,
70 index: partition,
71 };
72 let idx = IndexKeys::new(&p);
73
74 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
75 Ok(t) => t,
76 Err(e) => {
77 tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
78 return ScanResult { processed: 0, errors: 1 };
79 }
80 };
81
82 if partition == 0 {
83 self.failures.advance_cycle();
84 }
85
86 let mut total_processed: u32 = 0;
87 let mut total_errors: u32 = 0;
88
89 for lane in &self.lanes {
90 let delayed_key = idx.lane_delayed(lane);
91 let eligible_key = idx.lane_eligible(lane);
92
93 let due: Vec<String> = match client
95 .cmd("ZRANGEBYSCORE")
96 .arg(&delayed_key)
97 .arg("-inf")
98 .arg(now_ms.to_string().as_str())
99 .arg("LIMIT")
100 .arg("0")
101 .arg(BATCH_SIZE.to_string().as_str())
102 .execute()
103 .await
104 {
105 Ok(ids) => ids,
106 Err(e) => {
107 tracing::warn!(
108 partition, lane = %lane, error = %e,
109 "delayed_promoter: ZRANGEBYSCORE failed"
110 );
111 total_errors += 1;
112 continue;
113 }
114 };
115
116 if due.is_empty() {
117 continue;
118 }
119
120 for eid_str in &due {
124 if self.failures.should_skip(eid_str) {
125 continue;
126 }
127 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
128 continue;
129 }
130
131 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
132 let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
133 let now_str = now_ms.to_string();
134 let argv: [&str; 2] = [eid_str.as_str(), &now_str];
135
136 match client.fcall::<ferriskey::Value>(
137 "ff_promote_delayed",
138 &keys,
139 &argv,
140 ).await {
141 Ok(_) => {
142 self.failures.record_success(eid_str);
143 total_processed += 1;
144 }
145 Err(e) => {
146 tracing::warn!(
147 partition,
148 execution_id = eid_str.as_str(),
149 lane = %lane,
150 error = %e,
151 "delayed_promoter: ff_promote_delayed failed"
152 );
153 self.failures.record_failure(eid_str, "delayed_promoter");
154 total_errors += 1;
155 }
156 }
157 }
158 }
159
160 ScanResult {
161 processed: total_processed,
162 errors: total_errors,
163 }
164 }
165}
166