Skip to main content

ff_backend_postgres/
dispatch.rs

1//! Post-completion dependency-resolution cascade (Wave 5a).
2//!
3//! When an execution reaches a terminal outcome the Wave 4b writer
4//! (`attempt::complete` / `attempt::fail`) / Wave 4c (`cancel_flow`)
5//! emits an `ff_completion_event` row. The engine's dispatch loop
6//! polls the outbox by `event_id` and, for each event, calls
7//! [`dispatch_completion`]. This module is the Postgres twin of the
8//! Valkey `ff_resolve_dependency` FCALL cascade in
9//! `ff-engine::partition_router::dispatch_dependency_resolution`.
10//!
11//! # Per-hop-tx (K-2 adjudication)
12//!
13//! The round-2 RFC debate locked the cascade to per-hop transactions
14//! — NOT one mega-transaction spanning transitive descendants. A
15//! fanout of N downstream edges that each fan out to M further
16//! edges would otherwise hold `ff_edge_group` row locks across the
17//! entire subgraph for the full cascade duration, which is
18//! user-visible on large flows.
19//!
20//! The layout here:
21//!
22//! 1. Outer function [`dispatch_completion`] runs a short claim-tx
23//!    that atomically flips `ff_completion_event.dispatched_at_ms`
24//!    from NULL to `now_ms()` via `UPDATE ... RETURNING` — a
25//!    concurrent dispatcher (retry, reconciler) observes the
26//!    already-claimed row and short-circuits with
27//!    [`DispatchOutcome::NoOp`].
28//! 2. For each downstream edge, [`advance_edge_group`] runs its own
29//!    read-committed tx with `SELECT ... FOR UPDATE` on the
30//!    `ff_edge_group` row. Counter bump + policy eval + downstream
31//!    state flip + sibling-cancel bookkeeping all commit together
32//!    at hop boundary, then release the row lock.
33//! 3. If a hop's tx exhausts its serialization retries we return
34//!    [`EngineError::Contention(RetryExhausted)`]; the
35//!    Wave 6 reconciler picks up the uncleared `dispatched_at_ms`
36//!    via the partial index added in migration 0003.
37//!
38//! Failures mid-cascade leave the outbox row marked dispatched
39//! (short-circuited) but partial downstream state; the reconciler
40//! catches orphaned work on its next scan.
41
42use std::time::Duration;
43
44use ff_core::contracts::{EdgeDependencyPolicy, OnSatisfied};
45use ff_core::engine_error::{ContentionKind, EngineError};
46use serde_json::Value as JsonValue;
47use sqlx::{PgPool, Row};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51
52/// Max serialization-retry attempts per hop before we declare
53/// contention and hand the event back to the reconciler.
54const ADVANCE_MAX_ATTEMPTS: u32 = 3;
55
56/// Outcome of a single dispatch invocation. Surfaces enough to let
57/// callers (the dispatcher loop, tests) distinguish claim-races from
58/// real work.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum DispatchOutcome {
61    /// Event was already claimed by a concurrent dispatcher or by a
62    /// previous invocation; no work performed. Idempotent replay.
63    NoOp,
64    /// Dispatch fired; inner count is the number of downstream edges
65    /// whose groups were advanced. `0` is legal (terminal exec with
66    /// no outgoing edges, i.e. a leaf flow node).
67    Advanced(usize),
68}
69
70/// Public entry point — poll one outbox event and cascade.
71///
72/// The caller owns polling cadence + ordering. Typical usage is a
73/// tokio task that subscribes to
74/// [`super::completion::subscribe`] and invokes this function for
75/// each payload's `event_id`.
76#[tracing::instrument(name = "pg.dispatch_completion", skip(pool))]
77pub async fn dispatch_completion(
78    pool: &PgPool,
79    event_id: i64,
80) -> Result<DispatchOutcome, EngineError> {
81    // Claim step — atomic flip of `dispatched_at_ms` from NULL.
82    // Returns the event row when we won the race; `None` otherwise.
83    let now = now_ms();
84    let row = sqlx::query(
85        r#"
86        UPDATE ff_completion_event
87           SET dispatched_at_ms = $2
88         WHERE event_id = $1
89           AND dispatched_at_ms IS NULL
90         RETURNING partition_key, execution_id, flow_id, outcome
91        "#,
92    )
93    .bind(event_id)
94    .bind(now)
95    .fetch_optional(pool)
96    .await
97    .map_err(map_sqlx_error)?;
98
99    let Some(row) = row else {
100        return Ok(DispatchOutcome::NoOp);
101    };
102
103    let partition_key: i16 = row.get("partition_key");
104    let execution_id: Uuid = row.get("execution_id");
105    let flow_id: Option<Uuid> = row.get("flow_id");
106    let outcome: String = row.get("outcome");
107
108    let Some(flow_id) = flow_id else {
109        // Standalone exec: no edges to cascade. Claim stays set so a
110        // replay short-circuits.
111        return Ok(DispatchOutcome::Advanced(0));
112    };
113
114    // Outgoing edges: every edge whose upstream is this exec, under
115    // this flow. RFC-011 co-locates flow + exec under the same
116    // partition_key, so the query is partition-local.
117    let edges = sqlx::query(
118        r#"
119        SELECT edge_id, downstream_eid
120          FROM ff_edge
121         WHERE partition_key = $1 AND flow_id = $2 AND upstream_eid = $3
122        "#,
123    )
124    .bind(partition_key)
125    .bind(flow_id)
126    .bind(execution_id)
127    .fetch_all(pool)
128    .await
129    .map_err(map_sqlx_error)?;
130
131    if edges.is_empty() {
132        return Ok(DispatchOutcome::Advanced(0));
133    }
134
135    let outcome_kind = OutcomeKind::from_str(&outcome);
136
137    // Each edge advances in its own tx (K-2 per-hop-tx rule).
138    let mut advanced: usize = 0;
139    for edge in &edges {
140        let downstream_eid: Uuid = edge.get("downstream_eid");
141        advance_edge_group_with_retry(
142            pool,
143            partition_key,
144            flow_id,
145            execution_id,
146            downstream_eid,
147            outcome_kind,
148        )
149        .await?;
150        advanced += 1;
151    }
152
153    Ok(DispatchOutcome::Advanced(advanced))
154}
155
156/// Counter bucket derived from `ff_completion_event.outcome`.
157///
158/// The outbox stores free-form outcome strings; the engine collapses
159/// them into the three counters tracked on `ff_edge_group`. `skipped`
160/// outcomes are treated as skip; anything terminal that isn't
161/// explicitly "success" is a fail from the dependency resolver's
162/// point of view (parity with the Valkey `ff_resolve_dependency`
163/// Lua which checks `upstream_outcome == "success"`).
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165enum OutcomeKind {
166    Success,
167    Fail,
168    Skip,
169}
170
171impl OutcomeKind {
172    fn from_str(s: &str) -> Self {
173        match s {
174            "success" => Self::Success,
175            "skipped" => Self::Skip,
176            _ => Self::Fail,
177        }
178    }
179}
180
181/// Per-hop tx wrapper with SERIALIZABLE retry handling. Exhaustion
182/// returns `Contention(RetryExhausted)` per Q11.
183async fn advance_edge_group_with_retry(
184    pool: &PgPool,
185    partition_key: i16,
186    flow_id: Uuid,
187    upstream_eid: Uuid,
188    downstream_eid: Uuid,
189    outcome: OutcomeKind,
190) -> Result<(), EngineError> {
191    for attempt in 0..ADVANCE_MAX_ATTEMPTS {
192        match advance_edge_group(
193            pool,
194            partition_key,
195            flow_id,
196            upstream_eid,
197            downstream_eid,
198            outcome,
199        )
200        .await
201        {
202            Ok(()) => return Ok(()),
203            Err(err) if is_serialization_conflict(&err) => {
204                if attempt + 1 < ADVANCE_MAX_ATTEMPTS {
205                    let ms = 5u64 * (1u64 << attempt);
206                    tokio::time::sleep(Duration::from_millis(ms)).await;
207                    continue;
208                }
209                return Err(EngineError::Contention(ContentionKind::RetryExhausted));
210            }
211            Err(err) => return Err(err),
212        }
213    }
214    Err(EngineError::Contention(ContentionKind::RetryExhausted))
215}
216
217fn is_serialization_conflict(err: &EngineError) -> bool {
218    // 40001 / 40P01 are mapped to Contention(LeaseConflict) up in
219    // `error::map_sqlx_error`. Lock-timeout (55P03) is a legitimate
220    // contention signal on `FOR UPDATE` + `lock_timeout`; the shared
221    // error mapper routes it through `Transport`, so we probe the
222    // SQLSTATE off the boxed sqlx error here before treating it as
223    // a retryable contention fault.
224    if matches!(err, EngineError::Contention(ContentionKind::LeaseConflict)) {
225        return true;
226    }
227    if let EngineError::Transport { source, .. } = err
228        && let Some(sqlx_err) = source.downcast_ref::<sqlx::Error>()
229        && let Some(db) = sqlx_err.as_database_error()
230        && let Some(code) = db.code()
231        && code.as_ref() == "55P03"
232    {
233        // 55P03 = lock_not_available (lock_timeout variant hit
234        // while waiting on a row lock).
235        return true;
236    }
237    false
238}
239
240/// Per-hop transaction: bump counters on one `ff_edge_group` row,
241/// evaluate the policy, apply the decision to the downstream exec,
242/// optionally enqueue sibling-cancel bookkeeping.
243#[tracing::instrument(
244    name = "pg.advance_edge_group",
245    skip(pool),
246    fields(
247        part = partition_key,
248        flow = %flow_id,
249        downstream = %downstream_eid,
250    )
251)]
252async fn advance_edge_group(
253    pool: &PgPool,
254    partition_key: i16,
255    flow_id: Uuid,
256    upstream_eid: Uuid,
257    downstream_eid: Uuid,
258    outcome: OutcomeKind,
259) -> Result<(), EngineError> {
260    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
261
262    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
263        .execute(&mut *tx)
264        .await
265        .map_err(map_sqlx_error)?;
266
267    // FOR UPDATE pins the group row against concurrent advances.
268    let row = sqlx::query(
269        r#"
270        SELECT policy, success_count, fail_count, skip_count, running_count,
271               cancel_siblings_pending_flag
272          FROM ff_edge_group
273         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
274         FOR UPDATE
275        "#,
276    )
277    .bind(partition_key)
278    .bind(flow_id)
279    .bind(downstream_eid)
280    .fetch_optional(&mut *tx)
281    .await
282    .map_err(map_sqlx_error)?;
283
284    let Some(row) = row else {
285        // No group row for this downstream — nothing to advance.
286        // Legal when a flow has edges without a staged group (should
287        // not happen post-Wave 4c, but we treat as a no-op for
288        // forward-compat instead of failing the cascade.)
289        tx.commit().await.map_err(map_sqlx_error)?;
290        return Ok(());
291    };
292
293    let policy_raw: JsonValue = row.get("policy");
294    let mut success: i32 = row.get("success_count");
295    let mut fail: i32 = row.get("fail_count");
296    let mut skip: i32 = row.get("skip_count");
297    let mut running: i32 = row.get("running_count");
298    let already_flagged: bool = row.get("cancel_siblings_pending_flag");
299
300    // Any terminal outcome migrates one upstream out of the running
301    // bucket — keeping `total = success + fail + skip + running`
302    // invariant so the impossibility check works on the remaining
303    // headroom.
304    if running > 0 {
305        running -= 1;
306    }
307    match outcome {
308        OutcomeKind::Success => success += 1,
309        OutcomeKind::Fail => fail += 1,
310        OutcomeKind::Skip => skip += 1,
311    }
312
313    let policy = decode_policy(&policy_raw);
314    let total = success + fail + skip + running.max(0);
315    let decision = evaluate(&policy, success, fail, skip, total);
316
317    // Writeback the counter state first. (If the decision flips
318    // downstream or enqueues sibling cancels, those writes ride the
319    // same tx below.)
320    sqlx::query(
321        r#"
322        UPDATE ff_edge_group
323           SET success_count = $4,
324               fail_count    = $5,
325               skip_count    = $6,
326               running_count = $7
327         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
328        "#,
329    )
330    .bind(partition_key)
331    .bind(flow_id)
332    .bind(downstream_eid)
333    .bind(success)
334    .bind(fail)
335    .bind(skip)
336    .bind(running.max(0))
337    .execute(&mut *tx)
338    .await
339    .map_err(map_sqlx_error)?;
340
341    let now = now_ms();
342
343    match decision {
344        Decision::Pending => { /* counters already persisted */ }
345        Decision::Satisfied { cancel_siblings } => {
346            // Mark downstream eligible (the scheduler claims it next).
347            sqlx::query(
348                r#"
349                UPDATE ff_exec_core
350                   SET eligibility_state = 'eligible_now',
351                       lifecycle_phase   = CASE
352                           WHEN lifecycle_phase = 'blocked' THEN 'runnable'
353                           ELSE lifecycle_phase
354                       END
355                 WHERE partition_key = $1 AND execution_id = $2
356                   AND lifecycle_phase NOT IN ('terminal','cancelled')
357                "#,
358            )
359            .bind(partition_key)
360            .bind(downstream_eid)
361            .execute(&mut *tx)
362            .await
363            .map_err(map_sqlx_error)?;
364
365            if cancel_siblings && !already_flagged {
366                // Stage-C bookkeeping: add one row to
367                // `ff_pending_cancel_groups` per still-running sibling
368                // group, compute the sibling-members set, and flip the
369                // flag so a replay doesn't double-enqueue. The
370                // reconciler (Wave 6b) consumes `members` to drive
371                // per-sibling cancel; the set excludes the winner.
372                let sibling_rows = sqlx::query(
373                    r#"
374                    SELECT ff_exec_core.execution_id
375                      FROM ff_exec_core
376                      JOIN ff_edge ON ff_edge.upstream_eid = ff_exec_core.execution_id
377                     WHERE ff_exec_core.partition_key = $1
378                       AND ff_edge.partition_key      = $1
379                       AND ff_edge.flow_id            = $2
380                       AND ff_edge.downstream_eid     = $3
381                       AND ff_exec_core.lifecycle_phase NOT IN ('terminal','cancelled')
382                       AND ff_exec_core.public_state  <> 'skipped'
383                       AND ff_exec_core.execution_id <> $4
384                    "#,
385                )
386                .bind(partition_key)
387                .bind(flow_id)
388                .bind(downstream_eid)
389                .bind(upstream_eid)
390                .fetch_all(&mut *tx)
391                .await
392                .map_err(map_sqlx_error)?;
393                let members: Vec<String> = sibling_rows
394                    .iter()
395                    .map(|r| {
396                        let u: Uuid = r.get("execution_id");
397                        u.to_string()
398                    })
399                    .collect();
400
401                sqlx::query(
402                    r#"
403                    INSERT INTO ff_pending_cancel_groups
404                        (partition_key, flow_id, downstream_eid, enqueued_at_ms)
405                    VALUES ($1, $2, $3, $4)
406                    ON CONFLICT DO NOTHING
407                    "#,
408                )
409                .bind(partition_key)
410                .bind(flow_id)
411                .bind(downstream_eid)
412                .bind(now)
413                .execute(&mut *tx)
414                .await
415                .map_err(map_sqlx_error)?;
416
417                sqlx::query(
418                    r#"
419                    UPDATE ff_edge_group
420                       SET cancel_siblings_pending_flag    = TRUE,
421                           cancel_siblings_pending_members = $4
422                     WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
423                    "#,
424                )
425                .bind(partition_key)
426                .bind(flow_id)
427                .bind(downstream_eid)
428                .bind(&members)
429                .execute(&mut *tx)
430                .await
431                .map_err(map_sqlx_error)?;
432            }
433        }
434        Decision::Impossible => {
435            // Downstream can never satisfy → mark it skipped +
436            // cascade by emitting its own completion event. The
437            // Wave-5 dispatcher loop picks the new event up on its
438            // next poll.
439            let updated = sqlx::query(
440                r#"
441                UPDATE ff_exec_core
442                   SET lifecycle_phase   = 'terminal',
443                       eligibility_state = 'not_applicable',
444                       public_state      = 'skipped',
445                       attempt_state     = 'attempt_terminal',
446                       terminal_at_ms    = COALESCE(terminal_at_ms, $3)
447                 WHERE partition_key = $1 AND execution_id = $2
448                   AND lifecycle_phase NOT IN ('terminal','cancelled')
449                 RETURNING execution_id
450                "#,
451            )
452            .bind(partition_key)
453            .bind(downstream_eid)
454            .bind(now)
455            .fetch_optional(&mut *tx)
456            .await
457            .map_err(map_sqlx_error)?;
458
459            if updated.is_some() {
460                sqlx::query(
461                    r#"
462                    INSERT INTO ff_completion_event
463                        (partition_key, execution_id, flow_id, outcome, occurred_at_ms)
464                    VALUES ($1, $2, $3, 'skipped', $4)
465                    "#,
466                )
467                .bind(partition_key)
468                .bind(downstream_eid)
469                .bind(flow_id)
470                .bind(now)
471                .execute(&mut *tx)
472                .await
473                .map_err(map_sqlx_error)?;
474            }
475        }
476    }
477
478    tx.commit().await.map_err(map_sqlx_error)?;
479    Ok(())
480}
481
482/// Decision surface returned by [`evaluate`].
483#[derive(Debug, Clone, Copy, PartialEq, Eq)]
484enum Decision {
485    /// Policy not yet satisfied and not yet impossible.
486    Pending,
487    /// Downstream should transition to eligible. `cancel_siblings`
488    /// fires the Stage-C bookkeeping write.
489    Satisfied { cancel_siblings: bool },
490    /// Policy can never be satisfied — skip the downstream.
491    Impossible,
492}
493
494fn evaluate(
495    policy: &EdgeDependencyPolicy,
496    success: i32,
497    fail: i32,
498    skip: i32,
499    total: i32,
500) -> Decision {
501    let nonsuccess = fail + skip;
502    match policy {
503        EdgeDependencyPolicy::AllOf => {
504            if success == total && total > 0 {
505                Decision::Satisfied { cancel_siblings: false }
506            } else if nonsuccess > 0 {
507                // Any upstream non-success under all-of → impossible.
508                Decision::Impossible
509            } else {
510                Decision::Pending
511            }
512        }
513        EdgeDependencyPolicy::AnyOf { on_satisfied } => {
514            if success >= 1 {
515                Decision::Satisfied {
516                    cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
517                }
518            } else if nonsuccess >= total && total > 0 {
519                Decision::Impossible
520            } else {
521                Decision::Pending
522            }
523        }
524        EdgeDependencyPolicy::Quorum { k, on_satisfied } => {
525            let k = *k as i32;
526            if success >= k {
527                Decision::Satisfied {
528                    cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
529                }
530            } else if total - nonsuccess < k {
531                // Remaining upstream headroom cannot reach k.
532                Decision::Impossible
533            } else {
534                Decision::Pending
535            }
536        }
537        // `#[non_exhaustive]` forward-compat: unknown policy variants
538        // stay pending (dependency_reconciler eventually unjams).
539        _ => Decision::Pending,
540    }
541}
542
543fn decode_policy(v: &JsonValue) -> EdgeDependencyPolicy {
544    let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
545    match kind {
546        "any_of" => EdgeDependencyPolicy::AnyOf {
547            on_satisfied: parse_on_satisfied(v),
548        },
549        "quorum" => {
550            let k = v
551                .get("k")
552                .and_then(|x| x.as_u64())
553                .and_then(|n| u32::try_from(n).ok())
554                .unwrap_or(1);
555            EdgeDependencyPolicy::Quorum {
556                k,
557                on_satisfied: parse_on_satisfied(v),
558            }
559        }
560        _ => EdgeDependencyPolicy::AllOf,
561    }
562}
563
564fn parse_on_satisfied(v: &JsonValue) -> OnSatisfied {
565    match v.get("on_satisfied").and_then(|x| x.as_str()) {
566        Some("let_run") => OnSatisfied::LetRun,
567        _ => OnSatisfied::CancelRemaining,
568    }
569}
570
571fn now_ms() -> i64 {
572    i64::try_from(
573        std::time::SystemTime::now()
574            .duration_since(std::time::UNIX_EPOCH)
575            .map(|d| d.as_millis())
576            .unwrap_or(0),
577    )
578    .unwrap_or(i64::MAX)
579}
580
581// ── unit tests ───────────────────────────────────────────────────────────
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[test]
588    fn evaluate_all_of_satisfied() {
589        let d = evaluate(&EdgeDependencyPolicy::AllOf, 3, 0, 0, 3);
590        assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
591    }
592
593    #[test]
594    fn evaluate_all_of_impossible_on_fail() {
595        let d = evaluate(&EdgeDependencyPolicy::AllOf, 2, 1, 0, 3);
596        assert_eq!(d, Decision::Impossible);
597    }
598
599    #[test]
600    fn evaluate_all_of_pending() {
601        let d = evaluate(&EdgeDependencyPolicy::AllOf, 1, 0, 0, 3);
602        assert_eq!(d, Decision::Pending);
603    }
604
605    #[test]
606    fn evaluate_any_of_cancels_siblings() {
607        let d = evaluate(
608            &EdgeDependencyPolicy::AnyOf {
609                on_satisfied: OnSatisfied::CancelRemaining,
610            },
611            1, 0, 0, 3,
612        );
613        assert_eq!(d, Decision::Satisfied { cancel_siblings: true });
614    }
615
616    #[test]
617    fn evaluate_any_of_let_run() {
618        let d = evaluate(
619            &EdgeDependencyPolicy::AnyOf {
620                on_satisfied: OnSatisfied::LetRun,
621            },
622            1, 0, 0, 3,
623        );
624        assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
625    }
626
627    #[test]
628    fn evaluate_any_of_impossible_when_all_fail() {
629        let d = evaluate(
630            &EdgeDependencyPolicy::AnyOf {
631                on_satisfied: OnSatisfied::CancelRemaining,
632            },
633            0, 3, 0, 3,
634        );
635        assert_eq!(d, Decision::Impossible);
636    }
637
638    #[test]
639    fn evaluate_quorum_satisfied_at_k() {
640        let d = evaluate(
641            &EdgeDependencyPolicy::Quorum {
642                k: 2,
643                on_satisfied: OnSatisfied::LetRun,
644            },
645            2, 0, 1, 3,
646        );
647        assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
648    }
649
650    #[test]
651    fn evaluate_quorum_impossible_when_headroom_exhausted() {
652        // 5 upstream, k=3, 3 failed → only 2 possibly-success left.
653        let d = evaluate(
654            &EdgeDependencyPolicy::Quorum {
655                k: 3,
656                on_satisfied: OnSatisfied::CancelRemaining,
657            },
658            0, 3, 0, 5,
659        );
660        assert_eq!(d, Decision::Impossible);
661    }
662
663    #[test]
664    fn outcome_kind_mapping() {
665        assert_eq!(OutcomeKind::from_str("success"), OutcomeKind::Success);
666        assert_eq!(OutcomeKind::from_str("failed"), OutcomeKind::Fail);
667        assert_eq!(OutcomeKind::from_str("skipped"), OutcomeKind::Skip);
668        assert_eq!(OutcomeKind::from_str("cancelled"), OutcomeKind::Fail);
669    }
670}