#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use infino::supertable::{Supertable, manifest::commit::read_pointer};
const PUT_MULTIPART_THRESHOLD_BYTES: u64 = 1;
const BM25_TOP_K: usize = 5;
use infino::{
supertable::storage::{LocalFsStorageProvider, StorageProvider},
test_helpers::{build_title_batch, default_supertable_options},
};
use tempfile::TempDir;
#[test]
fn commit_persists_pointer_list_part_and_superfile() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo", "charlie delta"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let (pointer, _) = futures::executor::block_on(read_pointer(&*storage))
.expect("read")
.expect("pointer present");
assert_eq!(pointer.get_manifest_id(), 1);
assert!(pointer.manifest_uri.starts_with("manifest/manifest-"));
let (list_bytes, _) =
futures::executor::block_on(storage.get(&pointer.manifest_uri)).expect("get list");
assert!(!list_bytes.is_empty());
let manifest_parts_dir = dir.path().join("manifest-parts");
let parts: Vec<_> = std::fs::read_dir(&manifest_parts_dir)
.expect("readdir")
.filter_map(|e| e.ok())
.collect();
assert_eq!(
parts.len(),
1,
"single-partition mode: exactly one manifest part on disk; got {parts:?}"
);
let data_dir = dir.path().join("data");
let superfiles: Vec<_> = std::fs::read_dir(&data_dir)
.expect("readdir")
.filter_map(|e| e.ok())
.collect();
assert_eq!(
superfiles.len(),
1,
"one shard committed → one superfile file on disk; got {superfiles:?}"
);
let r = st.reader();
assert_eq!(r.manifest_id(), 1);
assert_eq!(r.n_superfiles(), 1);
}
#[test]
fn two_successive_commits_both_publish() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = st.writer().expect("w1");
w.append(&build_title_batch(&["foo", "bar"]))
.expect("append1");
w.commit().expect("commit1");
drop(w);
let mut w = st.writer().expect("w2");
w.append(&build_title_batch(&["baz"])).expect("append2");
w.commit().expect("commit2");
drop(w);
let (pointer, _) = futures::executor::block_on(read_pointer(&*storage))
.expect("read")
.expect("pointer");
assert_eq!(
pointer.get_manifest_id(),
2,
"two commits ⇒ pointer at manifest_id=2"
);
let manifest_dir = dir.path().join("manifest");
let n_manifests = std::fs::read_dir(&manifest_dir)
.expect("readdir")
.filter_map(|e| e.ok())
.count();
assert_eq!(n_manifests, 2, "two manifest files (manifest_id 1 + 2)");
let manifest_parts_dir = dir.path().join("manifest-parts");
let n_parts = std::fs::read_dir(&manifest_parts_dir)
.expect("readdir")
.filter_map(|e| e.ok())
.count();
assert_eq!(n_parts, 2);
let r = st.reader();
assert_eq!(r.manifest_id(), 2);
assert_eq!(
r.n_superfiles(),
2,
"two shard commits ⇒ two superfiles visible"
);
}
#[test]
fn multipart_threshold_forces_superfile_through_put_multipart() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_put_multipart_threshold_bytes(PUT_MULTIPART_THRESHOLD_BYTES);
let producer = Supertable::create(opts).expect("create");
{
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo", "charlie delta"]))
.expect("append");
w.commit().expect("commit via multipart path");
}
drop(producer);
let data_dir = dir.path().join("data");
let superfiles: Vec<_> = std::fs::read_dir(&data_dir)
.expect("readdir data")
.filter_map(|e| e.ok())
.collect();
assert_eq!(
superfiles.len(),
1,
"one superfile file should land on disk after a multipart commit"
);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open after multipart commit");
let r = consumer.reader();
assert_eq!(r.manifest_id(), 1);
assert_eq!(r.n_superfiles(), 1);
}
#[test]
fn no_storage_attached_takes_in_memory_path() {
let dir = TempDir::new().expect("tempdir");
let st = Supertable::create(default_supertable_options()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["x", "y"])).expect("append");
w.commit().expect("commit");
drop(w);
let entries: Vec<_> = std::fs::read_dir(dir.path())
.expect("readdir")
.filter_map(|e| e.ok())
.collect();
assert_eq!(
entries.len(),
0,
"no-storage supertable must not touch the filesystem; got {entries:?}"
);
let r = st.reader();
assert_eq!(r.manifest_id(), 1);
assert_eq!(r.n_superfiles(), 1);
}
#[test]
fn committed_supertable_remains_in_memory_queryable_for_now() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&[
"nimblefox special token",
"ordinary common text",
]))
.expect("append");
w.commit().expect("commit");
drop(w);
let hits = st
.reader()
.bm25_hits(
"title",
"nimblefox",
BM25_TOP_K,
infino::supertable::query::fts::BoolMode::Or,
)
.expect("query");
assert_eq!(hits.len(), 1, "commit must not break in-memory reads");
}
#[test]
fn manifest_id_increments_only_on_non_empty_commits() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = st.writer().expect("w");
w.commit().expect("empty commit"); drop(w);
let pointer = futures::executor::block_on(read_pointer(&*storage)).expect("read");
assert!(pointer.is_none(), "empty commit must not publish a pointer");
let mut w = st.writer().expect("w");
w.append(&build_title_batch(&["only", "real"]))
.expect("append");
w.commit().expect("real commit");
drop(w);
let (pointer, _) = futures::executor::block_on(read_pointer(&*storage))
.expect("read")
.expect("pointer");
assert_eq!(pointer.get_manifest_id(), 1);
}