forge_runtime/pg/change_log.rs
1//! Recovery helpers for `forge_change_log`.
2//!
3//! Doctrine: every consumer that needs at-least-once semantics over the
4//! framework's table-change feed reads the durable change log through this
5//! module. The table itself, the trigger that writes to it, and the trim
6//! function all live in system migration `__forge_v001`.
7//!
8//! # Recovery model
9//!
10//! The change log is a monotonic, partitioned-by-time tail of every row
11//! INSERT/UPDATE/DELETE in user tables that have been wired up to
12//! `forge_notify_change()`. Each row gets a `BIGSERIAL seq` and a server-side
13//! `created_at`. Consumers track the last `seq` they processed; on reconnect
14//! after a NOTIFY drop, they call [`drain_change_log`] with that high-water
15//! mark to replay anything they missed. If [`min_seq`] reports a value greater
16//! than the consumer's high-water mark, retention has trimmed past it and the
17//! consumer must fall back to a full resync of its derived state.
18//!
19//! Default retention is 1 hour with a minimum floor of 1 000 rows — trimming
20//! is skipped entirely when the log is smaller than that floor, even if
21//! entries are older than the window. Run [`trim_change_log`] periodically
22//! from a background task; the helpers themselves never trim implicitly.
23
24use chrono::{DateTime, Utc};
25use futures_util::stream::{Stream, StreamExt};
26use sqlx::PgPool;
27
28use forge_core::cluster::LeaderRole;
29use forge_core::error::{ForgeError, Result};
30
31/// One row of `forge_change_log`.
32///
33/// Mirrors the columns produced by the system migration's
34/// `forge_notify_change()` trigger. Kept deliberately close to the wire shape
35/// so callers can decide how to map `row_id` (TEXT, may not be a UUID for
36/// every table) and the comma-separated `changed_cols` string.
37#[derive(Debug, Clone, sqlx::FromRow)]
38pub struct ChangeRow {
39 /// Monotonic sequence number; the consumer's high-water mark.
40 pub seq: i64,
41 /// Table that produced the change.
42 pub table_name: String,
43 /// SQL operation: `INSERT`, `UPDATE`, or `DELETE`.
44 pub op: String,
45 /// Text-encoded primary key. `None` only when the trigger could not
46 /// resolve `OLD.id`/`NEW.id` (e.g. a table with a non-`id` PK).
47 pub row_id: Option<String>,
48 /// For `UPDATE`, comma-separated column names whose values changed. `None`
49 /// for `INSERT`/`DELETE` and for `UPDATE`s where no columns differ.
50 pub changed_cols: Option<String>,
51 /// Server-side timestamp at which the trigger inserted the row.
52 pub created_at: DateTime<Utc>,
53}
54
55/// Stream every change with `seq > since`, in ascending `seq` order.
56///
57/// Use to recover from a `LISTEN/NOTIFY` gap: track `last_seq`, and call this
58/// on reconnect with that value to replay anything missed. The result is
59/// already ordered so the consumer can advance its cursor as it processes
60/// rows.
61///
62/// Errors from PG flow through the stream as `Err(ForgeError::Database)`. The
63/// caller decides whether a transient error ends recovery or triggers a
64/// retry.
65pub fn drain_change_log(pool: &PgPool, since: i64) -> impl Stream<Item = Result<ChangeRow>> + '_ {
66 sqlx::query_as!(
67 ChangeRow,
68 r#"SELECT seq, table_name, op, row_id, changed_cols, created_at
69 FROM forge_change_log WHERE seq > $1 ORDER BY seq"#,
70 since,
71 )
72 .fetch(pool)
73 .map(|r| r.map_err(ForgeError::Database))
74}
75
76/// Minimum number of rows that must exist in `forge_change_log` before any
77/// time-based trimming occurs. On quiet systems this prevents the entire log
78/// from being wiped even though the rows are technically past the retention
79/// window; consumers rely on the log for gap-recovery and benefit from having
80/// at least this many recent entries available.
81pub const CHANGE_LOG_MIN_ROWS: i64 = 1_000;
82
83/// Delete every change-log row with `created_at < before`. Returns the number
84/// of rows deleted.
85///
86/// Run periodically from a background task; the reactor's gap-recovery only
87/// needs the recent tail. Default retention in the framework is 1 hour, set
88/// by passing `Utc::now() - chrono::Duration::hours(1)`.
89///
90/// Trimming is skipped when the total row count is at or below
91/// [`CHANGE_LOG_MIN_ROWS`] (default 1 000). This prevents over-aggressive
92/// cleanup on quiet systems where entries age past the window but the log
93/// itself is small.
94///
95/// Uses `pg_try_advisory_xact_lock` with the [`LeaderRole::LogCompactor`] lock
96/// ID so that only one node in the cluster runs the DELETE at a time. If
97/// another node already holds the lock the function returns `Ok(0)` immediately
98/// rather than blocking or racing.
99#[allow(clippy::disallowed_methods)]
100pub async fn trim_change_log(pool: &PgPool, before: DateTime<Utc>) -> Result<u64> {
101 let lock_id = LeaderRole::LogCompactor.lock_id();
102
103 let mut tx = pool.begin().await.map_err(ForgeError::Database)?;
104
105 // pg_try_advisory_xact_lock is a built-in function with no user-table
106 // schema, so it has no cached .sqlx metadata and must use the runtime API.
107 let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_xact_lock($1)")
108 .bind(lock_id)
109 .fetch_one(&mut *tx)
110 .await
111 .map_err(ForgeError::Database)?;
112
113 if !acquired {
114 // Another node is already trimming; skip without blocking.
115 return Ok(0);
116 }
117
118 let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM forge_change_log")
119 .fetch_one(&mut *tx)
120 .await
121 .map_err(ForgeError::Database)?;
122
123 if total <= CHANGE_LOG_MIN_ROWS {
124 return Ok(0);
125 }
126
127 let result = sqlx::query!("DELETE FROM forge_change_log WHERE created_at < $1", before,)
128 .execute(&mut *tx)
129 .await
130 .map_err(ForgeError::Database)?;
131
132 tx.commit().await.map_err(ForgeError::Database)?;
133
134 Ok(result.rows_affected())
135}
136
137/// Read the smallest `seq` currently in the log, or `None` if it is empty.
138///
139/// Pair with the consumer's high-water mark to detect unrecoverable gaps:
140/// if `min_seq()` is `Some(n)` with `n > last_seq`, retention has trimmed
141/// past the consumer's position and a full resync is required.
142pub async fn min_seq(pool: &PgPool) -> Result<Option<i64>> {
143 let row = sqlx::query!(r#"SELECT MIN(seq) AS min FROM forge_change_log"#)
144 .fetch_one(pool)
145 .await
146 .map_err(ForgeError::Database)?;
147 Ok(row.min)
148}
149
150/// Read the largest `seq` currently in the log, or `None` if it is empty.
151///
152/// Used at startup to initialize the listener's high-water mark so the first
153/// reconnect does not replay the entire log.
154#[allow(clippy::disallowed_methods)]
155pub async fn max_seq(pool: &PgPool) -> Result<Option<i64>> {
156 let row: (Option<i64>,) = sqlx::query_as("SELECT MAX(seq) FROM forge_change_log")
157 .fetch_one(pool)
158 .await
159 .map_err(ForgeError::Database)?;
160 Ok(row.0)
161}
162
163#[cfg(all(test, feature = "testcontainers"))]
164#[allow(
165 clippy::unwrap_used,
166 clippy::indexing_slicing,
167 clippy::panic,
168 clippy::disallowed_methods
169)]
170mod integration_tests {
171 use super::*;
172 use forge_core::testing::{IsolatedTestDb, TestDatabase};
173 use futures_util::stream::TryStreamExt;
174
175 async fn setup_db(test_name: &str) -> IsolatedTestDb {
176 let base = TestDatabase::from_env()
177 .await
178 .expect("Failed to create test database");
179 base.isolated(test_name)
180 .await
181 .expect("Failed to create isolated db")
182 }
183
184 /// Create a tracked table that fires the change trigger on every write.
185 async fn install_tracked_table(pool: &PgPool, table: &str) {
186 sqlx::query(&format!(
187 "CREATE TABLE {table} (id UUID PRIMARY KEY, name TEXT)"
188 ))
189 .execute(pool)
190 .await
191 .unwrap();
192 sqlx::query(&format!(
193 "CREATE TRIGGER {table}_notify
194 AFTER INSERT OR UPDATE OR DELETE ON {table}
195 FOR EACH ROW EXECUTE FUNCTION forge_notify_change()"
196 ))
197 .execute(pool)
198 .await
199 .unwrap();
200 }
201
202 #[tokio::test]
203 async fn drain_returns_rows_after_high_water_mark() {
204 let db = setup_db("change_log_drain").await;
205 install_tracked_table(db.pool(), "drain_items").await;
206
207 // Seed three rows so seq advances.
208 for _ in 0..3 {
209 sqlx::query("INSERT INTO drain_items (id, name) VALUES (gen_random_uuid(), 'x')")
210 .execute(db.pool())
211 .await
212 .unwrap();
213 }
214 // Capture the seq of the first insert; we'll drain after it.
215 let first_seq: i64 = sqlx::query_scalar(
216 "SELECT MIN(seq) FROM forge_change_log WHERE table_name='drain_items'",
217 )
218 .fetch_one(db.pool())
219 .await
220 .unwrap();
221
222 let rows: Vec<ChangeRow> = drain_change_log(db.pool(), first_seq)
223 .try_collect()
224 .await
225 .unwrap();
226 assert_eq!(rows.len(), 2);
227 assert!(rows.iter().all(|r| r.seq > first_seq));
228 assert!(rows.iter().all(|r| r.table_name == "drain_items"));
229 assert!(rows.iter().all(|r| r.op == "INSERT"));
230 // seq monotonic
231 assert!(rows[0].seq < rows[1].seq);
232 }
233
234 #[tokio::test]
235 async fn trim_deletes_only_rows_older_than_cutoff() {
236 let db = setup_db("change_log_trim").await;
237 install_tracked_table(db.pool(), "trim_items").await;
238
239 sqlx::query("INSERT INTO trim_items (id, name) VALUES (gen_random_uuid(), 'old')")
240 .execute(db.pool())
241 .await
242 .unwrap();
243 // Forge a future cutoff so the row qualifies as "old".
244 let cutoff = Utc::now() + chrono::Duration::seconds(10);
245 let deleted = trim_change_log(db.pool(), cutoff).await.unwrap();
246 assert_eq!(deleted, 1);
247 let remaining: i64 = sqlx::query_scalar(
248 "SELECT COUNT(*) FROM forge_change_log WHERE table_name='trim_items'",
249 )
250 .fetch_one(db.pool())
251 .await
252 .unwrap();
253 assert_eq!(remaining, 0);
254 }
255
256 #[tokio::test]
257 async fn min_seq_is_none_on_empty_log() {
258 let db = setup_db("change_log_min_empty").await;
259 // Trim everything the migrations may have produced (none expected, but
260 // be explicit so this test is hermetic).
261 sqlx::query("TRUNCATE forge_change_log")
262 .execute(db.pool())
263 .await
264 .unwrap();
265 assert_eq!(min_seq(db.pool()).await.unwrap(), None);
266 }
267
268 #[tokio::test]
269 async fn min_seq_reports_oldest_after_writes() {
270 let db = setup_db("change_log_min_after").await;
271 install_tracked_table(db.pool(), "min_items").await;
272 sqlx::query("TRUNCATE forge_change_log")
273 .execute(db.pool())
274 .await
275 .unwrap();
276
277 sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'a')")
278 .execute(db.pool())
279 .await
280 .unwrap();
281 sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'b')")
282 .execute(db.pool())
283 .await
284 .unwrap();
285
286 let min = min_seq(db.pool()).await.unwrap();
287 let actual: i64 = sqlx::query_scalar("SELECT MIN(seq) FROM forge_change_log")
288 .fetch_one(db.pool())
289 .await
290 .unwrap();
291 assert_eq!(min, Some(actual));
292 }
293}