Skip to main content

ff_backend_sqlite/queries/
operator.rs

1//! SQL statements for Wave 9 operator-control ops (RFC-023 Phase 3.2).
2//!
3//! Mirrors `ff-backend-postgres/src/operator.rs`. SQLite-specific
4//! translations:
5//!
6//! * `jsonb_set(raw_fields, '{k}', to_jsonb($::text))` →
7//!   `json_set(raw_fields, '$.k', ?)`. `raw_fields` is TEXT JSON
8//!   (JSON1), not JSONB.
9//! * `raw_fields->>'replay_count'` → `json_extract(raw_fields,
10//!   '$.replay_count')`. Cast to INTEGER happens via SQLite's implicit
11//!   numeric coercion inside arithmetic.
12//! * `FOR NO KEY UPDATE` / `FOR UPDATE` are no-ops — the enclosing
13//!   `BEGIN IMMEDIATE` holds the RESERVED lock for the full
14//!   read-modify-write window.
15//! * `ExecutionId` UUIDs are bound as 16-byte `BLOB`s (see
16//!   `backend::split_exec_id`), not stringified UUIDs as on PG.
17
18// ── cancel_execution ───────────────────────────────────────────────
19
20/// Pre-read: pin `exec_core` + current-attempt lease identity.
21/// Binds: ?1 partition_key, ?2 execution_id BLOB. `attempt_index` is
22/// returned by this read (not bound) and is re-bound separately on
23/// the attempt-row follow-up statements.
24pub(crate) const SELECT_CANCEL_PRE_SQL: &str = r#"
25    SELECT ec.lifecycle_phase  AS lifecycle_phase,
26           ec.public_state     AS public_state,
27           ec.attempt_index    AS attempt_index,
28           a.worker_instance_id AS worker_instance_id,
29           a.lease_epoch       AS lease_epoch
30      FROM ff_exec_core ec
31      LEFT JOIN ff_attempt a
32        ON a.partition_key = ec.partition_key
33       AND a.execution_id  = ec.execution_id
34       AND a.attempt_index = ec.attempt_index
35     WHERE ec.partition_key = ?1 AND ec.execution_id = ?2
36"#;
37
38/// Flip `ff_exec_core` into the terminal `cancelled` state. Matches
39/// `ff-backend-postgres/src/operator.rs:211-236`. Binds: ?1 part, ?2
40/// exec_uuid BLOB, ?3 now_ms, ?4 reason, ?5 source_str.
41pub(crate) const UPDATE_EXEC_CORE_CANCELLED_SQL: &str = r#"
42    UPDATE ff_exec_core
43       SET lifecycle_phase     = 'cancelled',
44           ownership_state     = 'unowned',
45           eligibility_state   = 'not_applicable',
46           public_state        = 'cancelled',
47           attempt_state       = 'cancelled',
48           terminal_at_ms      = COALESCE(terminal_at_ms, ?3),
49           cancellation_reason = COALESCE(cancellation_reason, ?4),
50           cancelled_by        = COALESCE(cancelled_by, ?5),
51           raw_fields          = json_set(raw_fields, '$.last_mutation_at', CAST(?3 AS TEXT))
52     WHERE partition_key = ?1 AND execution_id = ?2
53"#;
54
55/// Clear the current attempt's lease fields, bump lease_epoch, mark
56/// outcome='cancelled'. Binds: ?1 part, ?2 exec_uuid BLOB, ?3
57/// attempt_index, ?4 now_ms.
58pub(crate) const UPDATE_ATTEMPT_CANCELLED_SQL: &str = r#"
59    UPDATE ff_attempt
60       SET worker_instance_id   = NULL,
61           lease_expires_at_ms  = NULL,
62           lease_epoch          = lease_epoch + 1,
63           terminal_at_ms       = ?4,
64           outcome              = 'cancelled'
65     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
66"#;
67
68// ── revoke_lease ───────────────────────────────────────────────────
69
70/// Pre-read: exec_core.attempt_index.
71pub(crate) const SELECT_EXEC_ATTEMPT_INDEX_SQL: &str = r#"
72    SELECT attempt_index
73      FROM ff_exec_core
74     WHERE partition_key = ?1 AND execution_id = ?2
75"#;
76
77/// Pre-read: attempt's current owner + epoch.
78pub(crate) const SELECT_ATTEMPT_OWNER_SQL: &str = r#"
79    SELECT worker_instance_id, lease_epoch
80      FROM ff_attempt
81     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
82"#;
83
84/// CAS on lease_epoch — clears the lease + bumps the epoch. Row-count
85/// = 0 → concurrent revoker won, surface `AlreadySatisfied
86/// (epoch_moved)`. Binds: ?1 part, ?2 exec_uuid, ?3 attempt_index, ?4
87/// prior_epoch.
88pub(crate) const UPDATE_ATTEMPT_REVOKE_CAS_SQL: &str = r#"
89    UPDATE ff_attempt
90       SET worker_instance_id   = NULL,
91           lease_expires_at_ms  = NULL,
92           lease_epoch          = lease_epoch + 1
93     WHERE partition_key = ?1
94       AND execution_id  = ?2
95       AND attempt_index = ?3
96       AND lease_epoch   = ?4
97"#;
98
99/// Flip exec_core back to runnable (reclaimable) — gated on
100/// lifecycle_phase='active' so a concurrent cancel/complete is not
101/// overwritten. Binds: ?1 part, ?2 exec_uuid, ?3 now_ms.
102pub(crate) const UPDATE_EXEC_CORE_RECLAIMABLE_SQL: &str = r#"
103    UPDATE ff_exec_core
104       SET lifecycle_phase   = 'runnable',
105           ownership_state   = 'unowned',
106           eligibility_state = 'eligible_now',
107           attempt_state     = 'attempt_interrupted',
108           raw_fields        = json_set(raw_fields, '$.last_mutation_at', CAST(?3 AS TEXT))
109     WHERE partition_key = ?1 AND execution_id = ?2
110       AND lifecycle_phase = 'active'
111"#;
112
113// ── change_priority ────────────────────────────────────────────────
114
115/// Pre-read: gate fields + current priority. Binds: ?1 part, ?2
116/// exec_uuid BLOB.
117pub(crate) const SELECT_CHANGE_PRIORITY_PRE_SQL: &str = r#"
118    SELECT lifecycle_phase, eligibility_state, priority
119      FROM ff_exec_core
120     WHERE partition_key = ?1 AND execution_id = ?2
121"#;
122
123/// Update priority — gated on lifecycle_phase='runnable' AND
124/// eligibility_state='eligible_now' (Valkey-canonical gate from
125/// `flowfabric.lua:3683-3688`). Row-count = 0 on concurrent transition
126/// surfaces `ExecutionNotEligible`. Binds: ?1 part, ?2 exec_uuid, ?3
127/// new_priority, ?4 now_ms.
128pub(crate) const UPDATE_EXEC_CORE_PRIORITY_SQL: &str = r#"
129    UPDATE ff_exec_core
130       SET priority   = ?3,
131           raw_fields = json_set(raw_fields, '$.last_mutation_at', CAST(?4 AS TEXT))
132     WHERE partition_key = ?1 AND execution_id = ?2
133       AND lifecycle_phase   = 'runnable'
134       AND eligibility_state = 'eligible_now'
135"#;
136
137// ── replay_execution ───────────────────────────────────────────────
138
139/// Pre-read: lifecycle gate + flow membership + attempt index for
140/// deriving the skipped-flow-member branch.
141pub(crate) const SELECT_REPLAY_PRE_SQL: &str = r#"
142    SELECT lifecycle_phase, flow_id, attempt_index
143      FROM ff_exec_core
144     WHERE partition_key = ?1 AND execution_id = ?2
145"#;
146
147/// Read current attempt's outcome for the skipped-branch selector.
148pub(crate) const SELECT_ATTEMPT_OUTCOME_SQL: &str = r#"
149    SELECT outcome
150      FROM ff_attempt
151     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
152"#;
153
154/// Reset downstream edge-group counters for the skipped-flow-member
155/// replay branch. skip/fail/running → 0; success_count preserved per
156/// Valkey ground-truth at `flowfabric.lua:8580`. Binds: ?1 part, ?2
157/// exec_uuid_blob (downstream_eid).
158pub(crate) const RESET_EDGE_GROUP_COUNTERS_SQL: &str = r#"
159    UPDATE ff_edge_group
160       SET skip_count    = 0,
161           fail_count    = 0,
162           running_count = 0
163     WHERE (partition_key, flow_id, downstream_eid) IN (
164       SELECT DISTINCT e.partition_key, e.flow_id, e.downstream_eid
165         FROM ff_edge e
166        WHERE e.partition_key   = ?1
167          AND e.downstream_eid  = ?2
168     )
169"#;
170
171/// Flip exec_core back to runnable + bump `replay_count`. Binds: ?1
172/// part, ?2 exec_uuid BLOB, ?3 eligibility_state, ?4 public_state, ?5
173/// now_ms.
174///
175/// SQLite's `json_set` doesn't support JSON-pointer arithmetic in a
176/// single call the way PG's `jsonb_set(..., to_jsonb(... + 1))` does.
177/// We nest two `json_set` calls: the inner one bumps
178/// `replay_count`, the outer stamps `last_mutation_at`. The bump
179/// reads the current value via `json_extract` + `COALESCE(..., 0)`
180/// so the first replay initializes it from 0 → 1.
181///
182/// `replay_count` is stored as a JSON **number** (not a string): the
183/// SQLite integer result of the arithmetic flows directly into
184/// `json_set(...)` without a `CAST ... AS TEXT` wrapper, matching the
185/// PG reference's `to_jsonb(... + 1)` shape. `last_mutation_at` stays
186/// a JSON string to match how it is written elsewhere (e.g.
187/// `build_create_execution_raw_fields`).
188pub(crate) const UPDATE_EXEC_CORE_REPLAY_SQL: &str = r#"
189    UPDATE ff_exec_core
190       SET lifecycle_phase      = 'runnable',
191           ownership_state      = 'unowned',
192           eligibility_state    = ?3,
193           public_state         = ?4,
194           attempt_state        = 'pending_replay_attempt',
195           terminal_at_ms       = NULL,
196           result               = NULL,
197           cancellation_reason  = NULL,
198           cancelled_by         = NULL,
199           raw_fields           = json_set(
200               json_set(
201                   raw_fields,
202                   '$.replay_count',
203                   COALESCE(CAST(json_extract(raw_fields, '$.replay_count') AS INTEGER), 0) + 1
204               ),
205               '$.last_mutation_at',
206               CAST(?5 AS TEXT)
207           )
208     WHERE partition_key = ?1 AND execution_id = ?2
209"#;
210
211/// Reset current attempt row in-place (Rev 7 Fork 2 Option A — no new
212/// attempt row). Binds: ?1 part, ?2 exec_uuid, ?3 attempt_index.
213pub(crate) const UPDATE_ATTEMPT_REPLAY_RESET_SQL: &str = r#"
214    UPDATE ff_attempt
215       SET outcome              = NULL,
216           terminal_at_ms       = NULL,
217           worker_id            = NULL,
218           worker_instance_id   = NULL,
219           lease_expires_at_ms  = NULL,
220           lease_epoch          = lease_epoch + 1
221     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
222"#;
223
224// ── cancel_flow_header (§4.2.3, Phase 3.3) ─────────────────────────
225
226/// Pre-read flow core row under the write lock (`BEGIN IMMEDIATE`).
227/// Binds: ?1 partition_key, ?2 flow_id BLOB.
228pub(crate) const SELECT_FLOW_CORE_FOR_CANCEL_SQL: &str = r#"
229    SELECT public_flow_state, raw_fields
230      FROM ff_flow_core
231     WHERE partition_key = ?1 AND flow_id = ?2
232"#;
233
234/// Flip `ff_flow_core` to cancelled + merge `cancellation_policy` +
235/// `cancel_reason` into `raw_fields`. SQLite has no `jsonb ||` merge;
236/// we nest two `json_set` calls. Binds: ?1 part, ?2 flow_id, ?3 now,
237/// ?4 cancellation_policy, ?5 reason.
238pub(crate) const UPDATE_FLOW_CORE_CANCEL_WITH_REASON_SQL: &str = r#"
239    UPDATE ff_flow_core
240       SET public_flow_state = 'cancelled',
241           terminal_at_ms    = COALESCE(terminal_at_ms, ?3),
242           raw_fields        = json_set(
243                                 json_set(raw_fields,
244                                          '$.cancellation_policy', ?4),
245                                 '$.cancel_reason', ?5)
246     WHERE partition_key = ?1 AND flow_id = ?2
247"#;
248
249/// Idempotent insert of the backlog header. Binds: ?1 part, ?2
250/// flow_id, ?3 requested_at_ms, ?4 reason, ?5 cancellation_policy.
251pub(crate) const INSERT_CANCEL_BACKLOG_SQL: &str = r#"
252    INSERT INTO ff_cancel_backlog
253        (partition_key, flow_id, requested_at_ms, requester, reason,
254         cancellation_policy, status)
255    VALUES (?1, ?2, ?3, '', ?4, ?5, 'pending')
256    ON CONFLICT (partition_key, flow_id) DO NOTHING
257"#;
258
259/// Enumerate in-flight member executions for a flow. Binds: ?1 part,
260/// ?2 flow_id BLOB.
261pub(crate) const SELECT_FLOW_INFLIGHT_MEMBERS_SQL: &str = r#"
262    SELECT execution_id
263      FROM ff_exec_core
264     WHERE partition_key = ?1 AND flow_id = ?2
265       AND lifecycle_phase NOT IN ('terminal','cancelled')
266"#;
267
268/// Enumerate all members (used for idempotent-replay when a backlog
269/// row doesn't exist yet). Binds: ?1 part, ?2 flow_id.
270pub(crate) const SELECT_FLOW_ALL_MEMBERS_SQL: &str = r#"
271    SELECT execution_id
272      FROM ff_exec_core
273     WHERE partition_key = ?1 AND flow_id = ?2
274"#;
275
276/// Enumerate already-enumerated backlog members (idempotent-replay
277/// path). Binds: ?1 part, ?2 flow_id.
278pub(crate) const SELECT_CANCEL_BACKLOG_MEMBERS_SQL: &str = r#"
279    SELECT execution_id
280      FROM ff_cancel_backlog_member
281     WHERE partition_key = ?1 AND flow_id = ?2
282"#;
283
284/// Insert one backlog member row. Binds: ?1 part, ?2 flow_id, ?3
285/// execution_id wire-string. SQLite has no bulk UNNEST — the caller
286/// loops; under single-writer with BEGIN IMMEDIATE each statement is
287/// cheap and the membership cardinality is bounded.
288pub(crate) const INSERT_CANCEL_BACKLOG_MEMBER_SQL: &str = r#"
289    INSERT INTO ff_cancel_backlog_member
290        (partition_key, flow_id, execution_id)
291    VALUES (?1, ?2, ?3)
292    ON CONFLICT (partition_key, flow_id, execution_id) DO NOTHING
293"#;
294
295/// Flip one member exec_core row to cancelled for the `cancel_flow_header`
296/// fan-out. Binds: ?1 part, ?2 execution_id BLOB, ?3 now, ?4 reason.
297///
298/// Also clears `ownership_state` + `attempt_state` to their terminal
299/// forms so the `StateVector` assembled by `read_execution_info` is
300/// internally consistent (cursor bugbot: a live-running member would
301/// otherwise retain `ownership_state='leased'` + `attempt_state=
302/// 'running_attempt'` alongside `lifecycle_phase='cancelled'`).
303pub(crate) const UPDATE_EXEC_CORE_CANCEL_FROM_HEADER_SQL: &str = r#"
304    UPDATE ff_exec_core
305       SET lifecycle_phase     = 'cancelled',
306           ownership_state     = 'unowned',
307           eligibility_state   = 'cancelled',
308           public_state        = 'cancelled',
309           attempt_state       = 'cancelled',
310           terminal_at_ms      = COALESCE(terminal_at_ms, ?3),
311           cancellation_reason = COALESCE(cancellation_reason, ?4),
312           cancelled_by        = COALESCE(cancelled_by, 'cancel_flow_header')
313     WHERE partition_key = ?1 AND execution_id = ?2
314"#;
315
316// ── ack_cancel_member (§4.2.3, Phase 3.3) ──────────────────────────
317
318/// Delete one backlog member row. Binds: ?1 part, ?2 flow_id, ?3
319/// execution_id wire-string.
320pub(crate) const DELETE_CANCEL_BACKLOG_MEMBER_SQL: &str = r#"
321    DELETE FROM ff_cancel_backlog_member
322     WHERE partition_key = ?1
323       AND flow_id       = ?2
324       AND execution_id  = ?3
325"#;
326
327/// Delete backlog header IFF no members remain. Binds: ?1 part, ?2
328/// flow_id. Note: the NOT EXISTS subquery re-checks member_map
329/// post-member-delete in the same statement window, so a last-member
330/// ack drops both in a single logical step. Concurrent acks race at
331/// commit time and the loser retries through `retry_serializable`.
332pub(crate) const DELETE_CANCEL_BACKLOG_IF_EMPTY_SQL: &str = r#"
333    DELETE FROM ff_cancel_backlog
334     WHERE partition_key = ?1
335       AND flow_id       = ?2
336       AND NOT EXISTS (
337         SELECT 1 FROM ff_cancel_backlog_member
338          WHERE partition_key = ?1 AND flow_id = ?2
339       )
340"#;
341
342// ── operator_event outbox (co-transactional) ───────────────────────
343
344/// Insert one operator-event outbox row, back-filling `namespace` +
345/// `instance_tag` from the co-transactional `ff_exec_core.raw_fields`
346/// row (Phase 3.2 fix — mirrors the lease_event / completion_event
347/// back-fill). Binds:
348///
349///   1. execution_id TEXT — emitted on the outbox row.
350///   2. event_type TEXT.
351///   3. details TEXT (nullable JSON).
352///   4. occurred_at_ms (i64).
353///   5. partition_key (i64) — used on both the outbox row and the
354///      co-transactional exec_core lookup.
355///   6. execution_id BLOB — `ff_exec_core.execution_id` is BLOB.
356pub(crate) const INSERT_OPERATOR_EVENT_SQL: &str = r#"
357    INSERT INTO ff_operator_event
358        (execution_id, event_type, details, occurred_at_ms, partition_key,
359         namespace, instance_tag)
360    SELECT ?1, ?2, ?3, ?4, ?5,
361           json_extract(raw_fields, '$.namespace'),
362           json_extract(raw_fields, '$.tags."cairn.instance_id"')
363      FROM ff_exec_core
364     WHERE partition_key = ?5 AND execution_id = ?6
365    UNION ALL
366    SELECT ?1, ?2, ?3, ?4, ?5, NULL, NULL
367     WHERE NOT EXISTS (
368         SELECT 1 FROM ff_exec_core
369          WHERE partition_key = ?5 AND execution_id = ?6
370     )
371"#;
372
373/// Insert one operator-event outbox row, back-filling `namespace`
374/// from the co-transactional `ff_flow_core.raw_fields` row. Used by
375/// `flow_cancel_requested` where the `execution_id` column on the
376/// outbox carries the FLOW id (Phase 3.3 parity with PG reference
377/// `ff-backend-postgres/src/operator.rs:1118-1132`). `instance_tag`
378/// is left NULL — flows don't carry `cairn.instance_id` tags today.
379///
380/// Binds:
381///   1. flow_id TEXT (stringified UUID) — written on outbox row.
382///   2. event_type TEXT ('flow_cancel_requested').
383///   3. details TEXT (nullable JSON).
384///   4. occurred_at_ms (i64).
385///   5. partition_key (i64).
386///   6. flow_id BLOB — for the co-transactional flow_core lookup.
387pub(crate) const INSERT_OPERATOR_EVENT_FLOW_SQL: &str = r#"
388    INSERT INTO ff_operator_event
389        (execution_id, event_type, details, occurred_at_ms, partition_key,
390         namespace, instance_tag)
391    SELECT ?1, ?2, ?3, ?4, ?5,
392           json_extract(raw_fields, '$.namespace'),
393           NULL
394      FROM ff_flow_core
395     WHERE partition_key = ?5 AND flow_id = ?6
396    UNION ALL
397    SELECT ?1, ?2, ?3, ?4, ?5, NULL, NULL
398     WHERE NOT EXISTS (
399         SELECT 1 FROM ff_flow_core
400          WHERE partition_key = ?5 AND flow_id = ?6
401     )
402"#;