1use std::time::Duration;
13
14use ff_core::backend::ScannerFilter;
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::LaneId;
18
19use super::{should_skip_candidate, ScanResult, Scanner};
20
21const SCAN_COUNT: u32 = 100;
23
24pub struct IndexReconciler {
25 interval: Duration,
26 lanes: Vec<LaneId>,
28 filter: ScannerFilter,
29}
30
31impl IndexReconciler {
32 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
33 Self::with_filter(interval, lanes, ScannerFilter::default())
34 }
35
36 pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
39 Self {
40 interval,
41 lanes,
42 filter,
43 }
44 }
45}
46
47impl Scanner for IndexReconciler {
48 fn name(&self) -> &'static str {
49 "index_reconciler"
50 }
51
52 fn interval(&self) -> Duration {
53 self.interval
54 }
55
56 fn filter(&self) -> &ScannerFilter {
57 &self.filter
58 }
59
60 async fn scan_partition(
61 &self,
62 client: &ferriskey::Client,
63 partition: u16,
64 ) -> ScanResult {
65 let p = Partition {
66 family: PartitionFamily::Execution,
67 index: partition,
68 };
69 let idx = IndexKeys::new(&p);
70 let all_exec_key = idx.all_executions();
71
72 let mut cursor = "0".to_string();
73 let mut processed: u32 = 0;
74 let mut errors: u32 = 0;
75
76 loop {
77 let result: ferriskey::Value = match client
79 .cmd("SSCAN")
80 .arg(&all_exec_key)
81 .arg(cursor.as_str())
82 .arg("COUNT")
83 .arg(SCAN_COUNT.to_string().as_str())
84 .execute()
85 .await
86 {
87 Ok(v) => v,
88 Err(e) => {
89 tracing::warn!(partition, error = %e, "index_reconciler: SSCAN failed");
90 return ScanResult { processed, errors: errors + 1 };
91 }
92 };
93
94 let (next_cursor, members) = match parse_sscan_response(&result) {
96 Some(v) => v,
97 None => {
98 tracing::warn!(partition, "index_reconciler: unexpected SSCAN response format");
99 return ScanResult { processed, errors: errors + 1 };
100 }
101 };
102
103 for eid_str in &members {
104 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
105 continue;
106 }
107 match check_execution_index(client, &p, &idx, eid_str, &self.lanes).await {
108 Ok(true) => {} Ok(false) => {
110 processed += 1;
112 }
113 Err(e) => {
114 tracing::warn!(
115 partition,
116 execution_id = eid_str.as_str(),
117 error = %e,
118 "index_reconciler: check failed"
119 );
120 errors += 1;
121 }
122 }
123 }
124
125 cursor = next_cursor;
126 if cursor == "0" {
127 break;
128 }
129 }
130
131 ScanResult { processed, errors }
132 }
133}
134
135async fn check_execution_index(
138 client: &ferriskey::Client,
139 partition: &Partition,
140 idx: &IndexKeys,
141 eid_str: &str,
142 _lanes: &[LaneId],
143) -> Result<bool, ferriskey::Error> {
144 let core_key = format!("ff:exec:{}:{}:core", partition.hash_tag(), eid_str);
145
146 let fields: Vec<Option<String>> = client
148 .cmd("HMGET")
149 .arg(&core_key)
150 .arg("lifecycle_phase")
151 .arg("eligibility_state")
152 .arg("ownership_state")
153 .arg("lane_id")
154 .execute()
155 .await?;
156
157 if fields.is_empty() || fields[0].is_none() {
158 tracing::warn!(
159 partition = partition.index,
160 execution_id = eid_str,
161 "index_reconciler: execution in all_executions but core hash missing"
162 );
163 return Ok(false);
164 }
165
166 let lifecycle = fields[0].as_deref().unwrap_or("");
167 let eligibility = fields[1].as_deref().unwrap_or("");
168 let ownership = fields[2].as_deref().unwrap_or("");
169 let lane_str = fields[3].as_deref().unwrap_or("default");
170
171 let expected_index = match (lifecycle, eligibility, ownership) {
173 ("active", _, "leased") => "active",
174 ("runnable", "eligible_now", _) => "eligible",
175 ("runnable", "not_eligible_until_time", _) => "delayed",
176 ("runnable", "blocked_by_dependencies", _) => "blocked:dependencies",
177 ("runnable", "blocked_by_budget", _) => "blocked:budget",
178 ("runnable", "blocked_by_quota", _) => "blocked:quota",
179 ("runnable", "blocked_by_route", _) => "blocked:route",
180 ("runnable", "blocked_by_operator", _) => "blocked:operator",
181 ("suspended", _, _) => "suspended",
182 ("terminal", _, _) => "terminal",
183 _ => "unknown",
184 };
185
186 if expected_index == "unknown" {
187 return Ok(true);
189 }
190
191 let lane = LaneId::new(lane_str);
193 let expected_key = match expected_index {
194 "active" => idx.lane_active(&lane),
195 "eligible" => idx.lane_eligible(&lane),
196 "delayed" => idx.lane_delayed(&lane),
197 "blocked:dependencies" => idx.lane_blocked_dependencies(&lane),
198 "blocked:budget" => idx.lane_blocked_budget(&lane),
199 "blocked:quota" => idx.lane_blocked_quota(&lane),
200 "blocked:route" => idx.lane_blocked_route(&lane),
201 "blocked:operator" => idx.lane_blocked_operator(&lane),
202 "suspended" => idx.lane_suspended(&lane),
203 "terminal" => idx.lane_terminal(&lane),
204 _ => return Ok(true),
205 };
206
207 let score: Option<String> = client
209 .cmd("ZSCORE")
210 .arg(&expected_key)
211 .arg(eid_str)
212 .execute()
213 .await?;
214
215 if score.is_none() {
216 tracing::warn!(
217 partition = partition.index,
218 execution_id = eid_str,
219 expected_index,
220 expected_key = expected_key.as_str(),
221 lifecycle,
222 eligibility,
223 ownership,
224 "index_reconciler: execution missing from expected index"
225 );
226 return Ok(false);
227 }
228
229 Ok(true)
230}
231
232fn parse_sscan_response(val: &ferriskey::Value) -> Option<(String, Vec<String>)> {
235 let arr = match val {
236 ferriskey::Value::Array(a) => a,
237 _ => return None,
238 };
239 if arr.len() < 2 {
240 return None;
241 }
242
243 let cursor = match &arr[0] {
244 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
245 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
246 _ => return None,
247 };
248
249 let mut members = Vec::new();
250 match &arr[1] {
251 Ok(ferriskey::Value::Array(inner)) => {
252 for item in inner {
253 if let Ok(ferriskey::Value::BulkString(b)) = item {
254 members.push(String::from_utf8_lossy(b).into_owned());
255 }
256 }
257 }
258 Ok(ferriskey::Value::Set(inner)) => {
259 for item in inner {
260 if let ferriskey::Value::BulkString(b) = item {
261 members.push(String::from_utf8_lossy(b).into_owned());
262 }
263 }
264 }
265 _ => return None,
266 };
267
268 Some((cursor, members))
269}