use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration};
use arrow::ipc::reader::StreamReader;
use arrow_array::{ArrayRef, Decimal128Array, RecordBatch};
use bytes::Bytes;
use roaring::RoaringBitmap;
use tokio::time::sleep;
use uuid::Uuid;
use crate::{
runtime_bridge::bridge_sync_to_async,
storage::StorageError,
superfile::{ReadError, SuperfileReader, builder::SuperfileBuilder},
supertable::{
ManifestSnapshot, SupertableOptions,
handle::{Supertable, SupertableInner},
manifest::{
ClusterCentroids, FtsSummaryAgg, ScalarStatsAgg, SuperfileEntry, SuperfileUri,
VectorSummary, bloom::BloomBuilder,
},
options::{DECIMAL128_PRECISION, DECIMAL128_SCALE},
query::superfile_reader::superfile_reader,
utils::vector_split::split_vectors,
wal::{
persistence::{Etag, WalStore, WalStoreError},
state_doc::{
IdSpan, OpKind, RowId, TombstoneEntry, TombstoneOutcome, WalState, WalStateDoc,
},
tombstones_codec::TombstonesSidecar,
},
writer::{build_subsection_offsets, persist_commit},
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AppendPhaseOutcome {
AlreadyApplied,
Applied,
}
#[derive(Debug, thiserror::Error)]
pub enum AppendPhaseError {
#[error("WAL is missing required field {field:?} for the append phase")]
MissingField { field: &'static str },
#[error("append phase invoked on a DELETE WAL; only UPDATE has an append phase")]
NotAnUpdateWal,
#[error("supertable has no storage attached; append phase requires durable storage")]
NoStorageAttached,
#[error("IPC content hash mismatch for WAL {wal_id:?}: expected {expected:?}, got {got:?}")]
SidecarContentHashMismatch {
wal_id: String,
expected: String,
got: String,
},
#[error("IPC sidecar decode failed for WAL {wal_id:?}: {message}")]
IpcDecode { wal_id: String, message: String },
#[error(
"minted_id_spans flatten ({flat_len}) doesn't match new_row_count ({expected}) for WAL {wal_id:?}"
)]
IdSpansLengthMismatch {
wal_id: String,
flat_len: usize,
expected: u32,
},
#[error("superfile build failed: {message}")]
SuperfileBuild { message: String },
#[error("superfile open for summary failed: {message}")]
SuperfileOpenForSummary { message: String },
#[error("manifest commit failed: {message}")]
ManifestCommit { message: String },
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("WAL store error: {0}")]
WalStore(#[from] WalStoreError),
}
pub async fn run_append_phase(
supertable: &Supertable,
wal_store: &WalStore,
wal_doc: &WalStateDoc,
wal_etag: &Etag,
) -> Result<(AppendPhaseOutcome, WalStateDoc, Etag), AppendPhaseError> {
if wal_doc.op_kind != OpKind::Update {
return Err(AppendPhaseError::NotAnUpdateWal);
}
let preallocated_superfile_id =
wal_doc
.preallocated_superfile_id
.ok_or(AppendPhaseError::MissingField {
field: "preallocated_superfile_id",
})?;
let inner = supertable.inner();
let manifest_snapshot = inner.manifest.load_full();
if manifest_contains(&manifest_snapshot, preallocated_superfile_id) {
let (new_wal, new_etag) =
advance_to_appended_if_needed(wal_store, wal_doc, wal_etag).await?;
return Ok((AppendPhaseOutcome::AlreadyApplied, new_wal, new_etag));
}
let (new_wal, new_etag) = do_apply(
supertable,
wal_store,
wal_doc,
wal_etag,
preallocated_superfile_id,
)
.await?;
Ok((AppendPhaseOutcome::Applied, new_wal, new_etag))
}
fn manifest_contains(manifest: &ManifestSnapshot, superfile_id: Uuid) -> bool {
manifest
.get_all_superfiles()
.iter()
.any(|s| s.uri.0 == superfile_id)
}
async fn advance_to_appended_if_needed(
wal_store: &WalStore,
wal_doc: &WalStateDoc,
wal_etag: &Etag,
) -> Result<(WalStateDoc, Etag), AppendPhaseError> {
if wal_doc.state == WalState::Appended {
return Ok((wal_doc.clone(), wal_etag.clone()));
}
let mut next = wal_doc.clone();
next.state = WalState::Appended;
let new_etag = wal_store
.update_with_etag(wal_doc.wal_id, wal_etag, &next)
.await?;
Ok((next, new_etag))
}
async fn do_apply(
supertable: &Supertable,
wal_store: &WalStore,
wal_doc: &WalStateDoc,
wal_etag: &Etag,
preallocated_superfile_id: Uuid,
) -> Result<(WalStateDoc, Etag), AppendPhaseError> {
let inner = supertable.inner();
let storage = inner
.options
.storage
.as_ref()
.ok_or(AppendPhaseError::NoStorageAttached)?
.clone();
let hash = wal_doc
.new_row_content_hash
.as_deref()
.ok_or(AppendPhaseError::MissingField {
field: "new_row_content_hash",
})?;
let ipc_bytes = wal_store
.get_arrow(wal_doc.wal_id, Some(hash))
.await
.map_err(|e| match e {
WalStoreError::SidecarContentHashMismatch { expected, got, .. } => {
AppendPhaseError::SidecarContentHashMismatch {
wal_id: wal_doc.wal_id.to_hex(),
expected,
got,
}
}
other => AppendPhaseError::WalStore(other),
})?;
let user_batch = decode_ipc_batch(&ipc_bytes, wal_doc)?;
let new_row_count = wal_doc
.new_row_count
.ok_or(AppendPhaseError::MissingField {
field: "new_row_count",
})?;
let flat_ids = flatten_spans(&wal_doc.minted_id_spans);
if flat_ids.len() != new_row_count as usize {
return Err(AppendPhaseError::IdSpansLengthMismatch {
wal_id: wal_doc.wal_id.to_hex(),
flat_len: flat_ids.len(),
expected: new_row_count,
});
}
if user_batch.num_rows() != flat_ids.len() {
return Err(AppendPhaseError::IdSpansLengthMismatch {
wal_id: wal_doc.wal_id.to_hex(),
flat_len: flat_ids.len(),
expected: user_batch.num_rows() as u32,
});
}
let (scalar_no_id, vector_slices) =
split_vectors(&user_batch, &inner.options).map_err(|e| {
AppendPhaseError::SuperfileBuild {
message: format!("vector_split: {e}"),
}
})?;
let scalar_with_id = prepend_id_column(&scalar_no_id, &flat_ids, &inner.options)?;
let bytes = {
let mut builder = SuperfileBuilder::new(inner.options.builder_options()).map_err(|e| {
AppendPhaseError::SuperfileBuild {
message: format!("builder construction: {e}"),
}
})?;
builder
.add_batch(&scalar_with_id, &vector_slices)
.map_err(|e| AppendPhaseError::SuperfileBuild {
message: format!("add_batch: {e}"),
})?;
let raw = builder
.finish()
.map_err(|e| AppendPhaseError::SuperfileBuild {
message: format!("finish: {e}"),
})?;
Bytes::from(raw)
};
let reader = SuperfileReader::open_with(bytes.clone(), inner.options.superfile_open_options())
.map_err(|e| AppendPhaseError::SuperfileOpenForSummary {
message: e.to_string(),
})?;
let fts_summary = build_fts_summary(&reader, &inner.options);
let vector_summary = build_vector_summary(&reader, &inner.options);
let scalar_stats =
ScalarStatsAgg::from_batches(&inner.options.scalar_schema(), &[&scalar_with_id]);
let (id_min, id_max) = if flat_ids.is_empty() {
(0, 0)
} else {
(flat_ids[0], flat_ids[flat_ids.len() - 1])
};
let uri = SuperfileUri(preallocated_superfile_id);
let entry = Arc::new(SuperfileEntry {
superfile_id: preallocated_superfile_id,
uri,
n_docs: flat_ids.len() as u64,
id_min,
id_max,
scalar_stats,
fts_summary,
vector_summary,
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: build_subsection_offsets(&bytes),
});
persist_commit(inner, storage, vec![entry], &[], vec![(uri, bytes.clone())]).map_err(|e| {
AppendPhaseError::ManifestCommit {
message: format!("{e}"),
}
})?;
let _ = inner.options.store.insert(uri, bytes);
advance_to_appended_if_needed(wal_store, wal_doc, wal_etag).await
}
fn flatten_spans(spans: &[IdSpan]) -> Vec<i128> {
let total: usize = spans.iter().map(|s| s.len() as usize).sum();
let mut out = Vec::with_capacity(total);
for span in spans {
out.extend(span.first.0..=span.last.0);
}
out
}
fn decode_ipc_batch(
ipc_bytes: &Bytes,
wal_doc: &WalStateDoc,
) -> Result<RecordBatch, AppendPhaseError> {
let cursor = Cursor::new(ipc_bytes.as_ref());
let mut reader =
StreamReader::try_new(cursor, None).map_err(|e| AppendPhaseError::IpcDecode {
wal_id: wal_doc.wal_id.to_hex(),
message: format!("StreamReader::try_new: {e}"),
})?;
let batch = reader
.next()
.ok_or_else(|| AppendPhaseError::IpcDecode {
wal_id: wal_doc.wal_id.to_hex(),
message: "IPC stream had no batches; expected exactly one".into(),
})?
.map_err(|e| AppendPhaseError::IpcDecode {
wal_id: wal_doc.wal_id.to_hex(),
message: format!("batch read: {e}"),
})?;
if reader.next().is_some() {
return Err(AppendPhaseError::IpcDecode {
wal_id: wal_doc.wal_id.to_hex(),
message: "IPC stream had more than one batch; expected exactly one".into(),
});
}
Ok(batch)
}
fn prepend_id_column(
scalar_no_id: &RecordBatch,
flat_ids: &[i128],
options: &SupertableOptions,
) -> Result<RecordBatch, AppendPhaseError> {
let id_values: Vec<i128> = flat_ids.to_vec();
let id_array = Decimal128Array::from(id_values)
.with_precision_and_scale(DECIMAL128_PRECISION, DECIMAL128_SCALE)
.map_err(|e| AppendPhaseError::SuperfileBuild {
message: format!("Decimal128 precision/scale: {e}"),
})?;
let mut columns: Vec<ArrayRef> = Vec::with_capacity(scalar_no_id.num_columns() + 1);
columns.push(Arc::new(id_array));
columns.extend(scalar_no_id.columns().iter().cloned());
RecordBatch::try_new(options.scalar_schema(), columns).map_err(|e| {
AppendPhaseError::SuperfileBuild {
message: format!("RecordBatch::try_new with _id prepended: {e}"),
}
})
}
fn build_fts_summary(
reader: &SuperfileReader,
options: &SupertableOptions,
) -> HashMap<String, FtsSummaryAgg> {
let mut out: HashMap<String, FtsSummaryAgg> = HashMap::new();
let Some(fts_reader) = reader.fts() else {
return out;
};
for fc in &options.fts_columns {
let terms = fts_reader
.iter_column_terms(&fc.column)
.expect("FST bytes valid: superfile just built");
let n_terms_distinct = terms.len() as u32;
let (min_term, max_term) = match (terms.first(), terms.last()) {
(Some(min), Some(max)) => (min.clone(), max.clone()),
_ => (Vec::new(), Vec::new()),
};
let mut bloom_builder = BloomBuilder::new();
for term in &terms {
bloom_builder.insert(term);
}
out.insert(
fc.column.clone(),
FtsSummaryAgg::new_with_params(
bloom_builder.finish(),
n_terms_distinct,
(min_term, max_term),
),
);
}
out
}
fn build_vector_summary(
reader: &SuperfileReader,
options: &SupertableOptions,
) -> HashMap<String, VectorSummary> {
let mut out: HashMap<String, VectorSummary> = HashMap::new();
let Some(vec_reader) = reader.vec() else {
return out;
};
for vc in &options.vector_columns {
if let Some((centroid, radius)) = vec_reader.summary(&vc.column) {
let clusters = vec_reader
.cluster_centroids(&vc.column)
.map(|(n_cent, dim, fp32, counts)| {
ClusterCentroids::from_fp32(n_cent, dim, &fp32, counts)
})
.unwrap_or_default();
out.insert(
vc.column.clone(),
VectorSummary {
centroid,
radius,
clusters,
},
);
}
}
out
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TombstonePhaseOutcome {
AlreadyComplete {
n_tombstoned: usize,
n_not_found: usize,
},
Applied {
n_tombstoned: usize,
n_not_found: usize,
},
}
#[derive(Debug, thiserror::Error)]
pub enum TombstonePhaseError {
#[error(
"tombstone phase invoked on WAL in state {state:?} for op {op_kind:?}; expected \
Appended (UPDATE) or Intent (DELETE)"
)]
InvalidPreState { op_kind: OpKind, state: WalState },
#[error("supertable has no storage attached; tombstone phase requires durable storage")]
NoStorageAttached,
#[error("tombstone sidecar for target {target_id:?} remained sealed past retry budget")]
SealedSidecarRetryExhausted { target_id: String },
#[error(
"tombstone sidecar CAS exhausted after {attempts} attempts for superfile {superfile_id}"
)]
CasRetryExhausted { superfile_id: Uuid, attempts: u32 },
#[error("failed to scan _id column for target {target_id:?}: {message}")]
IdLookupFailed { target_id: String, message: String },
#[error("tombstone sidecar codec error: {0}")]
SidecarCodec(#[from] crate::supertable::wal::tombstones_codec::SidecarCodecError),
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("WAL store error: {0}")]
WalStore(#[from] WalStoreError),
}
pub async fn run_tombstone_phase(
supertable: &Supertable,
wal_store: &WalStore,
wal_doc: &WalStateDoc,
wal_etag: &Etag,
) -> Result<(TombstonePhaseOutcome, WalStateDoc, Etag), TombstonePhaseError> {
match (wal_doc.op_kind, wal_doc.state) {
(OpKind::Update, WalState::Appended) => {}
(OpKind::Delete, WalState::Intent) => {}
(_, WalState::Complete) => {
let (n_tombstoned, n_not_found) = count_outcomes(&wal_doc.tombstone_progress);
return Ok((
TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned,
n_not_found,
},
wal_doc.clone(),
wal_etag.clone(),
));
}
(op_kind, state) => {
return Err(TombstonePhaseError::InvalidPreState { op_kind, state });
}
}
do_tombstone_apply(supertable, wal_store, wal_doc, wal_etag).await
}
fn count_outcomes(progress: &[TombstoneEntry]) -> (usize, usize) {
let mut n_tombstoned = 0usize;
let mut n_not_found = 0usize;
for entry in progress {
match entry.outcome {
TombstoneOutcome::Tombstoned => n_tombstoned += 1,
TombstoneOutcome::NotFound => n_not_found += 1,
TombstoneOutcome::Pending => {}
}
}
(n_tombstoned, n_not_found)
}
const MAX_CAS_RETRIES: u32 = 10;
const MAX_SEALED_RETRIES: u32 = 16;
const SEALED_RETRY_BASE_MS: u64 = 100;
const SEALED_RETRY_CAP_MS: u64 = 30_000;
const SEALED_RETRY_MAX_SHIFT: u32 = 8;
async fn do_tombstone_apply(
supertable: &Supertable,
wal_store: &WalStore,
wal_doc: &WalStateDoc,
wal_etag: &Etag,
) -> Result<(TombstonePhaseOutcome, WalStateDoc, Etag), TombstonePhaseError> {
let inner = supertable.inner();
if inner.options.storage.is_none() {
return Err(TombstonePhaseError::NoStorageAttached);
}
let mut wal_cur = wal_doc.clone();
let mut etag_cur = wal_etag.clone();
for idx in 0..wal_cur.tombstone_progress.len() {
if wal_cur.tombstone_progress[idx].outcome != TombstoneOutcome::Pending {
continue;
}
let target_id = wal_cur.tombstone_progress[idx].target_id;
let (outcome, in_sf) = resolve_and_tombstone_one(inner, wal_store, target_id).await?;
wal_cur.tombstone_progress[idx].outcome = outcome;
wal_cur.tombstone_progress[idx].tombstoned_in_superfile = in_sf;
if let (Some(sf), Some(cache)) = (in_sf, inner.tombstone_cache.as_ref()) {
cache.invalidate(sf);
}
etag_cur = wal_store
.update_with_etag(wal_cur.wal_id, &etag_cur, &wal_cur)
.await?;
}
wal_cur.state = WalState::Complete;
etag_cur = wal_store
.update_with_etag(wal_cur.wal_id, &etag_cur, &wal_cur)
.await?;
let (n_tombstoned, n_not_found) = count_outcomes(&wal_cur.tombstone_progress);
Ok((
TombstonePhaseOutcome::Applied {
n_tombstoned,
n_not_found,
},
wal_cur,
etag_cur,
))
}
async fn resolve_and_tombstone_one(
inner: &Arc<SupertableInner>,
wal_store: &WalStore,
target_id: RowId,
) -> Result<(TombstoneOutcome, Option<Uuid>), TombstonePhaseError> {
let mut sealed_attempts = 0u32;
loop {
let manifest = inner.manifest.load_full();
let resolved = resolve_target_id_in_manifest(inner, &manifest, target_id)?;
let Some((superfile_id, doc_id)) = resolved else {
return Ok((TombstoneOutcome::NotFound, None));
};
match cas_tombstone_bit(wal_store, superfile_id, doc_id).await? {
SidecarCasOutcome::Landed => {
return Ok((TombstoneOutcome::Tombstoned, Some(superfile_id)));
}
SidecarCasOutcome::Sealed => {
sealed_attempts += 1;
if sealed_attempts > MAX_SEALED_RETRIES {
return Err(TombstonePhaseError::SealedSidecarRetryExhausted {
target_id: target_id.to_hex(),
});
}
let ms = SEALED_RETRY_BASE_MS
.saturating_mul(1u64 << (sealed_attempts - 1).min(SEALED_RETRY_MAX_SHIFT))
.min(SEALED_RETRY_CAP_MS);
sleep(Duration::from_millis(ms)).await;
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SidecarCasOutcome {
Landed,
Sealed,
}
async fn cas_tombstone_bit(
wal_store: &WalStore,
superfile_id: Uuid,
doc_id: u32,
) -> Result<SidecarCasOutcome, TombstonePhaseError> {
for _attempt in 0..MAX_CAS_RETRIES {
let (existing, etag_opt) = match wal_store.get_tombstones(superfile_id).await? {
Some((sc, etag)) => (Some(sc), Some(etag)),
None => (None, None),
};
if let Some(sc) = &existing
&& sc.seal.is_some()
{
return Ok(SidecarCasOutcome::Sealed);
}
let bitmap = match existing {
Some(sc) => {
let mut b = sc.bitmap;
b.insert(doc_id);
b
}
None => {
let mut b = RoaringBitmap::new();
b.insert(doc_id);
b
}
};
let new_sidecar = TombstonesSidecar { seal: None, bitmap };
match wal_store
.put_tombstones(superfile_id, etag_opt.as_ref(), &new_sidecar)
.await
{
Ok(_new_etag) => return Ok(SidecarCasOutcome::Landed),
Err(WalStoreError::CasFailed { .. }) => {
continue;
}
Err(other) => return Err(other.into()),
}
}
Err(TombstonePhaseError::CasRetryExhausted {
superfile_id,
attempts: MAX_CAS_RETRIES,
})
}
fn resolve_target_id_in_manifest(
inner: &Arc<SupertableInner>,
manifest: &ManifestSnapshot,
target_id: RowId,
) -> Result<Option<(Uuid, u32)>, TombstonePhaseError> {
let target = target_id.0;
for entry in manifest.get_all_superfiles().iter() {
if target < entry.id_min || target > entry.id_max {
continue;
}
let reader = match bridge_sync_to_async(superfile_reader(
&inner.options.store,
inner.options.disk_cache.as_ref(),
inner.options.storage.as_ref(),
&entry.uri,
entry.subsection_offsets.as_ref(),
)) {
Ok(r) => r,
Err(_) => {
let bytes =
fetch_superfile_bytes_for_id_scan(inner, entry.uri.0).map_err(|message| {
TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!(
"open superfile {} (storage fallback): {message}",
entry.uri.0
),
}
})?;
Arc::new(SuperfileReader::open(bytes).map_err(|e| {
TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!(
"SuperfileReader::open {} (storage fallback): {e}",
entry.uri.0
),
}
})?)
}
};
let lookup_result = match reader.id_lookup(target) {
Ok(result) => result,
Err(ReadError::Io(_)) => {
let bytes =
fetch_superfile_bytes_for_id_scan(inner, entry.uri.0).map_err(|message| {
TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!(
"open superfile {} (eager fallback for id_lookup): {message}",
entry.uri.0
),
}
})?;
let eager_reader = SuperfileReader::open(bytes).map_err(|e| {
TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!(
"SuperfileReader::open {} (eager fallback for id_lookup): {e}",
entry.uri.0
),
}
})?;
eager_reader
.id_lookup(target)
.map_err(|e| TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!("id_lookup in superfile {}: {e}", entry.uri.0),
})?
}
Err(e) => {
return Err(TombstonePhaseError::IdLookupFailed {
target_id: target_id.to_hex(),
message: format!("id_lookup in superfile {}: {e}", entry.uri.0),
});
}
};
if let Some(doc_id) = lookup_result {
return Ok(Some((entry.superfile_id, doc_id)));
}
}
Ok(None)
}
fn fetch_superfile_bytes_for_id_scan(
inner: &Arc<SupertableInner>,
superfile_id: Uuid,
) -> Result<Bytes, String> {
let storage = inner
.options
.storage
.as_ref()
.ok_or_else(|| "no storage attached".to_string())?
.clone();
let path = SuperfileUri(superfile_id).storage_path();
let (bytes, _) = bridge_sync_to_async(async move { storage.get(&path).await })
.map_err(|e| format!("storage get: {e}"))?;
Ok(bytes)
}
#[cfg(test)]
mod tests {
use arrow::ipc::writer::StreamWriter;
use chrono::Utc;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
supertable::{
Supertable,
manifest::ManifestSnapshot,
wal::{
state_doc::{
OpKind, RowId, SCHEMA_VERSION, SealRecord, TombstoneEntry, TombstoneOutcome,
WalId, WalState,
},
tombstones_codec::TombstonesSidecar,
},
},
test_helpers::{build_title_batch, default_supertable_options},
};
async fn fixture() -> (TempDir, Supertable, WalStore, WalStateDoc, Etag) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let supertable =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let wal_store = WalStore::new(Arc::clone(&storage));
let wal_id = WalId(42);
let wal_doc = WalStateDoc {
wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "_id = 1".into(),
target_ids: vec![RowId(1)],
new_row_count: Some(1),
new_row_content_hash: Some("0".repeat(64)),
preallocated_superfile_id: Some(Uuid::from_u128(0x1234_5678_9ABC)),
minted_id_spans: vec![IdSpan {
first: RowId(100),
last: RowId(100),
}],
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
};
let etag = wal_store.create(&wal_doc).await.expect("wal create");
(dir, supertable, wal_store, wal_doc, etag)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn rejects_delete_wal_with_typed_error() {
let (_dir, st, ws, mut wal, etag) = fixture().await;
wal.op_kind = OpKind::Delete;
let err = run_append_phase(&st, &ws, &wal, &etag)
.await
.expect_err("must error");
assert!(matches!(err, AppendPhaseError::NotAnUpdateWal), "{err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn rejects_wal_missing_preallocated_superfile_id() {
let (_dir, st, ws, mut wal, etag) = fixture().await;
wal.preallocated_superfile_id = None;
let err = run_append_phase(&st, &ws, &wal, &etag)
.await
.expect_err("must error");
assert!(
matches!(
err,
AppendPhaseError::MissingField {
field: "preallocated_superfile_id"
}
),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn manifest_contains_returns_true_for_matching_uuid() {
let (_dir, _st, _ws, _wal, _etag) = fixture().await;
let opts = Arc::new(default_supertable_options());
let empty = ManifestSnapshot::empty(Arc::clone(&opts));
assert!(!manifest_contains(&empty, Uuid::nil()));
}
fn encode_ipc(batch: &RecordBatch) -> Bytes {
let mut out: Vec<u8> = Vec::new();
{
let mut writer =
StreamWriter::try_new(&mut out, &batch.schema()).expect("ipc writer init");
writer.write(batch).expect("ipc write");
writer.finish().expect("ipc finish");
}
Bytes::from(out)
}
async fn fixture_with_ipc_payload(
titles: &[&str],
wal_id_value: i128,
minted_first: i128,
) -> (TempDir, Supertable, WalStore, WalStateDoc, Etag) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let supertable =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let wal_store = WalStore::new(Arc::clone(&storage));
let user_batch = build_title_batch(titles);
let ipc_bytes = encode_ipc(&user_batch);
let content_hash = blake3::hash(&ipc_bytes).to_hex().to_string();
let n = titles.len() as u32;
let wal_id = WalId(wal_id_value);
wal_store
.put_arrow(wal_id, ipc_bytes)
.await
.expect("put_arrow");
let wal_doc = WalStateDoc {
wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "set up by test".into(),
target_ids: (0..n).map(|i| RowId(1000 + i as i128)).collect(),
new_row_count: Some(n),
new_row_content_hash: Some(content_hash),
preallocated_superfile_id: Some(Uuid::from_u128(0xDEAD_BEEF_CAFE)),
minted_id_spans: vec![IdSpan {
first: RowId(minted_first),
last: RowId(minted_first + (n as i128) - 1),
}],
tombstone_progress: (0..n)
.map(|i| TombstoneEntry {
target_id: RowId(1000 + i as i128),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
})
.collect(),
};
let etag = wal_store.create(&wal_doc).await.expect("wal create");
(dir, supertable, wal_store, wal_doc, etag)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn end_to_end_appends_superfile_and_advances_state() {
let (_dir, st, ws, wal, etag) =
fixture_with_ipc_payload(&["alpha bravo", "charlie delta"], 7, 5_000).await;
let pre_uuid = wal.preallocated_superfile_id.expect("set in fixture");
let (outcome, new_wal, new_etag) = run_append_phase(&st, &ws, &wal, &etag)
.await
.expect("append phase");
assert_eq!(outcome, AppendPhaseOutcome::Applied);
assert_eq!(new_wal.state, WalState::Appended);
assert_ne!(new_etag, etag, "etag must advance after the state change");
let manifest = st.inner().manifest.load_full();
assert!(
manifest_contains(&manifest, pre_uuid),
"manifest must reference the new superfile"
);
let (read_back, read_etag) = ws.read(wal.wal_id).await.expect("read back");
assert_eq!(read_back.state, WalState::Appended);
assert_eq!(read_etag, new_etag);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn idempotent_replay_short_circuits_to_already_applied() {
let (_dir, st, ws, wal, etag) =
fixture_with_ipc_payload(&["alpha", "beta"], 11, 6_000).await;
let (first_outcome, after_first, etag_after_first) =
run_append_phase(&st, &ws, &wal, &etag)
.await
.expect("first");
assert_eq!(first_outcome, AppendPhaseOutcome::Applied);
assert_eq!(after_first.state, WalState::Appended);
let (second_outcome, after_second, etag_after_second) =
run_append_phase(&st, &ws, &after_first, &etag_after_first)
.await
.expect("second");
assert_eq!(second_outcome, AppendPhaseOutcome::AlreadyApplied);
assert_eq!(after_second.state, WalState::Appended);
assert_eq!(etag_after_second, etag_after_first);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn intent_already_committed_advances_state_without_rebuilding() {
let (_dir, st, ws, wal, etag) = fixture_with_ipc_payload(&["recovery"], 13, 7_000).await;
let pre_uuid = wal.preallocated_superfile_id.expect("set");
let (_outcome, _new_wal, _new_etag) =
run_append_phase(&st, &ws, &wal, &etag).await.expect("seed");
let mut intent_wal = wal.clone();
intent_wal.state = WalState::Intent;
let intent_etag = ws
.update_with_etag(
wal.wal_id,
&ws.read(wal.wal_id).await.expect("read").1,
&intent_wal,
)
.await
.expect("reset");
let (outcome, recovered, recovered_etag) =
run_append_phase(&st, &ws, &intent_wal, &intent_etag)
.await
.expect("recovered");
assert_eq!(outcome, AppendPhaseOutcome::AlreadyApplied);
assert_eq!(recovered.state, WalState::Appended);
assert_ne!(recovered_etag, intent_etag);
assert!(
manifest_contains(&st.inner().manifest.load_full(), pre_uuid),
"manifest still references the superfile we appended in the seed"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn replay_produces_bit_identical_superfile_bytes() {
let (_dir1, st1, ws1, wal, etag1) =
fixture_with_ipc_payload(&["determinism check"], 17, 8_000).await;
let (_o, _new_wal, _new_etag) = run_append_phase(&st1, &ws1, &wal, &etag1)
.await
.expect("first run");
let manifest1 = st1.inner().manifest.load_full();
let pre_uuid = wal.preallocated_superfile_id.expect("set");
let entry1 = manifest1
.get_all_superfiles()
.iter()
.find(|e| e.uri.0 == pre_uuid)
.expect("entry");
let storage1 = st1.inner().options.storage.as_ref().expect("storage");
let path = entry1.uri.storage_path();
let (bytes1, _) = storage1.get(&path).await.expect("get bytes");
let (_dir2, st2, ws2, wal2, etag2) =
fixture_with_ipc_payload(&["determinism check"], 17, 8_000).await;
run_append_phase(&st2, &ws2, &wal2, &etag2)
.await
.expect("second run");
let storage2 = st2.inner().options.storage.as_ref().expect("storage");
let (bytes2, _) = storage2.get(&path).await.expect("get bytes");
assert_eq!(
bytes1, bytes2,
"two independent runs with identical WAL inputs must produce \
bit-identical superfile bytes — this is the replay-safety \
invariant"
);
let manifest2 = st2.inner().manifest.load_full();
let entry2 = manifest2
.get_all_superfiles()
.iter()
.find(|e| e.uri.0 == pre_uuid)
.expect("entry");
assert_eq!(entry1.n_docs, entry2.n_docs);
assert_eq!(entry1.id_min, entry2.id_min);
assert_eq!(entry1.id_max, entry2.id_max);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn corrupt_ipc_payload_surfaces_typed_hash_mismatch() {
let (_dir, st, ws, mut wal, etag) = fixture_with_ipc_payload(&["foo"], 19, 9_000).await;
wal.new_row_content_hash = Some("ff".repeat(32));
let bad_etag = ws
.update_with_etag(wal.wal_id, &etag, &wal)
.await
.expect("re-cas with bad hash");
let err = run_append_phase(&st, &ws, &wal, &bad_etag)
.await
.expect_err("must error on hash mismatch");
assert!(
matches!(err, AppendPhaseError::SidecarContentHashMismatch { .. }),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn append_phase_without_storage_is_rejected() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options()).expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let user_batch = build_title_batch(&["x"]);
let ipc_bytes = encode_ipc(&user_batch);
let content_hash = blake3::hash(&ipc_bytes).to_hex().to_string();
let wal_id = WalId(701);
ws.put_arrow(wal_id, ipc_bytes).await.expect("put_arrow");
let wal_doc = WalStateDoc {
wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "no storage".into(),
target_ids: vec![RowId(1)],
new_row_count: Some(1),
new_row_content_hash: Some(content_hash),
preallocated_superfile_id: Some(Uuid::from_u128(0x7070)),
minted_id_spans: vec![IdSpan {
first: RowId(1),
last: RowId(1),
}],
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
};
let etag = ws.create(&wal_doc).await.expect("create");
let err = run_append_phase(&st, &ws, &wal_doc, &etag)
.await
.expect_err("must error without storage");
assert!(
matches!(err, AppendPhaseError::NoStorageAttached),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn append_phase_missing_new_row_count_is_rejected() {
let (_dir, st, ws, mut wal, etag) = fixture_with_ipc_payload(&["foo"], 711, 1_000).await;
wal.new_row_count = None;
let bad_etag = ws
.update_with_etag(wal.wal_id, &etag, &wal)
.await
.expect("re-cas");
let err = run_append_phase(&st, &ws, &wal, &bad_etag)
.await
.expect_err("must error");
assert!(
matches!(
err,
AppendPhaseError::MissingField {
field: "new_row_count"
}
),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn append_phase_id_span_count_mismatch_is_rejected() {
let (_dir, st, ws, mut wal, etag) = fixture_with_ipc_payload(&["foo"], 721, 1_000).await;
wal.minted_id_spans = vec![IdSpan {
first: RowId(1),
last: RowId(2), }];
let bad_etag = ws
.update_with_etag(wal.wal_id, &etag, &wal)
.await
.expect("re-cas");
let err = run_append_phase(&st, &ws, &wal, &bad_etag)
.await
.expect_err("must error");
assert!(
matches!(
err,
AppendPhaseError::IdSpansLengthMismatch {
flat_len: 2,
expected: 1,
..
}
),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn append_phase_corrupt_ipc_surfaces_decode_error() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let wal_id = WalId(731);
let garbage = Bytes::from_static(b"this is not arrow ipc");
let content_hash = blake3::hash(&garbage).to_hex().to_string();
ws.put_arrow(wal_id, garbage).await.expect("put_arrow");
let wal_doc = WalStateDoc {
wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "corrupt ipc".into(),
target_ids: vec![RowId(1)],
new_row_count: Some(1),
new_row_content_hash: Some(content_hash),
preallocated_superfile_id: Some(Uuid::from_u128(0x7373)),
minted_id_spans: vec![IdSpan {
first: RowId(1),
last: RowId(1),
}],
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
};
let etag = ws.create(&wal_doc).await.expect("create");
let err = run_append_phase(&st, &ws, &wal_doc, &etag)
.await
.expect_err("must error on bad ipc");
assert!(matches!(err, AppendPhaseError::IpcDecode { .. }), "{err:?}");
}
#[test]
fn flatten_spans_empty_is_empty() {
assert!(flatten_spans(&[]).is_empty());
}
#[test]
fn flatten_spans_concatenates_inclusive_ranges_in_order() {
let spans = vec![
IdSpan {
first: RowId(10),
last: RowId(12),
},
IdSpan {
first: RowId(100),
last: RowId(100),
},
];
let flat = flatten_spans(&spans);
assert_eq!(flat, vec![10i128, 11, 12, 100]);
}
async fn tombstone_fixture(
op_kind: OpKind,
state: WalState,
progress: Vec<TombstoneEntry>,
) -> (TempDir, Supertable, WalStore, WalStateDoc, Etag) {
let (dir, st, ws, mut wal, etag) = fixture().await;
wal.op_kind = op_kind;
wal.state = state;
wal.tombstone_progress = progress;
let new_etag = if wal.state != WalState::Intent
|| wal.op_kind != OpKind::Update
|| wal.tombstone_progress.len() != 1
{
ws.update_with_etag(wal.wal_id, &etag, &wal)
.await
.expect("re-cas fixture")
} else {
etag
};
(dir, st, ws, wal, new_etag)
}
fn ts_entry(target_id: i128, outcome: TombstoneOutcome) -> TombstoneEntry {
TombstoneEntry {
target_id: RowId(target_id),
outcome,
tombstoned_in_superfile: None,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_rejects_update_wal_in_intent_state() {
let (_dir, st, ws, wal, etag) = tombstone_fixture(
OpKind::Update,
WalState::Intent,
vec![ts_entry(1, TombstoneOutcome::Pending)],
)
.await;
let err = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect_err("must error");
assert!(
matches!(
err,
TombstonePhaseError::InvalidPreState {
op_kind: OpKind::Update,
state: WalState::Intent
}
),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_rejects_delete_wal_in_appended_state() {
let (_dir, st, ws, wal, etag) = tombstone_fixture(
OpKind::Delete,
WalState::Appended,
vec![ts_entry(2, TombstoneOutcome::Pending)],
)
.await;
let err = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect_err("must error");
assert!(
matches!(
err,
TombstonePhaseError::InvalidPreState {
op_kind: OpKind::Delete,
state: WalState::Appended
}
),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_on_complete_wal_is_noop_with_existing_counts() {
let progress = vec![
ts_entry(10, TombstoneOutcome::Tombstoned),
ts_entry(11, TombstoneOutcome::Tombstoned),
ts_entry(12, TombstoneOutcome::NotFound),
];
let (_dir, st, ws, wal, etag) =
tombstone_fixture(OpKind::Update, WalState::Complete, progress).await;
let (outcome, returned_wal, returned_etag) = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("ok");
assert_eq!(
outcome,
TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned: 2,
n_not_found: 1,
}
);
assert_eq!(returned_wal.state, WalState::Complete);
assert_eq!(returned_etag, etag, "etag must not advance on no-op");
let (read_back, read_etag) = ws.read(wal.wal_id).await.expect("read back");
assert_eq!(read_back.state, WalState::Complete);
assert_eq!(read_etag, etag);
}
#[test]
fn count_outcomes_sums_tombstoned_and_not_found_only() {
let progress = vec![
ts_entry(1, TombstoneOutcome::Pending),
ts_entry(2, TombstoneOutcome::Tombstoned),
ts_entry(3, TombstoneOutcome::NotFound),
ts_entry(4, TombstoneOutcome::Tombstoned),
ts_entry(5, TombstoneOutcome::Pending),
];
let (n_tombstoned, n_not_found) = count_outcomes(&progress);
assert_eq!(n_tombstoned, 2);
assert_eq!(n_not_found, 1);
}
async fn published_superfile_fixture(
titles: &[&str],
minted_first: i128,
) -> (TempDir, Supertable, WalStore, Uuid, i128, i128) {
let (dir, st, ws, wal, etag) = fixture_with_ipc_payload(titles, 101, minted_first).await;
let pre_uuid = wal.preallocated_superfile_id.expect("set");
run_append_phase(&st, &ws, &wal, &etag)
.await
.expect("append phase");
let n = titles.len() as i128;
(dir, st, ws, pre_uuid, minted_first, minted_first + n - 1)
}
async fn create_delete_wal(
ws: &WalStore,
wal_id_value: i128,
target_ids: &[i128],
) -> (WalStateDoc, Etag) {
let wal_doc = WalStateDoc {
wal_id: WalId(wal_id_value),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "test delete".into(),
target_ids: target_ids.iter().map(|&v| RowId(v)).collect(),
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: target_ids
.iter()
.map(|&v| TombstoneEntry {
target_id: RowId(v),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
})
.collect(),
};
let etag = ws.create(&wal_doc).await.expect("wal create");
(wal_doc, etag)
}
async fn read_sidecar_bitmap(ws: &WalStore, superfile_id: Uuid) -> RoaringBitmap {
match ws.get_tombstones(superfile_id).await.expect("get") {
Some((sc, _etag)) => sc.bitmap,
None => RoaringBitmap::new(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_marks_single_resolved_target_as_tombstoned() {
let (_dir, st, ws, sf_id, id_min, _id_max) =
published_superfile_fixture(&["aa", "bb", "cc"], 10_000).await;
let (wal, etag) = create_delete_wal(&ws, 201, &[id_min + 1]).await;
let (outcome, new_wal, _new_etag) = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("ok");
assert_eq!(
outcome,
TombstonePhaseOutcome::Applied {
n_tombstoned: 1,
n_not_found: 0,
}
);
assert_eq!(new_wal.state, WalState::Complete);
assert_eq!(
new_wal.tombstone_progress[0].outcome,
TombstoneOutcome::Tombstoned
);
assert_eq!(
new_wal.tombstone_progress[0].tombstoned_in_superfile,
Some(sf_id)
);
let bitmap = read_sidecar_bitmap(&ws, sf_id).await;
assert_eq!(bitmap.len(), 1);
assert!(bitmap.contains(1u32));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_marks_unknown_target_as_not_found() {
let (_dir, st, ws, sf_id, _id_min, id_max) =
published_superfile_fixture(&["aa", "bb"], 20_000).await;
let (wal, etag) = create_delete_wal(&ws, 202, &[id_max + 100]).await;
let (outcome, new_wal, _) = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("ok");
assert_eq!(
outcome,
TombstonePhaseOutcome::Applied {
n_tombstoned: 0,
n_not_found: 1,
}
);
assert_eq!(new_wal.state, WalState::Complete);
assert_eq!(
new_wal.tombstone_progress[0].outcome,
TombstoneOutcome::NotFound
);
assert!(
new_wal.tombstone_progress[0]
.tombstoned_in_superfile
.is_none()
);
let bitmap = read_sidecar_bitmap(&ws, sf_id).await;
assert!(bitmap.is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_unions_multiple_targets_into_one_sidecar() {
let (_dir, st, ws, sf_id, id_min, _id_max) =
published_superfile_fixture(&["a", "b", "c", "d"], 30_000).await;
let (wal, etag) =
create_delete_wal(&ws, 203, &[id_min, id_min + 2, id_min + 3, id_min + 999]).await;
let (outcome, new_wal, _) = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("ok");
assert_eq!(
outcome,
TombstonePhaseOutcome::Applied {
n_tombstoned: 3,
n_not_found: 1,
}
);
assert_eq!(new_wal.state, WalState::Complete);
let bitmap = read_sidecar_bitmap(&ws, sf_id).await;
let collected: Vec<u32> = bitmap.iter().collect();
assert_eq!(collected, vec![0u32, 2, 3]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_is_idempotent_on_replay() {
let (_dir, st, ws, sf_id, id_min, _id_max) =
published_superfile_fixture(&["x", "y"], 40_000).await;
let (wal, etag) = create_delete_wal(&ws, 204, &[id_min]).await;
let (first_outcome, after_first, etag_after_first) =
run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("first");
assert_eq!(
first_outcome,
TombstonePhaseOutcome::Applied {
n_tombstoned: 1,
n_not_found: 0,
}
);
let bitmap_v1 = read_sidecar_bitmap(&ws, sf_id).await;
let (second_outcome, after_second, etag_after_second) =
run_tombstone_phase(&st, &ws, &after_first, &etag_after_first)
.await
.expect("second");
assert_eq!(
second_outcome,
TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned: 1,
n_not_found: 0,
}
);
assert_eq!(etag_after_second, etag_after_first);
assert_eq!(after_second.state, WalState::Complete);
let bitmap_v2 = read_sidecar_bitmap(&ws, sf_id).await;
assert_eq!(bitmap_v1, bitmap_v2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstone_phase_resumes_partial_progress_on_replay() {
let (_dir, st, ws, sf_id, id_min, _id_max) =
published_superfile_fixture(&["p", "q", "r"], 50_000).await;
let (mut wal, etag) = create_delete_wal(&ws, 205, &[id_min, id_min + 1, id_min + 2]).await;
wal.tombstone_progress[0].outcome = TombstoneOutcome::Tombstoned;
wal.tombstone_progress[0].tombstoned_in_superfile = Some(sf_id);
let etag = ws
.update_with_etag(wal.wal_id, &etag, &wal)
.await
.expect("pre-mark");
let (outcome, new_wal, _) = run_tombstone_phase(&st, &ws, &wal, &etag)
.await
.expect("ok");
assert_eq!(
outcome,
TombstonePhaseOutcome::Applied {
n_tombstoned: 3,
n_not_found: 0,
}
);
assert_eq!(new_wal.state, WalState::Complete);
let bitmap = read_sidecar_bitmap(&ws, sf_id).await;
let collected: Vec<u32> = bitmap.iter().collect();
assert_eq!(collected, vec![1u32, 2]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sealed_sidecar_surfaces_after_retry_budget() {
let (_dir, st, ws, sf_id, id_min, _id_max) =
published_superfile_fixture(&["seal"], 60_000).await;
let sealed = TombstonesSidecar {
seal: Some(SealRecord {
compaction_id: Uuid::from_u128(0xC0DE_C0DE),
sealed_at: Utc::now(),
}),
bitmap: RoaringBitmap::new(),
};
ws.put_tombstones(sf_id, None, &sealed)
.await
.expect("seed sealed sidecar");
let (wal, etag) = create_delete_wal(&ws, 206, &[id_min]).await;
let result = timeout(
Duration::from_millis(250),
run_tombstone_phase(&st, &ws, &wal, &etag),
)
.await;
assert!(
result.is_err(),
"expected the orchestrator to still be in sealed-retry; got {result:?}"
);
let (post, _post_etag) = ws.read(wal.wal_id).await.expect("read back");
assert_eq!(post.state, WalState::Intent);
assert_eq!(
post.tombstone_progress[0].outcome,
TombstoneOutcome::Pending
);
}
}