Skip to main content

forge_runtime/pg/
change_log.rs

1//! Recovery helpers for `forge_change_log`.
2//!
3//! Doctrine: every consumer that needs at-least-once semantics over the
4//! framework's table-change feed reads the durable change log through this
5//! module. The table itself, the trigger that writes to it, and the trim
6//! function all live in system migration `__forge_v001`.
7//!
8//! # Recovery model
9//!
10//! The change log is a monotonic, partitioned-by-time tail of every row
11//! INSERT/UPDATE/DELETE in user tables that have been wired up to
12//! `forge_notify_change()`. Each row gets a `BIGSERIAL seq` and a server-side
13//! `created_at`. Consumers track the last `seq` they processed; on reconnect
14//! after a NOTIFY drop, they call [`drain_change_log`] with that high-water
15//! mark to replay anything they missed. If [`min_seq`] reports a value greater
16//! than the consumer's high-water mark, retention has trimmed past it and the
17//! consumer must fall back to a full resync of its derived state.
18//!
19//! Default retention is 1 hour with a minimum floor of 1 000 rows — trimming
20//! is skipped entirely when the log is smaller than that floor, even if
21//! entries are older than the window. Run [`trim_change_log`] periodically
22//! from a background task; the helpers themselves never trim implicitly.
23
24use chrono::{DateTime, Utc};
25use futures_util::stream::{Stream, StreamExt};
26use sqlx::PgPool;
27
28use forge_core::cluster::LeaderRole;
29use forge_core::error::{ForgeError, Result};
30
31/// One row of `forge_change_log`.
32///
33/// Mirrors the columns produced by the system migration's
34/// `forge_notify_change()` trigger. Kept deliberately close to the wire shape
35/// so callers can decide how to map `row_id` (TEXT, may not be a UUID for
36/// every table) and the comma-separated `changed_cols` string.
37#[derive(Debug, Clone, sqlx::FromRow)]
38pub struct ChangeRow {
39    /// Monotonic sequence number; the consumer's high-water mark.
40    pub seq: i64,
41    /// Table that produced the change.
42    pub table_name: String,
43    /// SQL operation: `INSERT`, `UPDATE`, or `DELETE`.
44    pub op: String,
45    /// Text-encoded primary key. `None` only when the trigger could not
46    /// resolve `OLD.id`/`NEW.id` (e.g. a table with a non-`id` PK).
47    pub row_id: Option<String>,
48    /// For `UPDATE`, comma-separated column names whose values changed. `None`
49    /// for `INSERT`/`DELETE` and for `UPDATE`s where no columns differ.
50    pub changed_cols: Option<String>,
51    /// Server-side timestamp at which the trigger inserted the row.
52    pub created_at: DateTime<Utc>,
53}
54
55/// Stream every change with `seq > since`, in ascending `seq` order.
56///
57/// Use to recover from a `LISTEN/NOTIFY` gap: track `last_seq`, and call this
58/// on reconnect with that value to replay anything missed. The result is
59/// already ordered so the consumer can advance its cursor as it processes
60/// rows.
61///
62/// Errors from PG flow through the stream as `Err(ForgeError::Database)`. The
63/// caller decides whether a transient error ends recovery or triggers a
64/// retry.
65pub fn drain_change_log(pool: &PgPool, since: i64) -> impl Stream<Item = Result<ChangeRow>> + '_ {
66    sqlx::query_as!(
67        ChangeRow,
68        r#"SELECT seq, table_name, op, row_id, changed_cols, created_at
69           FROM forge_change_log WHERE seq > $1 ORDER BY seq"#,
70        since,
71    )
72    .fetch(pool)
73    .map(|r| r.map_err(ForgeError::Database))
74}
75
76/// Minimum number of rows that must exist in `forge_change_log` before any
77/// time-based trimming occurs. On quiet systems this prevents the entire log
78/// from being wiped even though the rows are technically past the retention
79/// window; consumers rely on the log for gap-recovery and benefit from having
80/// at least this many recent entries available.
81pub const CHANGE_LOG_MIN_ROWS: i64 = 1_000;
82
83/// Delete every change-log row with `created_at < before`. Returns the number
84/// of rows deleted.
85///
86/// Run periodically from a background task; the reactor's gap-recovery only
87/// needs the recent tail. Default retention in the framework is 1 hour, set
88/// by passing `Utc::now() - chrono::Duration::hours(1)`.
89///
90/// Trimming is skipped when the total row count is at or below
91/// [`CHANGE_LOG_MIN_ROWS`] (default 1 000). This prevents over-aggressive
92/// cleanup on quiet systems where entries age past the window but the log
93/// itself is small.
94///
95/// Uses `pg_try_advisory_xact_lock` with the [`LeaderRole::LogCompactor`] lock
96/// ID so that only one node in the cluster runs the DELETE at a time. If
97/// another node already holds the lock the function returns `Ok(0)` immediately
98/// rather than blocking or racing.
99#[allow(clippy::disallowed_methods)]
100pub async fn trim_change_log(pool: &PgPool, before: DateTime<Utc>) -> Result<u64> {
101    let lock_id = LeaderRole::LogCompactor.lock_id();
102
103    let mut tx = pool.begin().await.map_err(ForgeError::Database)?;
104
105    // pg_try_advisory_xact_lock is a built-in function with no user-table
106    // schema, so it has no cached .sqlx metadata and must use the runtime API.
107    let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_xact_lock($1)")
108        .bind(lock_id)
109        .fetch_one(&mut *tx)
110        .await
111        .map_err(ForgeError::Database)?;
112
113    if !acquired {
114        // Another node is already trimming; skip without blocking.
115        return Ok(0);
116    }
117
118    let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM forge_change_log")
119        .fetch_one(&mut *tx)
120        .await
121        .map_err(ForgeError::Database)?;
122
123    if total <= CHANGE_LOG_MIN_ROWS {
124        return Ok(0);
125    }
126
127    let result = sqlx::query!("DELETE FROM forge_change_log WHERE created_at < $1", before,)
128        .execute(&mut *tx)
129        .await
130        .map_err(ForgeError::Database)?;
131
132    tx.commit().await.map_err(ForgeError::Database)?;
133
134    Ok(result.rows_affected())
135}
136
137/// Read the smallest `seq` currently in the log, or `None` if it is empty.
138///
139/// Pair with the consumer's high-water mark to detect unrecoverable gaps:
140/// if `min_seq()` is `Some(n)` with `n > last_seq`, retention has trimmed
141/// past the consumer's position and a full resync is required.
142pub async fn min_seq(pool: &PgPool) -> Result<Option<i64>> {
143    let row = sqlx::query!(r#"SELECT MIN(seq) AS min FROM forge_change_log"#)
144        .fetch_one(pool)
145        .await
146        .map_err(ForgeError::Database)?;
147    Ok(row.min)
148}
149
150/// Read the largest `seq` currently in the log, or `None` if it is empty.
151///
152/// Used at startup to initialize the listener's high-water mark so the first
153/// reconnect does not replay the entire log.
154#[allow(clippy::disallowed_methods)]
155pub async fn max_seq(pool: &PgPool) -> Result<Option<i64>> {
156    let row: (Option<i64>,) = sqlx::query_as("SELECT MAX(seq) FROM forge_change_log")
157        .fetch_one(pool)
158        .await
159        .map_err(ForgeError::Database)?;
160    Ok(row.0)
161}
162
163#[cfg(all(test, feature = "testcontainers"))]
164#[allow(
165    clippy::unwrap_used,
166    clippy::indexing_slicing,
167    clippy::panic,
168    clippy::disallowed_methods
169)]
170mod integration_tests {
171    use super::*;
172    use forge_core::testing::{IsolatedTestDb, TestDatabase};
173    use futures_util::stream::TryStreamExt;
174
175    async fn setup_db(test_name: &str) -> IsolatedTestDb {
176        let base = TestDatabase::from_env()
177            .await
178            .expect("Failed to create test database");
179        base.isolated(test_name)
180            .await
181            .expect("Failed to create isolated db")
182    }
183
184    /// Create a tracked table that fires the change trigger on every write.
185    async fn install_tracked_table(pool: &PgPool, table: &str) {
186        sqlx::query(&format!(
187            "CREATE TABLE {table} (id UUID PRIMARY KEY, name TEXT)"
188        ))
189        .execute(pool)
190        .await
191        .unwrap();
192        sqlx::query(&format!(
193            "CREATE TRIGGER {table}_notify
194             AFTER INSERT OR UPDATE OR DELETE ON {table}
195             FOR EACH ROW EXECUTE FUNCTION forge_notify_change()"
196        ))
197        .execute(pool)
198        .await
199        .unwrap();
200    }
201
202    #[tokio::test]
203    async fn drain_returns_rows_after_high_water_mark() {
204        let db = setup_db("change_log_drain").await;
205        install_tracked_table(db.pool(), "drain_items").await;
206
207        // Seed three rows so seq advances.
208        for _ in 0..3 {
209            sqlx::query("INSERT INTO drain_items (id, name) VALUES (gen_random_uuid(), 'x')")
210                .execute(db.pool())
211                .await
212                .unwrap();
213        }
214        // Capture the seq of the first insert; we'll drain after it.
215        let first_seq: i64 = sqlx::query_scalar(
216            "SELECT MIN(seq) FROM forge_change_log WHERE table_name='drain_items'",
217        )
218        .fetch_one(db.pool())
219        .await
220        .unwrap();
221
222        let rows: Vec<ChangeRow> = drain_change_log(db.pool(), first_seq)
223            .try_collect()
224            .await
225            .unwrap();
226        assert_eq!(rows.len(), 2);
227        assert!(rows.iter().all(|r| r.seq > first_seq));
228        assert!(rows.iter().all(|r| r.table_name == "drain_items"));
229        assert!(rows.iter().all(|r| r.op == "INSERT"));
230        // seq monotonic
231        assert!(rows[0].seq < rows[1].seq);
232    }
233
234    #[tokio::test]
235    async fn trim_deletes_only_rows_older_than_cutoff() {
236        let db = setup_db("change_log_trim").await;
237        install_tracked_table(db.pool(), "trim_items").await;
238
239        sqlx::query("INSERT INTO trim_items (id, name) VALUES (gen_random_uuid(), 'old')")
240            .execute(db.pool())
241            .await
242            .unwrap();
243        // Forge a future cutoff so the row qualifies as "old".
244        let cutoff = Utc::now() + chrono::Duration::seconds(10);
245        let deleted = trim_change_log(db.pool(), cutoff).await.unwrap();
246        assert_eq!(deleted, 1);
247        let remaining: i64 = sqlx::query_scalar(
248            "SELECT COUNT(*) FROM forge_change_log WHERE table_name='trim_items'",
249        )
250        .fetch_one(db.pool())
251        .await
252        .unwrap();
253        assert_eq!(remaining, 0);
254    }
255
256    #[tokio::test]
257    async fn min_seq_is_none_on_empty_log() {
258        let db = setup_db("change_log_min_empty").await;
259        // Trim everything the migrations may have produced (none expected, but
260        // be explicit so this test is hermetic).
261        sqlx::query("TRUNCATE forge_change_log")
262            .execute(db.pool())
263            .await
264            .unwrap();
265        assert_eq!(min_seq(db.pool()).await.unwrap(), None);
266    }
267
268    #[tokio::test]
269    async fn min_seq_reports_oldest_after_writes() {
270        let db = setup_db("change_log_min_after").await;
271        install_tracked_table(db.pool(), "min_items").await;
272        sqlx::query("TRUNCATE forge_change_log")
273            .execute(db.pool())
274            .await
275            .unwrap();
276
277        sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'a')")
278            .execute(db.pool())
279            .await
280            .unwrap();
281        sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'b')")
282            .execute(db.pool())
283            .await
284            .unwrap();
285
286        let min = min_seq(db.pool()).await.unwrap();
287        let actual: i64 = sqlx::query_scalar("SELECT MIN(seq) FROM forge_change_log")
288            .fetch_one(db.pool())
289            .await
290            .unwrap();
291        assert_eq!(min, Some(actual));
292    }
293}