use std::{collections::HashSet, sync::Arc, time::Duration};
use chrono::{DateTime, Utc};
use thiserror::Error;
use crate::{
storage::{StorageError, StorageProvider},
supertable::{
handle::Supertable,
wal::{
persistence::{WalStore, WalStoreError},
state_doc::{WalId, WalState},
},
},
};
pub const DEFAULT_WAL_GRACE: Duration = Duration::from_secs(5 * 60);
pub const DEFAULT_SIDECAR_GRACE: Duration = Duration::from_secs(60 * 60);
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct GcReport {
pub n_state_docs_scanned: usize,
pub n_state_docs_deleted: usize,
pub n_arrow_sidecars_deleted_with_state: usize,
pub n_orphan_arrow_sidecars_deleted: usize,
pub n_read_errors: usize,
pub n_delete_errors: usize,
}
#[derive(Debug, Error)]
pub enum GcError {
#[error("GC sweep requires storage; supertable has none attached")]
NoStorageAttached,
#[error("failed to list wal/mutations prefix: {0}")]
ListFailed(#[from] WalStoreError),
}
pub async fn run_sweep(
supertable: &Supertable,
now: DateTime<Utc>,
wal_grace: Duration,
sidecar_grace: Duration,
) -> Result<GcReport, GcError> {
let inner = supertable.inner();
let storage = inner
.options
.storage
.as_ref()
.ok_or(GcError::NoStorageAttached)?
.clone();
let wal_store = WalStore::new(Arc::clone(&storage));
let mut report = GcReport::default();
let wal_ids = match wal_store.list_wal_ids().await {
Ok(v) => v,
Err(e) => return Err(GcError::ListFailed(e)),
};
let wal_grace_chrono =
chrono::Duration::from_std(wal_grace).unwrap_or_else(|_| chrono::Duration::seconds(0));
for wal_id in &wal_ids {
report.n_state_docs_scanned += 1;
match wal_store.read(*wal_id).await {
Ok((doc, _etag)) => {
if doc.state == WalState::Complete && now - doc.created_at > wal_grace_chrono {
let state_ok = wal_store.delete_state(*wal_id).await.is_ok();
let arrow_ok = wal_store.delete_arrow(*wal_id).await.is_ok();
if state_ok {
report.n_state_docs_deleted += 1;
} else {
report.n_delete_errors += 1;
}
if arrow_ok {
report.n_arrow_sidecars_deleted_with_state += 1;
}
}
}
Err(_) => {
report.n_read_errors += 1;
}
}
}
let known_ids: HashSet<WalId> = wal_ids.into_iter().collect();
let sidecar_grace_chrono =
chrono::Duration::from_std(sidecar_grace).unwrap_or_else(|_| chrono::Duration::seconds(0));
match list_arrow_orphans(&storage, &known_ids).await {
Ok(orphans) => {
for (wal_id, mtime) in orphans {
let age_ok = match mtime {
Some(t) => now - t > sidecar_grace_chrono,
None => false,
};
if !age_ok {
continue;
}
if wal_store.delete_arrow(wal_id).await.is_ok() {
report.n_orphan_arrow_sidecars_deleted += 1;
} else {
report.n_delete_errors += 1;
}
}
}
Err(_) => {
report.n_read_errors += 1;
}
}
Ok(report)
}
async fn list_arrow_orphans(
storage: &Arc<dyn StorageProvider>,
known_ids: &HashSet<WalId>,
) -> Result<Vec<(WalId, Option<DateTime<Utc>>)>, StorageError> {
let uris = storage.list_with_prefix("wal/mutations").await?;
let mut out: Vec<(WalId, Option<DateTime<Utc>>)> = Vec::new();
for uri in uris {
let filename = match uri.rsplit_once('/') {
Some((_, f)) => f,
None => uri.as_str(),
};
let Some(stem) = filename.strip_suffix(".arrow") else {
continue;
};
let Ok(wal_id) = WalId::from_hex(stem) else {
continue;
};
if known_ids.contains(&wal_id) {
continue;
}
out.push((wal_id, None));
}
Ok(out)
}
#[cfg(test)]
mod tests {
use chrono::{Duration as ChronoDuration, Utc};
use tempfile::TempDir;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
supertable::{
Supertable,
wal::state_doc::{
OpKind, RowId, SCHEMA_VERSION, TombstoneEntry, TombstoneOutcome, WalStateDoc,
},
},
test_helpers::default_supertable_options,
};
fn complete_wal(wal_id_v: i128, created_at: DateTime<Utc>) -> WalStateDoc {
WalStateDoc {
wal_id: WalId(wal_id_v),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Complete,
created_at,
lease: None,
predicate_repr: "gc-test".into(),
target_ids: vec![RowId(1)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::NotFound,
tombstoned_in_superfile: None,
}],
}
}
fn intent_wal(wal_id_v: i128, created_at: DateTime<Utc>) -> WalStateDoc {
let mut doc = complete_wal(wal_id_v, created_at);
doc.state = WalState::Intent;
doc.tombstone_progress[0].outcome = TombstoneOutcome::Pending;
doc
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_empty_supertable_reports_zero_work() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let report = run_sweep(&st, Utc::now(), DEFAULT_WAL_GRACE, DEFAULT_SIDECAR_GRACE)
.await
.expect("sweep");
assert_eq!(report, GcReport::default());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_deletes_complete_wal_past_grace() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let now = Utc::now();
let old = now - ChronoDuration::seconds(10 * 60);
let doc = complete_wal(0x111, old);
ws.create(&doc).await.expect("seed");
let report = run_sweep(&st, now, DEFAULT_WAL_GRACE, DEFAULT_SIDECAR_GRACE)
.await
.expect("sweep");
assert_eq!(report.n_state_docs_scanned, 1);
assert_eq!(report.n_state_docs_deleted, 1);
let after = ws.list_wal_ids().await.expect("list");
assert!(after.is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_does_not_delete_fresh_complete_wal_within_grace() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let now = Utc::now();
let doc = complete_wal(0x222, now);
ws.create(&doc).await.expect("seed");
let report = run_sweep(&st, now, DEFAULT_WAL_GRACE, DEFAULT_SIDECAR_GRACE)
.await
.expect("sweep");
assert_eq!(report.n_state_docs_scanned, 1);
assert_eq!(report.n_state_docs_deleted, 0);
let after = ws.list_wal_ids().await.expect("list");
assert_eq!(after, vec![WalId(0x222)]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_does_not_delete_intent_wal() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let old = Utc::now() - ChronoDuration::seconds(60 * 60);
let doc = intent_wal(0x333, old);
ws.create(&doc).await.expect("seed");
let report = run_sweep(&st, Utc::now(), DEFAULT_WAL_GRACE, DEFAULT_SIDECAR_GRACE)
.await
.expect("sweep");
assert_eq!(report.n_state_docs_scanned, 1);
assert_eq!(report.n_state_docs_deleted, 0);
let after = ws.list_wal_ids().await.expect("list");
assert_eq!(after, vec![WalId(0x333)]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn in_memory_supertable_sweep_errors_cleanly() {
let st = Supertable::create(default_supertable_options()).expect("create");
let err = run_sweep(&st, Utc::now(), DEFAULT_WAL_GRACE, DEFAULT_SIDECAR_GRACE)
.await
.expect_err("must error");
assert!(matches!(err, GcError::NoStorageAttached));
}
}