Skip to main content

ff_engine/scanner/
edge_cancel_reconciler.rs

1//! RFC-016 Stage D — sibling-cancel reconciler (Invariant Q6 safety net).
2//!
3//! Stage C's `edge_cancel_dispatcher` populates
4//! `ff:idx:{fp:N}:pending_cancel_groups` atomically inside
5//! `ff_resolve_dependency` and drains each tuple via
6//! `ff_drain_sibling_cancel_group` after per-sibling cancels land. An
7//! engine crash between the SADD and the drain leaves stale tuples in
8//! the SET pointing at groups whose cancels already completed.
9//!
10//! This scanner is a safety net that runs at a slower cadence than the
11//! dispatcher (default 10s vs 1s). Per flow partition it enumerates
12//! `pending_cancel_groups` directly (no full scan) and calls
13//! `ff_reconcile_sibling_cancel_group` per tuple. The Lua function
14//! decides atomically:
15//!   - `sremmed_stale` — flag false / edgegroup missing → SREM only.
16//!   - `completed_drain` — flag true + all siblings terminal (drain
17//!     interrupted post-cancels) → HDEL + SREM.
18//!   - `no_op` — flag true + some sibling still running; the
19//!     dispatcher will handle on its next tick.
20//!
21//! The reconciler MUST NOT fight the dispatcher — `no_op` leaves state
22//! untouched so the 1s dispatcher cadence retains ownership of the
23//! happy path.
24
25use std::time::Duration;
26
27use ff_core::backend::ScannerFilter;
28use ff_core::keys::{FlowIndexKeys, FlowKeyContext};
29use ff_core::partition::{Partition, PartitionFamily};
30use ff_core::types::{ExecutionId, FlowId};
31
32use super::{ScanResult, Scanner};
33
34/// Max tuples processed per partition per cycle. Matches the dispatcher
35/// batch size — the reconciler is strictly bounded by the dispatcher's
36/// steady-state backlog plus any crash-recovery residue.
37const BATCH_SIZE: usize = 50;
38
39pub struct EdgeCancelReconciler {
40    interval: Duration,
41    filter: ScannerFilter,
42    metrics: std::sync::Arc<ff_observability::Metrics>,
43}
44
45impl EdgeCancelReconciler {
46    pub fn new(interval: Duration) -> Self {
47        Self::with_filter(interval, ScannerFilter::default())
48    }
49
50    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
51        Self::with_filter_and_metrics(
52            interval,
53            filter,
54            std::sync::Arc::new(ff_observability::Metrics::new()),
55        )
56    }
57
58    pub fn with_filter_and_metrics(
59        interval: Duration,
60        filter: ScannerFilter,
61        metrics: std::sync::Arc<ff_observability::Metrics>,
62    ) -> Self {
63        Self {
64            interval,
65            filter,
66            metrics,
67        }
68    }
69}
70
71impl Scanner for EdgeCancelReconciler {
72    fn name(&self) -> &'static str {
73        "edge_cancel_reconciler"
74    }
75
76    fn interval(&self) -> Duration {
77        self.interval
78    }
79
80    fn filter(&self) -> &ScannerFilter {
81        &self.filter
82    }
83
84    async fn scan_partition(
85        &self,
86        client: &ferriskey::Client,
87        partition: u16,
88    ) -> ScanResult {
89        let p = Partition {
90            family: PartitionFamily::Flow,
91            index: partition,
92        };
93        let fidx = FlowIndexKeys::new(&p);
94        let pending_key = fidx.pending_cancel_groups();
95
96        // SRANDMEMBER with count so a pathologically-large SET doesn't
97        // starve other partitions. The Lua reconcile call handles the
98        // SREM atomically; re-observing the same tuple next cycle is
99        // idempotent (`no_op "not_in_set"`).
100        let members: Vec<String> = match client
101            .cmd("SRANDMEMBER")
102            .arg(&pending_key)
103            .arg(BATCH_SIZE.to_string().as_str())
104            .execute()
105            .await
106        {
107            Ok(m) => m,
108            Err(e) => {
109                tracing::warn!(
110                    partition,
111                    error = %e,
112                    "edge_cancel_reconciler: SRANDMEMBER pending_cancel_groups failed"
113                );
114                return ScanResult { processed: 0, errors: 1 };
115            }
116        };
117
118        if members.is_empty() {
119            return ScanResult { processed: 0, errors: 0 };
120        }
121
122        let mut processed: u32 = 0;
123        let mut errors: u32 = 0;
124
125        for member in &members {
126            match self
127                .reconcile_one_group(client, &p, &pending_key, member)
128                .await
129            {
130                ReconcileOutcome::Acted => processed += 1,
131                ReconcileOutcome::NoOp => { /* dispatcher owns it */ }
132                ReconcileOutcome::Error => errors += 1,
133            }
134        }
135
136        ScanResult { processed, errors }
137    }
138}
139
140enum ReconcileOutcome {
141    /// SREM'd stale entry, or completed interrupted drain.
142    Acted,
143    /// Dispatcher still owns this tuple (siblings non-terminal) — left
144    /// alone; not counted as processed.
145    NoOp,
146    /// Malformed tuple or transient FCALL error.
147    Error,
148}
149
150/// Postgres-backend parallel to the Valkey scan loop.
151///
152/// Wave-6b (RFC-v0.7): delegates to
153/// [`ff_backend_postgres::reconcilers::edge_cancel_reconciler::reconciler_tick`],
154/// which mirrors RFC-016 Stage-D semantics (`sremmed_stale` /
155/// `completed_drain` / `no_op`) against `ff_pending_cancel_groups`.
156#[cfg(feature = "postgres")]
157pub async fn reconcile_via_postgres(
158    pool: &ff_backend_postgres::PgPool,
159    filter: &ff_core::backend::ScannerFilter,
160) -> Result<
161    ff_backend_postgres::reconcilers::edge_cancel_reconciler::ReconcileReport,
162    ff_core::engine_error::EngineError,
163> {
164    ff_backend_postgres::reconcilers::edge_cancel_reconciler::reconciler_tick(pool, filter).await
165}
166
167impl EdgeCancelReconciler {
168    async fn reconcile_one_group(
169        &self,
170        client: &ferriskey::Client,
171        flow_p: &Partition,
172        pending_key: &str,
173        member: &str,
174    ) -> ReconcileOutcome {
175        let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
176            Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
177            _ => {
178                tracing::warn!(
179                    raw = member,
180                    "edge_cancel_reconciler: malformed pending_cancel_groups \
181                     member; SREM-ing to avoid poison"
182                );
183                let _: Result<i64, _> = client
184                    .cmd("SREM")
185                    .arg(pending_key)
186                    .arg(member)
187                    .execute()
188                    .await;
189                return ReconcileOutcome::Error;
190            }
191        };
192
193        let flow_id = match FlowId::parse(flow_id_str) {
194            Ok(id) => id,
195            Err(_) => {
196                let _: Result<i64, _> = client
197                    .cmd("SREM")
198                    .arg(pending_key)
199                    .arg(member)
200                    .execute()
201                    .await;
202                return ReconcileOutcome::Error;
203            }
204        };
205
206        let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
207            Ok(id) => id,
208            Err(_) => {
209                let _: Result<i64, _> = client
210                    .cmd("SREM")
211                    .arg(pending_key)
212                    .arg(member)
213                    .execute()
214                    .await;
215                return ReconcileOutcome::Error;
216            }
217        };
218
219        let fctx = FlowKeyContext::new(flow_p, &flow_id);
220        let edgegroup_key = fctx.edgegroup(&downstream_eid);
221        let flow_id_s = flow_id.to_string();
222        let downstream_s = downstream_eid.to_string();
223        let keys = [pending_key, edgegroup_key.as_str()];
224        let argv = [flow_id_s.as_str(), downstream_s.as_str()];
225
226        let reply: Result<ferriskey::Value, _> = client
227            .fcall(
228                "ff_reconcile_sibling_cancel_group",
229                &keys,
230                &argv,
231            )
232            .await;
233
234        match reply {
235            Ok(val) => match extract_action(&val) {
236                Some(action) => {
237                    self.metrics.inc_sibling_cancel_reconcile(action);
238                    match action {
239                        "sremmed_stale" | "completed_drain" => {
240                            tracing::debug!(
241                                flow_id = %flow_id,
242                                downstream = %downstream_eid,
243                                action,
244                                "edge_cancel_reconciler: action applied"
245                            );
246                            ReconcileOutcome::Acted
247                        }
248                        // "no_op"
249                        _ => ReconcileOutcome::NoOp,
250                    }
251                }
252                None => {
253                    tracing::warn!(
254                        flow_id = %flow_id,
255                        downstream = %downstream_eid,
256                        "edge_cancel_reconciler: unparsable FCALL reply"
257                    );
258                    ReconcileOutcome::Error
259                }
260            },
261            Err(e) => {
262                tracing::warn!(
263                    flow_id = %flow_id,
264                    downstream = %downstream_eid,
265                    error = %e,
266                    "edge_cancel_reconciler: FCALL failed; retry next cycle"
267                );
268                ReconcileOutcome::Error
269            }
270        }
271    }
272}
273
274/// Lua returns `{1, "OK", <action>, <detail>}` for success. Pull the
275/// action string and coerce to a fixed `&'static str` so the metric
276/// cardinality is bounded to 3.
277fn extract_action(val: &ferriskey::Value) -> Option<&'static str> {
278    let arr = match val {
279        ferriskey::Value::Array(a) => a,
280        _ => return None,
281    };
282    let action_result = arr.get(2)?;
283    let action_val = action_result.as_ref().ok()?;
284    let action = match action_val {
285        ferriskey::Value::BulkString(b) => String::from_utf8_lossy(b).into_owned(),
286        ferriskey::Value::SimpleString(s) => s.clone(),
287        _ => return None,
288    };
289    match action.as_str() {
290        "sremmed_stale" => Some("sremmed_stale"),
291        "completed_drain" => Some("completed_drain"),
292        "no_op" => Some("no_op"),
293        _ => None,
294    }
295}