Skip to main content

ff_engine/scanner/
edge_cancel_reconciler.rs

1//! RFC-016 Stage D — sibling-cancel reconciler (Invariant Q6 safety net).
2//!
3//! Stage C's `edge_cancel_dispatcher` populates
4//! `ff:idx:{fp:N}:pending_cancel_groups` atomically inside
5//! `ff_resolve_dependency` and drains each tuple via
6//! `ff_drain_sibling_cancel_group` after per-sibling cancels land. An
7//! engine crash between the SADD and the drain leaves stale tuples in
8//! the SET pointing at groups whose cancels already completed.
9//!
10//! This scanner is a safety net that runs at a slower cadence than the
11//! dispatcher (default 10s vs 1s). Per flow partition it enumerates
12//! `pending_cancel_groups` directly (no full scan) and calls
13//! `ff_reconcile_sibling_cancel_group` per tuple. The Lua function
14//! decides atomically:
15//!   - `sremmed_stale` — flag false / edgegroup missing → SREM only.
16//!   - `completed_drain` — flag true + all siblings terminal (drain
17//!     interrupted post-cancels) → HDEL + SREM.
18//!   - `no_op` — flag true + some sibling still running; the
19//!     dispatcher will handle on its next tick.
20//!
21//! The reconciler MUST NOT fight the dispatcher — `no_op` leaves state
22//! untouched so the 1s dispatcher cadence retains ownership of the
23//! happy path.
24
25use std::time::Duration;
26
27use ff_core::backend::ScannerFilter;
28use ff_core::keys::{FlowIndexKeys, FlowKeyContext};
29use ff_core::partition::{Partition, PartitionFamily};
30use ff_core::types::{ExecutionId, FlowId};
31
32use super::{ScanResult, Scanner};
33
34/// Max tuples processed per partition per cycle. Matches the dispatcher
35/// batch size — the reconciler is strictly bounded by the dispatcher's
36/// steady-state backlog plus any crash-recovery residue.
37const BATCH_SIZE: usize = 50;
38
39pub struct EdgeCancelReconciler {
40    interval: Duration,
41    filter: ScannerFilter,
42    metrics: std::sync::Arc<ff_observability::Metrics>,
43}
44
45impl EdgeCancelReconciler {
46    pub fn new(interval: Duration) -> Self {
47        Self::with_filter(interval, ScannerFilter::default())
48    }
49
50    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
51        Self::with_filter_and_metrics(
52            interval,
53            filter,
54            std::sync::Arc::new(ff_observability::Metrics::new()),
55        )
56    }
57
58    pub fn with_filter_and_metrics(
59        interval: Duration,
60        filter: ScannerFilter,
61        metrics: std::sync::Arc<ff_observability::Metrics>,
62    ) -> Self {
63        Self {
64            interval,
65            filter,
66            metrics,
67        }
68    }
69}
70
71impl Scanner for EdgeCancelReconciler {
72    fn name(&self) -> &'static str {
73        "edge_cancel_reconciler"
74    }
75
76    fn interval(&self) -> Duration {
77        self.interval
78    }
79
80    fn filter(&self) -> &ScannerFilter {
81        &self.filter
82    }
83
84    async fn scan_partition(
85        &self,
86        client: &ferriskey::Client,
87        partition: u16,
88    ) -> ScanResult {
89        let p = Partition {
90            family: PartitionFamily::Flow,
91            index: partition,
92        };
93        let fidx = FlowIndexKeys::new(&p);
94        let pending_key = fidx.pending_cancel_groups();
95
96        // SRANDMEMBER with count so a pathologically-large SET doesn't
97        // starve other partitions. The Lua reconcile call handles the
98        // SREM atomically; re-observing the same tuple next cycle is
99        // idempotent (`no_op "not_in_set"`).
100        let members: Vec<String> = match client
101            .cmd("SRANDMEMBER")
102            .arg(&pending_key)
103            .arg(BATCH_SIZE.to_string().as_str())
104            .execute()
105            .await
106        {
107            Ok(m) => m,
108            Err(e) => {
109                tracing::warn!(
110                    partition,
111                    error = %e,
112                    "edge_cancel_reconciler: SRANDMEMBER pending_cancel_groups failed"
113                );
114                return ScanResult { processed: 0, errors: 1 };
115            }
116        };
117
118        if members.is_empty() {
119            return ScanResult { processed: 0, errors: 0 };
120        }
121
122        let mut processed: u32 = 0;
123        let mut errors: u32 = 0;
124
125        for member in &members {
126            match self
127                .reconcile_one_group(client, &p, &pending_key, member)
128                .await
129            {
130                ReconcileOutcome::Acted => processed += 1,
131                ReconcileOutcome::NoOp => { /* dispatcher owns it */ }
132                ReconcileOutcome::Error => errors += 1,
133            }
134        }
135
136        ScanResult { processed, errors }
137    }
138}
139
140enum ReconcileOutcome {
141    /// SREM'd stale entry, or completed interrupted drain.
142    Acted,
143    /// Dispatcher still owns this tuple (siblings non-terminal) — left
144    /// alone; not counted as processed.
145    NoOp,
146    /// Malformed tuple or transient FCALL error.
147    Error,
148}
149
150impl EdgeCancelReconciler {
151    async fn reconcile_one_group(
152        &self,
153        client: &ferriskey::Client,
154        flow_p: &Partition,
155        pending_key: &str,
156        member: &str,
157    ) -> ReconcileOutcome {
158        let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
159            Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
160            _ => {
161                tracing::warn!(
162                    raw = member,
163                    "edge_cancel_reconciler: malformed pending_cancel_groups \
164                     member; SREM-ing to avoid poison"
165                );
166                let _: Result<i64, _> = client
167                    .cmd("SREM")
168                    .arg(pending_key)
169                    .arg(member)
170                    .execute()
171                    .await;
172                return ReconcileOutcome::Error;
173            }
174        };
175
176        let flow_id = match FlowId::parse(flow_id_str) {
177            Ok(id) => id,
178            Err(_) => {
179                let _: Result<i64, _> = client
180                    .cmd("SREM")
181                    .arg(pending_key)
182                    .arg(member)
183                    .execute()
184                    .await;
185                return ReconcileOutcome::Error;
186            }
187        };
188
189        let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
190            Ok(id) => id,
191            Err(_) => {
192                let _: Result<i64, _> = client
193                    .cmd("SREM")
194                    .arg(pending_key)
195                    .arg(member)
196                    .execute()
197                    .await;
198                return ReconcileOutcome::Error;
199            }
200        };
201
202        let fctx = FlowKeyContext::new(flow_p, &flow_id);
203        let edgegroup_key = fctx.edgegroup(&downstream_eid);
204        let flow_id_s = flow_id.to_string();
205        let downstream_s = downstream_eid.to_string();
206        let keys = [pending_key, edgegroup_key.as_str()];
207        let argv = [flow_id_s.as_str(), downstream_s.as_str()];
208
209        let reply: Result<ferriskey::Value, _> = client
210            .fcall(
211                "ff_reconcile_sibling_cancel_group",
212                &keys,
213                &argv,
214            )
215            .await;
216
217        match reply {
218            Ok(val) => match extract_action(&val) {
219                Some(action) => {
220                    self.metrics.inc_sibling_cancel_reconcile(action);
221                    match action {
222                        "sremmed_stale" | "completed_drain" => {
223                            tracing::debug!(
224                                flow_id = %flow_id,
225                                downstream = %downstream_eid,
226                                action,
227                                "edge_cancel_reconciler: action applied"
228                            );
229                            ReconcileOutcome::Acted
230                        }
231                        // "no_op"
232                        _ => ReconcileOutcome::NoOp,
233                    }
234                }
235                None => {
236                    tracing::warn!(
237                        flow_id = %flow_id,
238                        downstream = %downstream_eid,
239                        "edge_cancel_reconciler: unparsable FCALL reply"
240                    );
241                    ReconcileOutcome::Error
242                }
243            },
244            Err(e) => {
245                tracing::warn!(
246                    flow_id = %flow_id,
247                    downstream = %downstream_eid,
248                    error = %e,
249                    "edge_cancel_reconciler: FCALL failed; retry next cycle"
250                );
251                ReconcileOutcome::Error
252            }
253        }
254    }
255}
256
257/// Lua returns `{1, "OK", <action>, <detail>}` for success. Pull the
258/// action string and coerce to a fixed `&'static str` so the metric
259/// cardinality is bounded to 3.
260fn extract_action(val: &ferriskey::Value) -> Option<&'static str> {
261    let arr = match val {
262        ferriskey::Value::Array(a) => a,
263        _ => return None,
264    };
265    let action_result = arr.get(2)?;
266    let action_val = action_result.as_ref().ok()?;
267    let action = match action_val {
268        ferriskey::Value::BulkString(b) => String::from_utf8_lossy(b).into_owned(),
269        ferriskey::Value::SimpleString(s) => s.clone(),
270        _ => return None,
271    };
272    match action.as_str() {
273        "sremmed_stale" => Some("sremmed_stale"),
274        "completed_drain" => Some("completed_drain"),
275        "no_op" => Some("no_op"),
276        _ => None,
277    }
278}