cf-modkit-db 0.5.5

ModKit database library
Documentation
use std::time::Duration;

use sea_orm::{ConnectionTrait, DbBackend, Statement, TransactionTrait};
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

use super::super::dialect::Dialect;
use super::super::taskward::{Directive, WorkerAction};
use super::super::types::OutboxError;
use crate::Db;

/// Max rows per bounded vacuum chunk (SELECT + DELETE).
const VACUUM_BATCH_SIZE: usize = 10_000;

/// SQL LIMIT value for vacuum batch size.
const VACUUM_BATCH_LIMIT: i64 = 10_000;

/// Page size for the dirty-partition cursor.
const DIRTY_PAGE_SIZE: usize = 64;

/// SQL LIMIT value for dirty-partition page size.
const DIRTY_PAGE_LIMIT: i64 = 64;

/// Report emitted by a vacuum sweep.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct VacuumReport {
    /// Number of partitions visited in this sweep.
    pub partitions_swept: usize,
    /// Total outgoing + body rows deleted across all partitions.
    pub rows_deleted: u64,
}

/// Standalone vacuum background task that garbage-collects processed
/// outgoing rows and their associated body rows.
///
/// Counter-driven: only visits partitions where the processor has
/// bumped `modkit_outbox_vacuum_counter` since the last vacuum.
///
/// Each sweep snapshots all dirty partitions, drains each one
/// (delete chunks until `deleted < VACUUM_BATCH_SIZE`), decrements
/// the counter by the snapshot value, then sleeps for `vacuum_cooldown`.
/// Partitions dirtied during the sweep are picked up in the next cycle.
///
/// Resilient to transient DB errors: a failed snapshot or per-partition
/// error is logged and the sweep continues (or retries after cooldown).
/// The vacuum never kills itself on a transient failure.
pub struct VacuumTask {
    db: Db,
    vacuum_cooldown: Duration,
}

impl VacuumTask {
    pub fn new(db: Db, vacuum_cooldown: Duration) -> Self {
        Self {
            db,
            vacuum_cooldown,
        }
    }
}

impl WorkerAction for VacuumTask {
    type Payload = VacuumReport;
    type Error = OutboxError;

    async fn execute(
        &mut self,
        cancel: &CancellationToken,
    ) -> Result<Directive<VacuumReport>, OutboxError> {
        let (backend, dialect) = {
            let sea_conn = self.db.sea_internal();
            let b = sea_conn.get_database_backend();
            (b, Dialect::from(b))
        };

        let sweep_start = tokio::time::Instant::now();

        // Phase 1: Snapshot dirty partitions (errors propagate to bulkhead)
        let dirty = Self::snapshot_dirty(&self.db, backend, &dialect, cancel).await?;

        // Phase 2: Drain each partition (per-partition errors logged, not propagated)
        let mut errors = 0u32;
        let mut total_deleted: u64 = 0;
        for (partition_id, snapshot_counter) in &dirty {
            if cancel.is_cancelled() {
                break;
            }
            match self
                .drain_partition(
                    &self.db,
                    backend,
                    &dialect,
                    *partition_id,
                    *snapshot_counter,
                    cancel,
                )
                .await
            {
                Ok(deleted) => total_deleted += deleted,
                Err(e) => {
                    warn!(
                        partition_id,
                        error = %e,
                        "vacuum: failed to drain partition, skipping",
                    );
                    errors += 1;
                }
            }
        }

        let elapsed = sweep_start.elapsed();
        debug!(
            partitions = dirty.len(),
            errors,
            elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
            "vacuum: sweep complete",
        );

        let report = VacuumReport {
            partitions_swept: dirty.len(),
            rows_deleted: total_deleted,
        };
        Ok(Directive::Sleep(self.vacuum_cooldown, report))
    }
}

impl VacuumTask {
    /// Drain a single partition and decrement its counter.
    /// Returns the number of rows deleted. Extracted so the caller can catch
    /// errors per-partition.
    async fn drain_partition(
        &self,
        db: &Db,
        backend: DbBackend,
        dialect: &Dialect,
        partition_id: i64,
        snapshot_counter: i64,
        cancel: &CancellationToken,
    ) -> Result<u64, OutboxError> {
        let deleted = self
            .vacuum_partition(db, backend, dialect, partition_id, cancel)
            .await?;

        // Only decrement counter if vacuum completed without cancellation.
        // A cancelled partial drain leaves rows behind — if we decrement to 0,
        // those rows become orphaned (no mechanism to rediscover them).
        if !cancel.is_cancelled() {
            let conn = db.sea_internal();
            conn.execute(Statement::from_sql_and_values(
                backend,
                dialect.decrement_vacuum_counter(),
                [snapshot_counter.into(), partition_id.into()],
            ))
            .await?;
        }

        Ok(deleted)
    }

