Skip to main content

ff_engine/scanner/
index_reconciler.rs

1//! Index consistency reconciler.
2//!
3//! SSCAN ff:idx:{p:N}:all_executions for each partition. For each
4//! execution, reads exec_core to determine its expected index membership,
5//! then verifies the execution appears in the correct scheduling sorted set.
6//!
7//! Phase 1: log-only. Does not auto-fix inconsistencies.
8//! Phase 2+: auto-fix via ff_reconcile_execution_index FCALL.
9//!
10//! Reference: RFC-010 §6.14
11
12use std::time::Duration;
13
14use ff_core::keys::IndexKeys;
15use ff_core::partition::{Partition, PartitionFamily};
16use ff_core::types::LaneId;
17
18use super::{ScanResult, Scanner};
19
20/// How many entries to pull per SSCAN iteration.
21const SCAN_COUNT: u32 = 100;
22
23pub struct IndexReconciler {
24    interval: Duration,
25    /// Lanes to check. Phase 1: just "default".
26    lanes: Vec<LaneId>,
27}
28
29impl IndexReconciler {
30    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
31        Self { interval, lanes }
32    }
33}
34
35impl Scanner for IndexReconciler {
36    fn name(&self) -> &'static str {
37        "index_reconciler"
38    }
39
40    fn interval(&self) -> Duration {
41        self.interval
42    }
43
44    async fn scan_partition(
45        &self,
46        client: &ferriskey::Client,
47        partition: u16,
48    ) -> ScanResult {
49        let p = Partition {
50            family: PartitionFamily::Execution,
51            index: partition,
52        };
53        let idx = IndexKeys::new(&p);
54        let all_exec_key = idx.all_executions();
55
56        let mut cursor = "0".to_string();
57        let mut processed: u32 = 0;
58        let mut errors: u32 = 0;
59
60        loop {
61            // SSCAN all_executions cursor COUNT scan_count
62            let result: ferriskey::Value = match client
63                .cmd("SSCAN")
64                .arg(&all_exec_key)
65                .arg(cursor.as_str())
66                .arg("COUNT")
67                .arg(SCAN_COUNT.to_string().as_str())
68                .execute()
69                .await
70            {
71                Ok(v) => v,
72                Err(e) => {
73                    tracing::warn!(partition, error = %e, "index_reconciler: SSCAN failed");
74                    return ScanResult { processed, errors: errors + 1 };
75                }
76            };
77
78            // Parse SSCAN response: [next_cursor, [member1, member2, ...]]
79            let (next_cursor, members) = match parse_sscan_response(&result) {
80                Some(v) => v,
81                None => {
82                    tracing::warn!(partition, "index_reconciler: unexpected SSCAN response format");
83                    return ScanResult { processed, errors: errors + 1 };
84                }
85            };
86
87            for eid_str in &members {
88                match check_execution_index(client, &p, &idx, eid_str, &self.lanes).await {
89                    Ok(true) => {} // consistent
90                    Ok(false) => {
91                        // inconsistency logged inside check_execution_index
92                        processed += 1;
93                    }
94                    Err(e) => {
95                        tracing::warn!(
96                            partition,
97                            execution_id = eid_str.as_str(),
98                            error = %e,
99                            "index_reconciler: check failed"
100                        );
101                        errors += 1;
102                    }
103                }
104            }
105
106            cursor = next_cursor;
107            if cursor == "0" {
108                break;
109            }
110        }
111
112        ScanResult { processed, errors }
113    }
114}
115
116/// Check whether an execution appears in the expected index set for its state.
117/// Returns Ok(true) if consistent, Ok(false) if inconsistency detected.
118async fn check_execution_index(
119    client: &ferriskey::Client,
120    partition: &Partition,
121    idx: &IndexKeys,
122    eid_str: &str,
123    _lanes: &[LaneId],
124) -> Result<bool, ferriskey::Error> {
125    let core_key = format!("ff:exec:{}:{}:core", partition.hash_tag(), eid_str);
126
127    // Read the fields we need for index verification
128    let fields: Vec<Option<String>> = client
129        .cmd("HMGET")
130        .arg(&core_key)
131        .arg("lifecycle_phase")
132        .arg("eligibility_state")
133        .arg("ownership_state")
134        .arg("lane_id")
135        .execute()
136        .await?;
137
138    if fields.is_empty() || fields[0].is_none() {
139        tracing::warn!(
140            partition = partition.index,
141            execution_id = eid_str,
142            "index_reconciler: execution in all_executions but core hash missing"
143        );
144        return Ok(false);
145    }
146
147    let lifecycle = fields[0].as_deref().unwrap_or("");
148    let eligibility = fields[1].as_deref().unwrap_or("");
149    let ownership = fields[2].as_deref().unwrap_or("");
150    let lane_str = fields[3].as_deref().unwrap_or("default");
151
152    // Determine which index this execution should be in
153    let expected_index = match (lifecycle, eligibility, ownership) {
154        ("active", _, "leased") => "active",
155        ("runnable", "eligible_now", _) => "eligible",
156        ("runnable", "not_eligible_until_time", _) => "delayed",
157        ("runnable", "blocked_by_dependencies", _) => "blocked:dependencies",
158        ("runnable", "blocked_by_budget", _) => "blocked:budget",
159        ("runnable", "blocked_by_quota", _) => "blocked:quota",
160        ("runnable", "blocked_by_route", _) => "blocked:route",
161        ("runnable", "blocked_by_operator", _) => "blocked:operator",
162        ("suspended", _, _) => "suspended",
163        ("terminal", _, _) => "terminal",
164        _ => "unknown",
165    };
166
167    if expected_index == "unknown" {
168        // Can't determine expected index — may be a transitional state
169        return Ok(true);
170    }
171
172    // Check membership in the expected index
173    let lane = LaneId::new(lane_str);
174    let expected_key = match expected_index {
175        "active" => idx.lane_active(&lane),
176        "eligible" => idx.lane_eligible(&lane),
177        "delayed" => idx.lane_delayed(&lane),
178        "blocked:dependencies" => idx.lane_blocked_dependencies(&lane),
179        "blocked:budget" => idx.lane_blocked_budget(&lane),
180        "blocked:quota" => idx.lane_blocked_quota(&lane),
181        "blocked:route" => idx.lane_blocked_route(&lane),
182        "blocked:operator" => idx.lane_blocked_operator(&lane),
183        "suspended" => idx.lane_suspended(&lane),
184        "terminal" => idx.lane_terminal(&lane),
185        _ => return Ok(true),
186    };
187
188    // ZSCORE returns nil if not a member
189    let score: Option<String> = client
190        .cmd("ZSCORE")
191        .arg(&expected_key)
192        .arg(eid_str)
193        .execute()
194        .await?;
195
196    if score.is_none() {
197        tracing::warn!(
198            partition = partition.index,
199            execution_id = eid_str,
200            expected_index,
201            expected_key = expected_key.as_str(),
202            lifecycle,
203            eligibility,
204            ownership,
205            "index_reconciler: execution missing from expected index"
206        );
207        return Ok(false);
208    }
209
210    Ok(true)
211}
212
213/// Parse SSCAN response from raw Value.
214/// SSCAN returns: Array([cursor_string, Array([member1, member2, ...])])
215fn parse_sscan_response(val: &ferriskey::Value) -> Option<(String, Vec<String>)> {
216    let arr = match val {
217        ferriskey::Value::Array(a) => a,
218        _ => return None,
219    };
220    if arr.len() < 2 {
221        return None;
222    }
223
224    let cursor = match &arr[0] {
225        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
226        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
227        _ => return None,
228    };
229
230    let mut members = Vec::new();
231    match &arr[1] {
232        Ok(ferriskey::Value::Array(inner)) => {
233            for item in inner {
234                if let Ok(ferriskey::Value::BulkString(b)) = item {
235                    members.push(String::from_utf8_lossy(b).into_owned());
236                }
237            }
238        }
239        Ok(ferriskey::Value::Set(inner)) => {
240            for item in inner {
241                if let ferriskey::Value::BulkString(b) = item {
242                    members.push(String::from_utf8_lossy(b).into_owned());
243                }
244            }
245        }
246        _ => return None,
247    };
248
249    Some((cursor, members))
250}