use uuid::Uuid;
use crate::supertable::wal::{
persistence::{WalStore, WalStoreError},
state_doc::SealRecord,
tombstones_codec::TombstonesSidecar,
};
#[derive(Debug, thiserror::Error)]
pub enum TombstonesAdminError {
#[error(
"tombstone sidecar for {superfile_id} is already sealed (compaction_id={existing_compaction_id})"
)]
AlreadySealed {
superfile_id: Uuid,
existing_compaction_id: Uuid,
},
#[error("CAS race lost while sealing sidecar for {superfile_id}")]
CasLost { superfile_id: Uuid },
#[error("wal store error: {0}")]
WalStore(#[from] WalStoreError),
}
pub async fn seal(
wal_store: &WalStore,
superfile_id: Uuid,
compaction_id: Uuid,
sealed_at: chrono::DateTime<chrono::Utc>,
) -> Result<TombstonesSidecar, TombstonesAdminError> {
let (existing, etag_opt) = match wal_store.get_tombstones(superfile_id).await? {
Some((sc, etag)) => (Some(sc), Some(etag)),
None => (None, None),
};
if let Some(existing) = &existing
&& let Some(existing_seal) = existing.seal.as_ref()
{
if existing_seal.compaction_id == compaction_id {
return Ok(existing.clone());
}
return Err(TombstonesAdminError::AlreadySealed {
superfile_id,
existing_compaction_id: existing_seal.compaction_id,
});
}
let bitmap = existing
.map(|sc| sc.bitmap)
.unwrap_or_else(roaring::RoaringBitmap::new);
let sealed = TombstonesSidecar {
seal: Some(SealRecord {
compaction_id,
sealed_at,
}),
bitmap,
};
match wal_store
.put_tombstones(superfile_id, etag_opt.as_ref(), &sealed)
.await
{
Ok(_new_etag) => Ok(sealed),
Err(WalStoreError::CasFailed { .. }) => Err(TombstonesAdminError::CasLost { superfile_id }),
Err(other) => Err(other.into()),
}
}
pub async fn live_rows(
wal_store: &WalStore,
superfile_id: Uuid,
n_docs: u32,
) -> Result<Vec<u32>, TombstonesAdminError> {
let bitmap = match wal_store.get_tombstones(superfile_id).await? {
Some((sc, _etag)) => sc.bitmap,
None => roaring::RoaringBitmap::new(),
};
let mut out: Vec<u32> = Vec::with_capacity(n_docs as usize);
for doc_id in 0..n_docs {
if !bitmap.contains(doc_id) {
out.push(doc_id);
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::Utc;
use tempfile::TempDir;
use super::*;
use crate::storage::{LocalFsStorageProvider, StorageProvider};
fn fixture() -> (TempDir, WalStore) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
(dir, WalStore::new(storage))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn seal_on_absent_sidecar_creates_sealed_empty() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x100);
let cid = Uuid::from_u128(0xC0DE);
let sealed = seal(&ws, sf, cid, Utc::now()).await.expect("seal");
assert_eq!(sealed.seal.expect("set").compaction_id, cid);
assert!(sealed.bitmap.is_empty());
let (post, _etag) = ws.get_tombstones(sf).await.expect("get").expect("present");
assert_eq!(post.seal.expect("set").compaction_id, cid);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn seal_preserves_existing_bitmap() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x200);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(5);
bitmap.insert(7);
ws.put_tombstones(
sf,
None,
&TombstonesSidecar {
seal: None,
bitmap: bitmap.clone(),
},
)
.await
.expect("seed");
let cid = Uuid::from_u128(0xABCD);
let sealed = seal(&ws, sf, cid, Utc::now()).await.expect("seal");
assert_eq!(sealed.bitmap, bitmap, "seal must preserve the bitmap");
assert_eq!(sealed.seal.expect("set").compaction_id, cid);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn seal_is_idempotent_on_same_compaction_id() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x300);
let cid = Uuid::from_u128(0xDEAD);
let first = seal(&ws, sf, cid, Utc::now()).await.expect("seal-1");
let again = seal(&ws, sf, cid, Utc::now()).await.expect("seal-2");
assert_eq!(
first.seal.as_ref().expect("set").compaction_id,
again.seal.as_ref().expect("set").compaction_id
);
assert_eq!(first.bitmap, again.bitmap);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn seal_on_different_compaction_id_surfaces_already_sealed() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x400);
let cid_a = Uuid::from_u128(0x1111);
let cid_b = Uuid::from_u128(0x2222);
let _ = seal(&ws, sf, cid_a, Utc::now()).await.expect("seal-a");
let err = seal(&ws, sf, cid_b, Utc::now())
.await
.expect_err("must error");
assert!(matches!(
err,
TombstonesAdminError::AlreadySealed {
existing_compaction_id,
..
} if existing_compaction_id == cid_a
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_rows_absent_sidecar_returns_full_range() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x500);
let rows = live_rows(&ws, sf, 5).await.expect("live");
assert_eq!(rows, vec![0u32, 1, 2, 3, 4]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_rows_excludes_tombstoned_bits() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x600);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(3);
ws.put_tombstones(sf, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("seed");
let rows = live_rows(&ws, sf, 5).await.expect("live");
assert_eq!(rows, vec![0u32, 2, 4]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_rows_works_on_sealed_sidecar() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x700);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(2);
ws.put_tombstones(sf, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("seed");
let cid = Uuid::from_u128(0xC0DEC0DE);
let _ = seal(&ws, sf, cid, Utc::now()).await.expect("seal");
let rows = live_rows(&ws, sf, 4).await.expect("live");
assert_eq!(rows, vec![0u32, 1, 3]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn race_writer_then_seal_landed_tombstone_visible_to_compactor() {
let (_dir, ws) = fixture();
let sf = Uuid::from_u128(0x800);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(3);
ws.put_tombstones(sf, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("writer wrote");
let cid = Uuid::from_u128(0xC0DEFACE);
let _ = seal(&ws, sf, cid, Utc::now()).await.expect("seal");
let rows = live_rows(&ws, sf, 5).await.expect("live");
assert_eq!(rows, vec![0u32, 1, 2, 4]);
}
}