    /// Collect all dirty partitions (counter > 0) via paginated cursor.
    /// Returns `(partition_id, counter)` pairs, snapshot taken once per sweep.
    async fn snapshot_dirty(
        db: &Db,
        backend: DbBackend,
        dialect: &Dialect,
        cancel: &CancellationToken,
    ) -> Result<Vec<(i64, i64)>, OutboxError> {
        let mut dirty = Vec::new();
        let mut cursor: i64 = 0;

        loop {
            if cancel.is_cancelled() {
                break;
            }

            let conn = db.sea_internal();
            let page = DIRTY_PAGE_LIMIT;
            let rows = conn
                .query_all(Statement::from_sql_and_values(
                    backend,
                    dialect.fetch_dirty_partitions(),
                    [cursor.into(), page.into()],
                ))
                .await?;

            if rows.is_empty() {
                break;
            }

            for r in &rows {
                let pid: i64 = r.try_get_by_index(0).map_err(|e| {
                    OutboxError::Database(sea_orm::DbErr::Custom(format!(
                        "partition_id column: {e}"
                    )))
                })?;
                let counter: i64 = r.try_get_by_index(1).map_err(|e| {
                    OutboxError::Database(sea_orm::DbErr::Custom(format!("counter column: {e}")))
                })?;
                dirty.push((pid, counter));
            }

            cursor = dirty.last().map_or(cursor, |&(pid, _)| pid);

            if rows.len() < DIRTY_PAGE_SIZE {
                break;
            }
        }

        Ok(dirty)
    }

    /// Drain a single partition: read `processed_seq`, then delete all
    /// outgoing + body rows with `seq <= processed_seq` in bounded chunks
    /// until `deleted < VACUUM_BATCH_SIZE`.
    /// Returns total rows deleted for this partition.
    async fn vacuum_partition(
        &self,
        db: &Db,
        backend: DbBackend,
        dialect: &Dialect,
        partition_id: i64,
        cancel: &CancellationToken,
    ) -> Result<u64, OutboxError> {
        // Read processed_seq (PK lookup, cheap).
        let row = {
            let conn = db.sea_internal();
            conn.query_one(Statement::from_sql_and_values(
                backend,
                dialect.read_processor(),
                [partition_id.into()],
            ))
            .await?
        };

        let Some(row) = row else {
            return Ok(0);
        };
        let processed_seq: i64 = row.try_get_by_index(0).map_err(|e| {
            OutboxError::Database(sea_orm::DbErr::Custom(format!(
                "`processed_seq` column: {e}",
            )))
        })?;
        if processed_seq == 0 {
            return Ok(0);
        }

        let vacuum_sql = dialect.vacuum_cleanup();
        let mut total_deleted: u64 = 0;

        // Delete in bounded chunks until drained.
        // The bulkhead holds the maintenance semaphore for the entire sweep.
        loop {
            if cancel.is_cancelled() {
                break;
            }

            let deleted = Self::delete_chunk(
                db,
                backend,
                dialect,
                &vacuum_sql,
                partition_id,
                processed_seq,
            )
            .await?;

            total_deleted += deleted as u64;

            if deleted < VACUUM_BATCH_SIZE {
                break; // Partition drained.
            }
        }

        Ok(total_deleted)
    }

    /// Execute one bounded chunk of cleanup for a single partition.
    /// Returns the number of outgoing rows deleted.
    async fn delete_chunk(
        db: &Db,
        backend: DbBackend,
        dialect: &Dialect,
        vacuum_sql: &super::super::dialect::VacuumSql,
        partition_id: i64,
        processed_seq: i64,
    ) -> Result<usize, OutboxError> {
        let conn = db.sea_internal();
        let txn = conn.begin().await?;

        let limit = VACUUM_BATCH_LIMIT;

        let rows = txn
            .query_all(Statement::from_sql_and_values(
                backend,
                vacuum_sql.select_outgoing_chunk,
                [partition_id.into(), processed_seq.into(), limit.into()],
            ))
            .await?;

        if rows.is_empty() {
            txn.rollback().await?;
            return Ok(0);
        }

        let mut outgoing_ids: Vec<i64> = Vec::with_capacity(rows.len());
        let mut body_ids: Vec<i64> = Vec::with_capacity(rows.len());
        for r in &rows {
            let oid: i64 = r.try_get_by_index(0).map_err(|e| {
                OutboxError::Database(sea_orm::DbErr::Custom(format!("outgoing_id column: {e}")))
            })?;
            outgoing_ids.push(oid);
            if let Ok(bid) = r.try_get_by_index::<i64>(1) {
                body_ids.push(bid);
            }
        }

        let count = outgoing_ids.len();

        // DELETE outgoing rows by ID.
        if !outgoing_ids.is_empty() {
            let delete_sql = dialect.build_delete_outgoing_batch(outgoing_ids.len());
            let values: Vec<sea_orm::Value> = outgoing_ids.iter().map(|&id| id.into()).collect();
            txn.execute(Statement::from_sql_and_values(backend, &delete_sql, values))
                .await?;
        }

        // DELETE body rows by ID.
        if !body_ids.is_empty() {
            let delete_sql = dialect.build_delete_body_batch(body_ids.len());
            let values: Vec<sea_orm::Value> = body_ids.iter().map(|&id| id.into()).collect();
            txn.execute(Statement::from_sql_and_values(backend, &delete_sql, values))
                .await?;
        }

        txn.commit().await?;
        Ok(count)
    }
}