Skip to main content

ff_engine/scanner/
cancel_reconciler.rs

1//! Cancel-flow dispatch reconciler.
2//!
3//! Safety net for async `cancel_flow` member dispatch. `ff-server`'s
4//! `cancel_flow_inner` spawns a background task that cancels each
5//! member with bounded transport retries and SREMs from `pending_cancels`
6//! on success. If the process crashes between `CancellationScheduled`
7//! returning and the dispatch finishing — or a member's cancel hits a
8//! permanent error the bounded retry can't recover — the member would
9//! otherwise escape cancellation because exec_core lives on `{p:N}`
10//! and can't atomically consult `flow_core` on `{fp:N}` during claim.
11//!
12//! This scanner drains the per-partition `cancel_backlog` ZSET: for
13//! each flow whose grace window (score) has elapsed, read
14//! `pending_cancels`, fire `ff_cancel_execution` per member, and ack
15//! via `ff_ack_cancel_member` to SREM. When the set empties, the ack
16//! ZREMs the flow from the backlog.
17//!
18//! The complete close of the cross-slot claim race (a worker claiming
19//! a member between the flow's cancel and the member's own cancel)
20//! still requires cross-slot flow-state consultation, which is out of
21//! scope. This reconciler shrinks the escape window from "forever" to
22//! "one reconciler interval plus grace_ms."
23
24use std::time::Duration;
25
26use ff_core::backend::ScannerFilter;
27use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
28use ff_core::partition::{
29    execution_partition, Partition, PartitionConfig, PartitionFamily,
30};
31use ff_core::types::{AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId, WorkerInstanceId};
32
33use super::{should_skip_candidate, ScanResult, Scanner};
34
35const BATCH_SIZE: u32 = 50;
36const MAX_MEMBERS_PER_FLOW_PER_CYCLE: usize = 500;
37
38pub struct CancelReconciler {
39    interval: Duration,
40    partition_config: PartitionConfig,
41    filter: ScannerFilter,
42}
43
44impl CancelReconciler {
45    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
46        Self::with_filter(interval, partition_config, ScannerFilter::default())
47    }
48
49    /// Construct with a [`ScannerFilter`] applied per member
50    /// execution (issue #122). Each member's exec partition is
51    /// derived from its `ExecutionId` before the filter HGETs.
52    pub fn with_filter(
53        interval: Duration,
54        partition_config: PartitionConfig,
55        filter: ScannerFilter,
56    ) -> Self {
57        Self {
58            interval,
59            partition_config,
60            filter,
61        }
62    }
63}
64
65impl Scanner for CancelReconciler {
66    fn name(&self) -> &'static str {
67        "cancel_reconciler"
68    }
69
70    fn interval(&self) -> Duration {
71        self.interval
72    }
73
74    fn filter(&self) -> &ScannerFilter {
75        &self.filter
76    }
77
78    /// PR-94: `ff_cancel_backlog_depth` gauge sample — ZCARD of the
79    /// per-partition cancel_backlog ZSET. Runs once per partition
80    /// per cycle. On error returns `None`; the scanner runner
81    /// treats any `None` during a cycle where other partitions
82    /// sampled as an invalidation and skips the gauge write
83    /// (leaves the prior scrape value in place) so a transient
84    /// ZCARD failure on one partition does not cause the gauge to
85    /// under-report as an apparent drain.
86    async fn sample_backlog_depth(
87        &self,
88        client: &ferriskey::Client,
89        partition: u16,
90    ) -> Option<u64> {
91        let p = Partition {
92            family: PartitionFamily::Flow,
93            index: partition,
94        };
95        let fidx = FlowIndexKeys::new(&p);
96        let backlog_key = fidx.cancel_backlog();
97        let card: Result<Option<u64>, _> = client
98            .cmd("ZCARD")
99            .arg(&backlog_key)
100            .execute()
101            .await;
102        card.ok().flatten()
103    }
104
105    async fn scan_partition(
106        &self,
107        client: &ferriskey::Client,
108        partition: u16,
109    ) -> ScanResult {
110        let p = Partition {
111            family: PartitionFamily::Flow,
112            index: partition,
113        };
114        let fidx = FlowIndexKeys::new(&p);
115        let backlog_key = fidx.cancel_backlog();
116
117        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
118            Ok(t) => t,
119            Err(e) => {
120                tracing::warn!(partition, error = %e, "cancel_reconciler: TIME failed");
121                return ScanResult { processed: 0, errors: 1 };
122            }
123        };
124
125        // ZRANGEBYSCORE -inf now: flows whose grace has elapsed AND that
126        // still have owed cancels. Live dispatch SREMs entries as it
127        // succeeds, so if the live path finishes before grace expiry
128        // the ack removes the flow from the backlog and this scanner
129        // sees nothing to do. We only pick up the tail.
130        let flow_ids: Vec<String> = match client
131            .cmd("ZRANGEBYSCORE")
132            .arg(&backlog_key)
133            .arg("-inf")
134            .arg(now_ms.to_string().as_str())
135            .arg("LIMIT")
136            .arg("0")
137            .arg(BATCH_SIZE.to_string().as_str())
138            .execute()
139            .await
140        {
141            Ok(ids) => ids,
142            Err(e) => {
143                tracing::warn!(
144                    partition,
145                    error = %e,
146                    "cancel_reconciler: ZRANGEBYSCORE cancel_backlog failed"
147                );
148                return ScanResult { processed: 0, errors: 1 };
149            }
150        };
151
152        if flow_ids.is_empty() {
153            return ScanResult { processed: 0, errors: 0 };
154        }
155
156        let mut processed: u32 = 0;
157        let mut errors: u32 = 0;
158
159        for flow_id_str in flow_ids {
160            let flow_id = match FlowId::parse(&flow_id_str) {
161                Ok(id) => id,
162                Err(e) => {
163                    tracing::warn!(
164                        partition,
165                        raw = %flow_id_str,
166                        error = %e,
167                        "cancel_reconciler: malformed flow_id in cancel_backlog; ZREM"
168                    );
169                    let _: Result<i64, _> = client
170                        .cmd("ZREM")
171                        .arg(&backlog_key)
172                        .arg(flow_id_str.as_str())
173                        .execute()
174                        .await;
175                    errors += 1;
176                    continue;
177                }
178            };
179            let fctx = FlowKeyContext::new(&p, &flow_id);
180            let pending_key = fctx.pending_cancels();
181
182            // Defensive: if flow_core is gone (retention, operator-
183            // triggered DEL), there's nothing authoritative to ack
184            // against. Nuke the backlog entry and move on.
185            let core_exists: bool = match client
186                .cmd("EXISTS")
187                .arg(fctx.core().as_str())
188                .execute()
189                .await
190            {
191                Ok(v) => v,
192                Err(e) => {
193                    tracing::warn!(
194                        flow_id = %flow_id,
195                        error = %e,
196                        "cancel_reconciler: EXISTS flow_core failed"
197                    );
198                    errors += 1;
199                    continue;
200                }
201            };
202            if !core_exists {
203                let _: Result<i64, _> = client
204                    .cmd("DEL")
205                    .arg(pending_key.as_str())
206                    .execute()
207                    .await;
208                let _: Result<i64, _> = client
209                    .cmd("ZREM")
210                    .arg(&backlog_key)
211                    .arg(flow_id.to_string().as_str())
212                    .execute()
213                    .await;
214                continue;
215            }
216
217            let member_strs: Vec<String> = match client
218                .cmd("SRANDMEMBER")
219                .arg(pending_key.as_str())
220                .arg(MAX_MEMBERS_PER_FLOW_PER_CYCLE.to_string().as_str())
221                .execute()
222                .await
223            {
224                Ok(m) => m,
225                Err(e) => {
226                    tracing::warn!(
227                        flow_id = %flow_id,
228                        error = %e,
229                        "cancel_reconciler: SRANDMEMBER pending_cancels failed"
230                    );
231                    errors += 1;
232                    continue;
233                }
234            };
235
236            if member_strs.is_empty() {
237                // Live dispatch drained it before the grace elapsed.
238                // The ack path should have ZREMed, but double-check
239                // to avoid a stale entry pinning the scanner.
240                let _: Result<i64, _> = client
241                    .cmd("ZREM")
242                    .arg(&backlog_key)
243                    .arg(flow_id.to_string().as_str())
244                    .execute()
245                    .await;
246                continue;
247            }
248
249            // Re-read the reason from flow_core so the reconciler-
250            // initiated cancel carries the original operator intent.
251            // On transport error: retry this flow next cycle (don't
252            // issue member cancels with a fabricated reason).
253            let reason: String = match client
254                .cmd("HGET")
255                .arg(fctx.core().as_str())
256                .arg("cancel_reason")
257                .execute::<Option<String>>()
258                .await
259            {
260                Ok(Some(s)) => s,
261                Ok(None) => "flow_cancelled".to_owned(),
262                Err(e) => {
263                    tracing::warn!(
264                        flow_id = %flow_id,
265                        error = %e,
266                        "cancel_reconciler: HGET cancel_reason failed; retry next cycle"
267                    );
268                    errors += 1;
269                    continue;
270                }
271            };
272
273            for eid_str in &member_strs {
274                let execution_id = match ExecutionId::parse(eid_str) {
275                    Ok(id) => id,
276                    Err(e) => {
277                        tracing::warn!(
278                            flow_id = %flow_id,
279                            raw = %eid_str,
280                            error = %e,
281                            "cancel_reconciler: malformed eid in pending_cancels; SREM"
282                        );
283                        let _: Result<i64, _> = client
284                            .cmd("SREM")
285                            .arg(pending_key.as_str())
286                            .arg(eid_str.as_str())
287                            .execute()
288                            .await;
289                        errors += 1;
290                        continue;
291                    }
292                };
293
294                // Issue #122: skip members that don't match our filter.
295                // Member exec_core may live on a different exec partition
296                // than this flow's partition — derive it from the eid.
297                let member_part = execution_partition(
298                    &execution_id,
299                    &self.partition_config,
300                ).index;
301                if should_skip_candidate(
302                    client,
303                    &self.filter,
304                    member_part,
305                    eid_str,
306                )
307                .await
308                {
309                    continue;
310                }
311
312                if cancel_member(
313                    client,
314                    &self.partition_config,
315                    &execution_id,
316                    &reason,
317                )
318                .await
319                {
320                    // ACK via FCALL so SREM + conditional backlog ZREM
321                    // are one atomic unit on the flow partition. If the
322                    // ack itself fails transiently the member stays in
323                    // pending_cancels and the next cycle retries; count
324                    // it as an error so scanner stats reflect real
325                    // progress (Copilot feedback).
326                    let flow_id_str = flow_id.to_string();
327                    let ack_keys = [pending_key.as_str(), backlog_key.as_str()];
328                    let ack_args = [eid_str.as_str(), flow_id_str.as_str()];
329                    match client
330                        .fcall::<ferriskey::Value>("ff_ack_cancel_member", &ack_keys, &ack_args)
331                        .await
332                    {
333                        Ok(_) => processed += 1,
334                        Err(e) => {
335                            tracing::debug!(
336                                flow_id = %flow_id,
337                                execution_id = %eid_str,
338                                error = %e,
339                                "cancel_reconciler: ack failed; retry next cycle"
340                            );
341                            errors += 1;
342                        }
343                    }
344                } else {
345                    errors += 1;
346                }
347            }
348        }
349
350        ScanResult { processed, errors }
351    }
352}
353
354/// Issue a single operator-override cancel on the member's partition.
355/// Returns true on "ack-worthy" (success OR already-terminal OR
356/// non-retryable transport error we don't want to poison the queue
357/// with). Returns false on transient transport errors; the next
358/// reconciler cycle will retry.
359///
360/// Mirrors ff-server's `build_cancel_execution_fcall` exactly:
361/// pre-reads `lane_id`, `current_attempt_index`, `current_waitpoint_id`,
362/// `current_worker_instance_id` from exec_core so the 21-KEY list
363/// touches the REAL lane/worker indexes. Hardcoding those (first
364/// draft) would leave stale entries in lane_eligible / worker_leases /
365/// etc for any non-default-lane member — flagged by Copilot + Cursor
366/// Bugbot on PR #56.
367async fn cancel_member(
368    client: &ferriskey::Client,
369    partition_config: &PartitionConfig,
370    execution_id: &ExecutionId,
371    reason: &str,
372) -> bool {
373    let partition = execution_partition(execution_id, partition_config);
374    let ctx = ExecKeyContext::new(&partition, execution_id);
375    let idx = IndexKeys::new(&partition);
376
377    // Pre-read dynamic fields. A transient HGET/HMGET failure here
378    // surfaces as a retry on the next reconciler pass (return false);
379    // we do NOT fall back to defaults, since that would produce the
380    // wrong KEYS layout.
381    let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
382        Ok(v) => v,
383        Err(e) => {
384            let kind = e.kind();
385            let retryable = is_retryable_kind(kind);
386            if !retryable {
387                tracing::warn!(
388                    execution_id = %execution_id,
389                    error = %e,
390                    "cancel_reconciler: permanent HGET lane_id error; ack to avoid poison"
391                );
392                return true;
393            }
394            tracing::debug!(
395                execution_id = %execution_id,
396                error = %e,
397                "cancel_reconciler: transient HGET lane_id; retry next cycle"
398            );
399            return false;
400        }
401    };
402    let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
403
404    let dyn_fields: Vec<Option<String>> = match client
405        .cmd("HMGET")
406        .arg(ctx.core())
407        .arg("current_attempt_index")
408        .arg("current_waitpoint_id")
409        .arg("current_worker_instance_id")
410        .execute()
411        .await
412    {
413        Ok(v) => v,
414        Err(e) => {
415            let kind = e.kind();
416            let retryable = is_retryable_kind(kind);
417            if !retryable {
418                tracing::warn!(
419                    execution_id = %execution_id,
420                    error = %e,
421                    "cancel_reconciler: permanent HMGET error; ack to avoid poison"
422                );
423                return true;
424            }
425            tracing::debug!(
426                execution_id = %execution_id,
427                error = %e,
428                "cancel_reconciler: transient HMGET; retry next cycle"
429            );
430            return false;
431        }
432    };
433
434    let att_idx_val = dyn_fields
435        .first()
436        .and_then(|v| v.as_ref())
437        .and_then(|s| s.parse::<u32>().ok())
438        .unwrap_or(0);
439    let att_idx = AttemptIndex::new(att_idx_val);
440    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
441    let wp_id = if wp_id_str.is_empty() {
442        WaitpointId::new()
443    } else {
444        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
445    };
446    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
447    let wiid = WorkerInstanceId::new(&wiid_str);
448
449    let keys: Vec<String> = vec![
450        ctx.core(),
451        ctx.attempt_hash(att_idx),
452        ctx.stream_meta(att_idx),
453        ctx.lease_current(),
454        ctx.lease_history(),
455        idx.lease_expiry(),
456        idx.worker_leases(&wiid),
457        ctx.suspension_current(),
458        ctx.waitpoint(&wp_id),
459        ctx.waitpoint_condition(&wp_id),
460        idx.suspension_timeout(),
461        idx.lane_terminal(&lane),
462        idx.attempt_timeout(),
463        idx.execution_deadline(),
464        idx.lane_eligible(&lane),
465        idx.lane_delayed(&lane),
466        idx.lane_blocked_dependencies(&lane),
467        idx.lane_blocked_budget(&lane),
468        idx.lane_blocked_quota(&lane),
469        idx.lane_blocked_route(&lane),
470        idx.lane_blocked_operator(&lane),
471    ];
472    let argv: Vec<String> = vec![
473        execution_id.to_string(),
474        reason.to_owned(),
475        "operator_override".to_owned(),
476        String::new(),
477        String::new(),
478    ];
479    let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
480    let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
481
482    match client.fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar).await {
483        Ok(ferriskey::Value::Array(arr)) => match arr.first() {
484            Some(Ok(ferriskey::Value::Int(1))) => true,
485            Some(Ok(ferriskey::Value::Int(0))) => {
486                let code = arr
487                    .get(1)
488                    .and_then(|r| match r {
489                        Ok(ferriskey::Value::BulkString(b)) => {
490                            Some(String::from_utf8_lossy(b).into_owned())
491                        }
492                        Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
493                        _ => None,
494                    })
495                    .unwrap_or_default();
496                matches!(code.as_str(), "execution_not_active" | "execution_not_found")
497            }
498            _ => false,
499        },
500        Ok(_) => false,
501        Err(e) => {
502            let retryable = is_retryable_kind(e.kind());
503            if !retryable {
504                tracing::warn!(
505                    execution_id = %execution_id,
506                    error = %e,
507                    "cancel_reconciler: permanent error on FCALL; ack to avoid poison"
508                );
509                return true;
510            }
511            tracing::debug!(
512                execution_id = %execution_id,
513                error = %e,
514                "cancel_reconciler: transient FCALL error; retry next cycle"
515            );
516            false
517        }
518    }
519}
520
521/// Centralised retry classification so this scanner's policy tracks
522/// `ff_script::retry::is_retryable_kind`. Inlined (not imported) because
523/// ff-engine doesn't depend on ff-script to keep the scanner crate light;
524/// the variants are exhaustively matched with a `_ => false` fallback so
525/// a new upstream `ErrorKind` can't silently become retryable.
526fn is_retryable_kind(kind: ferriskey::ErrorKind) -> bool {
527    use ferriskey::ErrorKind::*;
528    matches!(
529        kind,
530        IoError | FatalSendError | TryAgain | BusyLoadingError | ClusterDown
531    )
532}