use std::sync::Arc;
use bytes::Bytes;
use thiserror::Error;
use crate::{
storage::{StorageError, StorageProvider},
supertable::wal::{
state_doc::{WalId, WalStateDoc},
tombstones_codec::{self, SidecarCodecError, TombstonesSidecar},
},
};
pub type Etag = String;
#[derive(Debug, Error)]
pub enum WalStoreError {
#[error("CAS failed for {path:?}: storage etag has advanced past expected")]
CasFailed { path: String },
#[error("WAL state doc already exists at {path:?}")]
AlreadyExists { path: String },
#[error("WAL state doc not found at {path:?}")]
NotFound { path: String },
#[error("storage error at {path:?}: {source}")]
Storage {
path: String,
#[source]
source: StorageError,
},
#[error("state-doc serde error at {path:?}: {source}")]
Serde {
path: String,
#[source]
source: serde_json::Error,
},
#[error("sidecar codec error at {path:?}: {source}")]
SidecarCodec {
path: String,
#[source]
source: SidecarCodecError,
},
#[error("sidecar content hash mismatch at {path:?}: expected {expected:?}, got {got:?}")]
SidecarContentHashMismatch {
path: String,
expected: String,
got: String,
},
}
const WAL_DIR: &str = "wal/mutations";
const STATE_EXT: &str = "json";
const ARROW_EXT: &str = "arrow";
const SUPERFILES_DIR: &str = "superfiles";
const TOMBSTONES_EXT: &str = "tombstones";
#[derive(Debug, Clone)]
pub struct WalStore {
storage: Arc<dyn StorageProvider>,
}
impl WalStore {
pub fn new(storage: Arc<dyn StorageProvider>) -> Self {
Self { storage }
}
fn state_path(wal_id: WalId) -> String {
format!("{WAL_DIR}/{}.{STATE_EXT}", wal_id.to_hex())
}
fn arrow_path(wal_id: WalId) -> String {
format!("{WAL_DIR}/{}.{ARROW_EXT}", wal_id.to_hex())
}
fn tombstones_path(superfile_id: uuid::Uuid) -> String {
format!("{SUPERFILES_DIR}/{superfile_id}.{TOMBSTONES_EXT}")
}
pub async fn list_wal_ids(&self) -> Result<Vec<WalId>, WalStoreError> {
let uris = self
.storage
.list_with_prefix(WAL_DIR)
.await
.map_err(|source| WalStoreError::Storage {
path: WAL_DIR.into(),
source,
})?;
let suffix = format!(".{STATE_EXT}");
let mut out: Vec<WalId> = Vec::new();
for uri in uris {
let filename = match uri.rsplit_once('/') {
Some((_, fname)) => fname,
None => uri.as_str(),
};
let Some(stem) = filename.strip_suffix(&suffix) else {
continue;
};
let Ok(id) = WalId::from_hex(stem) else {
continue;
};
out.push(id);
}
out.sort_unstable_by_key(|w| w.0);
Ok(out)
}
pub async fn list_tombstone_ids(&self) -> Result<Vec<uuid::Uuid>, WalStoreError> {
let uris = self
.storage
.list_with_prefix(SUPERFILES_DIR)
.await
.map_err(|source| WalStoreError::Storage {
path: SUPERFILES_DIR.into(),
source,
})?;
let suffix = format!(".{TOMBSTONES_EXT}");
let mut out = Vec::new();
for uri in uris {
let filename = match uri.rsplit_once('/') {
Some((_, fname)) => fname,
None => uri.as_str(),
};
let Some(stem) = filename.strip_suffix(&suffix) else {
continue;
};
let Ok(id) = uuid::Uuid::parse_str(stem) else {
continue;
};
out.push(id);
}
Ok(out)
}
pub async fn create(&self, state: &WalStateDoc) -> Result<Etag, WalStoreError> {
let path = Self::state_path(state.wal_id);
let body = serde_json::to_vec(state).map_err(|e| WalStoreError::Serde {
path: path.clone(),
source: e,
})?;
match self.storage.put_atomic(&path, Bytes::from(body)).await {
Ok(etag) => Ok(etag.unwrap_or_default()),
Err(StorageError::PreconditionFailed { .. }) => {
Err(WalStoreError::AlreadyExists { path })
}
Err(other) => Err(WalStoreError::Storage {
path,
source: other,
}),
}
}
pub async fn read(&self, wal_id: WalId) -> Result<(WalStateDoc, Etag), WalStoreError> {
let path = Self::state_path(wal_id);
let (bytes, meta) = match self.storage.get(&path).await {
Ok(pair) => pair,
Err(StorageError::NotFound { .. }) => {
return Err(WalStoreError::NotFound { path });
}
Err(other) => {
return Err(WalStoreError::Storage {
path,
source: other,
});
}
};
let state: WalStateDoc =
serde_json::from_slice(&bytes).map_err(|e| WalStoreError::Serde {
path: path.clone(),
source: e,
})?;
Ok((state, meta.etag.unwrap_or_default()))
}
pub async fn update_with_etag(
&self,
wal_id: WalId,
expected_etag: &Etag,
new_state: &WalStateDoc,
) -> Result<Etag, WalStoreError> {
let path = Self::state_path(wal_id);
let body = serde_json::to_vec(new_state).map_err(|e| WalStoreError::Serde {
path: path.clone(),
source: e,
})?;
let expected_opt = if expected_etag.is_empty() {
None
} else {
Some(expected_etag.as_str())
};
match self
.storage
.put_if_match(&path, Bytes::from(body), expected_opt)
.await
{
Ok(etag) => Ok(etag.unwrap_or_default()),
Err(StorageError::PreconditionFailed { .. }) => Err(WalStoreError::CasFailed { path }),
Err(StorageError::NotFound { .. }) => {
Err(WalStoreError::CasFailed { path })
}
Err(other) => Err(WalStoreError::Storage {
path,
source: other,
}),
}
}
pub async fn delete_state(&self, wal_id: WalId) -> Result<(), WalStoreError> {
let path = Self::state_path(wal_id);
self.storage
.delete(&path)
.await
.map_err(|source| WalStoreError::Storage {
path: path.clone(),
source,
})?;
Ok(())
}
pub async fn delete_arrow(&self, wal_id: WalId) -> Result<(), WalStoreError> {
let path = Self::arrow_path(wal_id);
self.storage
.delete(&path)
.await
.map_err(|source| WalStoreError::Storage {
path: path.clone(),
source,
})?;
Ok(())
}
pub async fn put_arrow(&self, wal_id: WalId, bytes: Bytes) -> Result<(), WalStoreError> {
let path = Self::arrow_path(wal_id);
match self.storage.put_atomic(&path, bytes).await {
Ok(_) => Ok(()),
Err(StorageError::PreconditionFailed { .. }) => Ok(()),
Err(source) => Err(WalStoreError::Storage { path, source }),
}
}
pub async fn get_arrow(
&self,
wal_id: WalId,
expected_blake3_hex: Option<&str>,
) -> Result<Bytes, WalStoreError> {
let path = Self::arrow_path(wal_id);
let (bytes, _) =
self.storage
.get(&path)
.await
.map_err(|source| WalStoreError::Storage {
path: path.clone(),
source,
})?;
if let Some(want) = expected_blake3_hex {
let got = blake3::hash(&bytes).to_hex().to_string();
if got != want {
return Err(WalStoreError::SidecarContentHashMismatch {
path,
expected: want.to_string(),
got,
});
}
}
Ok(bytes)
}
pub async fn get_tombstones(
&self,
superfile_id: uuid::Uuid,
) -> Result<Option<(TombstonesSidecar, Etag)>, WalStoreError> {
let path = Self::tombstones_path(superfile_id);
let (bytes, meta) = match self.storage.get(&path).await {
Ok(pair) => pair,
Err(StorageError::NotFound { .. }) => return Ok(None),
Err(other) => {
return Err(WalStoreError::Storage {
path,
source: other,
});
}
};
let sidecar = tombstones_codec::decode_sidecar(&bytes).map_err(|source| {
WalStoreError::SidecarCodec {
path: path.clone(),
source,
}
})?;
Ok(Some((sidecar, meta.etag.unwrap_or_default())))
}
pub async fn put_tombstones(
&self,
superfile_id: uuid::Uuid,
expected_etag: Option<&Etag>,
sidecar: &TombstonesSidecar,
) -> Result<Etag, WalStoreError> {
let path = Self::tombstones_path(superfile_id);
let bytes = tombstones_codec::encode_sidecar(sidecar).map_err(|source| {
WalStoreError::SidecarCodec {
path: path.clone(),
source,
}
})?;
let expected_opt =
expected_etag.and_then(|e| if e.is_empty() { None } else { Some(e.as_str()) });
match self
.storage
.put_if_match(&path, Bytes::from(bytes), expected_opt)
.await
{
Ok(etag) => Ok(etag.unwrap_or_default()),
Err(StorageError::PreconditionFailed { .. }) => Err(WalStoreError::CasFailed { path }),
Err(other) => Err(WalStoreError::Storage {
path,
source: other,
}),
}
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use tempfile::TempDir;
use super::*;
use crate::{
storage::LocalFsStorageProvider,
supertable::wal::state_doc::{
OpKind, RowId, SCHEMA_VERSION, TombstoneEntry, TombstoneOutcome, WalState,
},
};
fn store() -> (TempDir, WalStore) {
let dir = TempDir::new().expect("tempdir");
let provider: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
(dir, WalStore::new(provider))
}
fn sample_state(wal_id: i128) -> WalStateDoc {
WalStateDoc {
wal_id: WalId(wal_id),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "for tests".into(),
target_ids: vec![RowId(1), RowId(2)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![
TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
},
TombstoneEntry {
target_id: RowId(2),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
},
],
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn create_then_read_roundtrips_state() {
let (_dir, ws) = store();
let state = sample_state(7);
let etag = ws.create(&state).await.expect("create");
let (read_state, read_etag) = ws.read(state.wal_id).await.expect("read");
assert_eq!(read_state, state);
assert_eq!(read_etag, etag);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn read_missing_returns_not_found() {
let (_dir, ws) = store();
let err = ws.read(WalId(9999)).await.expect_err("must error");
assert!(matches!(err, WalStoreError::NotFound { .. }), "{err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn create_twice_fails_with_already_exists() {
let (_dir, ws) = store();
let state = sample_state(11);
ws.create(&state).await.expect("first create");
let err = ws.create(&state).await.expect_err("second must fail");
assert!(
matches!(err, WalStoreError::AlreadyExists { .. }),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn update_with_correct_etag_advances_state() {
let (_dir, ws) = store();
let mut state = sample_state(13);
let e1 = ws.create(&state).await.expect("create");
state.state = WalState::Appended;
let e2 = ws
.update_with_etag(state.wal_id, &e1, &state)
.await
.expect("update");
assert_ne!(e1, e2, "etag must advance after a real write");
let (read_state, read_etag) = ws.read(state.wal_id).await.expect("read");
assert_eq!(read_state.state, WalState::Appended);
assert_eq!(read_etag, e2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn update_with_stale_etag_fails_cas() {
let (_dir, ws) = store();
let mut state = sample_state(17);
let e1 = ws.create(&state).await.expect("create");
state.state = WalState::Appended;
let _ = ws
.update_with_etag(state.wal_id, &e1, &state)
.await
.expect("first update");
state.state = WalState::Complete;
let err = ws
.update_with_etag(state.wal_id, &e1, &state)
.await
.expect_err("stale etag must lose");
assert!(matches!(err, WalStoreError::CasFailed { .. }), "{err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn concurrent_updates_have_exactly_one_winner() {
let (_dir, ws) = store();
let state = sample_state(23);
let initial_etag = ws.create(&state).await.expect("create");
const N: usize = 12;
let ws = Arc::new(ws);
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let ws = Arc::clone(&ws);
let etag = initial_etag.clone();
let mut s = state.clone();
s.state = WalState::Appended;
s.predicate_repr = format!("racer {i}");
handles.push(tokio::spawn(async move {
ws.update_with_etag(s.wal_id, &etag, &s).await
}));
}
let mut ok_count = 0usize;
let mut cas_failed = 0usize;
let mut other_err = 0usize;
for h in handles {
match h.await.expect("join") {
Ok(_) => ok_count += 1,
Err(WalStoreError::CasFailed { .. }) => cas_failed += 1,
Err(_) => other_err += 1,
}
}
assert_eq!(ok_count, 1, "exactly one task must win CAS");
assert_eq!(cas_failed, N - 1, "all losers must report CasFailed");
assert_eq!(other_err, 0, "no spurious failures");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn arrow_sidecar_round_trips_with_hash_verify() {
let (_dir, ws) = store();
let wal_id = WalId(29);
let payload = Bytes::from_static(b"hello-payload");
ws.put_arrow(wal_id, payload.clone()).await.expect("put");
let hash = blake3::hash(&payload).to_hex().to_string();
let got = ws
.get_arrow(wal_id, Some(&hash))
.await
.expect("hashed read");
assert_eq!(got, payload);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn arrow_sidecar_overwrite_is_legal() {
let (_dir, ws) = store();
let wal_id = WalId(31);
ws.put_arrow(wal_id, Bytes::from_static(b"first"))
.await
.expect("first");
ws.put_arrow(wal_id, Bytes::from_static(b"first"))
.await
.expect("idempotent re-write");
ws.put_arrow(wal_id, Bytes::from_static(b"second"))
.await
.expect("replacement");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn arrow_sidecar_hash_mismatch_surfaces_typed_error() {
let (_dir, ws) = store();
let wal_id = WalId(37);
ws.put_arrow(wal_id, Bytes::from_static(b"actual"))
.await
.expect("put");
let bogus_hash = "00".repeat(32);
let err = ws
.get_arrow(wal_id, Some(&bogus_hash))
.await
.expect_err("hash check must fail");
match err {
WalStoreError::SidecarContentHashMismatch { expected, got, .. } => {
assert_eq!(expected, bogus_hash);
assert_ne!(got, bogus_hash);
}
other => panic!("expected SidecarContentHashMismatch; got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstones_sidecar_absent_returns_none() {
let (_dir, ws) = store();
let got = ws.get_tombstones(uuid::Uuid::nil()).await.expect("query");
assert!(got.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstones_sidecar_roundtrips_through_storage() {
let (_dir, ws) = store();
let superfile_id = uuid::Uuid::from_u128(0xCAFE_BABE_DEAD_BEEF);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(7);
bitmap.insert(42);
let sidecar = TombstonesSidecar {
seal: None,
bitmap: bitmap.clone(),
};
let etag1 = ws
.put_tombstones(superfile_id, None, &sidecar)
.await
.expect("first put");
let (got, etag_read) = ws
.get_tombstones(superfile_id)
.await
.expect("get")
.expect("present");
assert!(got.seal.is_none());
let got_ids: Vec<u32> = got.bitmap.iter().collect();
let expected_ids: Vec<u32> = bitmap.iter().collect();
assert_eq!(got_ids, expected_ids);
assert_eq!(etag_read, etag1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tombstones_sidecar_stale_etag_fails_cas() {
let (_dir, ws) = store();
let superfile_id = uuid::Uuid::from_u128(0xFEED_FACE_BEEF_CAFE);
let initial = TombstonesSidecar {
seal: None,
bitmap: roaring::RoaringBitmap::new(),
};
let etag1 = ws
.put_tombstones(superfile_id, None, &initial)
.await
.expect("first put");
let mut bumped_bitmap = roaring::RoaringBitmap::new();
bumped_bitmap.insert(3);
let bumped = TombstonesSidecar {
seal: None,
bitmap: bumped_bitmap,
};
let _etag2 = ws
.put_tombstones(superfile_id, Some(&etag1), &bumped)
.await
.expect("update");
let err = ws
.put_tombstones(superfile_id, Some(&etag1), &bumped)
.await
.expect_err("stale etag");
assert!(matches!(err, WalStoreError::CasFailed { .. }), "{err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn delete_state_is_idempotent() {
let (_dir, ws) = store();
let state = sample_state(41);
ws.create(&state).await.expect("create");
ws.delete_state(state.wal_id).await.expect("first delete");
ws.delete_state(state.wal_id).await.expect("second delete");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn delete_arrow_is_idempotent_and_removes_sidecar() {
let (_dir, ws) = store();
let wal_id = WalId(43);
ws.put_arrow(wal_id, Bytes::from_static(b"payload"))
.await
.expect("put_arrow");
ws.delete_arrow(wal_id).await.expect("first delete");
ws.get_arrow(wal_id, None)
.await
.expect_err("sidecar must be gone after delete");
ws.delete_arrow(wal_id).await.expect("second delete");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn list_wal_ids_returns_only_state_docs_sorted_ascending() {
let (_dir, ws) = store();
for id in [50i128, 10, 30] {
ws.create(&sample_state(id)).await.expect("create");
}
ws.put_arrow(WalId(99), Bytes::from_static(b"ignored"))
.await
.expect("put_arrow");
let ids = ws.list_wal_ids().await.expect("list");
let raw: Vec<i128> = ids.iter().map(|w| w.0).collect();
assert_eq!(raw, vec![10, 30, 50], "ascending oldest-first order");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn list_wal_ids_on_empty_prefix_is_empty() {
let (_dir, ws) = store();
let ids = ws.list_wal_ids().await.expect("list");
assert!(ids.is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn list_tombstone_ids_returns_superfiles_with_sidecars() {
let (_dir, ws) = store();
let a = uuid::Uuid::from_u128(0x1111);
let b = uuid::Uuid::from_u128(0x2222);
let empty = TombstonesSidecar {
seal: None,
bitmap: roaring::RoaringBitmap::new(),
};
ws.put_tombstones(a, None, &empty).await.expect("put a");
ws.put_tombstones(b, None, &empty).await.expect("put b");
let mut ids = ws.list_tombstone_ids().await.expect("list");
ids.sort();
let mut expected = vec![a, b];
expected.sort();
assert_eq!(ids, expected);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn list_tombstone_ids_on_empty_prefix_is_empty() {
let (_dir, ws) = store();
let ids = ws.list_tombstone_ids().await.expect("list");
assert!(ids.is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn put_tombstones_with_empty_expected_etag_is_create_only() {
let (_dir, ws) = store();
let superfile_id = uuid::Uuid::from_u128(0xABCD);
let mut bitmap = roaring::RoaringBitmap::new();
bitmap.insert(1);
let sidecar = TombstonesSidecar { seal: None, bitmap };
let empty_etag: Etag = String::new();
ws.put_tombstones(superfile_id, Some(&empty_etag), &sidecar)
.await
.expect("first create-only put lands");
let err = ws
.put_tombstones(superfile_id, Some(&empty_etag), &sidecar)
.await
.expect_err("second create-only put must lose CAS");
assert!(matches!(err, WalStoreError::CasFailed { .. }), "{err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn get_tombstones_surfaces_codec_error_on_corrupt_bytes() {
let (dir, ws) = store();
let superfile_id = uuid::Uuid::from_u128(0xDEAD);
let provider: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let path = WalStore::tombstones_path(superfile_id);
provider
.put_atomic(&path, Bytes::from_static(b"not a valid sidecar"))
.await
.expect("write garbage");
let err = ws
.get_tombstones(superfile_id)
.await
.expect_err("decode must fail");
assert!(matches!(err, WalStoreError::SidecarCodec { .. }), "{err:?}");
}
}