Skip to main content

ff_engine/scanner/
edge_cancel_dispatcher.rs

1//! RFC-016 Stage C — sibling-cancel dispatcher.
2//!
3//! Drains the per-flow-partition `ff:idx:{fp:N}:pending_cancel_groups`
4//! SET, populated atomically by `ff_resolve_dependency` whenever the
5//! AnyOf/Quorum resolver flips an edge group to `satisfied | impossible`
6//! under `OnSatisfied::CancelRemaining`. Each SET member is a
7//! `<flow_id>|<downstream_eid>` tuple pointing at the edge-group hash
8//! whose `cancel_siblings_pending_members` field carries the pipe-
9//! delimited list of still-running sibling execution ids captured at
10//! resolve time.
11//!
12//! For each group: read the sibling list + the cancel reason from the
13//! edgegroup hash, issue `ff_cancel_execution` (source = sibling_quorum)
14//! per sibling carrying `FailureReason::sibling_quorum_{satisfied,
15//! impossible}`, track per-id disposition (cancelled | already_terminal
16//! | not_found), and finally atomically SREM the tuple + HDEL the flag +
17//! members fields via `ff_drain_sibling_cancel_group` — a single Lua
18//! unit so a dispatcher crash mid-drain leaves either the pre-drain or
19//! post-drain state, never a torn in-between.
20//!
21//! Partition-batched, lightweight: the scanner processes at most
22//! `BATCH_SIZE` groups per partition per cycle. `LetRun` policies are
23//! structurally excluded — `ff_resolve_dependency` NEVER flags or
24//! indexes `LetRun` groups (RFC-016 §5, pure-LetRun adjudication
25//! 2026-04-23) so the dispatcher cannot see them. The scanner therefore
26//! does not re-check policy; if a tuple is in the index SET, its group
27//! asked for a cancel.
28//!
29//! Stage C scope per RFC-016 §11: the dispatcher + the
30//! `pending_cancel_groups` SET + basic metrics. Stage D adds the
31//! `LetRun` late-terminal metric; Stage E adds the full reconciler +
32//! observability polish. Crash-mid-cancel recovery is Stage D.
33
34use std::time::Duration;
35
36use ff_core::backend::ScannerFilter;
37use ff_core::keys::{
38    ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys,
39};
40use ff_core::partition::{
41    execution_partition, Partition, PartitionConfig, PartitionFamily,
42};
43use ff_core::types::{
44    AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId,
45    WorkerInstanceId,
46};
47
48use super::{ScanResult, Scanner};
49
50/// Max groups processed per partition per cycle. Bounds worst-case
51/// per-cycle Lua work at BATCH_SIZE × (sibling-list size + 1 drain)
52/// calls. 50 matches the `cancel_reconciler` batch.
53const BATCH_SIZE: u32 = 50;
54
55/// Structural cap on sibling-list size honoured per group per cycle.
56/// Protects against a pathological edgegroup whose members string grew
57/// beyond the §4.1 128-soft-cap. The dispatcher still drains the full
58/// list — just in additional cycles — because the drain FCALL only
59/// fires after the Rust loop has issued every cancel it enumerated.
60const MAX_SIBLINGS_PER_GROUP: usize = 1024;
61
62pub struct EdgeCancelDispatcher {
63    interval: Duration,
64    partition_config: PartitionConfig,
65    filter: ScannerFilter,
66    metrics: std::sync::Arc<ff_observability::Metrics>,
67}
68
69impl EdgeCancelDispatcher {
70    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
71        Self::with_filter(interval, partition_config, ScannerFilter::default())
72    }
73
74    pub fn with_filter(
75        interval: Duration,
76        partition_config: PartitionConfig,
77        filter: ScannerFilter,
78    ) -> Self {
79        Self::with_filter_and_metrics(
80            interval,
81            partition_config,
82            filter,
83            std::sync::Arc::new(ff_observability::Metrics::new()),
84        )
85    }
86
87    pub fn with_filter_and_metrics(
88        interval: Duration,
89        partition_config: PartitionConfig,
90        filter: ScannerFilter,
91        metrics: std::sync::Arc<ff_observability::Metrics>,
92    ) -> Self {
93        Self {
94            interval,
95            partition_config,
96            filter,
97            metrics,
98        }
99    }
100}
101
102impl Scanner for EdgeCancelDispatcher {
103    fn name(&self) -> &'static str {
104        "edge_cancel_dispatcher"
105    }
106
107    fn interval(&self) -> Duration {
108        self.interval
109    }
110
111    fn filter(&self) -> &ScannerFilter {
112        &self.filter
113    }
114
115    async fn scan_partition(
116        &self,
117        client: &ferriskey::Client,
118        partition: u16,
119    ) -> ScanResult {
120        let p = Partition {
121            family: PartitionFamily::Flow,
122            index: partition,
123        };
124        let fidx = FlowIndexKeys::new(&p);
125        let pending_key = fidx.pending_cancel_groups();
126
127        // SRANDMEMBER with count: pull up to BATCH_SIZE members without
128        // disturbing the SET (the drain FCALL removes successful
129        // entries atomically after cancels land). A failure here is
130        // transient — log + skip; the next cycle retries.
131        let members: Vec<String> = match client
132            .cmd("SRANDMEMBER")
133            .arg(&pending_key)
134            .arg(BATCH_SIZE.to_string().as_str())
135            .execute()
136            .await
137        {
138            Ok(m) => m,
139            Err(e) => {
140                tracing::warn!(
141                    partition,
142                    error = %e,
143                    "edge_cancel_dispatcher: SRANDMEMBER pending_cancel_groups failed"
144                );
145                return ScanResult { processed: 0, errors: 1 };
146            }
147        };
148
149        if members.is_empty() {
150            return ScanResult { processed: 0, errors: 0 };
151        }
152
153        let mut processed: u32 = 0;
154        let mut errors: u32 = 0;
155
156        for member in &members {
157            match self
158                .dispatch_one_group(client, &p, &pending_key, member)
159                .await
160            {
161                GroupOutcome::Drained => processed += 1,
162                GroupOutcome::SkippedRetry => { /* next cycle */ }
163                GroupOutcome::Error => errors += 1,
164            }
165        }
166
167        ScanResult { processed, errors }
168    }
169}
170
171enum GroupOutcome {
172    /// Fully drained: cancels issued + drain FCALL acked.
173    Drained,
174    /// Transient failure (transport / missing sibling metadata);
175    /// leave the SET entry + flag for next cycle.
176    SkippedRetry,
177    /// Malformed / unrecoverable; counted as an error but moved on.
178    Error,
179}
180
181impl EdgeCancelDispatcher {
182    async fn dispatch_one_group(
183        &self,
184        client: &ferriskey::Client,
185        flow_p: &Partition,
186        pending_key: &str,
187        member: &str,
188    ) -> GroupOutcome {
189        // Parse `<flow_id>|<downstream_eid>`
190        let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
191            Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
192            _ => {
193                tracing::warn!(
194                    raw = member,
195                    "edge_cancel_dispatcher: malformed pending_cancel_groups \
196                     member; SREM-ing to avoid poison"
197                );
198                let _: Result<i64, _> = client
199                    .cmd("SREM")
200                    .arg(pending_key)
201                    .arg(member)
202                    .execute()
203                    .await;
204                return GroupOutcome::Error;
205            }
206        };
207
208        let flow_id = match FlowId::parse(flow_id_str) {
209            Ok(id) => id,
210            Err(_) => {
211                let _: Result<i64, _> = client
212                    .cmd("SREM")
213                    .arg(pending_key)
214                    .arg(member)
215                    .execute()
216                    .await;
217                return GroupOutcome::Error;
218            }
219        };
220
221        let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
222            Ok(id) => id,
223            Err(_) => {
224                let _: Result<i64, _> = client
225                    .cmd("SREM")
226                    .arg(pending_key)
227                    .arg(member)
228                    .execute()
229                    .await;
230                return GroupOutcome::Error;
231            }
232        };
233
234        let fctx = FlowKeyContext::new(flow_p, &flow_id);
235        let edgegroup_key = fctx.edgegroup(&downstream_eid);
236
237        // Read reason + members list from the edgegroup hash. A missing
238        // hash (retention-deleted) is still drainable — ff_drain acks
239        // with `drained_sans_group`.
240        let fields: Vec<Option<String>> = match client
241            .cmd("HMGET")
242            .arg(&edgegroup_key)
243            .arg("cancel_siblings_reason")
244            .arg("cancel_siblings_pending_members")
245            .arg("cancel_siblings_pending_flag")
246            .execute()
247            .await
248        {
249            Ok(v) => v,
250            Err(e) => {
251                tracing::debug!(
252                    flow_id = %flow_id,
253                    downstream = %downstream_eid,
254                    error = %e,
255                    "edge_cancel_dispatcher: HMGET edgegroup failed; retry next cycle"
256                );
257                return GroupOutcome::SkippedRetry;
258            }
259        };
260
261        let reason = fields.first().and_then(|v| v.clone()).unwrap_or_default();
262        let members_raw = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
263        let flag = fields.get(2).and_then(|v| v.clone()).unwrap_or_default();
264
265        // Inconsistency: flag absent but tuple in SET, OR flag true but
266        // reason missing. Log + attempt drain; the drain call idempotently
267        // cleans up.
268        if flag.is_empty() && members_raw.is_empty() {
269            tracing::debug!(
270                flow_id = %flow_id,
271                downstream = %downstream_eid,
272                "edge_cancel_dispatcher: group has no pending flag / members; \
273                 draining tuple (likely already drained or racing retention)"
274            );
275            return self
276                .drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
277                .await;
278        }
279
280        // Determine reason code. Default to `sibling_quorum_satisfied`
281        // if absent — the group was indexed in a CancelRemaining state
282        // so SOME reason was written; be defensive rather than emit an
283        // empty reason to downstream cancels.
284        let reason_str = if reason.is_empty() {
285            "sibling_quorum_satisfied"
286        } else {
287            reason.as_str()
288        };
289
290        // Enumerate siblings + issue cancels.
291        let sibling_eids: Vec<&str> = members_raw
292            .split('|')
293            .filter(|s| !s.is_empty())
294            .take(MAX_SIBLINGS_PER_GROUP)
295            .collect();
296
297        // Normalise reason to a &'static str for the fixed-cardinality
298        // metric label. Any unrecognised string is coerced to
299        // `sibling_quorum_satisfied` (the default path) rather than
300        // exploding cardinality.
301        let static_reason: &'static str = match reason_str {
302            "sibling_quorum_impossible" => "sibling_quorum_impossible",
303            _ => "sibling_quorum_satisfied",
304        };
305
306        let mut cancel_dispositions: [u64; 3] = [0, 0, 0]; // cancelled, already_terminal, not_found
307        for sib_str in &sibling_eids {
308            let sib_eid = match ExecutionId::parse(sib_str) {
309                Ok(id) => id,
310                Err(e) => {
311                    tracing::warn!(
312                        flow_id = %flow_id,
313                        raw = %sib_str,
314                        error = %e,
315                        "edge_cancel_dispatcher: malformed sibling eid; counting as not_found"
316                    );
317                    cancel_dispositions[2] += 1;
318                    continue;
319                }
320            };
321
322            self.metrics.inc_sibling_cancel_dispatched(static_reason);
323            match cancel_sibling(
324                client,
325                &self.partition_config,
326                &sib_eid,
327                reason_str,
328            )
329            .await
330            {
331                SiblingDisposition::Cancelled => {
332                    cancel_dispositions[0] += 1;
333                    self.metrics.inc_sibling_cancel_disposition("cancelled");
334                }
335                SiblingDisposition::AlreadyTerminal => {
336                    cancel_dispositions[1] += 1;
337                    self.metrics
338                        .inc_sibling_cancel_disposition("already_terminal");
339                }
340                SiblingDisposition::NotFound => {
341                    cancel_dispositions[2] += 1;
342                    self.metrics.inc_sibling_cancel_disposition("not_found");
343                }
344                SiblingDisposition::TransientError => {
345                    // One sibling flake pauses drain for this group;
346                    // others will be retried after the next dispatch
347                    // tick sees the same members list. Don't half-drain.
348                    tracing::debug!(
349                        flow_id = %flow_id,
350                        sibling = %sib_eid,
351                        "edge_cancel_dispatcher: transient cancel error; retry group next cycle"
352                    );
353                    return GroupOutcome::SkippedRetry;
354                }
355            }
356        }
357
358        for (i, label) in ["cancelled", "already_terminal", "not_found"].iter().enumerate() {
359            if cancel_dispositions[i] > 0 {
360                tracing::debug!(
361                    flow_id = %flow_id,
362                    downstream = %downstream_eid,
363                    reason = %static_reason,
364                    disposition = label,
365                    count = cancel_dispositions[i],
366                    "edge_cancel_dispatcher: sibling cancel disposition"
367                );
368            }
369        }
370
371        // Drain: atomic SREM + HDEL.
372        self.drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
373            .await
374    }
375
376    async fn drain_group(
377        &self,
378        client: &ferriskey::Client,
379        pending_key: &str,
380        edgegroup_key: &str,
381        flow_id: &FlowId,
382        downstream_eid: &ExecutionId,
383    ) -> GroupOutcome {
384        let flow_id_str = flow_id.to_string();
385        let downstream_eid_str = downstream_eid.to_string();
386        let keys = [pending_key, edgegroup_key];
387        let argv = [flow_id_str.as_str(), downstream_eid_str.as_str()];
388        match client
389            .fcall::<ferriskey::Value>(
390                "ff_drain_sibling_cancel_group",
391                &keys,
392                &argv,
393            )
394            .await
395        {
396            Ok(_) => GroupOutcome::Drained,
397            Err(e) => {
398                tracing::warn!(
399                    flow_id = %flow_id,
400                    downstream = %downstream_eid,
401                    error = %e,
402                    "edge_cancel_dispatcher: drain FCALL failed; retry next cycle"
403                );
404                GroupOutcome::SkippedRetry
405            }
406        }
407    }
408}
409
410#[derive(Debug, Clone, Copy)]
411enum SiblingDisposition {
412    Cancelled,
413    AlreadyTerminal,
414    NotFound,
415    TransientError,
416}
417
418/// Issue `ff_cancel_execution` against a sibling. Mirrors
419/// `cancel_reconciler::cancel_member` KEYS layout but with
420/// `source = "operator_override"` — sibling-quorum cancels bypass
421/// lease-fence checks (no worker holds a lease stake here; the engine
422/// itself is pulling the plug). The `reason` is carried through as
423/// ARGV[2] and lands verbatim in exec_core's `cancellation_reason`.
424async fn cancel_sibling(
425    client: &ferriskey::Client,
426    partition_config: &PartitionConfig,
427    sib_eid: &ExecutionId,
428    reason: &str,
429) -> SiblingDisposition {
430    let partition = execution_partition(sib_eid, partition_config);
431    let ctx = ExecKeyContext::new(&partition, sib_eid);
432    let idx = IndexKeys::new(&partition);
433
434    // Pre-read lane + dynamic fields so the 21 KEYS target the correct
435    // lane/worker indexes. Matches `cancel_reconciler::cancel_member`.
436    let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
437        Ok(v) => v,
438        Err(_) => return SiblingDisposition::TransientError,
439    };
440    let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
441
442    let dyn_fields: Vec<Option<String>> = match client
443        .cmd("HMGET")
444        .arg(ctx.core())
445        .arg("current_attempt_index")
446        .arg("current_waitpoint_id")
447        .arg("current_worker_instance_id")
448        .execute()
449        .await
450    {
451        Ok(v) => v,
452        Err(_) => return SiblingDisposition::TransientError,
453    };
454
455    let att_idx_val = dyn_fields
456        .first()
457        .and_then(|v| v.as_ref())
458        .and_then(|s| s.parse::<u32>().ok())
459        .unwrap_or(0);
460    let att_idx = AttemptIndex::new(att_idx_val);
461    let wp_id_str = dyn_fields
462        .get(1)
463        .and_then(|v| v.as_ref())
464        .cloned()
465        .unwrap_or_default();
466    let wp_id = if wp_id_str.is_empty() {
467        WaitpointId::new()
468    } else {
469        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
470    };
471    let wiid_str = dyn_fields
472        .get(2)
473        .and_then(|v| v.as_ref())
474        .cloned()
475        .unwrap_or_default();
476    let wiid = WorkerInstanceId::new(&wiid_str);
477
478    let keys: Vec<String> = vec![
479        ctx.core(),
480        ctx.attempt_hash(att_idx),
481        ctx.stream_meta(att_idx),
482        ctx.lease_current(),
483        ctx.lease_history(),
484        idx.lease_expiry(),
485        idx.worker_leases(&wiid),
486        ctx.suspension_current(),
487        ctx.waitpoint(&wp_id),
488        ctx.waitpoint_condition(&wp_id),
489        idx.suspension_timeout(),
490        idx.lane_terminal(&lane),
491        idx.attempt_timeout(),
492        idx.execution_deadline(),
493        idx.lane_eligible(&lane),
494        idx.lane_delayed(&lane),
495        idx.lane_blocked_dependencies(&lane),
496        idx.lane_blocked_budget(&lane),
497        idx.lane_blocked_quota(&lane),
498        idx.lane_blocked_route(&lane),
499        idx.lane_blocked_operator(&lane),
500    ];
501    let argv: Vec<String> = vec![
502        sib_eid.to_string(),
503        reason.to_owned(),
504        "operator_override".to_owned(),
505        String::new(),
506        String::new(),
507    ];
508    let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
509    let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
510
511    match client
512        .fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar)
513        .await
514    {
515        Ok(ferriskey::Value::Array(arr)) => match arr.first() {
516            Some(Ok(ferriskey::Value::Int(1))) => SiblingDisposition::Cancelled,
517            Some(Ok(ferriskey::Value::Int(0))) => {
518                let code = arr
519                    .get(1)
520                    .and_then(|r| match r {
521                        Ok(ferriskey::Value::BulkString(b)) => {
522                            Some(String::from_utf8_lossy(b).into_owned())
523                        }
524                        Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
525                        _ => None,
526                    })
527                    .unwrap_or_default();
528                match code.as_str() {
529                    "execution_not_active" => SiblingDisposition::AlreadyTerminal,
530                    "execution_not_found" => SiblingDisposition::NotFound,
531                    _ => SiblingDisposition::TransientError,
532                }
533            }
534            _ => SiblingDisposition::TransientError,
535        },
536        Ok(_) => SiblingDisposition::TransientError,
537        Err(_) => SiblingDisposition::TransientError,
538    }
539}