#![deny(clippy::unwrap_used)]
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::SystemTime,
};
use async_trait::async_trait;
use bytes::Bytes;
use infino::{
storage::{LocalFsStorageProvider, ObjectMeta, StorageError, StorageProvider},
supertable::{
CommitError, ManifestLoadError, ManifestSnapshot,
manifest::{
commit::{
self, MANIFEST_DIR, MANIFEST_PARTS_DIR, POINTER_PATH, PointerFile, manifest_uri,
part_uri, read_pointer, write_pointer,
},
list::{
FORMAT_VERSION as LIST_FORMAT_VERSION, Manifest, ManifestPartEntry,
PartitionStrategy,
},
part::{self as part_mod, ContentHash, ManifestPart, PartId},
},
},
test_helpers::default_supertable_options,
};
use tempfile::TempDir;
use tokio::sync::{Barrier, Mutex};
use uuid::Uuid;
const POINTER_ROUNDTRIP_MANIFEST_ID: u64 = 42;
const POINTER_FORWARD_COMPAT_MANIFEST_ID: u64 = 7;
const FIXTURE_CONTENT_HASH_BYTE: u8 = 0xab;
const DEFAULT_HASH_N_BUCKETS: u32 = 64;
const PARALLEL_PUT_COUNT_BEFORE_POINTER: usize = 2;
const EXPECTED_COMMIT_PUT_COUNT: usize = 3;
const PARALLEL_PUT_POLL_TIMEOUT_SECS: u64 = 5;
const PARALLEL_PUT_POLL_INTERVAL_MS: u64 = 10;
async fn commit_manifest(
storage: &Arc<dyn StorageProvider>,
expected_prev_etag: Option<&str>,
new_list: &Manifest,
parts: &[&ManifestPart],
) -> Result<PointerFile, CommitError> {
let encoded: Vec<Vec<u8>> = parts.iter().map(|p| part_mod::encode(p, 3)).collect();
let encoded_refs: Vec<&[u8]> = encoded.iter().map(|b| b.as_slice()).collect();
let manifest = ManifestSnapshot::new(
new_list.manifest_id,
Arc::new(default_supertable_options()),
Vec::new(),
Some(Arc::clone(storage)),
Some(new_list.clone()),
);
manifest
.write(storage.as_ref(), expected_prev_etag, &encoded_refs)
.await?;
let (pointer, _) = read_pointer(storage.as_ref())
.await
.expect("pointer readable after commit")
.expect("pointer present after commit");
Ok(pointer)
}
#[test]
fn pointer_file_text_format_roundtrip() {
let p = PointerFile {
manifest_id: POINTER_ROUNDTRIP_MANIFEST_ID,
manifest_uri: "manifest/manifest-000042.json".into(),
content_hash: ContentHash([FIXTURE_CONTENT_HASH_BYTE; 32]),
};
let bytes = p.to_bytes();
let s = std::str::from_utf8(&bytes).expect("utf-8");
assert!(
s.contains("manifest_id=42"),
"must spell out manifest_id; got {s:?}"
);
assert!(s.contains("manifest_uri=manifest/manifest-000042.json"));
assert!(s.contains("content_hash=blake3:"));
let parsed = PointerFile::from_bytes(&bytes).expect("parse");
assert_eq!(parsed, p);
}
#[test]
fn pointer_file_rejects_truncated() {
let bad = b"manifest_id=1\nmanifest_uri=foo\n"; let err = PointerFile::from_bytes(bad).expect_err("must reject");
assert!(matches!(err, ManifestLoadError::PointerParse(_)), "{err:?}");
}
#[test]
fn pointer_file_tolerates_unknown_keys_for_forward_compat() {
let s = b"manifest_id=7\n\
manifest_uri=manifest/manifest-000007.json\n\
content_hash=blake3:0000000000000000000000000000000000000000000000000000000000000000\n\
future_field=whatever\n";
let p = PointerFile::from_bytes(s).expect("parse");
assert_eq!(p.get_manifest_id(), POINTER_FORWARD_COMPAT_MANIFEST_ID);
}
fn fresh_part(seed: u8) -> ManifestPart {
ManifestPart {
format_version: part_mod::FORMAT_VERSION.into(),
part_id: PartId(Uuid::from_bytes([seed; 16])),
superfiles: vec![],
}
}
fn empty_list(manifest_id: u64, parts: Vec<ManifestPartEntry>) -> Manifest {
Manifest {
format_version: LIST_FORMAT_VERSION.into(),
manifest_id,
options_hash: ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "doc_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "doc_id".into(),
n_buckets: DEFAULT_HASH_N_BUCKETS,
},
parts,
}
}
fn entry_for(part: &ManifestPart) -> ManifestPartEntry {
let encoded = part_mod::encode(part, 3);
let hash = ContentHash::of(&encoded);
let uri = part_uri(&hash);
let size_compressed = encoded.len() as u64;
let size_uncompressed = zstd::stream::decode_all(encoded.as_slice())
.expect("self-decode")
.len() as u64;
ManifestPartEntry {
part_id: part.part_id,
uri,
n_superfiles: part.superfiles.len() as u64,
size_bytes_compressed: size_compressed,
size_bytes_uncompressed: size_uncompressed,
content_hash: hash,
partition_key: Vec::new(),
id_range: (0, 0),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}
}
#[tokio::test]
async fn initial_commit_writes_list_part_pointer() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let part = fresh_part(1);
let list = empty_list(0, vec![entry_for(&part)]);
let pointer = commit_manifest(&storage, None, &list, &[&part])
.await
.expect("initial commit");
assert_eq!(pointer.get_manifest_id(), 0);
assert_eq!(pointer.manifest_uri, manifest_uri(0));
let (read, _) = read_pointer(storage.as_ref())
.await
.expect("read")
.expect("some");
assert_eq!(read, pointer);
let (list_bytes, _) = storage.get(&manifest_uri(0)).await.expect("list bytes");
assert!(!list_bytes.is_empty());
let (part_bytes, _) = storage
.get(&entry_for(&part).uri)
.await
.expect("part bytes");
assert!(!part_bytes.is_empty());
}
#[tokio::test]
async fn no_prior_pointer_is_none() {
let dir = TempDir::new().expect("tempdir");
let storage = LocalFsStorageProvider::new(dir.path()).expect("provider");
let read = read_pointer(&storage).await.expect("read");
assert!(read.is_none(), "fresh supertable has no pointer yet");
}
#[tokio::test]
async fn second_commit_with_valid_prev_etag_succeeds() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let part_v0 = fresh_part(2);
let list_v0 = empty_list(0, vec![entry_for(&part_v0)]);
commit_manifest(&storage, None, &list_v0, &[&part_v0])
.await
.expect("v0");
let etag_v0 = storage
.head(POINTER_PATH)
.await
.expect("head v0")
.etag
.expect("etag");
let part_v1 = fresh_part(3);
let list_v1 = empty_list(1, vec![entry_for(&part_v0), entry_for(&part_v1)]);
let pointer = commit_manifest(&storage, Some(&etag_v0), &list_v1, &[&part_v1])
.await
.expect("v1");
assert_eq!(pointer.get_manifest_id(), 1);
let (read, _) = read_pointer(storage.as_ref())
.await
.expect("read")
.expect("some");
assert_eq!(read.get_manifest_id(), 1);
}
#[tokio::test]
async fn stale_prev_etag_surfaces_write_contention_exhausted() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let part_v0 = fresh_part(4);
let list_v0 = empty_list(0, vec![entry_for(&part_v0)]);
commit_manifest(&storage, None, &list_v0, &[&part_v0])
.await
.expect("v0");
let etag_v0 = storage
.head(POINTER_PATH)
.await
.expect("head")
.etag
.expect("etag");
let part_v1 = fresh_part(5);
let list_v1 = empty_list(1, vec![entry_for(&part_v0), entry_for(&part_v1)]);
commit_manifest(&storage, Some(&etag_v0), &list_v1, &[&part_v1])
.await
.expect("v1");
let part_v1_stale = fresh_part(6);
let list_v1_stale = empty_list(1, vec![entry_for(&part_v1_stale)]);
let err = commit_manifest(&storage, Some(&etag_v0), &list_v1_stale, &[&part_v1_stale])
.await
.expect_err("stale etag must fail");
assert!(
matches!(err, CommitError::WriteContentionExhausted),
"expected WriteContentionExhausted, got {err:?}"
);
}
#[tokio::test]
async fn part_reuse_writes_zero_new_part_files() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let part = fresh_part(7);
let list_v0 = empty_list(0, vec![entry_for(&part)]);
commit_manifest(&storage, None, &list_v0, &[&part])
.await
.expect("v0");
let parts_dir = dir.path().join(MANIFEST_PARTS_DIR);
let count_before = std::fs::read_dir(&parts_dir).expect("readdir").count();
let etag_v0 = storage
.head(POINTER_PATH)
.await
.expect("head")
.etag
.expect("etag");
let list_v1 = empty_list(1, vec![entry_for(&part)]);
commit_manifest(&storage, Some(&etag_v0), &list_v1, &[])
.await
.expect("v1 no new parts");
let count_after = std::fs::read_dir(&parts_dir).expect("readdir").count();
assert_eq!(
count_after, count_before,
"part-reuse commit must write zero new part files \
(before={count_before}, after={count_after})"
);
let (read, _) = read_pointer(storage.as_ref())
.await
.expect("read")
.expect("some");
assert_eq!(read.get_manifest_id(), 1);
}
#[tokio::test]
async fn idempotent_content_addressed_part_put() {
let dir = TempDir::new().expect("tempdir");
let storage = LocalFsStorageProvider::new(dir.path()).expect("provider");
let part = fresh_part(9);
let r1 = commit::write_manifest_part(&storage, &part, 3)
.await
.expect("first write");
let r2 = commit::write_manifest_part(&storage, &part, 3)
.await
.expect("second write (idempotent)");
assert_eq!(r1.uri, r2.uri);
assert_eq!(r1.content_hash, r2.content_hash);
}
#[derive(Debug)]
struct BarrierMockStorage {
barrier: Arc<Barrier>,
objects: Mutex<HashMap<String, Bytes>>,
put_calls: AtomicUsize,
}
impl BarrierMockStorage {
fn new(barrier_n: usize) -> Arc<Self> {
Arc::new(Self {
barrier: Arc::new(Barrier::new(barrier_n)),
objects: Mutex::new(HashMap::new()),
put_calls: AtomicUsize::new(0),
})
}
}
#[async_trait]
impl StorageProvider for BarrierMockStorage {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
let objs = self.objects.lock().await;
match objs.get(uri) {
Some(b) => Ok(ObjectMeta {
size: b.len() as u64,
etag: Some("mock-etag".into()),
last_modified: SystemTime::now(),
}),
None => Err(StorageError::NotFound { uri: uri.into() }),
}
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
let objs = self.objects.lock().await;
match objs.get(uri) {
Some(b) => Ok((
b.clone(),
ObjectMeta {
size: b.len() as u64,
etag: Some("mock-etag".into()),
last_modified: SystemTime::now(),
},
)),
None => Err(StorageError::NotFound { uri: uri.into() }),
}
}
async fn get_range(
&self,
_uri: &str,
_range: std::ops::Range<u64>,
) -> Result<Bytes, StorageError> {
Err(StorageError::Permanent {
uri: "barrier-mock".into(),
source: "get_range unused".into(),
})
}
async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError> {
let prior = self.put_calls.fetch_add(1, Ordering::AcqRel);
if prior < PARALLEL_PUT_COUNT_BEFORE_POINTER {
self.barrier.wait().await;
}
let mut objs = self.objects.lock().await;
if objs.contains_key(uri) {
return Err(StorageError::PreconditionFailed { uri: uri.into() });
}
objs.insert(uri.into(), bytes);
Ok(Some("mock-etag".into()))
}
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
_expected: Option<&str>,
) -> Result<Option<String>, StorageError> {
let prior = self.put_calls.fetch_add(1, Ordering::AcqRel);
if prior < PARALLEL_PUT_COUNT_BEFORE_POINTER {
self.barrier.wait().await;
}
let mut objs = self.objects.lock().await;
objs.insert(uri.into(), bytes);
Ok(Some("mock-etag".into()))
}
async fn put_multipart(
&self,
_uri: &str,
) -> Result<Box<dyn object_store::MultipartUpload>, StorageError> {
Err(StorageError::Permanent {
uri: "barrier-mock".into(),
source: "put_multipart unused".into(),
})
}
async fn delete(&self, _uri: &str) -> Result<(), StorageError> {
Ok(())
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn commit_issues_list_and_part_in_parallel() {
let storage = BarrierMockStorage::new(PARALLEL_PUT_COUNT_BEFORE_POINTER);
let storage_dyn: Arc<dyn StorageProvider> = storage.clone();
let part = fresh_part(20);
let list = empty_list(0, vec![entry_for(&part)]);
let commit_handle = {
let storage_dyn = Arc::clone(&storage_dyn);
let part = part.clone();
let list = list.clone();
tokio::spawn(async move { commit_manifest(&storage_dyn, None, &list, &[&part]).await })
};
let deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PARALLEL_PUT_POLL_TIMEOUT_SECS);
loop {
if storage.put_calls.load(Ordering::Acquire) >= PARALLEL_PUT_COUNT_BEFORE_POINTER {
break;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"parallel-issue verification failed: only {} put calls \
arrived at the barrier within 5s — commit_manifest \
appears to be serial",
storage.put_calls.load(Ordering::Acquire)
);
}
tokio::time::sleep(std::time::Duration::from_millis(
PARALLEL_PUT_POLL_INTERVAL_MS,
))
.await;
}
let pointer = commit_handle.await.expect("join").expect("commit");
assert_eq!(pointer.get_manifest_id(), 0);
assert_eq!(
storage.put_calls.load(Ordering::Acquire),
EXPECTED_COMMIT_PUT_COUNT
);
}
#[tokio::test]
async fn write_pointer_initial_then_update() {
let dir = TempDir::new().expect("tempdir");
let storage = LocalFsStorageProvider::new(dir.path()).expect("provider");
let p0 = PointerFile {
manifest_id: 0,
manifest_uri: manifest_uri(0),
content_hash: ContentHash([FIXTURE_CONTENT_HASH_BYTE; 32]),
};
write_pointer(&storage, &p0, None).await.expect("initial");
let etag = storage
.head(POINTER_PATH)
.await
.expect("head")
.etag
.expect("etag");
let p1 = PointerFile {
manifest_id: 1,
manifest_uri: manifest_uri(1),
content_hash: ContentHash([0xcd; 32]),
};
write_pointer(&storage, &p1, Some(&etag))
.await
.expect("update");
let (read, _) = read_pointer(&storage).await.expect("read").expect("some");
assert_eq!(read, p1);
}
#[tokio::test]
async fn write_pointer_initial_rejects_existing() {
let dir = TempDir::new().expect("tempdir");
let storage = LocalFsStorageProvider::new(dir.path()).expect("provider");
let p0 = PointerFile {
manifest_id: 0,
manifest_uri: manifest_uri(0),
content_hash: ContentHash([0u8; 32]),
};
write_pointer(&storage, &p0, None).await.expect("first");
let err = write_pointer(&storage, &p0, None)
.await
.expect_err("second initial must fail");
assert!(
matches!(err, CommitError::WriteContentionExhausted),
"expected WriteContentionExhausted, got {err:?}"
);
}
#[test]
fn directory_layout_constants_match_plan() {
assert_eq!(POINTER_PATH, "_supertable/current");
assert_eq!(MANIFEST_DIR, "manifest");
assert_eq!(MANIFEST_PARTS_DIR, "manifest-parts");
assert_eq!(
manifest_uri(POINTER_ROUNDTRIP_MANIFEST_ID),
"manifest/manifest-000042.json"
);
let h = ContentHash([0u8; 32]);
let u = part_uri(&h);
assert!(u.starts_with("manifest-parts/part-"));
assert!(u.ends_with(".avro.zst"));
}