Skip to main content

ff_backend_postgres/
flow.rs

1//! Postgres flow-family `EngineBackend` implementations (Wave 4c).
2//!
3//! Six trait methods over the Wave 3 schema:
4//!
5//! * [`describe_flow`] — single-flow snapshot (row + edge groups).
6//! * [`list_flows`] — cursor-paginated per-partition listing.
7//! * [`list_edges`] — fan-in / fan-out adjacency for an execution.
8//! * [`describe_edge`] — one edge by flow + edge id.
9//! * [`cancel_flow`] — SERIALIZABLE cascade with a 3-attempt retry
10//!   loop. Exhaustion surfaces as
11//!   [`ContentionKind::RetryExhausted`] (Q11 — one of the three
12//!   SERIALIZABLE sites).
13//! * [`set_edge_group_policy`] — RFC-016 Stage A policy write;
14//!   rejects when edges have already been staged for the downstream
15//!   group (parity with the Valkey `invalid_input` error path).
16//!
17//! Field layout note: `ff_flow_core` only hoists the columns the
18//! engine filters on (public_flow_state, graph_revision, created_at).
19//! Every other `FlowSnapshot` field — `flow_kind`, `namespace`,
20//! `node_count`, `edge_count`, `last_mutation_at_ms`, cancel meta,
21//! and namespaced tags — rides in `raw_fields jsonb`. Same story
22//! for `ff_edge`: the immutable edge record (dependency_kind,
23//! satisfaction_condition, etc.) lives in the `policy jsonb` column
24//! (the only jsonb slot on the edge row); the typed columns carry
25//! only endpoints + policy_version.
26
27use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::{CancelFlowPolicy, CancelFlowWait};
31use ff_core::contracts::{
32    CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33    EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34    SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37    ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48/// Max attempts for the SERIALIZABLE retry loop on `cancel_flow`
49/// (Q11). After this many 40001 / 40P01 faults the caller sees
50/// [`ContentionKind::RetryExhausted`] and falls back to the
51/// reconciler.
52const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54/// Extract a partition-index byte (0..=255) from a [`PartitionKey`].
55/// Returns a typed validation error on malformed keys.
56fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57    let p = key.parse().map_err(|e| EngineError::Validation {
58        kind: ValidationKind::InvalidInput,
59        detail: format!("partition_key: {e}"),
60    })?;
61    Ok(p.index as i16)
62}
63
64/// Compute the partition byte (0..=255) for a `FlowId` using the
65/// deployment's partition config. Routing parity with the Valkey
66/// backend's `flow_partition` — same crc16-CCITT modulo.
67fn flow_partition_byte(
68    flow_id: &FlowId,
69    partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71    ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74/// Decode a `raw_fields jsonb` string slot, falling back to `None`
75/// for missing / null / non-string values.
76fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77    raw.get(key).and_then(|v| v.as_str())
78}
79
80/// Decode a numeric slot from `raw_fields jsonb` as `u64`. Accepts
81/// JSON numbers or string-encoded ints (the Lua projector emits
82/// numbers; the Wave-4 proc emits the same).
83fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84    raw.get(key).and_then(|v| {
85        v.as_u64()
86            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87    })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91    raw.get(key).and_then(|v| {
92        v.as_u64()
93            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94            .and_then(|n| u32::try_from(n).ok())
95    })
96}
97
98/// Route `raw_fields` jsonb object keys into a `(typed_fields, tags)`
99/// pair — anything matching `^[a-z][a-z0-9_]*\.` is a namespaced tag.
100fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101    let mut tags = BTreeMap::new();
102    let Some(obj) = raw.as_object() else {
103        return tags;
104    };
105    for (k, v) in obj {
106        if !is_namespaced_tag_key(k) {
107            continue;
108        }
109        if let Some(s) = v.as_str() {
110            tags.insert(k.clone(), s.to_owned());
111        }
112    }
113    tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117    let mut chars = k.chars();
118    let Some(first) = chars.next() else {
119        return false;
120    };
121    if !first.is_ascii_lowercase() {
122        return false;
123    }
124    let mut saw_dot = false;
125    for c in chars {
126        if c == '.' {
127            saw_dot = true;
128            break;
129        }
130        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131            return false;
132        }
133    }
134    saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138    match s {
139        "let_run" => OnSatisfied::LetRun,
140        _ => OnSatisfied::CancelRemaining,
141    }
142}
143
144/// Decode an `EdgeDependencyPolicy` from the `ff_edge_group.policy`
145/// jsonb. Layout mirrors the ff-core `Serialize` impl:
146/// `{ "kind": "all_of" | "any_of" | "quorum", "on_satisfied": ..., "k": .. }`.
147fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148    let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149    match kind {
150        "any_of" => {
151            let on = v
152                .get("on_satisfied")
153                .and_then(|x| x.as_str())
154                .map(parse_on_satisfied)
155                .unwrap_or(OnSatisfied::CancelRemaining);
156            EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157        }
158        "quorum" => {
159            let k = v
160                .get("k")
161                .and_then(|x| x.as_u64())
162                .and_then(|n| u32::try_from(n).ok())
163                .unwrap_or(1);
164            let on = v
165                .get("on_satisfied")
166                .and_then(|x| x.as_str())
167                .map(parse_on_satisfied)
168                .unwrap_or(OnSatisfied::CancelRemaining);
169            EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170        }
171        _ => EdgeDependencyPolicy::AllOf,
172    }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176    match p {
177        EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178        EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179            "kind": "any_of",
180            "on_satisfied": on_satisfied.variant_str(),
181        }),
182        EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183            "kind": "quorum",
184            "k": k,
185            "on_satisfied": on_satisfied.variant_str(),
186        }),
187        // Forward-compat: unknown variants fall back to the Stage-A
188        // `all_of` encoding so a future `#[non_exhaustive]` addition
189        // doesn't corrupt the persisted row.
190        _ => serde_json::json!({ "kind": "all_of" }),
191    }
192}
193
194/// Decode one `ff_edge_group` row into an [`EdgeGroupSnapshot`].
195fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196    let downstream_uuid: Uuid = row.get("downstream_eid");
197    let policy_raw: JsonValue = row.get("policy");
198    let k_target: i32 = row.get("k_target");
199    let success_count: i32 = row.get("success_count");
200    let fail_count: i32 = row.get("fail_count");
201    let skip_count: i32 = row.get("skip_count");
202    let running_count: i32 = row.get("running_count");
203
204    // Round-trip: exec id lives in the flow partition. Reconstruct
205    // the full `{fp:N}:<uuid>` form using the row's partition_key.
206    let part: i16 = row.get("partition_key");
207    let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208        .map_err(|e| EngineError::Validation {
209            kind: ValidationKind::Corruption,
210            detail: format!("ff_edge_group.downstream_eid: {e}"),
211        })?;
212
213    let policy = decode_edge_policy(&policy_raw);
214    // Stage A: total_deps reported as success+fail+skip+running
215    // (equivalent to the RFC-007 `n` counter on the legacy hash).
216    let total: u32 = success_count.max(0) as u32
217        + fail_count.max(0) as u32
218        + skip_count.max(0) as u32
219        + running_count.max(0) as u32;
220
221    // group_state — Stage A derivation: satisfied when k_target met.
222    let state = if k_target > 0 && success_count >= k_target {
223        EdgeGroupState::Satisfied
224    } else {
225        EdgeGroupState::Pending
226    };
227
228    Ok(EdgeGroupSnapshot::new(
229        downstream_id,
230        policy,
231        total,
232        success_count.max(0) as u32,
233        fail_count.max(0) as u32,
234        skip_count.max(0) as u32,
235        running_count.max(0) as u32,
236        state,
237    ))
238}
239
240/// Decode one `ff_flow_core` row + its edge_group rows into a
241/// [`FlowSnapshot`].
242fn decode_flow_row(
243    flow_id: FlowId,
244    row: &PgRow,
245    edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247    let public_flow_state: String = row.get("public_flow_state");
248    let graph_revision_i: i64 = row.get("graph_revision");
249    let created_at_ms: i64 = row.get("created_at_ms");
250    let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251    let raw_fields: JsonValue = row.get("raw_fields");
252
253    let flow_kind = raw_str(&raw_fields, "flow_kind")
254        .unwrap_or("")
255        .to_owned();
256    let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257    let namespace = Namespace::new(namespace_str.to_owned());
258    let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259    let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260    let last_mutation_at_ms =
261        raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263    let cancelled_at = terminal_at_ms.map(TimestampMs);
264    let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265    let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267    let tags = extract_tags(&raw_fields);
268
269    let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271    Ok(FlowSnapshot::new(
272        flow_id,
273        flow_kind,
274        namespace,
275        public_flow_state,
276        graph_revision,
277        node_count,
278        edge_count,
279        TimestampMs(created_at_ms),
280        TimestampMs(last_mutation_at_ms),
281        cancelled_at,
282        cancel_reason,
283        cancellation_policy,
284        tags,
285        edge_groups,
286    ))
287}
288
289/// Decode one `ff_edge` row into an [`EdgeSnapshot`].
290///
291/// The immutable edge record (dependency_kind, satisfaction_condition,
292/// data_passing_ref, edge_state, created_at, created_by) lives inside
293/// the `policy jsonb` column — the only jsonb slot on `ff_edge`.
294fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295    let edge_uuid: Uuid = row.get("edge_id");
296    let upstream_uuid: Uuid = row.get("upstream_eid");
297    let downstream_uuid: Uuid = row.get("downstream_eid");
298    let part: i16 = row.get("partition_key");
299    let policy_raw: JsonValue = row.get("policy");
300
301    let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302        .map_err(|e| EngineError::Validation {
303            kind: ValidationKind::Corruption,
304            detail: format!("ff_edge.upstream_eid: {e}"),
305        })?;
306    let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307        .map_err(|e| EngineError::Validation {
308            kind: ValidationKind::Corruption,
309            detail: format!("ff_edge.downstream_eid: {e}"),
310        })?;
311
312    let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313        .unwrap_or("success_only")
314        .to_owned();
315    let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316        .unwrap_or("all_required")
317        .to_owned();
318    let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319        .filter(|s| !s.is_empty())
320        .map(str::to_owned);
321    let edge_state = raw_str(&policy_raw, "edge_state")
322        .unwrap_or("pending")
323        .to_owned();
324    let created_at_ms =
325        raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326    let created_by = raw_str(&policy_raw, "created_by")
327        .unwrap_or("engine")
328        .to_owned();
329
330    Ok(EdgeSnapshot::new(
331        EdgeId::from_uuid(edge_uuid),
332        flow_id.clone(),
333        upstream,
334        downstream,
335        dependency_kind,
336        satisfaction_condition,
337        data_passing_ref,
338        edge_state,
339        TimestampMs(created_at_ms),
340        created_by,
341    ))
342}
343
344/// `describe_flow` — single flow snapshot + edge_groups.
345pub async fn describe_flow(
346    pool: &PgPool,
347    partition_config: &ff_core::partition::PartitionConfig,
348    id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350    let part = flow_partition_byte(id, partition_config);
351    let flow_uuid: Uuid = id.0;
352
353    let flow_row_opt = sqlx::query(
354        "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355                created_at_ms, terminal_at_ms, raw_fields \
356         FROM ff_flow_core \
357         WHERE partition_key = $1 AND flow_id = $2",
358    )
359    .bind(part)
360    .bind(flow_uuid)
361    .fetch_optional(pool)
362    .await
363    .map_err(map_sqlx_error)?;
364
365    let Some(flow_row) = flow_row_opt else {
366        return Ok(None);
367    };
368
369    let eg_rows = sqlx::query(
370        "SELECT partition_key, flow_id, downstream_eid, policy, \
371                k_target, success_count, fail_count, skip_count, running_count \
372         FROM ff_edge_group \
373         WHERE partition_key = $1 AND flow_id = $2 \
374         ORDER BY downstream_eid",
375    )
376    .bind(part)
377    .bind(flow_uuid)
378    .fetch_all(pool)
379    .await
380    .map_err(map_sqlx_error)?;
381
382    let mut edge_groups = Vec::with_capacity(eg_rows.len());
383    for row in &eg_rows {
384        edge_groups.push(decode_edge_group_row(row)?);
385    }
386
387    decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390/// `list_flows` — cursor-paginated per-partition flow listing.
391pub async fn list_flows(
392    pool: &PgPool,
393    partition: PartitionKey,
394    cursor: Option<FlowId>,
395    limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397    if limit == 0 {
398        return Ok(ListFlowsPage::new(Vec::new(), None));
399    }
400    let part = partition_index_from_key(&partition)?;
401    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402    // +1 row to decide whether next_cursor should be set.
403    let fetch_limit = (limit + 1) as i64;
404
405    let rows = sqlx::query(
406        "SELECT flow_id, created_at_ms, public_flow_state \
407         FROM ff_flow_core \
408         WHERE partition_key = $1 \
409           AND ($2::uuid IS NULL OR flow_id > $2) \
410         ORDER BY flow_id \
411         LIMIT $3",
412    )
413    .bind(part)
414    .bind(cursor_uuid)
415    .bind(fetch_limit)
416    .fetch_all(pool)
417    .await
418    .map_err(map_sqlx_error)?;
419
420    let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421    let mut next_cursor: Option<FlowId> = None;
422    for (idx, row) in rows.iter().enumerate() {
423        if idx >= limit {
424            // The (limit+1)-th row signals there is at least one more;
425            // we don't include it in the page, but the last included
426            // row's flow_id becomes the cursor.
427            if let Some(last) = flows.last() {
428                next_cursor = Some(last.flow_id.clone());
429            }
430            break;
431        }
432        let flow_uuid: Uuid = row.get("flow_id");
433        let created_at_ms: i64 = row.get("created_at_ms");
434        let public_state: String = row.get("public_flow_state");
435        let status = FlowStatus::from_public_flow_state(&public_state);
436        flows.push(FlowSummary::new(
437            FlowId::from_uuid(flow_uuid),
438            TimestampMs(created_at_ms),
439            status,
440        ));
441    }
442
443    Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446/// `list_edges` — direction-keyed adjacency listing.
447pub async fn list_edges(
448    pool: &PgPool,
449    partition_config: &ff_core::partition::PartitionConfig,
450    flow_id: &FlowId,
451    direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453    let part = flow_partition_byte(flow_id, partition_config);
454    let flow_uuid: Uuid = flow_id.0;
455    let (column_filter, subject_eid) = match &direction {
456        EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457        EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458    };
459    // Parse the UUID suffix from the `{fp:N}:<uuid>` id.
460    let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462    let sql = format!(
463        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464         FROM ff_edge \
465         WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466         ORDER BY edge_id"
467    );
468    let rows = sqlx::query(&sql)
469        .bind(part)
470        .bind(flow_uuid)
471        .bind(subject_uuid)
472        .fetch_all(pool)
473        .await
474        .map_err(map_sqlx_error)?;
475
476    let mut out = Vec::with_capacity(rows.len());
477    for row in &rows {
478        out.push(decode_edge_row(row, flow_id)?);
479    }
480    Ok(out)
481}
482
483/// `describe_edge` — one edge by flow + edge id.
484pub async fn describe_edge(
485    pool: &PgPool,
486    partition_config: &ff_core::partition::PartitionConfig,
487    flow_id: &FlowId,
488    edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490    let part = flow_partition_byte(flow_id, partition_config);
491    let row_opt = sqlx::query(
492        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493         FROM ff_edge \
494         WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495    )
496    .bind(part)
497    .bind(flow_id.0)
498    .bind(edge_id.0)
499    .fetch_optional(pool)
500    .await
501    .map_err(map_sqlx_error)?;
502
503    match row_opt {
504        Some(row) => decode_edge_row(&row, flow_id).map(Some),
505        None => Ok(None),
506    }
507}
508
509/// Parse the UUID suffix out of a hash-tagged `{fp:N}:<uuid>` exec id.
510fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511    let s = eid.as_str();
512    let Some(colon) = s.rfind("}:") else {
513        return Err(EngineError::Validation {
514            kind: ValidationKind::InvalidInput,
515            detail: format!("execution_id missing '}}:' delimiter: {s}"),
516        });
517    };
518    let tail = &s[colon + 2..];
519    Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520        kind: ValidationKind::InvalidInput,
521        detail: format!("execution_id uuid suffix parse: {e}"),
522    })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526    match p {
527        CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528        CancelFlowPolicy::CancelAll => "cancel_all",
529        CancelFlowPolicy::CancelPending => "cancel_pending",
530        // Forward-compat for future additive `CancelFlowPolicy` variants
531        // — encode as `cancel_all` (safest conservative default).
532        _ => "cancel_all",
533    }
534}
535
536/// `cancel_flow` — SERIALIZABLE cascade with a 3-attempt retry loop.
537///
538/// Transaction steps:
539///   1. SELECT + UPDATE `ff_flow_core.public_flow_state = 'cancelled'`
540///      (atomic state flip).
541///   2. For `CancelAll`/`CancelPending`: SELECT member execution ids
542///      from `ff_exec_core` filtered by flow_id and state, UPDATE
543///      each to `cancelled`, INSERT a `ff_completion_event` row per
544///      member (the trigger fires NOTIFY at COMMIT).
545///   3. For `CancelPending` under `CancelRemaining` semantics (Stage
546///      C), INSERT a `ff_pending_cancel_groups` bookkeeping row for
547///      every running sibling group that the Wave-5 dispatcher needs
548///      to follow up on. Wave 4c only writes the bookkeeping row —
549///      the actual dispatch is Wave 5.
550///
551/// On `SQLSTATE 40001` / `40P01` (serialization / deadlock) the
552/// transaction ROLLBACKs, we sleep a tiny backoff, and retry. After
553/// [`CANCEL_FLOW_MAX_ATTEMPTS`] failed attempts we return
554/// [`ContentionKind::RetryExhausted`].
555pub async fn cancel_flow(
556    pool: &PgPool,
557    partition_config: &ff_core::partition::PartitionConfig,
558    id: &FlowId,
559    policy: CancelFlowPolicy,
560    _wait: CancelFlowWait,
561) -> Result<CancelFlowResult, EngineError> {
562    let part = flow_partition_byte(id, partition_config);
563    let flow_uuid: Uuid = id.0;
564    let policy_str = cancel_policy_to_str(policy);
565
566    let mut last_transport: Option<EngineError> = None;
567    for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
568        match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
569            Ok(result) => return Ok(result),
570            Err(err) => {
571                if is_serialization_conflict(&err) {
572                    // Tiny exponential backoff: 0, 5ms, 15ms.
573                    if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
574                        let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
575                        tokio::time::sleep(Duration::from_millis(ms)).await;
576                    }
577                    last_transport = Some(err);
578                    continue;
579                }
580                return Err(err);
581            }
582        }
583    }
584    // Exhausted all attempts on serialization failures.
585    let _ = last_transport; // drop; retained only for trace-level diagnosis
586    Err(EngineError::Contention(ContentionKind::RetryExhausted))
587}
588
589/// Return `true` when an `EngineError` was produced by the 40001 /
590/// 40P01 sqlx mapping (Contention(LeaseConflict)). We re-map that
591/// specific kind into the retry loop; other `Contention` variants
592/// (e.g. honest lease contention from a future extension) propagate
593/// untouched.
594fn is_serialization_conflict(err: &EngineError) -> bool {
595    matches!(
596        err,
597        EngineError::Contention(ContentionKind::LeaseConflict)
598    )
599}
600
601async fn cancel_flow_once(
602    pool: &PgPool,
603    part: i16,
604    flow_uuid: Uuid,
605    policy: CancelFlowPolicy,
606    policy_str: &str,
607) -> Result<CancelFlowResult, EngineError> {
608    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
609
610    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
611        .execute(&mut *tx)
612        .await
613        .map_err(map_sqlx_error)?;
614
615    // Step 1 — atomically flip the flow_core state. If the flow
616    // doesn't exist we return `Cancelled { members: [] }` with empty
617    // members (idempotent retry after the row has already been
618    // cancelled returns the same shape).
619    let flow_found = sqlx::query(
620        "UPDATE ff_flow_core \
621         SET public_flow_state = 'cancelled', \
622             terminal_at_ms = COALESCE(terminal_at_ms, \
623                 (extract(epoch from clock_timestamp())*1000)::bigint), \
624             raw_fields = raw_fields \
625                 || jsonb_build_object('cancellation_policy', $3::text) \
626         WHERE partition_key = $1 AND flow_id = $2 \
627         RETURNING flow_id",
628    )
629    .bind(part)
630    .bind(flow_uuid)
631    .bind(policy_str)
632    .fetch_optional(&mut *tx)
633    .await
634    .map_err(map_sqlx_error)?;
635
636    if flow_found.is_none() {
637        tx.commit().await.map_err(map_sqlx_error)?;
638        return Ok(CancelFlowResult::Cancelled {
639            cancellation_policy: policy_str.to_owned(),
640            member_execution_ids: Vec::new(),
641        });
642    }
643
644    // Step 2 — collect + cancel member executions under the policy.
645    let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
646        Vec::new()
647    } else {
648        let state_filter: &str = match policy {
649            CancelFlowPolicy::CancelAll => {
650                "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
651            }
652            CancelFlowPolicy::CancelPending => {
653                "lifecycle_phase IN ('pending','blocked','eligible')"
654            }
655            _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
656        };
657        let sql = format!(
658            "SELECT execution_id FROM ff_exec_core \
659             WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
660        );
661        sqlx::query(&sql)
662            .bind(part)
663            .bind(flow_uuid)
664            .fetch_all(&mut *tx)
665            .await
666            .map_err(map_sqlx_error)?
667    };
668
669    let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
670    for row in &member_rows {
671        let exec_uuid: Uuid = row.get("execution_id");
672        // Flip the member to `cancelled` lifecycle + eligibility.
673        sqlx::query(
674            "UPDATE ff_exec_core \
675             SET lifecycle_phase = 'cancelled', \
676                 eligibility_state = 'cancelled', \
677                 public_state = 'cancelled', \
678                 terminal_at_ms = COALESCE(terminal_at_ms, \
679                     (extract(epoch from clock_timestamp())*1000)::bigint), \
680                 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
681                 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
682             WHERE partition_key = $1 AND execution_id = $2",
683        )
684        .bind(part)
685        .bind(exec_uuid)
686        .execute(&mut *tx)
687        .await
688        .map_err(map_sqlx_error)?;
689
690        // Emit a completion outbox row (trigger fires NOTIFY).
691        sqlx::query(
692            "INSERT INTO ff_completion_event \
693             (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
694             VALUES ($1, $2, $3, 'cancelled', \
695                     (extract(epoch from clock_timestamp())*1000)::bigint)",
696        )
697        .bind(part)
698        .bind(exec_uuid)
699        .bind(flow_uuid)
700        .execute(&mut *tx)
701        .await
702        .map_err(map_sqlx_error)?;
703
704        member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
705    }
706
707    // Step 3 — for CancelPending, record any running sibling groups
708    // into ff_pending_cancel_groups so the Wave-5 dispatcher can
709    // follow up. Wave 4c only does the bookkeeping write.
710    if matches!(policy, CancelFlowPolicy::CancelPending) {
711        sqlx::query(
712            "INSERT INTO ff_pending_cancel_groups \
713             (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
714             SELECT partition_key, flow_id, downstream_eid, \
715                    (extract(epoch from clock_timestamp())*1000)::bigint \
716             FROM ff_edge_group \
717             WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
718             ON CONFLICT DO NOTHING",
719        )
720        .bind(part)
721        .bind(flow_uuid)
722        .execute(&mut *tx)
723        .await
724        .map_err(map_sqlx_error)?;
725    }
726
727    tx.commit().await.map_err(map_sqlx_error)?;
728
729    Ok(CancelFlowResult::Cancelled {
730        cancellation_policy: policy_str.to_owned(),
731        member_execution_ids,
732    })
733}
734
735/// `set_edge_group_policy` — RFC-016 Stage A.
736///
737/// Reject when at least one edge has already been staged for
738/// `downstream_execution_id` (the ordering rule: policy must be set
739/// BEFORE the first `add_dependency`). Parity with the Valkey Lua
740/// path, which returns `invalid_input` → `EngineError::Validation`
741/// in the same situation (not `Conflict`, per in-tree behavior).
742///
743/// Stage A supports `AllOf` fully; `AnyOf` / `Quorum` flow through
744/// the same write path (Stage B's resolver is backend-specific —
745/// Valkey's Lua honours them, Postgres Wave 4c stores them and Wave
746/// 5 wires the resolver). Callers that need the Stage-A ordering
747/// check can rely on this method regardless of the variant.
748pub async fn set_edge_group_policy(
749    pool: &PgPool,
750    partition_config: &ff_core::partition::PartitionConfig,
751    flow_id: &FlowId,
752    downstream_execution_id: &ExecutionId,
753    policy: EdgeDependencyPolicy,
754) -> Result<SetEdgeGroupPolicyResult, EngineError> {
755    // Light input validation mirroring the Valkey Stage B impl.
756    match &policy {
757        EdgeDependencyPolicy::AllOf => {}
758        EdgeDependencyPolicy::AnyOf { .. } => {}
759        EdgeDependencyPolicy::Quorum { k, .. } => {
760            if *k == 0 {
761                return Err(EngineError::Validation {
762                    kind: ValidationKind::InvalidInput,
763                    detail: "quorum k must be >= 1".to_string(),
764                });
765            }
766        }
767        _ => {
768            return Err(EngineError::Validation {
769                kind: ValidationKind::InvalidInput,
770                detail: "unknown EdgeDependencyPolicy variant".to_string(),
771            });
772        }
773    }
774
775    let part = flow_partition_byte(flow_id, partition_config);
776    let flow_uuid: Uuid = flow_id.0;
777    let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
778
779    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
780
781    // Ordering guard: edges already staged for this group?
782    let already_staged: i64 = sqlx::query_scalar(
783        "SELECT COUNT(*) FROM ff_edge \
784         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
785    )
786    .bind(part)
787    .bind(flow_uuid)
788    .bind(downstream_uuid)
789    .fetch_one(&mut *tx)
790    .await
791    .map_err(map_sqlx_error)?;
792
793    if already_staged > 0 {
794        let _ = tx.rollback().await;
795        return Err(EngineError::Validation {
796            kind: ValidationKind::InvalidInput,
797            detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
798        });
799    }
800
801    // Idempotent restate: if an identical policy is already stored,
802    // return AlreadySet without touching the row.
803    let existing: Option<JsonValue> = sqlx::query_scalar(
804        "SELECT policy FROM ff_edge_group \
805         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
806    )
807    .bind(part)
808    .bind(flow_uuid)
809    .bind(downstream_uuid)
810    .fetch_optional(&mut *tx)
811    .await
812    .map_err(map_sqlx_error)?;
813
814    let encoded = encode_edge_policy(&policy);
815    if let Some(existing_policy) = existing
816        && existing_policy == encoded
817    {
818        tx.commit().await.map_err(map_sqlx_error)?;
819        return Ok(SetEdgeGroupPolicyResult::AlreadySet);
820    }
821
822    sqlx::query(
823        "INSERT INTO ff_edge_group \
824         (partition_key, flow_id, downstream_eid, policy) \
825         VALUES ($1, $2, $3, $4) \
826         ON CONFLICT (partition_key, flow_id, downstream_eid) \
827         DO UPDATE SET policy = EXCLUDED.policy",
828    )
829    .bind(part)
830    .bind(flow_uuid)
831    .bind(downstream_uuid)
832    .bind(&encoded)
833    .execute(&mut *tx)
834    .await
835    .map_err(map_sqlx_error)?;
836
837    tx.commit().await.map_err(map_sqlx_error)?;
838    Ok(SetEdgeGroupPolicyResult::Set)
839}
840
841// Silence an unused-import lint if PartitionFamily/Partition imports
842// aren't otherwise referenced after trimming.
843#[allow(dead_code)]
844fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}