1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//! SQLite dialect-forked queries for flow-staging ingress
//! (`add_execution_to_flow`, `stage_dependency_edge`,
//! `apply_dependency_to_child`).
//!
//! Populated in Phase 2b.1 per RFC-023 §4.1. Mirrors the PG reference
//! at `ff-backend-postgres/src/flow_staging.rs` statement-by-statement.
// ── add_execution_to_flow ───────────────────────────────────────────────
/// Read + lock the flow_core row. SQLite's `BEGIN IMMEDIATE` serializes
/// writers so the PG `FOR UPDATE` is unnecessary; the query shape is a
/// plain SELECT.
///
/// Returns: (public_flow_state, raw_fields).
pub const SELECT_FLOW_CORE_FOR_STAGE_SQL: &str = r#"
SELECT public_flow_state, raw_fields
FROM ff_flow_core
WHERE partition_key = ?1 AND flow_id = ?2
"#;
/// Read the exec_core row's flow back-pointer. Returns `flow_id` as a
/// nullable `BLOB`.
pub const SELECT_EXEC_FLOW_ID_SQL: &str = r#"
SELECT flow_id
FROM ff_exec_core
WHERE partition_key = ?1 AND execution_id = ?2
"#;
/// Stamp `exec_core.flow_id` on a successful `add_execution_to_flow`.
pub const UPDATE_EXEC_SET_FLOW_ID_SQL: &str = r#"
UPDATE ff_exec_core
SET flow_id = ?3
WHERE partition_key = ?1 AND execution_id = ?2
"#;
/// Bump `graph_revision` + `raw_fields.node_count` on
/// `add_execution_to_flow`. Mirror of PG's `jsonb_build_object` path
/// at `ff-backend-postgres/src/flow_staging.rs:239-258`; JSON1's
/// `json_set` is the equivalent in-place mutation shape.
///
/// Binds:
/// 1. partition_key (i64)
/// 2. flow_id (Uuid)
/// 3. now_ms (i64)
pub const BUMP_FLOW_NODE_COUNT_SQL: &str = r#"
UPDATE ff_flow_core
SET graph_revision = graph_revision + 1,
raw_fields = json_set(
json_set(
raw_fields,
'$.node_count',
COALESCE(json_extract(raw_fields, '$.node_count'), 0) + 1
),
'$.last_mutation_at_ms',
?3
)
WHERE partition_key = ?1 AND flow_id = ?2
"#;
/// Read the post-bump `node_count` so the caller can return it in
/// `AddExecutionToFlowResult::Added`.
pub const SELECT_FLOW_NODE_COUNT_SQL: &str = r#"
SELECT COALESCE(json_extract(raw_fields, '$.node_count'), 0) AS node_count
FROM ff_flow_core
WHERE partition_key = ?1 AND flow_id = ?2
"#;
// ── stage_dependency_edge ──────────────────────────────────────────────
/// CAS on `graph_revision` — succeed only if (state, rev) match and the
/// flow is open. Bump `graph_revision` + `edge_count` in the same
/// UPDATE. Returns the new revision via a follow-up SELECT (SQLite
/// `RETURNING` is available ≥3.35 but kept as a second query for
/// symmetry with the rest of this module).
///
/// Binds:
/// 1. partition_key (i64)
/// 2. flow_id (Uuid)
/// 3. expected_graph_revision (i64)
/// 4. now_ms (i64)
pub const CAS_BUMP_FLOW_REV_SQL: &str = r#"
UPDATE ff_flow_core
SET graph_revision = graph_revision + 1,
raw_fields = json_set(
json_set(
raw_fields,
'$.edge_count',
COALESCE(json_extract(raw_fields, '$.edge_count'), 0) + 1
),
'$.last_mutation_at_ms',
?4
)
WHERE partition_key = ?1
AND flow_id = ?2
AND graph_revision = ?3
AND public_flow_state = 'open'
"#;
pub const SELECT_FLOW_REV_AND_STATE_SQL: &str = r#"
SELECT graph_revision, public_flow_state
FROM ff_flow_core
WHERE partition_key = ?1 AND flow_id = ?2
"#;
/// Check that BOTH upstream + downstream execs belong to the given
/// flow. Returns one row per member that matches. `?3` and `?4` are
/// the two exec UUIDs.
pub const SELECT_FLOW_MEMBERSHIP_PAIR_SQL: &str = r#"
SELECT execution_id
FROM ff_exec_core
WHERE partition_key = ?1
AND flow_id = ?2
AND (execution_id = ?3 OR execution_id = ?4)
"#;
/// Insert a staged edge row. `policy` JSON carries the immutable edge
/// record (dependency_kind / satisfaction_condition / data_passing_ref
/// / edge_state / created_at_ms / created_by / staged_at_ms /
/// applied_at_ms).
///
/// Binds:
/// 1. partition_key (i64)
/// 2. flow_id (Uuid)
/// 3. edge_id (Uuid)
/// 4. upstream_eid (Uuid)
/// 5. downstream_eid (Uuid)
/// 6. policy (TEXT JSON)
pub const INSERT_EDGE_SQL: &str = r#"
INSERT INTO ff_edge
(partition_key, flow_id, edge_id, upstream_eid, downstream_eid,
policy, policy_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0)
ON CONFLICT (partition_key, flow_id, edge_id) DO NOTHING
"#;
// ── apply_dependency_to_child ──────────────────────────────────────────
/// Load the edge's policy JSON (all mutable + immutable edge state
/// rides in this column per §4.1 on-disk layout decision).
pub const SELECT_EDGE_POLICY_SQL: &str = r#"
SELECT policy
FROM ff_edge
WHERE partition_key = ?1 AND flow_id = ?2 AND edge_id = ?3
"#;
/// Write back the mutated policy JSON (applied_at_ms + edge_state set
/// by the caller in Rust).
pub const UPDATE_EDGE_POLICY_SQL: &str = r#"
UPDATE ff_edge
SET policy = ?4
WHERE partition_key = ?1 AND flow_id = ?2 AND edge_id = ?3
"#;
/// Upsert the edge_group aggregate — create with default AllOf when
/// missing, bump `running_count` when present. Mirrors PG at
/// `ff-backend-postgres/src/flow_staging.rs:530-546`.
///
/// Binds:
/// 1. partition_key (i64)
/// 2. flow_id (Uuid)
/// 3. downstream_eid (Uuid)
/// 4. policy_json (TEXT JSON — default '{"kind":"all_of"}')
pub const UPSERT_EDGE_GROUP_APPLY_SQL: &str = r#"
INSERT INTO ff_edge_group
(partition_key, flow_id, downstream_eid, policy, running_count)
VALUES (?1, ?2, ?3, ?4, 1)
ON CONFLICT (partition_key, flow_id, downstream_eid) DO UPDATE
SET running_count = ff_edge_group.running_count + 1
"#;
/// Read the post-upsert `running_count` for the caller's result payload.
pub const SELECT_EDGE_GROUP_RUNNING_COUNT_SQL: &str = r#"
SELECT running_count
FROM ff_edge_group
WHERE partition_key = ?1 AND flow_id = ?2 AND downstream_eid = ?3
"#;