Skip to main content

ff_engine/scanner/
edge_cancel_dispatcher.rs

1//! RFC-016 Stage C — sibling-cancel dispatcher.
2//!
3//! Drains the per-flow-partition `ff:idx:{fp:N}:pending_cancel_groups`
4//! SET, populated atomically by `ff_resolve_dependency` whenever the
5//! AnyOf/Quorum resolver flips an edge group to `satisfied | impossible`
6//! under `OnSatisfied::CancelRemaining`. Each SET member is a
7//! `<flow_id>|<downstream_eid>` tuple pointing at the edge-group hash
8//! whose `cancel_siblings_pending_members` field carries the pipe-
9//! delimited list of still-running sibling execution ids captured at
10//! resolve time.
11//!
12//! For each group: read the sibling list + the cancel reason from the
13//! edgegroup hash, issue `ff_cancel_execution` (source = sibling_quorum)
14//! per sibling carrying `FailureReason::sibling_quorum_{satisfied,
15//! impossible}`, track per-id disposition (cancelled | already_terminal
16//! | not_found), and finally atomically SREM the tuple + HDEL the flag +
17//! members fields via `ff_drain_sibling_cancel_group` — a single Lua
18//! unit so a dispatcher crash mid-drain leaves either the pre-drain or
19//! post-drain state, never a torn in-between.
20//!
21//! Partition-batched, lightweight: the scanner processes at most
22//! `BATCH_SIZE` groups per partition per cycle. `LetRun` policies are
23//! structurally excluded — `ff_resolve_dependency` NEVER flags or
24//! indexes `LetRun` groups (RFC-016 §5, pure-LetRun adjudication
25//! 2026-04-23) so the dispatcher cannot see them. The scanner therefore
26//! does not re-check policy; if a tuple is in the index SET, its group
27//! asked for a cancel.
28//!
29//! Stage C scope per RFC-016 §11: the dispatcher + the
30//! `pending_cancel_groups` SET + basic metrics. Stage D adds the
31//! `LetRun` late-terminal metric; Stage E adds the full reconciler +
32//! observability polish. Crash-mid-cancel recovery is Stage D.
33
34use std::time::Duration;
35
36use ff_core::backend::ScannerFilter;
37use ff_core::keys::{
38    ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys,
39};
40use ff_core::partition::{
41    execution_partition, Partition, PartitionConfig, PartitionFamily,
42};
43use ff_core::types::{
44    AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId,
45    WorkerInstanceId,
46};
47
48use super::{ScanResult, Scanner};
49
50/// Max groups processed per partition per cycle. Bounds worst-case
51/// per-cycle Lua work at BATCH_SIZE × (sibling-list size + 1 drain)
52/// calls. 50 matches the `cancel_reconciler` batch.
53const BATCH_SIZE: u32 = 50;
54
55/// Structural cap on sibling-list size honoured per group per cycle.
56/// Protects against a pathological edgegroup whose members string grew
57/// beyond the §4.1 128-soft-cap. The dispatcher still drains the full
58/// list — just in additional cycles — because the drain FCALL only
59/// fires after the Rust loop has issued every cancel it enumerated.
60const MAX_SIBLINGS_PER_GROUP: usize = 1024;
61
62pub struct EdgeCancelDispatcher {
63    interval: Duration,
64    partition_config: PartitionConfig,
65    filter: ScannerFilter,
66    metrics: std::sync::Arc<ff_observability::Metrics>,
67}
68
69impl EdgeCancelDispatcher {
70    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
71        Self::with_filter(interval, partition_config, ScannerFilter::default())
72    }
73
74    pub fn with_filter(
75        interval: Duration,
76        partition_config: PartitionConfig,
77        filter: ScannerFilter,
78    ) -> Self {
79        Self::with_filter_and_metrics(
80            interval,
81            partition_config,
82            filter,
83            std::sync::Arc::new(ff_observability::Metrics::new()),
84        )
85    }
86
87    pub fn with_filter_and_metrics(
88        interval: Duration,
89        partition_config: PartitionConfig,
90        filter: ScannerFilter,
91        metrics: std::sync::Arc<ff_observability::Metrics>,
92    ) -> Self {
93        Self {
94            interval,
95            partition_config,
96            filter,
97            metrics,
98        }
99    }
100}
101
102impl Scanner for EdgeCancelDispatcher {
103    fn name(&self) -> &'static str {
104        "edge_cancel_dispatcher"
105    }
106
107    fn interval(&self) -> Duration {
108        self.interval
109    }
110
111    fn filter(&self) -> &ScannerFilter {
112        &self.filter
113    }
114
115    async fn scan_partition(
116        &self,
117        client: &ferriskey::Client,
118        partition: u16,
119    ) -> ScanResult {
120        let p = Partition {
121            family: PartitionFamily::Flow,
122            index: partition,
123        };
124        let fidx = FlowIndexKeys::new(&p);
125        let pending_key = fidx.pending_cancel_groups();
126
127        // SRANDMEMBER with count: pull up to BATCH_SIZE members without
128        // disturbing the SET (the drain FCALL removes successful
129        // entries atomically after cancels land). A failure here is
130        // transient — log + skip; the next cycle retries.
131        let members: Vec<String> = match client
132            .cmd("SRANDMEMBER")
133            .arg(&pending_key)
134            .arg(BATCH_SIZE.to_string().as_str())
135            .execute()
136            .await
137        {
138            Ok(m) => m,
139            Err(e) => {
140                tracing::warn!(
141                    partition,
142                    error = %e,
143                    "edge_cancel_dispatcher: SRANDMEMBER pending_cancel_groups failed"
144                );
145                return ScanResult { processed: 0, errors: 1 };
146            }
147        };
148
149        if members.is_empty() {
150            return ScanResult { processed: 0, errors: 0 };
151        }
152
153        let mut processed: u32 = 0;
154        let mut errors: u32 = 0;
155
156        for member in &members {
157            match self
158                .dispatch_one_group(client, &p, &pending_key, member)
159                .await
160            {
161                GroupOutcome::Drained => processed += 1,
162                GroupOutcome::SkippedRetry => { /* next cycle */ }
163                GroupOutcome::Error => errors += 1,
164            }
165        }
166
167        ScanResult { processed, errors }
168    }
169}
170
171enum GroupOutcome {
172    /// Fully drained: cancels issued + drain FCALL acked.
173    Drained,
174    /// Transient failure (transport / missing sibling metadata);
175    /// leave the SET entry + flag for next cycle.
176    SkippedRetry,
177    /// Malformed / unrecoverable; counted as an error but moved on.
178    Error,
179}
180
181/// Postgres-backend parallel to the Valkey scan loop.
182///
183/// Wave-6b (RFC-v0.7): delegates to
184/// [`ff_backend_postgres::reconcilers::edge_cancel_dispatcher::dispatcher_tick`],
185/// which mirrors RFC-016 Stage-C semantics against `ff_edge_group` +
186/// `ff_pending_cancel_groups` under a per-group transaction with
187/// `FOR UPDATE SKIP LOCKED` coalescing.
188#[cfg(feature = "postgres")]
189pub async fn dispatch_via_postgres(
190    pool: &ff_backend_postgres::PgPool,
191    filter: &ff_core::backend::ScannerFilter,
192) -> Result<
193    ff_backend_postgres::reconcilers::edge_cancel_dispatcher::DispatchReport,
194    ff_core::engine_error::EngineError,
195> {
196    ff_backend_postgres::reconcilers::edge_cancel_dispatcher::dispatcher_tick(pool, filter).await
197}
198
199impl EdgeCancelDispatcher {
200    async fn dispatch_one_group(
201        &self,
202        client: &ferriskey::Client,
203        flow_p: &Partition,
204        pending_key: &str,
205        member: &str,
206    ) -> GroupOutcome {
207        // Parse `<flow_id>|<downstream_eid>`
208        let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
209            Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
210            _ => {
211                tracing::warn!(
212                    raw = member,
213                    "edge_cancel_dispatcher: malformed pending_cancel_groups \
214                     member; SREM-ing to avoid poison"
215                );
216                let _: Result<i64, _> = client
217                    .cmd("SREM")
218                    .arg(pending_key)
219                    .arg(member)
220                    .execute()
221                    .await;
222                return GroupOutcome::Error;
223            }
224        };
225
226        let flow_id = match FlowId::parse(flow_id_str) {
227            Ok(id) => id,
228            Err(_) => {
229                let _: Result<i64, _> = client
230                    .cmd("SREM")
231                    .arg(pending_key)
232                    .arg(member)
233                    .execute()
234                    .await;
235                return GroupOutcome::Error;
236            }
237        };
238
239        let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
240            Ok(id) => id,
241            Err(_) => {
242                let _: Result<i64, _> = client
243                    .cmd("SREM")
244                    .arg(pending_key)
245                    .arg(member)
246                    .execute()
247                    .await;
248                return GroupOutcome::Error;
249            }
250        };
251
252        let fctx = FlowKeyContext::new(flow_p, &flow_id);
253        let edgegroup_key = fctx.edgegroup(&downstream_eid);
254
255        // Read reason + members list from the edgegroup hash. A missing
256        // hash (retention-deleted) is still drainable — ff_drain acks
257        // with `drained_sans_group`.
258        let fields: Vec<Option<String>> = match client
259            .cmd("HMGET")
260            .arg(&edgegroup_key)
261            .arg("cancel_siblings_reason")
262            .arg("cancel_siblings_pending_members")
263            .arg("cancel_siblings_pending_flag")
264            .execute()
265            .await
266        {
267            Ok(v) => v,
268            Err(e) => {
269                tracing::debug!(
270                    flow_id = %flow_id,
271                    downstream = %downstream_eid,
272                    error = %e,
273                    "edge_cancel_dispatcher: HMGET edgegroup failed; retry next cycle"
274                );
275                return GroupOutcome::SkippedRetry;
276            }
277        };
278
279        let reason = fields.first().and_then(|v| v.clone()).unwrap_or_default();
280        let members_raw = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
281        let flag = fields.get(2).and_then(|v| v.clone()).unwrap_or_default();
282
283        // Inconsistency: flag absent but tuple in SET, OR flag true but
284        // reason missing. Log + attempt drain; the drain call idempotently
285        // cleans up.
286        if flag.is_empty() && members_raw.is_empty() {
287            tracing::debug!(
288                flow_id = %flow_id,
289                downstream = %downstream_eid,
290                "edge_cancel_dispatcher: group has no pending flag / members; \
291                 draining tuple (likely already drained or racing retention)"
292            );
293            return self
294                .drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
295                .await;
296        }
297
298        // Determine reason code. Default to `sibling_quorum_satisfied`
299        // if absent — the group was indexed in a CancelRemaining state
300        // so SOME reason was written; be defensive rather than emit an
301        // empty reason to downstream cancels.
302        let reason_str = if reason.is_empty() {
303            "sibling_quorum_satisfied"
304        } else {
305            reason.as_str()
306        };
307
308        // Enumerate siblings + issue cancels.
309        let sibling_eids: Vec<&str> = members_raw
310            .split('|')
311            .filter(|s| !s.is_empty())
312            .take(MAX_SIBLINGS_PER_GROUP)
313            .collect();
314
315        // Normalise reason to a &'static str for the fixed-cardinality
316        // metric label. Any unrecognised string is coerced to
317        // `sibling_quorum_satisfied` (the default path) rather than
318        // exploding cardinality.
319        let static_reason: &'static str = match reason_str {
320            "sibling_quorum_impossible" => "sibling_quorum_impossible",
321            _ => "sibling_quorum_satisfied",
322        };
323
324        let mut cancel_dispositions: [u64; 3] = [0, 0, 0]; // cancelled, already_terminal, not_found
325        for sib_str in &sibling_eids {
326            let sib_eid = match ExecutionId::parse(sib_str) {
327                Ok(id) => id,
328                Err(e) => {
329                    tracing::warn!(
330                        flow_id = %flow_id,
331                        raw = %sib_str,
332                        error = %e,
333                        "edge_cancel_dispatcher: malformed sibling eid; counting as not_found"
334                    );
335                    cancel_dispositions[2] += 1;
336                    continue;
337                }
338            };
339
340            self.metrics.inc_sibling_cancel_dispatched(static_reason);
341            match cancel_sibling(
342                client,
343                &self.partition_config,
344                &sib_eid,
345                reason_str,
346            )
347            .await
348            {
349                SiblingDisposition::Cancelled => {
350                    cancel_dispositions[0] += 1;
351                    self.metrics.inc_sibling_cancel_disposition("cancelled");
352                }
353                SiblingDisposition::AlreadyTerminal => {
354                    cancel_dispositions[1] += 1;
355                    self.metrics
356                        .inc_sibling_cancel_disposition("already_terminal");
357                }
358                SiblingDisposition::NotFound => {
359                    cancel_dispositions[2] += 1;
360                    self.metrics.inc_sibling_cancel_disposition("not_found");
361                }
362                SiblingDisposition::TransientError => {
363                    // One sibling flake pauses drain for this group;
364                    // others will be retried after the next dispatch
365                    // tick sees the same members list. Don't half-drain.
366                    tracing::debug!(
367                        flow_id = %flow_id,
368                        sibling = %sib_eid,
369                        "edge_cancel_dispatcher: transient cancel error; retry group next cycle"
370                    );
371                    return GroupOutcome::SkippedRetry;
372                }
373            }
374        }
375
376        for (i, label) in ["cancelled", "already_terminal", "not_found"].iter().enumerate() {
377            if cancel_dispositions[i] > 0 {
378                tracing::debug!(
379                    flow_id = %flow_id,
380                    downstream = %downstream_eid,
381                    reason = %static_reason,
382                    disposition = label,
383                    count = cancel_dispositions[i],
384                    "edge_cancel_dispatcher: sibling cancel disposition"
385                );
386            }
387        }
388
389        // Drain: atomic SREM + HDEL.
390        self.drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
391            .await
392    }
393
394    async fn drain_group(
395        &self,
396        client: &ferriskey::Client,
397        pending_key: &str,
398        edgegroup_key: &str,
399        flow_id: &FlowId,
400        downstream_eid: &ExecutionId,
401    ) -> GroupOutcome {
402        let flow_id_str = flow_id.to_string();
403        let downstream_eid_str = downstream_eid.to_string();
404        let keys = [pending_key, edgegroup_key];
405        let argv = [flow_id_str.as_str(), downstream_eid_str.as_str()];
406        match client
407            .fcall::<ferriskey::Value>(
408                "ff_drain_sibling_cancel_group",
409                &keys,
410                &argv,
411            )
412            .await
413        {
414            Ok(_) => GroupOutcome::Drained,
415            Err(e) => {
416                tracing::warn!(
417                    flow_id = %flow_id,
418                    downstream = %downstream_eid,
419                    error = %e,
420                    "edge_cancel_dispatcher: drain FCALL failed; retry next cycle"
421                );
422                GroupOutcome::SkippedRetry
423            }
424        }
425    }
426}
427
428#[derive(Debug, Clone, Copy)]
429enum SiblingDisposition {
430    Cancelled,
431    AlreadyTerminal,
432    NotFound,
433    TransientError,
434}
435
436/// Issue `ff_cancel_execution` against a sibling. Mirrors
437/// `cancel_reconciler::cancel_member` KEYS layout but with
438/// `source = "operator_override"` — sibling-quorum cancels bypass
439/// lease-fence checks (no worker holds a lease stake here; the engine
440/// itself is pulling the plug). The `reason` is carried through as
441/// ARGV[2] and lands verbatim in exec_core's `cancellation_reason`.
442async fn cancel_sibling(
443    client: &ferriskey::Client,
444    partition_config: &PartitionConfig,
445    sib_eid: &ExecutionId,
446    reason: &str,
447) -> SiblingDisposition {
448    let partition = execution_partition(sib_eid, partition_config);
449    let ctx = ExecKeyContext::new(&partition, sib_eid);
450    let idx = IndexKeys::new(&partition);
451
452    // Pre-read lane + dynamic fields so the 21 KEYS target the correct
453    // lane/worker indexes. Matches `cancel_reconciler::cancel_member`.
454    let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
455        Ok(v) => v,
456        Err(_) => return SiblingDisposition::TransientError,
457    };
458    let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
459
460    let dyn_fields: Vec<Option<String>> = match client
461        .cmd("HMGET")
462        .arg(ctx.core())
463        .arg("current_attempt_index")
464        .arg("current_waitpoint_id")
465        .arg("current_worker_instance_id")
466        .execute()
467        .await
468    {
469        Ok(v) => v,
470        Err(_) => return SiblingDisposition::TransientError,
471    };
472
473    let att_idx_val = dyn_fields
474        .first()
475        .and_then(|v| v.as_ref())
476        .and_then(|s| s.parse::<u32>().ok())
477        .unwrap_or(0);
478    let att_idx = AttemptIndex::new(att_idx_val);
479    let wp_id_str = dyn_fields
480        .get(1)
481        .and_then(|v| v.as_ref())
482        .cloned()
483        .unwrap_or_default();
484    let wp_id = if wp_id_str.is_empty() {
485        WaitpointId::new()
486    } else {
487        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
488    };
489    let wiid_str = dyn_fields
490        .get(2)
491        .and_then(|v| v.as_ref())
492        .cloned()
493        .unwrap_or_default();
494    let wiid = WorkerInstanceId::new(&wiid_str);
495
496    let keys: Vec<String> = vec![
497        ctx.core(),
498        ctx.attempt_hash(att_idx),
499        ctx.stream_meta(att_idx),
500        ctx.lease_current(),
501        ctx.lease_history(),
502        idx.lease_expiry(),
503        idx.worker_leases(&wiid),
504        ctx.suspension_current(),
505        ctx.waitpoint(&wp_id),
506        ctx.waitpoint_condition(&wp_id),
507        idx.suspension_timeout(),
508        idx.lane_terminal(&lane),
509        idx.attempt_timeout(),
510        idx.execution_deadline(),
511        idx.lane_eligible(&lane),
512        idx.lane_delayed(&lane),
513        idx.lane_blocked_dependencies(&lane),
514        idx.lane_blocked_budget(&lane),
515        idx.lane_blocked_quota(&lane),
516        idx.lane_blocked_route(&lane),
517        idx.lane_blocked_operator(&lane),
518    ];
519    let argv: Vec<String> = vec![
520        sib_eid.to_string(),
521        reason.to_owned(),
522        "operator_override".to_owned(),
523        String::new(),
524        String::new(),
525    ];
526    let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
527    let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
528
529    match client
530        .fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar)
531        .await
532    {
533        Ok(ferriskey::Value::Array(arr)) => match arr.first() {
534            Some(Ok(ferriskey::Value::Int(1))) => SiblingDisposition::Cancelled,
535            Some(Ok(ferriskey::Value::Int(0))) => {
536                let code = arr
537                    .get(1)
538                    .and_then(|r| match r {
539                        Ok(ferriskey::Value::BulkString(b)) => {
540                            Some(String::from_utf8_lossy(b).into_owned())
541                        }
542                        Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
543                        _ => None,
544                    })
545                    .unwrap_or_default();
546                match code.as_str() {
547                    "execution_not_active" => SiblingDisposition::AlreadyTerminal,
548                    "execution_not_found" => SiblingDisposition::NotFound,
549                    _ => SiblingDisposition::TransientError,
550                }
551            }
552            _ => SiblingDisposition::TransientError,
553        },
554        Ok(_) => SiblingDisposition::TransientError,
555        Err(_) => SiblingDisposition::TransientError,
556    }
557}