mod catalog_integrity_helpers;
use nodedb::control::security::catalog::SystemCatalog;
use nodedb::data::executor::handlers::reclaim;
use catalog_integrity_helpers::{TENANT, make_catalog, make_collection};
#[test]
fn delete_collection_is_idempotent_and_reflected_in_get() {
let (_tmp, catalog) = make_catalog();
let mut coll = make_collection("users");
coll.is_active = true;
catalog
.put_collection(nodedb_types::DatabaseId::DEFAULT, &coll)
.unwrap();
assert!(
catalog
.get_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap()
.is_some()
);
catalog
.delete_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap();
assert!(
catalog
.get_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap()
.is_none(),
"post-delete get_collection must return None"
);
catalog
.delete_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap();
catalog
.delete_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap();
assert!(
catalog
.get_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "users")
.unwrap()
.is_none()
);
}
#[test]
fn soft_delete_preserves_row_and_clears_active_flag() {
let (_tmp, catalog) = make_catalog();
let mut coll = make_collection("logs");
coll.is_active = true;
catalog
.put_collection(nodedb_types::DatabaseId::DEFAULT, &coll)
.unwrap();
let mut stored = catalog
.get_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "logs")
.unwrap()
.unwrap();
stored.is_active = false;
catalog
.put_collection(nodedb_types::DatabaseId::DEFAULT, &stored)
.unwrap();
let after = catalog
.get_collection(nodedb_types::DatabaseId::DEFAULT, TENANT, "logs")
.unwrap()
.unwrap();
assert!(
!after.is_active,
"is_active must be false after soft-delete"
);
assert_eq!(after.name, "logs", "row must still exist for UNDROP");
let dropped = catalog
.load_dropped_collections(nodedb_types::DatabaseId::DEFAULT)
.unwrap();
assert!(
dropped.iter().any(|c| c.name == "logs"),
"soft-deleted row must appear in load_dropped_collections"
);
}
#[test]
fn l2_cleanup_queue_is_idempotent_end_to_end() {
use nodedb::control::security::catalog::StoredL2CleanupEntry;
let (_tmp, catalog) = make_catalog();
let entry = |lsn: u64, bytes: u64, attempts: u32, err: &str| StoredL2CleanupEntry {
tenant_id: TENANT,
name: "events".into(),
purge_lsn: lsn,
enqueued_at_ns: 100,
bytes_pending: bytes,
last_error: err.to_string(),
attempts,
};
catalog
.enqueue_l2_cleanup(&entry(500, 2_000, 0, ""))
.unwrap();
catalog
.enqueue_l2_cleanup(&entry(700, 9_000, 0, ""))
.unwrap();
let rows = catalog.load_l2_cleanup_queue().unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].purge_lsn, 700);
assert_eq!(rows[0].bytes_pending, 9_000);
catalog
.record_l2_cleanup_attempt(TENANT, "events", "s3: 503")
.unwrap();
catalog
.record_l2_cleanup_attempt(TENANT, "events", "s3: 503")
.unwrap();
let rows = catalog.load_l2_cleanup_queue().unwrap();
assert_eq!(rows[0].attempts, 2);
assert_eq!(rows[0].last_error, "s3: 503");
catalog.remove_l2_cleanup(TENANT, "events").unwrap();
catalog.remove_l2_cleanup(TENANT, "events").unwrap();
assert!(catalog.load_l2_cleanup_queue().unwrap().is_empty());
catalog
.record_l2_cleanup_attempt(TENANT, "events", "doesn't matter")
.unwrap();
assert!(catalog.load_l2_cleanup_queue().unwrap().is_empty());
}
#[test]
fn reclaim_handlers_are_idempotent_on_missing_files() {
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let vector = reclaim::vector::reclaim_vector_checkpoints(base, TENANT, "x");
let spatial = reclaim::spatial::reclaim_spatial_checkpoints(base, TENANT, "x");
let sparse = reclaim::sparse_vector::reclaim_sparse_vector_checkpoints(base, TENANT, "x");
let ts = reclaim::timeseries::reclaim_timeseries_partitions(base, TENANT, "x");
assert_eq!(vector.files_unlinked, 0);
assert_eq!(spatial.files_unlinked, 0);
assert_eq!(sparse.files_unlinked, 0);
assert_eq!(ts.files_unlinked, 0);
let _ = reclaim::vector::reclaim_vector_checkpoints(base, TENANT, "x");
let _ = reclaim::spatial::reclaim_spatial_checkpoints(base, TENANT, "x");
let _ = reclaim::sparse_vector::reclaim_sparse_vector_checkpoints(base, TENANT, "x");
let _ = reclaim::timeseries::reclaim_timeseries_partitions(base, TENANT, "x");
}
#[test]
fn reclaim_is_scoped_to_tenant_and_collection() {
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let vec_dir = base.join("vector-ckpt");
std::fs::create_dir_all(&vec_dir).unwrap();
std::fs::write(vec_dir.join("1:users.ckpt"), b"a").unwrap();
std::fs::write(vec_dir.join("1:orders.ckpt"), b"b").unwrap();
std::fs::write(vec_dir.join("2:users.ckpt"), b"c").unwrap();
let stats = reclaim::vector::reclaim_vector_checkpoints(base, 1, "users");
assert_eq!(stats.files_unlinked, 1);
assert!(!vec_dir.join("1:users.ckpt").exists());
assert!(vec_dir.join("1:orders.ckpt").exists());
assert!(vec_dir.join("2:users.ckpt").exists());
}
fn _cat_ref_witness(_cat: &SystemCatalog) {}