ff_engine/scanner/
delayed_promoter.rs1use std::time::Duration;
15
16use ff_core::keys::IndexKeys;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::LaneId;
19
20use super::{FailureTracker, ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 50;
23
24pub struct DelayedPromoter {
25 interval: Duration,
26 lanes: Vec<LaneId>,
28 failures: FailureTracker,
29}
30
31impl DelayedPromoter {
32 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
33 Self { interval, lanes, failures: FailureTracker::new() }
34 }
35}
36
37impl Scanner for DelayedPromoter {
38 fn name(&self) -> &'static str {
39 "delayed_promoter"
40 }
41
42 fn interval(&self) -> Duration {
43 self.interval
44 }
45
46 async fn scan_partition(
47 &self,
48 client: &ferriskey::Client,
49 partition: u16,
50 ) -> ScanResult {
51 let p = Partition {
52 family: PartitionFamily::Execution,
53 index: partition,
54 };
55 let idx = IndexKeys::new(&p);
56
57 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
58 Ok(t) => t,
59 Err(e) => {
60 tracing::warn!(partition, error = %e, "delayed_promoter: failed to get server time");
61 return ScanResult { processed: 0, errors: 1 };
62 }
63 };
64
65 if partition == 0 {
66 self.failures.advance_cycle();
67 }
68
69 let mut total_processed: u32 = 0;
70 let mut total_errors: u32 = 0;
71
72 for lane in &self.lanes {
73 let delayed_key = idx.lane_delayed(lane);
74 let eligible_key = idx.lane_eligible(lane);
75
76 let due: Vec<String> = match client
78 .cmd("ZRANGEBYSCORE")
79 .arg(&delayed_key)
80 .arg("-inf")
81 .arg(now_ms.to_string().as_str())
82 .arg("LIMIT")
83 .arg("0")
84 .arg(BATCH_SIZE.to_string().as_str())
85 .execute()
86 .await
87 {
88 Ok(ids) => ids,
89 Err(e) => {
90 tracing::warn!(
91 partition, lane = %lane, error = %e,
92 "delayed_promoter: ZRANGEBYSCORE failed"
93 );
94 total_errors += 1;
95 continue;
96 }
97 };
98
99 if due.is_empty() {
100 continue;
101 }
102
103 for eid_str in &due {
107 if self.failures.should_skip(eid_str) {
108 continue;
109 }
110
111 let exec_core = format!("ff:exec:{}:{}:core", p.hash_tag(), eid_str);
112 let keys: [&str; 3] = [&exec_core, &delayed_key, &eligible_key];
113 let now_str = now_ms.to_string();
114 let argv: [&str; 2] = [eid_str.as_str(), &now_str];
115
116 match client.fcall::<ferriskey::Value>(
117 "ff_promote_delayed",
118 &keys,
119 &argv,
120 ).await {
121 Ok(_) => {
122 self.failures.record_success(eid_str);
123 total_processed += 1;
124 }
125 Err(e) => {
126 tracing::warn!(
127 partition,
128 execution_id = eid_str.as_str(),
129 lane = %lane,
130 error = %e,
131 "delayed_promoter: ff_promote_delayed failed"
132 );
133 self.failures.record_failure(eid_str, "delayed_promoter");
134 total_errors += 1;
135 }
136 }
137 }
138 }
139
140 ScanResult {
141 processed: total_processed,
142 errors: total_errors,
143 }
144 }
145}
146