Skip to main content

ff_backend_sqlite/queries/
stream.rs

1//! SQLite dialect-forked queries for the RFC-015 stream surface.
2//!
3//! Populated in Phase 2a.3 per RFC-023 §4.1 (write path) + Phase
4//! 2b.2.2 (read path). Mirrors `ff-backend-postgres/src/stream.rs`
5//! statement-for-statement: `append_frame`, `read_stream`,
6//! `tail_stream` (same SELECT shape; in-Rust broadcast replaces PG's
7//! `LISTEN/NOTIFY` at the backend layer — see
8//! `crate::pubsub::PubSub::stream_frame`), and `read_summary`.
9//!
10//! # Dialect notes
11//!
12//! * `ff_stream_summary.document_json` is TEXT (not `jsonb`) in the
13//!   SQLite port; JSON Merge Patch is applied in Rust via
14//!   `crate::backend::apply_json_merge_patch` and written back whole.
15//!   Same observable behaviour as the PG `jsonb` path.
16//! * BestEffortLive trim uses a subquery-IN delete with the same
17//!   shape as PG — SQLite supports correlated subqueries in `DELETE`.
18//! * `pg_advisory_xact_lock` is replaced by the enclosing
19//!   `BEGIN IMMEDIATE` lock (§4.1 A3 single-writer).
20
21/// Read `MAX(seq)` for the current `(pkey, eid, aidx, ts_ms)` tuple so
22/// the caller can mint the next sequence under the txn lock. Mirror of
23/// PG at `ff-backend-postgres/src/stream.rs:163-176`.
24pub(crate) const SELECT_MAX_SEQ_SQL: &str = r#"
25    SELECT MAX(seq) AS s FROM ff_stream_frame
26     WHERE partition_key = ?1 AND execution_id = ?2
27       AND attempt_index = ?3 AND ts_ms = ?4
28"#;
29
30/// Insert one frame row into `ff_stream_frame`. Mirror of PG at
31/// `ff-backend-postgres/src/stream.rs:178-193`.
32pub(crate) const INSERT_STREAM_FRAME_SQL: &str = r#"
33    INSERT INTO ff_stream_frame (
34        partition_key, execution_id, attempt_index,
35        ts_ms, seq, fields, mode, created_at_ms
36    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
37"#;
38
39/// Fetch the current `ff_stream_summary` document + version for a
40/// given `(pkey, eid, aidx)`. Caller merges the patch in Rust.
41/// Mirror of PG's `FOR UPDATE` read at
42/// `ff-backend-postgres/src/stream.rs:210-220` — `BEGIN IMMEDIATE`
43/// already serializes writers on SQLite.
44pub(crate) const SELECT_STREAM_SUMMARY_SQL: &str = r#"
45    SELECT document_json, version
46      FROM ff_stream_summary
47     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
48"#;
49
50/// Insert a fresh `ff_stream_summary` row (first-time document). Mirror
51/// of PG at `ff-backend-postgres/src/stream.rs:234-251`.
52pub(crate) const INSERT_STREAM_SUMMARY_SQL: &str = r#"
53    INSERT INTO ff_stream_summary (
54        partition_key, execution_id, attempt_index,
55        document_json, version, patch_kind,
56        last_updated_ms, first_applied_ms
57    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
58"#;
59
60/// Update an existing `ff_stream_summary` row with the merged document
61/// + bumped version. Mirror of PG at
62///   `ff-backend-postgres/src/stream.rs:253-267`.
63pub(crate) const UPDATE_STREAM_SUMMARY_SQL: &str = r#"
64    UPDATE ff_stream_summary
65       SET document_json = ?4,
66           version = ?5,
67           patch_kind = ?6,
68           last_updated_ms = ?7
69     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
70"#;
71
72/// Fetch the BestEffort EMA state (`ema_rate_hz`, `last_append_ts_ms`).
73/// Mirror of PG at `ff-backend-postgres/src/stream.rs:274-284`.
74pub(crate) const SELECT_STREAM_META_SQL: &str = r#"
75    SELECT ema_rate_hz, last_append_ts_ms
76      FROM ff_stream_meta
77     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
78"#;
79
80/// UPSERT the BestEffort EMA + last-append meta. Mirror of PG at
81/// `ff-backend-postgres/src/stream.rs:299-318`.
82pub(crate) const UPSERT_STREAM_META_SQL: &str = r#"
83    INSERT INTO ff_stream_meta (
84        partition_key, execution_id, attempt_index,
85        ema_rate_hz, last_append_ts_ms, maxlen_applied_last
86    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
87    ON CONFLICT (partition_key, execution_id, attempt_index) DO UPDATE SET
88        ema_rate_hz = excluded.ema_rate_hz,
89        last_append_ts_ms = excluded.last_append_ts_ms,
90        maxlen_applied_last = excluded.maxlen_applied_last
91"#;
92
93/// Trim `ff_stream_frame` to the most-recent `?4` rows for the tuple.
94/// Mirror of PG at `ff-backend-postgres/src/stream.rs:322-331`.
95pub(crate) const TRIM_STREAM_FRAMES_SQL: &str = r#"
96    DELETE FROM ff_stream_frame
97     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
98       AND (ts_ms, seq) NOT IN (
99           SELECT ts_ms, seq FROM ff_stream_frame
100            WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
101            ORDER BY ts_ms DESC, seq DESC
102            LIMIT ?4
103       )
104"#;
105
106/// Count frames for a tuple post-append — returned via
107/// [`ff_core::backend::AppendFrameOutcome`].
108pub(crate) const COUNT_STREAM_FRAMES_SQL: &str = r#"
109    SELECT COUNT(*) AS c FROM ff_stream_frame
110     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
111"#;
112
113// ── Phase 2b.2.2 read surface ─────────────────────────────────────────
114
115/// `read_stream` — XRANGE-equivalent over the `(ts_ms, seq)` tuple
116/// ordering. Binds: `?1=partition_key`, `?2=execution_id` (BLOB),
117/// `?3=attempt_index`, `?4=from_ts`, `?5=from_seq`, `?6=to_ts`,
118/// `?7=to_seq`, `?8=limit`. Mirror of PG at
119/// `ff-backend-postgres/src/stream.rs:414-418` — SQLite supports the
120/// same row-value comparison shape `(a, b) >= (x, y)` so the SQL
121/// ports verbatim.
122pub(crate) const READ_STREAM_RANGE_SQL: &str = r#"
123    SELECT ts_ms, seq, fields FROM ff_stream_frame
124     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
125       AND (ts_ms, seq) >= (?4, ?5) AND (ts_ms, seq) <= (?6, ?7)
126     ORDER BY ts_ms, seq
127     LIMIT ?8
128"#;
129
130/// `tail_stream` — rows strictly after `(after_ts, after_seq)`. Binds:
131/// `?1=partition_key`, `?2=execution_id` (BLOB), `?3=attempt_index`,
132/// `?4=after_ts`, `?5=after_seq`, `?6=limit`. Mirror of PG at
133/// `ff-backend-postgres/src/stream.rs:500-504`.
134pub(crate) const TAIL_STREAM_AFTER_SQL: &str = r#"
135    SELECT ts_ms, seq, fields FROM ff_stream_frame
136     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
137       AND (ts_ms, seq) > (?4, ?5)
138     ORDER BY ts_ms, seq
139     LIMIT ?6
140"#;
141
142/// `tail_stream` with `TailVisibility::ExcludeBestEffort` — additive
143/// `mode <> 'best_effort'` filter. Binds identical to
144/// [`TAIL_STREAM_AFTER_SQL`].
145pub(crate) const TAIL_STREAM_AFTER_EXCLUDE_BE_SQL: &str = r#"
146    SELECT ts_ms, seq, fields FROM ff_stream_frame
147     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
148       AND (ts_ms, seq) > (?4, ?5)
149       AND mode <> 'best_effort'
150     ORDER BY ts_ms, seq
151     LIMIT ?6
152"#;
153
154/// `read_summary` — fetch the full summary row for caller consumption.
155/// Binds: `?1=partition_key`, `?2=execution_id`, `?3=attempt_index`.
156/// Mirror of PG at `ff-backend-postgres/src/stream.rs:576-580`.
157pub(crate) const READ_SUMMARY_FULL_SQL: &str = r#"
158    SELECT document_json, version, patch_kind, last_updated_ms, first_applied_ms
159      FROM ff_stream_summary
160     WHERE partition_key = ?1 AND execution_id = ?2 AND attempt_index = ?3
161"#;
162
163// ── Phase 2b.2.2 outbox-cursor-reader SQL ─────────────────────────────
164
165/// Cursor-resume tail of `ff_stream_frame` for the OutboxCursorReader
166/// primitive. Unlike the row-value `(ts_ms, seq)` cursor used by
167/// `tail_stream`, the outbox-cursor reader uses the table's ROWID as
168/// the monotonic event id — matches the `last_insert_rowid()` shape
169/// the producer emits via the `stream_frame` broadcast channel.
170///
171/// Binds: `?1=partition_key`, `?2=cursor_rowid`, `?3=batch_size`.
172///
173/// Only referenced from `outbox_cursor::tests` today — Phase 3's
174/// `subscribe_stream_frame` trait impl will be the first non-test
175/// consumer.
176#[cfg(test)]
177pub(crate) const OUTBOX_TAIL_STREAM_FRAME_SQL: &str = r#"
178    SELECT _rowid_ AS event_id,
179           partition_key, execution_id, attempt_index, ts_ms, seq, fields, mode
180      FROM ff_stream_frame
181     WHERE partition_key = ?1 AND _rowid_ > ?2
182     ORDER BY _rowid_ ASC
183     LIMIT ?3
184"#;