ff_backend_sqlite/queries/flow.rs
1//! SQLite dialect-forked queries for flow-header create / cancel.
2//!
3//! Populated in Phase 2b.1 per RFC-023 §4.1. Mirrors the PG reference
4//! at `ff-backend-postgres/src/flow.rs` + `ff-backend-postgres/src/flow_staging.rs`
5//! statement-by-statement; the only dialect changes are `jsonb` → TEXT
6//! JSON (JSON1), `$N` → `?N`, and the removal of `FOR UPDATE` /
7//! partition-aware casts since SQLite runs single-writer under
8//! `BEGIN IMMEDIATE` (§4.1 A3).
9
10// ── create_flow ─────────────────────────────────────────────────────────
11
12/// Idempotent flow-header insert. `ON CONFLICT DO NOTHING` — caller
13/// detects duplicate via post-insert changes() == 0.
14///
15/// Binds:
16/// 1. partition_key (i64)
17/// 2. flow_id (Uuid)
18/// 3. created_at_ms (i64)
19/// 4. raw_fields (TEXT JSON: flow_kind / namespace / node_count=0 /
20/// edge_count=0 / last_mutation_at_ms)
21pub(crate) const INSERT_FLOW_CORE_SQL: &str = r#"
22 INSERT INTO ff_flow_core
23 (partition_key, flow_id, graph_revision, public_flow_state,
24 created_at_ms, raw_fields)
25 VALUES (?1, ?2, 0, 'open', ?3, ?4)
26 ON CONFLICT (partition_key, flow_id) DO NOTHING
27"#;
28
29// Lifecycle-phase literal sets used by cancel_flow member selection.
30// Kept as SQL-inline literals (rather than Rust constants bound at
31// query time) because SQLite does not support IN-list parameter
32// arrays; each literal is a deployment-wide invariant string that
33// `ff_exec_core.lifecycle_phase` can take, never user input.
34// Centralizing them in this module so any future lifecycle-phase
35// vocabulary change touches exactly one file.
36//
37// `TERMINAL_PHASES` — an exec row in any of these is already finished,
38// so cancel_flow skips it. Mirrors PG's `NOT IN (...)` filter at
39// `ff-backend-postgres/src/flow.rs:648-649` plus the RFC-023 SQLite
40// 'terminal' literal (see `queries/exec_core.rs::UPDATE_EXEC_CORE_COMPLETE_SQL`
41// which writes 'terminal' rather than PG's multiple terminal literals).
42//
43// `PRE_RUNNABLE_PHASES` — the `CancelPending` policy only flips rows
44// whose execution hasn't started yet.
45
46// ── cancel_flow ─────────────────────────────────────────────────────────
47
48/// Atomic flip of flow_core to cancelled, recording the requested
49/// cancellation policy in `raw_fields`. The PG path uses a `RETURNING`
50/// to detect flow-not-found; SQLite uses `changes()` after execute
51/// (caller reads `ExecuteResult::rows_affected`).
52///
53/// Binds:
54/// 1. partition_key (i64)
55/// 2. flow_id (Uuid)
56/// 3. now_ms (i64) — consumed by the COALESCE(terminal_at_ms, ?3)
57/// 4. policy_str (TEXT)
58pub(crate) const UPDATE_FLOW_CORE_CANCEL_SQL: &str = r#"
59 UPDATE ff_flow_core
60 SET public_flow_state = 'cancelled',
61 terminal_at_ms = COALESCE(terminal_at_ms, ?3),
62 raw_fields = json_set(raw_fields, '$.cancellation_policy', ?4)
63 WHERE partition_key = ?1 AND flow_id = ?2
64"#;
65
66/// Enumerate member executions for cancel_flow. Returns rows filtered
67/// by the policy-specific `lifecycle_phase` set.
68///
69/// NOTE: the state filter is embedded at format-time (not bound) because
70/// SQLite prepares the statement by string shape and the NOT-IN literal
71/// list is the simplest dialect-portable shape. The three literals are
72/// hard-coded constants in `backend.rs::cancel_flow_impl`, so there is no
73/// user-controlled string concatenation.
74pub(crate) const SELECT_FLOW_MEMBERS_CANCEL_ALL_SQL: &str = r#"
75 SELECT execution_id
76 FROM ff_exec_core
77 WHERE partition_key = ?1
78 AND flow_id = ?2
79 AND lifecycle_phase NOT IN ('completed', 'failed', 'cancelled', 'expired', 'terminal')
80"#;
81
82pub(crate) const SELECT_FLOW_MEMBERS_CANCEL_PENDING_SQL: &str = r#"
83 SELECT execution_id
84 FROM ff_exec_core
85 WHERE partition_key = ?1
86 AND flow_id = ?2
87 AND lifecycle_phase IN ('pending', 'blocked', 'eligible', 'runnable', 'submitted')
88"#;
89
90/// Flip one member exec_core row to cancelled. Mirror of PG at
91/// `ff-backend-postgres/src/flow.rs:672-687`.
92///
93/// Binds:
94/// 1. partition_key (i64)
95/// 2. execution_id (Uuid)
96/// 3. now_ms (i64)
97pub(crate) const UPDATE_EXEC_CORE_CANCEL_MEMBER_SQL: &str = r#"
98 UPDATE ff_exec_core
99 SET lifecycle_phase = 'cancelled',
100 eligibility_state = 'cancelled',
101 public_state = 'cancelled',
102 attempt_state = 'cancelled',
103 terminal_at_ms = COALESCE(terminal_at_ms, ?3),
104 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'),
105 cancelled_by = COALESCE(cancelled_by, 'cancel_flow')
106 WHERE partition_key = ?1 AND execution_id = ?2
107"#;
108
109/// Clear the current attempt's `outcome` on cancel-member so a later
110/// `read_execution_info` doesn't surface a stale `retry`/`interrupted`
111/// terminal-outcome on a cancelled row (#355). Mirror of the PG
112/// companion statement added on the cancel-member loop in
113/// `ff-backend-postgres/src/flow.rs`.
114///
115/// Binds:
116/// 1. partition_key (i64)
117/// 2. execution_id (Uuid)
118pub(crate) const UPDATE_ATTEMPT_CLEAR_OUTCOME_FOR_CURRENT_SQL: &str = r#"
119 UPDATE ff_attempt
120 SET outcome = NULL
121 WHERE partition_key = ?1
122 AND execution_id = ?2
123 AND attempt_index = (SELECT attempt_index FROM ff_exec_core
124 WHERE partition_key = ?1 AND execution_id = ?2)
125"#;
126
127/// RFC-016 Stage C bookkeeping: enqueue a pending-cancel row for
128/// every edge_group with `running_count > 0` on the cancelled flow
129/// (the Wave-5 dispatcher reads this).
130pub(crate) const INSERT_PENDING_CANCEL_GROUPS_SQL: &str = r#"
131 INSERT OR IGNORE INTO ff_pending_cancel_groups
132 (partition_key, flow_id, downstream_eid, enqueued_at_ms)
133 SELECT partition_key, flow_id, downstream_eid, ?3
134 FROM ff_edge_group
135 WHERE partition_key = ?1 AND flow_id = ?2 AND running_count > 0
136"#;