use sea_orm::{ConnectionTrait, DbBackend, Statement, TransactionTrait};
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use super::super::dialect::Dialect;
use super::super::taskward::{Directive, WorkerAction};
use super::super::types::OutboxError;
use crate::Db;
const DIRTY_PAGE_SIZE: usize = 64;
const DIRTY_PAGE_LIMIT: i64 = 64;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct VacuumReport {
pub partitions_swept: usize,
pub rows_deleted: u64,
}
pub struct VacuumTask {
db: Db,
batch_size: usize,
}
impl VacuumTask {
pub fn new(db: Db, batch_size: usize) -> Self {
assert!(
batch_size > 0,
"vacuum batch_size must be greater than zero"
);
Self { db, batch_size }
}
}
impl WorkerAction for VacuumTask {
type Payload = VacuumReport;
type Error = OutboxError;
async fn execute(
&mut self,
cancel: &CancellationToken,
) -> Result<Directive<VacuumReport>, OutboxError> {
let (backend, dialect) = {
let sea_conn = self.db.sea_internal();
let b = sea_conn.get_database_backend();
(b, Dialect::from(b))
};
let sweep_start = tokio::time::Instant::now();
let dirty = Self::snapshot_dirty(&self.db, backend, &dialect, cancel).await?;
let mut errors = 0u32;
let mut total_deleted: u64 = 0;
for (partition_id, snapshot_counter) in &dirty {
if cancel.is_cancelled() {
break;
}
match self
.drain_partition(
&self.db,
backend,
&dialect,
*partition_id,
*snapshot_counter,
cancel,
)
.await
{
Ok(deleted) => total_deleted += deleted,
Err(e) => {
warn!(
partition_id,
error = %e,
"vacuum: failed to drain partition, skipping",
);
errors += 1;
}
}
}
let elapsed = sweep_start.elapsed();
debug!(
partitions = dirty.len(),
errors,
elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
"vacuum: sweep complete",
);
let report = VacuumReport {
partitions_swept: dirty.len(),
rows_deleted: total_deleted,
};
Ok(Directive::Idle(report))
}
}
impl VacuumTask {
async fn drain_partition(
&self,
db: &Db,
backend: DbBackend,
dialect: &Dialect,
partition_id: i64,
snapshot_counter: i64,
cancel: &CancellationToken,
) -> Result<u64, OutboxError> {
let deleted = self
.vacuum_partition(db, backend, dialect, partition_id, cancel)
.await?;
if !cancel.is_cancelled() {
let conn = db.sea_internal();
conn.execute(Statement::from_sql_and_values(
backend,
dialect.decrement_vacuum_counter(),
[snapshot_counter.into(), partition_id.into()],
))
.await?;
}
Ok(deleted)
}
async fn snapshot_dirty(
db: &Db,
backend: DbBackend,
dialect: &Dialect,
cancel: &CancellationToken,
) -> Result<Vec<(i64, i64)>, OutboxError> {
let mut dirty = Vec::new();
let mut cursor: i64 = 0;
loop {
if cancel.is_cancelled() {
break;
}
let conn = db.sea_internal();
let page = DIRTY_PAGE_LIMIT;
let rows = conn
.query_all(Statement::from_sql_and_values(
backend,
dialect.fetch_dirty_partitions(),
[cursor.into(), page.into()],
))
.await?;
if rows.is_empty() {
break;
}
for r in &rows {
let pid: i64 = r.try_get_by_index(0).map_err(|e| {
OutboxError::Database(sea_orm::DbErr::Custom(format!(
"partition_id column: {e}"
)))
})?;
let counter: i64 = r.try_get_by_index(1).map_err(|e| {
OutboxError::Database(sea_orm::DbErr::Custom(format!("counter column: {e}")))
})?;
dirty.push((pid, counter));
}
cursor = dirty.last().map_or(cursor, |&(pid, _)| pid);
if rows.len() < DIRTY_PAGE_SIZE {
break;
}
}
Ok(dirty)
}
async fn vacuum_partition(
&self,
db: &Db,
backend: DbBackend,
dialect: &Dialect,
partition_id: i64,
cancel: &CancellationToken,
) -> Result<u64, OutboxError> {
let row = {
let conn = db.sea_internal();
conn.query_one(Statement::from_sql_and_values(
backend,
dialect.read_processor(),
[partition_id.into()],
))
.await?
};
let Some(row) = row else {
return Ok(0);
};
let processed_seq: i64 = row.try_get_by_index(0).map_err(|e| {
OutboxError::Database(sea_orm::DbErr::Custom(format!(
"`processed_seq` column: {e}",
)))
})?;
if processed_seq == 0 {
return Ok(0);
}
let vacuum_sql = dialect.vacuum_cleanup();
let mut total_deleted: u64 = 0;
loop {
if cancel.is_cancelled() {
break;
}
let deleted = Self::delete_chunk(
db,
backend,
dialect,
&vacuum_sql,
partition_id,
processed_seq,
i64::try_from(self.batch_size).unwrap_or(i64::MAX),
)
.await?;
total_deleted += deleted as u64;
if deleted < self.batch_size {
break; }
}
Ok(total_deleted)
}
async fn delete_chunk(
db: &Db,
backend: DbBackend,
dialect: &Dialect,
vacuum_sql: &super::super::dialect::VacuumSql,
partition_id: i64,
processed_seq: i64,
batch_limit: i64,
) -> Result<usize, OutboxError> {
let conn = db.sea_internal();
let txn = conn.begin().await?;
let limit = batch_limit;
let rows = txn
.query_all(Statement::from_sql_and_values(
backend,
vacuum_sql.select_outgoing_chunk,
[partition_id.into(), processed_seq.into(), limit.into()],
))
.await?;
if rows.is_empty() {
txn.rollback().await?;
return Ok(0);
}
let mut outgoing_ids: Vec<i64> = Vec::with_capacity(rows.len());
let mut body_ids: Vec<i64> = Vec::with_capacity(rows.len());
for r in &rows {
let oid: i64 = r.try_get_by_index(0).map_err(|e| {
OutboxError::Database(sea_orm::DbErr::Custom(format!("outgoing_id column: {e}")))
})?;
outgoing_ids.push(oid);
if let Ok(bid) = r.try_get_by_index::<i64>(1) {
body_ids.push(bid);
}
}
let count = outgoing_ids.len();
if !outgoing_ids.is_empty() {
let delete_sql = dialect.build_delete_outgoing_batch(outgoing_ids.len());
let values: Vec<sea_orm::Value> = outgoing_ids.iter().map(|&id| id.into()).collect();
txn.execute(Statement::from_sql_and_values(backend, &delete_sql, values))
.await?;
}
if !body_ids.is_empty() {
let delete_sql = dialect.build_delete_body_batch(body_ids.len());
let values: Vec<sea_orm::Value> = body_ids.iter().map(|&id| id.into()).collect();
txn.execute(Statement::from_sql_and_values(backend, &delete_sql, values))
.await?;
}
txn.commit().await?;
Ok(count)
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
#[test]
fn vacuum_task_has_configurable_batch_size() {
let _proof = std::mem::size_of::<VacuumTask>();
let tuning = super::super::super::types::WorkerTuning::vacuum();
assert_ne!(
tuning.batch_size, 500,
"sanity: default vacuum batch_size is not 500"
);
let custom_tuning = tuning.batch_size(500);
assert_eq!(
custom_tuning.batch_size as usize, 500,
"VacuumTask should use the configured batch_size (500)",
);
}
}