1use std::time::Duration;
13
14use ff_core::keys::IndexKeys;
15use ff_core::partition::{Partition, PartitionFamily};
16use ff_core::types::LaneId;
17
18use super::{ScanResult, Scanner};
19
20const SCAN_COUNT: u32 = 100;
22
23pub struct IndexReconciler {
24 interval: Duration,
25 lanes: Vec<LaneId>,
27}
28
29impl IndexReconciler {
30 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
31 Self { interval, lanes }
32 }
33}
34
35impl Scanner for IndexReconciler {
36 fn name(&self) -> &'static str {
37 "index_reconciler"
38 }
39
40 fn interval(&self) -> Duration {
41 self.interval
42 }
43
44 async fn scan_partition(
45 &self,
46 client: &ferriskey::Client,
47 partition: u16,
48 ) -> ScanResult {
49 let p = Partition {
50 family: PartitionFamily::Execution,
51 index: partition,
52 };
53 let idx = IndexKeys::new(&p);
54 let all_exec_key = idx.all_executions();
55
56 let mut cursor = "0".to_string();
57 let mut processed: u32 = 0;
58 let mut errors: u32 = 0;
59
60 loop {
61 let result: ferriskey::Value = match client
63 .cmd("SSCAN")
64 .arg(&all_exec_key)
65 .arg(cursor.as_str())
66 .arg("COUNT")
67 .arg(SCAN_COUNT.to_string().as_str())
68 .execute()
69 .await
70 {
71 Ok(v) => v,
72 Err(e) => {
73 tracing::warn!(partition, error = %e, "index_reconciler: SSCAN failed");
74 return ScanResult { processed, errors: errors + 1 };
75 }
76 };
77
78 let (next_cursor, members) = match parse_sscan_response(&result) {
80 Some(v) => v,
81 None => {
82 tracing::warn!(partition, "index_reconciler: unexpected SSCAN response format");
83 return ScanResult { processed, errors: errors + 1 };
84 }
85 };
86
87 for eid_str in &members {
88 match check_execution_index(client, &p, &idx, eid_str, &self.lanes).await {
89 Ok(true) => {} Ok(false) => {
91 processed += 1;
93 }
94 Err(e) => {
95 tracing::warn!(
96 partition,
97 execution_id = eid_str.as_str(),
98 error = %e,
99 "index_reconciler: check failed"
100 );
101 errors += 1;
102 }
103 }
104 }
105
106 cursor = next_cursor;
107 if cursor == "0" {
108 break;
109 }
110 }
111
112 ScanResult { processed, errors }
113 }
114}
115
116async fn check_execution_index(
119 client: &ferriskey::Client,
120 partition: &Partition,
121 idx: &IndexKeys,
122 eid_str: &str,
123 _lanes: &[LaneId],
124) -> Result<bool, ferriskey::Error> {
125 let core_key = format!("ff:exec:{}:{}:core", partition.hash_tag(), eid_str);
126
127 let fields: Vec<Option<String>> = client
129 .cmd("HMGET")
130 .arg(&core_key)
131 .arg("lifecycle_phase")
132 .arg("eligibility_state")
133 .arg("ownership_state")
134 .arg("lane_id")
135 .execute()
136 .await?;
137
138 if fields.is_empty() || fields[0].is_none() {
139 tracing::warn!(
140 partition = partition.index,
141 execution_id = eid_str,
142 "index_reconciler: execution in all_executions but core hash missing"
143 );
144 return Ok(false);
145 }
146
147 let lifecycle = fields[0].as_deref().unwrap_or("");
148 let eligibility = fields[1].as_deref().unwrap_or("");
149 let ownership = fields[2].as_deref().unwrap_or("");
150 let lane_str = fields[3].as_deref().unwrap_or("default");
151
152 let expected_index = match (lifecycle, eligibility, ownership) {
154 ("active", _, "leased") => "active",
155 ("runnable", "eligible_now", _) => "eligible",
156 ("runnable", "not_eligible_until_time", _) => "delayed",
157 ("runnable", "blocked_by_dependencies", _) => "blocked:dependencies",
158 ("runnable", "blocked_by_budget", _) => "blocked:budget",
159 ("runnable", "blocked_by_quota", _) => "blocked:quota",
160 ("runnable", "blocked_by_route", _) => "blocked:route",
161 ("runnable", "blocked_by_operator", _) => "blocked:operator",
162 ("suspended", _, _) => "suspended",
163 ("terminal", _, _) => "terminal",
164 _ => "unknown",
165 };
166
167 if expected_index == "unknown" {
168 return Ok(true);
170 }
171
172 let lane = LaneId::new(lane_str);
174 let expected_key = match expected_index {
175 "active" => idx.lane_active(&lane),
176 "eligible" => idx.lane_eligible(&lane),
177 "delayed" => idx.lane_delayed(&lane),
178 "blocked:dependencies" => idx.lane_blocked_dependencies(&lane),
179 "blocked:budget" => idx.lane_blocked_budget(&lane),
180 "blocked:quota" => idx.lane_blocked_quota(&lane),
181 "blocked:route" => idx.lane_blocked_route(&lane),
182 "blocked:operator" => idx.lane_blocked_operator(&lane),
183 "suspended" => idx.lane_suspended(&lane),
184 "terminal" => idx.lane_terminal(&lane),
185 _ => return Ok(true),
186 };
187
188 let score: Option<String> = client
190 .cmd("ZSCORE")
191 .arg(&expected_key)
192 .arg(eid_str)
193 .execute()
194 .await?;
195
196 if score.is_none() {
197 tracing::warn!(
198 partition = partition.index,
199 execution_id = eid_str,
200 expected_index,
201 expected_key = expected_key.as_str(),
202 lifecycle,
203 eligibility,
204 ownership,
205 "index_reconciler: execution missing from expected index"
206 );
207 return Ok(false);
208 }
209
210 Ok(true)
211}
212
213fn parse_sscan_response(val: &ferriskey::Value) -> Option<(String, Vec<String>)> {
216 let arr = match val {
217 ferriskey::Value::Array(a) => a,
218 _ => return None,
219 };
220 if arr.len() < 2 {
221 return None;
222 }
223
224 let cursor = match &arr[0] {
225 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
226 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
227 _ => return None,
228 };
229
230 let mut members = Vec::new();
231 match &arr[1] {
232 Ok(ferriskey::Value::Array(inner)) => {
233 for item in inner {
234 if let Ok(ferriskey::Value::BulkString(b)) = item {
235 members.push(String::from_utf8_lossy(b).into_owned());
236 }
237 }
238 }
239 Ok(ferriskey::Value::Set(inner)) => {
240 for item in inner {
241 if let ferriskey::Value::BulkString(b) = item {
242 members.push(String::from_utf8_lossy(b).into_owned());
243 }
244 }
245 }
246 _ => return None,
247 };
248
249 Some((cursor, members))
250}