ff_backend_sqlite/queries/operator.rs
1//! SQL statements for Wave 9 operator-control ops (RFC-023 Phase 3.2).
2//!
3//! Mirrors `ff-backend-postgres/src/operator.rs`. SQLite-specific
4//! translations:
5//!
6//! * `jsonb_set(raw_fields, '{k}', to_jsonb($::text))` →
7//! `json_set(raw_fields, '$.k', ?)`. `raw_fields` is TEXT JSON
8//! (JSON1), not JSONB.
9//! * `raw_fields->>'replay_count'` → `json_extract(raw_fields,
10//! '$.replay_count')`. Cast to INTEGER happens via SQLite's implicit
11//! numeric coercion inside arithmetic.
12//! * `FOR NO KEY UPDATE` / `FOR UPDATE` are no-ops — the enclosing
13//! `BEGIN IMMEDIATE` holds the RESERVED lock for the full
14//! read-modify-write window.
15//! * `ExecutionId` UUIDs are bound as 16-byte `BLOB`s (see
16//! `backend::split_exec_id`), not stringified UUIDs as on PG.
17
18// ── cancel_execution ───────────────────────────────────────────────
19
20/// Pre-read: pin `exec_core` + current-attempt lease identity.
21/// Binds: ?1 partition_key, ?2 execution_id BLOB. `attempt_index` is
22/// returned by this read (not bound) and is re-bound separately on
23/// the attempt-row follow-up statements.
24pub(crate) const SELECT_CANCEL_PRE_SQL: &str = r#"
25 SELECT ec.lifecycle_phase AS lifecycle_phase,
26 ec.public_state AS public_state,
27 ec.attempt_index AS attempt_index,
28 a.worker_instance_id AS worker_instance_id,
29 a.lease_epoch AS lease_epoch
30 FROM ff_exec_core ec
31 LEFT JOIN ff_attempt a
32 ON a.partition_key = ec.partition_key
33 AND a.execution_id = ec.execution_id
34 AND a.attempt_index = ec.attempt_index
35 WHERE ec.partition_key = ?1 AND ec.execution_id = ?2
36"#;
37
38/// Flip `ff_exec_core` into the terminal `cancelled` state. Matches
39/// `ff-backend-postgres/src/operator.rs:211-236`. Binds: ?1 part, ?2
40/// exec_uuid BLOB, ?3 now_ms, ?4 reason, ?5 source_str.
41pub(crate) const UPDATE_EXEC_CORE_CANCELLED_SQL: &str = r#"
42 UPDATE ff_exec_core
43 SET lifecycle_phase = 'cancelled',
44 ownership_state = 'unowned',
45 eligibility_state = 'not_applicable',
46 public_state = 'cancelled',
47 attempt_state = 'cancelled',
48 terminal_at_ms = COALESCE(terminal_at_ms, ?3),
49 cancellation_reason = COALESCE(cancellation_reason, ?4),
50 cancelled_by = COALESCE(cancelled_by, ?5),
51 raw_fields = json_set(raw_fields, '$.last_mutation_at', CAST(?3 AS TEXT))
52 WHERE partition_key = ?1 AND execution_id = ?2
53"#;
54
55/// Clear the current attempt's lease fields, bump lease_epoch, mark
56/// outcome='cancelled'. Binds: ?1 part, ?2 exec_uuid BLOB, ?3
57/// attempt_index, ?4 now_ms.
58pub(crate) const UPDATE_ATTEMPT_CANCELLED_SQL: &str = r#"
59 UPDATE ff_attempt
60 SET worker_instance_id = NULL,
61 lease_expires_at_ms = NULL,
62 lease_epoch = lease_epoch + 1,
63 terminal_at_ms = ?4,
64 outcome = 'cancelled'
65 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
66"#;
67
68// ── revoke_lease ───────────────────────────────────────────────────
69
70/// Pre-read: exec_core.attempt_index.
71pub(crate) const SELECT_EXEC_ATTEMPT_INDEX_SQL: &str = r#"
72 SELECT attempt_index
73 FROM ff_exec_core
74 WHERE partition_key = ?1 AND execution_id = ?2
75"#;
76
77/// Pre-read: attempt's current owner + epoch.
78pub(crate) const SELECT_ATTEMPT_OWNER_SQL: &str = r#"
79 SELECT worker_instance_id, lease_epoch
80 FROM ff_attempt
81 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
82"#;
83
84/// CAS on lease_epoch — clears the lease + bumps the epoch. Row-count
85/// = 0 → concurrent revoker won, surface `AlreadySatisfied
86/// (epoch_moved)`. Binds: ?1 part, ?2 exec_uuid, ?3 attempt_index, ?4
87/// prior_epoch.
88pub(crate) const UPDATE_ATTEMPT_REVOKE_CAS_SQL: &str = r#"
89 UPDATE ff_attempt
90 SET worker_instance_id = NULL,
91 lease_expires_at_ms = NULL,
92 lease_epoch = lease_epoch + 1
93 WHERE partition_key = ?1
94 AND execution_id = ?2
95 AND attempt_index = ?3
96 AND lease_epoch = ?4
97"#;
98
99/// Flip exec_core back to runnable (reclaimable) — gated on
100/// lifecycle_phase='active' so a concurrent cancel/complete is not
101/// overwritten. Binds: ?1 part, ?2 exec_uuid, ?3 now_ms.
102pub(crate) const UPDATE_EXEC_CORE_RECLAIMABLE_SQL: &str = r#"
103 UPDATE ff_exec_core
104 SET lifecycle_phase = 'runnable',
105 ownership_state = 'unowned',
106 eligibility_state = 'eligible_now',
107 attempt_state = 'attempt_interrupted',
108 raw_fields = json_set(raw_fields, '$.last_mutation_at', CAST(?3 AS TEXT))
109 WHERE partition_key = ?1 AND execution_id = ?2
110 AND lifecycle_phase = 'active'
111"#;
112
113// ── change_priority ────────────────────────────────────────────────
114
115/// Pre-read: gate fields + current priority. Binds: ?1 part, ?2
116/// exec_uuid BLOB.
117pub(crate) const SELECT_CHANGE_PRIORITY_PRE_SQL: &str = r#"
118 SELECT lifecycle_phase, eligibility_state, priority
119 FROM ff_exec_core
120 WHERE partition_key = ?1 AND execution_id = ?2
121"#;
122
123/// Update priority — gated on lifecycle_phase='runnable' AND
124/// eligibility_state='eligible_now' (Valkey-canonical gate from
125/// `flowfabric.lua:3683-3688`). Row-count = 0 on concurrent transition
126/// surfaces `ExecutionNotEligible`. Binds: ?1 part, ?2 exec_uuid, ?3
127/// new_priority, ?4 now_ms.
128pub(crate) const UPDATE_EXEC_CORE_PRIORITY_SQL: &str = r#"
129 UPDATE ff_exec_core
130 SET priority = ?3,
131 raw_fields = json_set(raw_fields, '$.last_mutation_at', CAST(?4 AS TEXT))
132 WHERE partition_key = ?1 AND execution_id = ?2
133 AND lifecycle_phase = 'runnable'
134 AND eligibility_state = 'eligible_now'
135"#;
136
137// ── replay_execution ───────────────────────────────────────────────
138
139/// Pre-read: lifecycle gate + flow membership + attempt index for
140/// deriving the skipped-flow-member branch.
141pub(crate) const SELECT_REPLAY_PRE_SQL: &str = r#"
142 SELECT lifecycle_phase, flow_id, attempt_index
143 FROM ff_exec_core
144 WHERE partition_key = ?1 AND execution_id = ?2
145"#;
146
147/// Read current attempt's outcome for the skipped-branch selector.
148pub(crate) const SELECT_ATTEMPT_OUTCOME_SQL: &str = r#"
149 SELECT outcome
150 FROM ff_attempt
151 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
152"#;
153
154/// Reset downstream edge-group counters for the skipped-flow-member
155/// replay branch. skip/fail/running → 0; success_count preserved per
156/// Valkey ground-truth at `flowfabric.lua:8580`. Binds: ?1 part, ?2
157/// exec_uuid_blob (downstream_eid).
158pub(crate) const RESET_EDGE_GROUP_COUNTERS_SQL: &str = r#"
159 UPDATE ff_edge_group
160 SET skip_count = 0,
161 fail_count = 0,
162 running_count = 0
163 WHERE (partition_key, flow_id, downstream_eid) IN (
164 SELECT DISTINCT e.partition_key, e.flow_id, e.downstream_eid
165 FROM ff_edge e
166 WHERE e.partition_key = ?1
167 AND e.downstream_eid = ?2
168 )
169"#;
170
171/// Flip exec_core back to runnable + bump `replay_count`. Binds: ?1
172/// part, ?2 exec_uuid BLOB, ?3 eligibility_state, ?4 public_state, ?5
173/// now_ms.
174///
175/// SQLite's `json_set` doesn't support JSON-pointer arithmetic in a
176/// single call the way PG's `jsonb_set(..., to_jsonb(... + 1))` does.
177/// We nest two `json_set` calls: the inner one bumps
178/// `replay_count`, the outer stamps `last_mutation_at`. The bump
179/// reads the current value via `json_extract` + `COALESCE(..., 0)`
180/// so the first replay initializes it from 0 → 1.
181///
182/// `replay_count` is stored as a JSON **number** (not a string): the
183/// SQLite integer result of the arithmetic flows directly into
184/// `json_set(...)` without a `CAST ... AS TEXT` wrapper, matching the
185/// PG reference's `to_jsonb(... + 1)` shape. `last_mutation_at` stays
186/// a JSON string to match how it is written elsewhere (e.g.
187/// `build_create_execution_raw_fields`).
188pub(crate) const UPDATE_EXEC_CORE_REPLAY_SQL: &str = r#"
189 UPDATE ff_exec_core
190 SET lifecycle_phase = 'runnable',
191 ownership_state = 'unowned',
192 eligibility_state = ?3,
193 public_state = ?4,
194 attempt_state = 'pending_replay_attempt',
195 terminal_at_ms = NULL,
196 result = NULL,
197 cancellation_reason = NULL,
198 cancelled_by = NULL,
199 raw_fields = json_set(
200 json_set(
201 raw_fields,
202 '$.replay_count',
203 COALESCE(CAST(json_extract(raw_fields, '$.replay_count') AS INTEGER), 0) + 1
204 ),
205 '$.last_mutation_at',
206 CAST(?5 AS TEXT)
207 )
208 WHERE partition_key = ?1 AND execution_id = ?2
209"#;
210
211/// Reset current attempt row in-place (Rev 7 Fork 2 Option A — no new
212/// attempt row). Binds: ?1 part, ?2 exec_uuid, ?3 attempt_index.
213pub(crate) const UPDATE_ATTEMPT_REPLAY_RESET_SQL: &str = r#"
214 UPDATE ff_attempt
215 SET outcome = NULL,
216 terminal_at_ms = NULL,
217 worker_id = NULL,
218 worker_instance_id = NULL,
219 lease_expires_at_ms = NULL,
220 lease_epoch = lease_epoch + 1
221 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
222"#;
223
224// ── cancel_flow_header (§4.2.3, Phase 3.3) ─────────────────────────
225
226/// Pre-read flow core row under the write lock (`BEGIN IMMEDIATE`).
227/// Binds: ?1 partition_key, ?2 flow_id BLOB.
228pub(crate) const SELECT_FLOW_CORE_FOR_CANCEL_SQL: &str = r#"
229 SELECT public_flow_state, raw_fields
230 FROM ff_flow_core
231 WHERE partition_key = ?1 AND flow_id = ?2
232"#;
233
234/// Flip `ff_flow_core` to cancelled + merge `cancellation_policy` +
235/// `cancel_reason` into `raw_fields`. SQLite has no `jsonb ||` merge;
236/// we nest two `json_set` calls. Binds: ?1 part, ?2 flow_id, ?3 now,
237/// ?4 cancellation_policy, ?5 reason.
238pub(crate) const UPDATE_FLOW_CORE_CANCEL_WITH_REASON_SQL: &str = r#"
239 UPDATE ff_flow_core
240 SET public_flow_state = 'cancelled',
241 terminal_at_ms = COALESCE(terminal_at_ms, ?3),
242 raw_fields = json_set(
243 json_set(raw_fields,
244 '$.cancellation_policy', ?4),
245 '$.cancel_reason', ?5)
246 WHERE partition_key = ?1 AND flow_id = ?2
247"#;
248
249/// Idempotent insert of the backlog header. Binds: ?1 part, ?2
250/// flow_id, ?3 requested_at_ms, ?4 reason, ?5 cancellation_policy.
251pub(crate) const INSERT_CANCEL_BACKLOG_SQL: &str = r#"
252 INSERT INTO ff_cancel_backlog
253 (partition_key, flow_id, requested_at_ms, requester, reason,
254 cancellation_policy, status)
255 VALUES (?1, ?2, ?3, '', ?4, ?5, 'pending')
256 ON CONFLICT (partition_key, flow_id) DO NOTHING
257"#;
258
259/// Enumerate in-flight member executions for a flow. Binds: ?1 part,
260/// ?2 flow_id BLOB.
261pub(crate) const SELECT_FLOW_INFLIGHT_MEMBERS_SQL: &str = r#"
262 SELECT execution_id
263 FROM ff_exec_core
264 WHERE partition_key = ?1 AND flow_id = ?2
265 AND lifecycle_phase NOT IN ('terminal','cancelled')
266"#;
267
268/// Enumerate all members (used for idempotent-replay when a backlog
269/// row doesn't exist yet). Binds: ?1 part, ?2 flow_id.
270pub(crate) const SELECT_FLOW_ALL_MEMBERS_SQL: &str = r#"
271 SELECT execution_id
272 FROM ff_exec_core
273 WHERE partition_key = ?1 AND flow_id = ?2
274"#;
275
276/// Enumerate already-enumerated backlog members (idempotent-replay
277/// path). Binds: ?1 part, ?2 flow_id.
278pub(crate) const SELECT_CANCEL_BACKLOG_MEMBERS_SQL: &str = r#"
279 SELECT execution_id
280 FROM ff_cancel_backlog_member
281 WHERE partition_key = ?1 AND flow_id = ?2
282"#;
283
284/// Insert one backlog member row. Binds: ?1 part, ?2 flow_id, ?3
285/// execution_id wire-string. SQLite has no bulk UNNEST — the caller
286/// loops; under single-writer with BEGIN IMMEDIATE each statement is
287/// cheap and the membership cardinality is bounded.
288pub(crate) const INSERT_CANCEL_BACKLOG_MEMBER_SQL: &str = r#"
289 INSERT INTO ff_cancel_backlog_member
290 (partition_key, flow_id, execution_id)
291 VALUES (?1, ?2, ?3)
292 ON CONFLICT (partition_key, flow_id, execution_id) DO NOTHING
293"#;
294
295/// Flip one member exec_core row to cancelled for the `cancel_flow_header`
296/// fan-out. Binds: ?1 part, ?2 execution_id BLOB, ?3 now, ?4 reason.
297///
298/// Also clears `ownership_state` + `attempt_state` to their terminal
299/// forms so the `StateVector` assembled by `read_execution_info` is
300/// internally consistent (cursor bugbot: a live-running member would
301/// otherwise retain `ownership_state='leased'` + `attempt_state=
302/// 'running_attempt'` alongside `lifecycle_phase='cancelled'`).
303pub(crate) const UPDATE_EXEC_CORE_CANCEL_FROM_HEADER_SQL: &str = r#"
304 UPDATE ff_exec_core
305 SET lifecycle_phase = 'cancelled',
306 ownership_state = 'unowned',
307 eligibility_state = 'cancelled',
308 public_state = 'cancelled',
309 attempt_state = 'cancelled',
310 terminal_at_ms = COALESCE(terminal_at_ms, ?3),
311 cancellation_reason = COALESCE(cancellation_reason, ?4),
312 cancelled_by = COALESCE(cancelled_by, 'cancel_flow_header')
313 WHERE partition_key = ?1 AND execution_id = ?2
314"#;
315
316// ── ack_cancel_member (§4.2.3, Phase 3.3) ──────────────────────────
317
318/// Delete one backlog member row. Binds: ?1 part, ?2 flow_id, ?3
319/// execution_id wire-string.
320pub(crate) const DELETE_CANCEL_BACKLOG_MEMBER_SQL: &str = r#"
321 DELETE FROM ff_cancel_backlog_member
322 WHERE partition_key = ?1
323 AND flow_id = ?2
324 AND execution_id = ?3
325"#;
326
327/// Delete backlog header IFF no members remain. Binds: ?1 part, ?2
328/// flow_id. Note: the NOT EXISTS subquery re-checks member_map
329/// post-member-delete in the same statement window, so a last-member
330/// ack drops both in a single logical step. Concurrent acks race at
331/// commit time and the loser retries through `retry_serializable`.
332pub(crate) const DELETE_CANCEL_BACKLOG_IF_EMPTY_SQL: &str = r#"
333 DELETE FROM ff_cancel_backlog
334 WHERE partition_key = ?1
335 AND flow_id = ?2
336 AND NOT EXISTS (
337 SELECT 1 FROM ff_cancel_backlog_member
338 WHERE partition_key = ?1 AND flow_id = ?2
339 )
340"#;
341
342// ── operator_event outbox (co-transactional) ───────────────────────
343
344/// Insert one operator-event outbox row, back-filling `namespace` +
345/// `instance_tag` from the co-transactional `ff_exec_core.raw_fields`
346/// row (Phase 3.2 fix — mirrors the lease_event / completion_event
347/// back-fill). Binds:
348///
349/// 1. execution_id TEXT — emitted on the outbox row.
350/// 2. event_type TEXT.
351/// 3. details TEXT (nullable JSON).
352/// 4. occurred_at_ms (i64).
353/// 5. partition_key (i64) — used on both the outbox row and the
354/// co-transactional exec_core lookup.
355/// 6. execution_id BLOB — `ff_exec_core.execution_id` is BLOB.
356pub(crate) const INSERT_OPERATOR_EVENT_SQL: &str = r#"
357 INSERT INTO ff_operator_event
358 (execution_id, event_type, details, occurred_at_ms, partition_key,
359 namespace, instance_tag)
360 SELECT ?1, ?2, ?3, ?4, ?5,
361 json_extract(raw_fields, '$.namespace'),
362 json_extract(raw_fields, '$.tags."cairn.instance_id"')
363 FROM ff_exec_core
364 WHERE partition_key = ?5 AND execution_id = ?6
365 UNION ALL
366 SELECT ?1, ?2, ?3, ?4, ?5, NULL, NULL
367 WHERE NOT EXISTS (
368 SELECT 1 FROM ff_exec_core
369 WHERE partition_key = ?5 AND execution_id = ?6
370 )
371"#;
372
373/// Insert one operator-event outbox row, back-filling `namespace`
374/// from the co-transactional `ff_flow_core.raw_fields` row. Used by
375/// `flow_cancel_requested` where the `execution_id` column on the
376/// outbox carries the FLOW id (Phase 3.3 parity with PG reference
377/// `ff-backend-postgres/src/operator.rs:1118-1132`). `instance_tag`
378/// is left NULL — flows don't carry `cairn.instance_id` tags today.
379///
380/// Binds:
381/// 1. flow_id TEXT (stringified UUID) — written on outbox row.
382/// 2. event_type TEXT ('flow_cancel_requested').
383/// 3. details TEXT (nullable JSON).
384/// 4. occurred_at_ms (i64).
385/// 5. partition_key (i64).
386/// 6. flow_id BLOB — for the co-transactional flow_core lookup.
387pub(crate) const INSERT_OPERATOR_EVENT_FLOW_SQL: &str = r#"
388 INSERT INTO ff_operator_event
389 (execution_id, event_type, details, occurred_at_ms, partition_key,
390 namespace, instance_tag)
391 SELECT ?1, ?2, ?3, ?4, ?5,
392 json_extract(raw_fields, '$.namespace'),
393 NULL
394 FROM ff_flow_core
395 WHERE partition_key = ?5 AND flow_id = ?6
396 UNION ALL
397 SELECT ?1, ?2, ?3, ?4, ?5, NULL, NULL
398 WHERE NOT EXISTS (
399 SELECT 1 FROM ff_flow_core
400 WHERE partition_key = ?5 AND flow_id = ?6
401 )
402"#;