use chrono::{DateTime, Utc};
use futures_util::stream::{Stream, StreamExt};
use sqlx::PgPool;
use forge_core::cluster::LeaderRole;
use forge_core::error::{ForgeError, Result};
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ChangeRow {
pub seq: i64,
pub table_name: String,
pub op: String,
pub row_id: Option<String>,
pub changed_cols: Option<String>,
pub created_at: DateTime<Utc>,
}
pub fn drain_change_log(pool: &PgPool, since: i64) -> impl Stream<Item = Result<ChangeRow>> + '_ {
sqlx::query_as!(
ChangeRow,
r#"SELECT seq, table_name, op, row_id, changed_cols, created_at
FROM forge_change_log WHERE seq > $1 ORDER BY seq"#,
since,
)
.fetch(pool)
.map(|r| r.map_err(ForgeError::Database))
}
pub const CHANGE_LOG_MIN_ROWS: i64 = 1_000;
#[allow(clippy::disallowed_methods)]
pub async fn trim_change_log(pool: &PgPool, before: DateTime<Utc>) -> Result<u64> {
let lock_id = LeaderRole::LogCompactor.lock_id();
let mut tx = pool.begin().await.map_err(ForgeError::Database)?;
let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_xact_lock($1)")
.bind(lock_id)
.fetch_one(&mut *tx)
.await
.map_err(ForgeError::Database)?;
if !acquired {
return Ok(0);
}
let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM forge_change_log")
.fetch_one(&mut *tx)
.await
.map_err(ForgeError::Database)?;
if total <= CHANGE_LOG_MIN_ROWS {
return Ok(0);
}
let result = sqlx::query!("DELETE FROM forge_change_log WHERE created_at < $1", before,)
.execute(&mut *tx)
.await
.map_err(ForgeError::Database)?;
tx.commit().await.map_err(ForgeError::Database)?;
Ok(result.rows_affected())
}
pub async fn min_seq(pool: &PgPool) -> Result<Option<i64>> {
let row = sqlx::query!(r#"SELECT MIN(seq) AS min FROM forge_change_log"#)
.fetch_one(pool)
.await
.map_err(ForgeError::Database)?;
Ok(row.min)
}
#[allow(clippy::disallowed_methods)]
pub async fn max_seq(pool: &PgPool) -> Result<Option<i64>> {
let row: (Option<i64>,) = sqlx::query_as("SELECT MAX(seq) FROM forge_change_log")
.fetch_one(pool)
.await
.map_err(ForgeError::Database)?;
Ok(row.0)
}
#[cfg(all(test, feature = "testcontainers"))]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::panic,
clippy::disallowed_methods
)]
mod integration_tests {
use super::*;
use forge_core::testing::{IsolatedTestDb, TestDatabase};
use futures_util::stream::TryStreamExt;
async fn setup_db(test_name: &str) -> IsolatedTestDb {
let base = TestDatabase::from_env()
.await
.expect("Failed to create test database");
base.isolated(test_name)
.await
.expect("Failed to create isolated db")
}
async fn install_tracked_table(pool: &PgPool, table: &str) {
sqlx::query(&format!(
"CREATE TABLE {table} (id UUID PRIMARY KEY, name TEXT)"
))
.execute(pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE TRIGGER {table}_notify
AFTER INSERT OR UPDATE OR DELETE ON {table}
FOR EACH ROW EXECUTE FUNCTION forge_notify_change()"
))
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn drain_returns_rows_after_high_water_mark() {
let db = setup_db("change_log_drain").await;
install_tracked_table(db.pool(), "drain_items").await;
for _ in 0..3 {
sqlx::query("INSERT INTO drain_items (id, name) VALUES (gen_random_uuid(), 'x')")
.execute(db.pool())
.await
.unwrap();
}
let first_seq: i64 = sqlx::query_scalar(
"SELECT MIN(seq) FROM forge_change_log WHERE table_name='drain_items'",
)
.fetch_one(db.pool())
.await
.unwrap();
let rows: Vec<ChangeRow> = drain_change_log(db.pool(), first_seq)
.try_collect()
.await
.unwrap();
assert_eq!(rows.len(), 2);
assert!(rows.iter().all(|r| r.seq > first_seq));
assert!(rows.iter().all(|r| r.table_name == "drain_items"));
assert!(rows.iter().all(|r| r.op == "INSERT"));
assert!(rows[0].seq < rows[1].seq);
}
#[tokio::test]
async fn trim_deletes_only_rows_older_than_cutoff() {
let db = setup_db("change_log_trim").await;
install_tracked_table(db.pool(), "trim_items").await;
sqlx::query("INSERT INTO trim_items (id, name) VALUES (gen_random_uuid(), 'old')")
.execute(db.pool())
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(10);
let deleted = trim_change_log(db.pool(), cutoff).await.unwrap();
assert_eq!(deleted, 1);
let remaining: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM forge_change_log WHERE table_name='trim_items'",
)
.fetch_one(db.pool())
.await
.unwrap();
assert_eq!(remaining, 0);
}
#[tokio::test]
async fn min_seq_is_none_on_empty_log() {
let db = setup_db("change_log_min_empty").await;
sqlx::query("TRUNCATE forge_change_log")
.execute(db.pool())
.await
.unwrap();
assert_eq!(min_seq(db.pool()).await.unwrap(), None);
}
#[tokio::test]
async fn min_seq_reports_oldest_after_writes() {
let db = setup_db("change_log_min_after").await;
install_tracked_table(db.pool(), "min_items").await;
sqlx::query("TRUNCATE forge_change_log")
.execute(db.pool())
.await
.unwrap();
sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'a')")
.execute(db.pool())
.await
.unwrap();
sqlx::query("INSERT INTO min_items (id, name) VALUES (gen_random_uuid(), 'b')")
.execute(db.pool())
.await
.unwrap();
let min = min_seq(db.pool()).await.unwrap();
let actual: i64 = sqlx::query_scalar("SELECT MIN(seq) FROM forge_change_log")
.fetch_one(db.pool())
.await
.unwrap();
assert_eq!(min, Some(actual));
}
}