1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//! SQLite dialect-forked queries for execution core (create / claim /
//! complete / fail).
//!
//! Populated in Phase 2a.2 per RFC-023 §4.1. The SQL strings are kept
//! as module-level `const`s so `backend.rs` call sites reference them
//! by name and migration-parity review can line them up against the
//! PG reference statement-by-statement.
/// Flip exec_core to `active/leased/running` on a successful claim
/// (mirror of PG's claim-path UPDATE at
/// `ff-backend-postgres/src/attempt.rs:236-250`).
///
/// `public_state = 'running'` mirrors the Postgres resume-claim write
/// at `ff-backend-postgres/src/suspend_ops.rs:958-960` and normalises
/// to [`ff_core::types::PublicState::Active`] via
/// [`crate::reads::normalise_public_state`]. Without this write the
/// column remained at its create-time `'waiting'` literal on SQLite
/// while Spine-B consumers (and any direct `SELECT public_state`)
/// expected the claimed-execution literal.
pub const UPDATE_EXEC_CORE_CLAIM_SQL: &str = r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'active',
ownership_state = 'leased',
eligibility_state = 'not_applicable',
public_state = 'running',
attempt_state = 'running_attempt',
started_at_ms = COALESCE(started_at_ms, ?3)
WHERE partition_key = ?1 AND execution_id = ?2
"#;
/// Flip exec_core to terminal success, recording the result payload
/// (mirror of `ff-backend-postgres/src/attempt.rs:554-572`).
pub const UPDATE_EXEC_CORE_COMPLETE_SQL: &str = r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
attempt_state = 'attempt_terminal',
terminal_at_ms = ?1,
result = ?2
WHERE partition_key = ?3 AND execution_id = ?4
"#;
/// Re-enqueue the execution for a retry attempt: flip lifecycle back to
/// runnable, bump attempt_index, stash the last failure message in the
/// TEXT-encoded `raw_fields` JSON document via SQLite's `json_set` (JSON1).
/// Mirrors PG's `jsonb_build_object(...)` concat at
/// `ff-backend-postgres/src/attempt.rs:647-664`.
pub const UPDATE_EXEC_CORE_FAIL_RETRY_SQL: &str = r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = 'eligible_now',
attempt_state = 'pending_retry_attempt',
attempt_index = attempt_index + 1,
raw_fields = json_set(raw_fields, '$.last_failure_message', ?1)
WHERE partition_key = ?2 AND execution_id = ?3
"#;
/// Merge `progress_pct` + `progress_message` into `ff_exec_core.raw_fields`
/// (TEXT-encoded JSON). NULL binds must leave the corresponding field
/// untouched, INCLUDING when the JSON path is currently absent — a naive
/// `json_set(x, '$.k', coalesce(NULL, json_extract(absent))) =
/// json_set(x, '$.k', NULL)` materializes an explicit JSON `null`,
/// which diverges from the PG `raw_fields ||` no-op semantics for an
/// empty patch (PR #376 Copilot review).
///
/// The `CASE WHEN ? IS NULL THEN <inner> ELSE json_set(<inner>, '$.k', ?)`
/// shape skips the `json_set` call entirely when the bind is NULL,
/// preserving absent fields AND preserving prior non-NULL values.
/// Mirror of PG at `ff-backend-postgres/src/attempt.rs:498-518`; PG
/// uses `raw_fields ||` on a jsonb object, we re-express the same
/// observable write shape via conditional `json_set` calls over a
/// TEXT document.
///
/// Binds: `?1 = pct (nullable INT)`, `?2 = message (nullable TEXT)`,
/// `?3 = partition_key`, `?4 = execution_id`.
pub const UPDATE_EXEC_CORE_PROGRESS_SQL: &str = r#"
UPDATE ff_exec_core
SET raw_fields = CASE
WHEN ?2 IS NULL THEN
CASE
WHEN ?1 IS NULL THEN raw_fields
ELSE json_set(raw_fields, '$.progress_pct', ?1)
END
ELSE json_set(
CASE
WHEN ?1 IS NULL THEN raw_fields
ELSE json_set(raw_fields, '$.progress_pct', ?1)
END,
'$.progress_message',
?2
)
END
WHERE partition_key = ?3 AND execution_id = ?4
"#;
/// Flip exec_core to terminal failed — retry budget exhausted or
/// classification was permanent. Mirror of PG at
/// `ff-backend-postgres/src/attempt.rs:700-718`.
pub const UPDATE_EXEC_CORE_FAIL_TERMINAL_SQL: &str = r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
attempt_state = 'attempt_terminal',
terminal_at_ms = ?1,
raw_fields = json_set(raw_fields, '$.last_failure_message', ?2)
WHERE partition_key = ?3 AND execution_id = ?4
"#;
// ── Phase 2b.1 producer-side: create_execution inserts ──────────────────
/// Idempotent insert of one `ff_exec_core` row. `ON CONFLICT DO NOTHING`
/// mirrors the PG path (`ff-backend-postgres/src/exec_core.rs:157-188`)
/// and the Valkey `ff_create_execution` FCALL's `Duplicate` outcome —
/// caller detects duplicate via post-insert changes() == 0.
///
/// Binds (in order):
/// 1. partition_key (i64)
/// 2. execution_id (Uuid)
/// 3. lane_id (TEXT)
/// 4. priority (i64)
/// 5. created_at_ms (i64)
/// 6. deadline_at_ms (Option<i64>)
/// 7. payload (Option<Vec<u8>>)
/// 8. policy (Option<TEXT JSON>)
/// 9. raw_fields (TEXT JSON)
pub const INSERT_EXEC_CORE_SQL: &str = r#"
INSERT INTO ff_exec_core (
partition_key, execution_id, flow_id, lane_id,
attempt_index,
lifecycle_phase, ownership_state, eligibility_state,
public_state, attempt_state,
priority, created_at_ms, deadline_at_ms,
payload, policy, raw_fields
) VALUES (
?1, ?2, NULL, ?3,
0,
'submitted', 'unowned', 'eligible_now',
'waiting', 'pending',
?4, ?5, ?6,
?7, ?8, ?9
)
ON CONFLICT (partition_key, execution_id) DO NOTHING
"#;
/// Idempotent insert into the normalized capability junction
/// (RFC-023 §4.1 A4 — replaces PG's `text[]` + GIN). One row per
/// `(execution_id, capability)` pair; junction rows persist as long
/// as their parent `ff_exec_core` row does.
pub const INSERT_EXEC_CAPABILITY_SQL: &str = r#"
INSERT INTO ff_execution_capabilities (execution_id, capability)
VALUES (?1, ?2)
ON CONFLICT (execution_id, capability) DO NOTHING
"#;
/// Idempotent seed of the lane registry (Q6 — dynamic lanes seed
/// here on first use). Mirror of PG at
/// `ff-backend-postgres/src/exec_core.rs:191-203`.
pub const INSERT_LANE_REGISTRY_SQL: &str = r#"
INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
VALUES (?1, ?2, 'create_execution')
ON CONFLICT (lane_id) DO NOTHING
"#;