#![deny(clippy::unwrap_used)]
use std::{
ops::Range,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use async_trait::async_trait;
use bytes::Bytes;
use infino::{
superfile::fts::reader::BoolMode,
supertable::{
Supertable,
options::Consistency,
storage::{LocalFsStorageProvider, ObjectMeta, StorageError, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options},
};
use tempfile::TempDir;
const BM25_TOP_K: usize = 10;
const MANIFEST_PARTS_PREFIX: &str = "manifests/";
#[test]
fn query_after_first_commit_on_same_handle_succeeds() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_read_consistency(Consistency::Snapshot),
)
.expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo", "charlie delta"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let hits = st
.reader()
.bm25_hits("title", "alpha", BM25_TOP_K, BoolMode::Or)
.expect("same-handle query after first commit must resolve parts");
assert_eq!(hits.len(), 1, "expected the one matching row");
}
#[test]
fn query_after_second_commit_on_same_handle_succeeds() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_read_consistency(Consistency::Snapshot),
)
.expect("create");
let mut w = st.writer().expect("writer 1");
w.append(&build_title_batch(&["alpha bravo"]))
.expect("append 1");
w.commit().expect("commit 1");
drop(w);
let mut w = st.writer().expect("writer 2");
w.append(&build_title_batch(&["echo foxtrot"]))
.expect("append 2");
w.commit().expect("commit 2");
drop(w);
let old_hits = st
.reader()
.bm25_hits("title", "alpha", BM25_TOP_K, BoolMode::Or)
.expect("query for first-commit term");
assert_eq!(
old_hits.len(),
1,
"first-commit row must survive the rewrite"
);
let new_hits = st
.reader()
.bm25_hits("title", "echo", BM25_TOP_K, BoolMode::Or)
.expect("query for second-commit term");
assert_eq!(new_hits.len(), 1, "second-commit row must be queryable");
}
#[test]
fn same_handle_query_after_commit_refetches_no_manifest_parts() {
let dir = TempDir::new().expect("tempdir");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let counter = Arc::new(PartGetCounter::new(local));
let storage: Arc<dyn StorageProvider> = Arc::clone(&counter) as Arc<dyn StorageProvider>;
let st = Supertable::create(
default_supertable_options()
.with_storage(storage)
.with_read_consistency(Consistency::Snapshot),
)
.expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo", "charlie delta"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let before = counter.part_gets();
let hits = st
.reader()
.bm25_hits("title", "alpha", BM25_TOP_K, BoolMode::Or)
.expect("query");
let after = counter.part_gets();
assert_eq!(hits.len(), 1);
assert_eq!(
after - before,
0,
"post-commit query must not refetch manifest parts (seeded into cache)"
);
}
#[test]
fn parts_cache_stays_bounded_across_repeated_commits() {
const COMMITS: usize = 6;
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(
default_supertable_options()
.with_storage(storage)
.with_read_consistency(Consistency::Snapshot),
)
.expect("create");
for i in 0..COMMITS {
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha bravo"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let s = st.stats();
assert!(
s.n_manifest_parts_loaded <= s.n_manifest_parts,
"commit {}: cache ({}) exceeds live parts ({}) — superseded \
parts are accumulating",
i + 1,
s.n_manifest_parts_loaded,
s.n_manifest_parts,
);
}
}
#[derive(Debug)]
struct PartGetCounter {
inner: Arc<dyn StorageProvider>,
part_gets: AtomicUsize,
}
impl PartGetCounter {
fn new(inner: Arc<dyn StorageProvider>) -> Self {
Self {
inner,
part_gets: AtomicUsize::new(0),
}
}
fn part_gets(&self) -> usize {
self.part_gets.load(Ordering::Acquire)
}
}
#[async_trait]
impl StorageProvider for PartGetCounter {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
self.inner.head(uri).await
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
if uri.starts_with(MANIFEST_PARTS_PREFIX) {
self.part_gets.fetch_add(1, Ordering::AcqRel);
}
self.inner.get(uri).await
}
async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError> {
self.inner.get_range(uri, range).await
}
async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError> {
self.inner.put_atomic(uri, bytes).await
}
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
expected: Option<&str>,
) -> Result<Option<String>, StorageError> {
self.inner.put_if_match(uri, bytes, expected).await
}
async fn put_multipart(
&self,
uri: &str,
) -> Result<Box<dyn object_store::MultipartUpload>, StorageError> {
self.inner.put_multipart(uri).await
}
async fn delete(&self, uri: &str) -> Result<(), StorageError> {
self.inner.delete(uri).await
}
}