ff_backend_sqlite/queries/stream.rs
1//! SQLite dialect-forked queries for the RFC-015 stream surface.
2//!
3//! Populated in Phase 2a.3 per RFC-023 §4.1 (write path) + Phase
4//! 2b.2.2 (read path). Mirrors `ff-backend-postgres/src/stream.rs`
5//! statement-for-statement: `append_frame`, `read_stream`,
6//! `tail_stream` (same SELECT shape; in-Rust broadcast replaces PG's
7//! `LISTEN/NOTIFY` at the backend layer — see
8//! `crate::pubsub::PubSub::stream_frame`), and `read_summary`.
9//!
10//! # Dialect notes
11//!
12//! * `ff_stream_summary.document_json` is TEXT (not `jsonb`) in the
13//! SQLite port; JSON Merge Patch is applied in Rust via
14//! `crate::backend::apply_json_merge_patch` and written back whole.
15//! Same observable behaviour as the PG `jsonb` path.
16//! * BestEffortLive trim uses a subquery-IN delete with the same
17//! shape as PG — SQLite supports correlated subqueries in `DELETE`.
18//! * `pg_advisory_xact_lock` is replaced by the enclosing
19//! `BEGIN IMMEDIATE` lock (§4.1 A3 single-writer).
20
21/// Read `MAX(seq)` for the current `(pkey, eid, aidx, ts_ms)` tuple so
22/// the caller can mint the next sequence under the txn lock. Mirror of
23/// PG at `ff-backend-postgres/src/stream.rs:163-176`.
24pub(crate) const SELECT_MAX_SEQ_SQL: &str = r#"
25 SELECT MAX(seq) AS s FROM ff_stream_frame
26 WHERE partition_key = ?1 AND execution_id = ?2
27 AND attempt_index = ?3 AND ts_ms = ?4
28"#;
29
30/// Insert one frame row into `ff_stream_frame`. Mirror of PG at
31/// `ff-backend-postgres/src/stream.rs:178-193`.
32pub(crate) const INSERT_STREAM_FRAME_SQL: &str = r#"
33 INSERT INTO ff_stream_frame (
34 partition_key, execution_id, attempt_index,
35 ts_ms, seq, fields, mode, created_at_ms
36 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
37"#;
38
39/// Fetch the current `ff_stream_summary` document + version for a
40/// given `(pkey, eid, aidx)`. Caller merges the patch in Rust.
41/// Mirror of PG's `FOR UPDATE` read at
42/// `ff-backend-postgres/src/stream.rs:210-220` — `BEGIN IMMEDIATE`
43/// already serializes writers on SQLite.
44pub(crate) const SELECT_STREAM_SUMMARY_SQL: &str = r#"
45 SELECT document_json, version
46 FROM ff_stream_summary
47 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
48"#;
49
50/// Insert a fresh `ff_stream_summary` row (first-time document). Mirror
51/// of PG at `ff-backend-postgres/src/stream.rs:234-251`.
52pub(crate) const INSERT_STREAM_SUMMARY_SQL: &str = r#"
53 INSERT INTO ff_stream_summary (
54 partition_key, execution_id, attempt_index,
55 document_json, version, patch_kind,
56 last_updated_ms, first_applied_ms
57 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
58"#;
59
60/// Update an existing `ff_stream_summary` row with the merged document
61/// + bumped version. Mirror of PG at
62/// `ff-backend-postgres/src/stream.rs:253-267`.
63pub(crate) const UPDATE_STREAM_SUMMARY_SQL: &str = r#"
64 UPDATE ff_stream_summary
65 SET document_json = ?4,
66 version = ?5,
67 patch_kind = ?6,
68 last_updated_ms = ?7
69 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
70"#;
71
72/// Fetch the BestEffort EMA state (`ema_rate_hz`, `last_append_ts_ms`).
73/// Mirror of PG at `ff-backend-postgres/src/stream.rs:274-284`.
74pub(crate) const SELECT_STREAM_META_SQL: &str = r#"
75 SELECT ema_rate_hz, last_append_ts_ms
76 FROM ff_stream_meta
77 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
78"#;
79
80/// UPSERT the BestEffort EMA + last-append meta. Mirror of PG at
81/// `ff-backend-postgres/src/stream.rs:299-318`.
82pub(crate) const UPSERT_STREAM_META_SQL: &str = r#"
83 INSERT INTO ff_stream_meta (
84 partition_key, execution_id, attempt_index,
85 ema_rate_hz, last_append_ts_ms, maxlen_applied_last
86 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
87 ON CONFLICT (partition_key, execution_id, attempt_index) DO UPDATE SET
88 ema_rate_hz = excluded.ema_rate_hz,
89 last_append_ts_ms = excluded.last_append_ts_ms,
90 maxlen_applied_last = excluded.maxlen_applied_last
91"#;
92
93/// Trim `ff_stream_frame` to the most-recent `?4` rows for the tuple.
94/// Mirror of PG at `ff-backend-postgres/src/stream.rs:322-331`.
95pub(crate) const TRIM_STREAM_FRAMES_SQL: &str = r#"
96 DELETE FROM ff_stream_frame
97 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
98 AND (ts_ms, seq) NOT IN (
99 SELECT ts_ms, seq FROM ff_stream_frame
100 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
101 ORDER BY ts_ms DESC, seq DESC
102 LIMIT ?4
103 )
104"#;
105
106/// Count frames for a tuple post-append — returned via
107/// [`ff_core::backend::AppendFrameOutcome`].
108pub(crate) const COUNT_STREAM_FRAMES_SQL: &str = r#"
109 SELECT COUNT(*) AS c FROM ff_stream_frame
110 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
111"#;
112
113// ── Phase 2b.2.2 read surface ─────────────────────────────────────────
114
115/// `read_stream` — XRANGE-equivalent over the `(ts_ms, seq)` tuple
116/// ordering. Binds: `?1=partition_key`, `?2=execution_id` (BLOB),
117/// `?3=attempt_index`, `?4=from_ts`, `?5=from_seq`, `?6=to_ts`,
118/// `?7=to_seq`, `?8=limit`. Mirror of PG at
119/// `ff-backend-postgres/src/stream.rs:414-418` — SQLite supports the
120/// same row-value comparison shape `(a, b) >= (x, y)` so the SQL
121/// ports verbatim.
122pub(crate) const READ_STREAM_RANGE_SQL: &str = r#"
123 SELECT ts_ms, seq, fields FROM ff_stream_frame
124 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
125 AND (ts_ms, seq) >= (?4, ?5) AND (ts_ms, seq) <= (?6, ?7)
126 ORDER BY ts_ms, seq
127 LIMIT ?8
128"#;
129
130/// `tail_stream` — rows strictly after `(after_ts, after_seq)`. Binds:
131/// `?1=partition_key`, `?2=execution_id` (BLOB), `?3=attempt_index`,
132/// `?4=after_ts`, `?5=after_seq`, `?6=limit`. Mirror of PG at
133/// `ff-backend-postgres/src/stream.rs:500-504`.
134pub(crate) const TAIL_STREAM_AFTER_SQL: &str = r#"
135 SELECT ts_ms, seq, fields FROM ff_stream_frame
136 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
137 AND (ts_ms, seq) > (?4, ?5)
138 ORDER BY ts_ms, seq
139 LIMIT ?6
140"#;
141
142/// `tail_stream` with `TailVisibility::ExcludeBestEffort` — additive
143/// `mode <> 'best_effort'` filter. Binds identical to
144/// [`TAIL_STREAM_AFTER_SQL`].
145pub(crate) const TAIL_STREAM_AFTER_EXCLUDE_BE_SQL: &str = r#"
146 SELECT ts_ms, seq, fields FROM ff_stream_frame
147 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
148 AND (ts_ms, seq) > (?4, ?5)
149 AND mode <> 'best_effort'
150 ORDER BY ts_ms, seq
151 LIMIT ?6
152"#;
153
154/// `read_summary` — fetch the full summary row for caller consumption.
155/// Binds: `?1=partition_key`, `?2=execution_id`, `?3=attempt_index`.
156/// Mirror of PG at `ff-backend-postgres/src/stream.rs:576-580`.
157pub(crate) const READ_SUMMARY_FULL_SQL: &str = r#"
158 SELECT document_json, version, patch_kind, last_updated_ms, first_applied_ms
159 FROM ff_stream_summary
160 WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
161"#;
162
163// ── Phase 2b.2.2 outbox-cursor-reader SQL ─────────────────────────────
164
165/// Cursor-resume tail of `ff_stream_frame` for the OutboxCursorReader
166/// primitive. Unlike the row-value `(ts_ms, seq)` cursor used by
167/// `tail_stream`, the outbox-cursor reader uses the table's ROWID as
168/// the monotonic event id — matches the `last_insert_rowid()` shape
169/// the producer emits via the `stream_frame` broadcast channel.
170///
171/// Binds: `?1=partition_key`, `?2=cursor_rowid`, `?3=batch_size`.
172///
173/// Only referenced from `outbox_cursor::tests` today — Phase 3's
174/// `subscribe_stream_frame` trait impl will be the first non-test
175/// consumer.
176#[cfg(test)]
177pub(crate) const OUTBOX_TAIL_STREAM_FRAME_SQL: &str = r#"
178 SELECT _rowid_ AS event_id,
179 partition_key, execution_id, attempt_index, ts_ms, seq, fields, mode
180 FROM ff_stream_frame
181 WHERE partition_key = ?1 AND _rowid_ > ?2
182 ORDER BY _rowid_ ASC
183 LIMIT ?3
184"#;