#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use infino::{
superfile::{
builder::FtsConfig,
fts::{reader::BoolMode, tokenize::Tokenizer},
},
supertable::{
ManifestLoadError, OpenError, Supertable, SupertableOptions,
options::Consistency,
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options, default_tokenizer},
};
const BM25_TOP_K: usize = 10;
const RAYON_POOL_THREADS: usize = 1;
use tempfile::TempDir;
#[test]
fn open_sees_writes_made_by_a_different_handle() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo", "charlie delta"]))
.expect("append");
w.commit().expect("commit");
drop(w);
drop(producer);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open");
assert_eq!(consumer.manifest_id(), 1);
assert_eq!(consumer.reader().n_superfiles(), 1);
}
#[test]
fn open_on_fresh_tempdir_returns_pointer_unreadable() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let err = Supertable::open(default_supertable_options().with_storage(storage))
.expect_err("must reject fresh dir");
assert!(
matches!(err, OpenError::ManifestLoadError(_)),
"expected PointerUnreadable, got {err:?}"
);
}
#[test]
fn open_without_storage_rejects() {
let opts = default_supertable_options();
let err = Supertable::open(opts).expect_err("must reject");
assert!(matches!(err, OpenError::Build(_)), "{err:?}");
}
#[test]
fn strong_consistency_query_sees_another_writers_new_commit() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = producer.writer().expect("w1");
w.append(&build_title_batch(&["initial"])).expect("append1");
w.commit().expect("commit1");
drop(w);
let consumer = Supertable::open(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_read_consistency(Consistency::Strong),
)
.expect("open");
assert_eq!(consumer.manifest_id(), 1);
let pinned_reader = consumer.reader();
let mut w = producer.writer().expect("w2");
w.append(&build_title_batch(&["added"])).expect("append2");
w.commit().expect("commit2");
drop(w);
let hits = consumer
.reader()
.bm25_hits("title", "added", BM25_TOP_K, BoolMode::Or)
.expect("query under strong consistency");
assert!(!hits.is_empty(), "strong query must see the v2 row");
assert_eq!(consumer.manifest_id(), 2);
assert_eq!(
consumer.reader().n_superfiles(),
2,
"post-query reader sees both commits"
);
assert_eq!(
pinned_reader.manifest_id(),
1,
"a reader captured earlier keeps its snapshot"
);
assert_eq!(pinned_reader.n_superfiles(), 1);
}
#[test]
fn strong_consistency_query_is_stable_when_pointer_unchanged() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = producer.writer().expect("w");
w.append(&build_title_batch(&["only"])).expect("append");
w.commit().expect("commit");
drop(w);
let consumer = Supertable::open(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_read_consistency(Consistency::Strong),
)
.expect("open");
let _ = consumer
.reader()
.bm25_hits("title", "only", BM25_TOP_K, BoolMode::Or)
.expect("query");
assert_eq!(consumer.manifest_id(), 1);
}
#[test]
fn strong_consistency_query_on_uncommitted_table_stays_at_zero() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(
default_supertable_options()
.with_storage(storage)
.with_read_consistency(Consistency::Strong),
)
.expect("create");
let hits = st
.reader()
.bm25_hits("title", "anything", BM25_TOP_K, BoolMode::Or)
.expect("query on uncommitted table");
assert!(hits.is_empty());
assert_eq!(st.manifest_id(), 0);
}
#[test]
fn open_rejects_mismatched_options_via_options_hash() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
{
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&["alpha"])).expect("append");
w.commit().expect("commit");
}
let other_schema = Arc::new(arrow_schema::Schema::new(vec![
arrow_schema::Field::new("title", arrow_schema::DataType::LargeUtf8, false),
arrow_schema::Field::new("doc_id", arrow_schema::DataType::UInt64, false),
]));
let tk: Arc<dyn Tokenizer> = default_tokenizer();
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(RAYON_POOL_THREADS)
.build()
.expect("pool"),
);
let mismatched_opts = SupertableOptions::new(
other_schema,
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts")
.with_writer_pool(pool)
.with_storage(Arc::clone(&storage));
let err = Supertable::open(mismatched_opts)
.expect_err("open must surface OptionsHashMismatch for a reordered schema");
assert!(
matches!(
err,
OpenError::ManifestLoadError(ManifestLoadError::ContentHashMismatch { .. })
),
"expected OptionsHashMismatch; got {err:?}"
);
}
#[test]
fn open_with_matching_options_succeeds_under_options_hash_validation() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
{
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&["alpha"])).expect("append");
w.commit().expect("commit");
}
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open must succeed when options match");
assert_eq!(consumer.manifest_id(), 1);
}