Skip to main content

ff_engine/scanner/
index_reconciler.rs

1//! Index consistency reconciler.
2//!
3//! SSCAN ff:idx:{p:N}:all_executions for each partition. For each
4//! execution, reads exec_core to determine its expected index membership,
5//! then verifies the execution appears in the correct scheduling sorted set.
6//!
7//! Phase 1: log-only. Does not auto-fix inconsistencies.
8//! Phase 2+: auto-fix via ff_reconcile_execution_index FCALL.
9//!
10//! Reference: RFC-010 §6.14
11
12use std::time::Duration;
13
14use ff_core::backend::ScannerFilter;
15use ff_core::keys::IndexKeys;
16use ff_core::partition::{Partition, PartitionFamily};
17use ff_core::types::LaneId;
18
19use super::{should_skip_candidate, ScanResult, Scanner};
20
21/// How many entries to pull per SSCAN iteration.
22const SCAN_COUNT: u32 = 100;
23
24pub struct IndexReconciler {
25    interval: Duration,
26    /// Lanes to check. Phase 1: just "default".
27    lanes: Vec<LaneId>,
28    filter: ScannerFilter,
29}
30
31impl IndexReconciler {
32    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
33        Self::with_filter(interval, lanes, ScannerFilter::default())
34    }
35
36    /// Construct with a [`ScannerFilter`] applied per candidate
37    /// (issue #122).
38    pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
39        Self {
40            interval,
41            lanes,
42            filter,
43        }
44    }
45}
46
47impl Scanner for IndexReconciler {
48    fn name(&self) -> &'static str {
49        "index_reconciler"
50    }
51
52    fn interval(&self) -> Duration {
53        self.interval
54    }
55
56    fn filter(&self) -> &ScannerFilter {
57        &self.filter
58    }
59
60    async fn scan_partition(
61        &self,
62        client: &ferriskey::Client,
63        partition: u16,
64    ) -> ScanResult {
65        let p = Partition {
66            family: PartitionFamily::Execution,
67            index: partition,
68        };
69        let idx = IndexKeys::new(&p);
70        let all_exec_key = idx.all_executions();
71
72        let mut cursor = "0".to_string();
73        let mut processed: u32 = 0;
74        let mut errors: u32 = 0;
75
76        loop {
77            // SSCAN all_executions cursor COUNT scan_count
78            let result: ferriskey::Value = match client
79                .cmd("SSCAN")
80                .arg(&all_exec_key)
81                .arg(cursor.as_str())
82                .arg("COUNT")
83                .arg(SCAN_COUNT.to_string().as_str())
84                .execute()
85                .await
86            {
87                Ok(v) => v,
88                Err(e) => {
89                    tracing::warn!(partition, error = %e, "index_reconciler: SSCAN failed");
90                    return ScanResult { processed, errors: errors + 1 };
91                }
92            };
93
94            // Parse SSCAN response: [next_cursor, [member1, member2, ...]]
95            let (next_cursor, members) = match parse_sscan_response(&result) {
96                Some(v) => v,
97                None => {
98                    tracing::warn!(partition, "index_reconciler: unexpected SSCAN response format");
99                    return ScanResult { processed, errors: errors + 1 };
100                }
101            };
102
103            for eid_str in &members {
104                if should_skip_candidate(client, &self.filter, partition, eid_str).await {
105                    continue;
106                }
107                match check_execution_index(client, &p, &idx, eid_str, &self.lanes).await {
108                    Ok(true) => {} // consistent
109                    Ok(false) => {
110                        // inconsistency logged inside check_execution_index
111                        processed += 1;
112                    }
113                    Err(e) => {
114                        tracing::warn!(
115                            partition,
116                            execution_id = eid_str.as_str(),
117                            error = %e,
118                            "index_reconciler: check failed"
119                        );
120                        errors += 1;
121                    }
122                }
123            }
124
125            cursor = next_cursor;
126            if cursor == "0" {
127                break;
128            }
129        }
130
131        ScanResult { processed, errors }
132    }
133}
134
135/// Check whether an execution appears in the expected index set for its state.
136/// Returns Ok(true) if consistent, Ok(false) if inconsistency detected.
137async fn check_execution_index(
138    client: &ferriskey::Client,
139    partition: &Partition,
140    idx: &IndexKeys,
141    eid_str: &str,
142    _lanes: &[LaneId],
143) -> Result<bool, ferriskey::Error> {
144    let core_key = format!("ff:exec:{}:{}:core", partition.hash_tag(), eid_str);
145
146    // Read the fields we need for index verification
147    let fields: Vec<Option<String>> = client
148        .cmd("HMGET")
149        .arg(&core_key)
150        .arg("lifecycle_phase")
151        .arg("eligibility_state")
152        .arg("ownership_state")
153        .arg("lane_id")
154        .execute()
155        .await?;
156
157    if fields.is_empty() || fields[0].is_none() {
158        tracing::warn!(
159            partition = partition.index,
160            execution_id = eid_str,
161            "index_reconciler: execution in all_executions but core hash missing"
162        );
163        return Ok(false);
164    }
165
166    let lifecycle = fields[0].as_deref().unwrap_or("");
167    let eligibility = fields[1].as_deref().unwrap_or("");
168    let ownership = fields[2].as_deref().unwrap_or("");
169    let lane_str = fields[3].as_deref().unwrap_or("default");
170
171    // Determine which index this execution should be in
172    let expected_index = match (lifecycle, eligibility, ownership) {
173        ("active", _, "leased") => "active",
174        ("runnable", "eligible_now", _) => "eligible",
175        ("runnable", "not_eligible_until_time", _) => "delayed",
176        ("runnable", "blocked_by_dependencies", _) => "blocked:dependencies",
177        ("runnable", "blocked_by_budget", _) => "blocked:budget",
178        ("runnable", "blocked_by_quota", _) => "blocked:quota",
179        ("runnable", "blocked_by_route", _) => "blocked:route",
180        ("runnable", "blocked_by_operator", _) => "blocked:operator",
181        ("suspended", _, _) => "suspended",
182        ("terminal", _, _) => "terminal",
183        _ => "unknown",
184    };
185
186    if expected_index == "unknown" {
187        // Can't determine expected index — may be a transitional state
188        return Ok(true);
189    }
190
191    // Check membership in the expected index
192    let lane = LaneId::new(lane_str);
193    let expected_key = match expected_index {
194        "active" => idx.lane_active(&lane),
195        "eligible" => idx.lane_eligible(&lane),
196        "delayed" => idx.lane_delayed(&lane),
197        "blocked:dependencies" => idx.lane_blocked_dependencies(&lane),
198        "blocked:budget" => idx.lane_blocked_budget(&lane),
199        "blocked:quota" => idx.lane_blocked_quota(&lane),
200        "blocked:route" => idx.lane_blocked_route(&lane),
201        "blocked:operator" => idx.lane_blocked_operator(&lane),
202        "suspended" => idx.lane_suspended(&lane),
203        "terminal" => idx.lane_terminal(&lane),
204        _ => return Ok(true),
205    };
206
207    // ZSCORE returns nil if not a member
208    let score: Option<String> = client
209        .cmd("ZSCORE")
210        .arg(&expected_key)
211        .arg(eid_str)
212        .execute()
213        .await?;
214
215    if score.is_none() {
216        tracing::warn!(
217            partition = partition.index,
218            execution_id = eid_str,
219            expected_index,
220            expected_key = expected_key.as_str(),
221            lifecycle,
222            eligibility,
223            ownership,
224            "index_reconciler: execution missing from expected index"
225        );
226        return Ok(false);
227    }
228
229    Ok(true)
230}
231
232/// Parse SSCAN response from raw Value.
233/// SSCAN returns: Array([cursor_string, Array([member1, member2, ...])])
234fn parse_sscan_response(val: &ferriskey::Value) -> Option<(String, Vec<String>)> {
235    let arr = match val {
236        ferriskey::Value::Array(a) => a,
237        _ => return None,
238    };
239    if arr.len() < 2 {
240        return None;
241    }
242
243    let cursor = match &arr[0] {
244        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
245        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
246        _ => return None,
247    };
248
249    let mut members = Vec::new();
250    match &arr[1] {
251        Ok(ferriskey::Value::Array(inner)) => {
252            for item in inner {
253                if let Ok(ferriskey::Value::BulkString(b)) = item {
254                    members.push(String::from_utf8_lossy(b).into_owned());
255                }
256            }
257        }
258        Ok(ferriskey::Value::Set(inner)) => {
259            for item in inner {
260                if let ferriskey::Value::BulkString(b) = item {
261                    members.push(String::from_utf8_lossy(b).into_owned());
262                }
263            }
264        }
265        _ => return None,
266    };
267
268    Some((cursor, members))
269}