#![deny(clippy::unwrap_used)]
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use bytes::Bytes;
use infino::{
superfile::{SuperfileReader, builder::FtsConfig, fts::tokenize::Tokenizer},
supertable::{
Supertable, SupertableOptions,
manifest::SuperfileUri,
reader_cache::{InMemoryReaderCache, ReaderCacheError, SuperfileReaderCache},
},
test_helpers::{build_title_batch, default_tokenizer, schema_id_title},
};
const RAYON_POOL_THREADS: usize = 1;
const EXACT_TERM_SUPERFILE_COUNT: usize = 4;
const BM25_TOP_K: usize = 5;
const NO_MATCH_SUPERFILE_COUNT: u64 = 3;
#[derive(Default)]
struct CountingStore {
inner: InMemoryReaderCache,
reader_calls: Mutex<HashMap<SuperfileUri, usize>>,
}
impl CountingStore {
fn new() -> Self {
Self::default()
}
fn snapshot(&self) -> HashMap<SuperfileUri, usize> {
self.reader_calls
.lock()
.expect("reader_calls mutex")
.clone()
}
fn delta(&self, before: &HashMap<SuperfileUri, usize>) -> HashMap<SuperfileUri, usize> {
let after = self.snapshot();
let mut out = HashMap::new();
for (uri, n_after) in &after {
let n_before = before.get(uri).copied().unwrap_or(0);
if *n_after > n_before {
out.insert(*uri, n_after - n_before);
}
}
out
}
}
impl SuperfileReaderCache for CountingStore {
fn reader(&self, uri: &SuperfileUri) -> Result<Arc<SuperfileReader>, ReaderCacheError> {
*self
.reader_calls
.lock()
.expect("reader_calls mutex")
.entry(*uri)
.or_insert(0) += 1;
self.inner.reader(uri)
}
fn insert(&self, uri: SuperfileUri, bytes: Bytes) -> Result<(), ReaderCacheError> {
self.inner.insert(uri, bytes)
}
fn resident_bytes(&self) -> usize {
self.inner.resident_bytes()
}
}
fn options_with_counting_store(store: Arc<CountingStore>) -> SupertableOptions {
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(RAYON_POOL_THREADS)
.build()
.expect("build pool"),
);
let tk: Arc<dyn Tokenizer> = default_tokenizer();
SupertableOptions::new(
schema_id_title(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts")
.with_writer_pool(pool)
.with_store(store)
}
#[test]
fn bm25_exact_term_skip_opens_only_matching_superfile() {
let store = Arc::new(CountingStore::new());
let st = Supertable::create(options_with_counting_store(Arc::clone(&store))).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&[
"lookup nimblefox special token",
"ordinary common everyday text",
]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&[
"another generic page",
"more filler text",
]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&[
"yet another normal title",
"wrapping up the corpus",
]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&["filler bin", "extra padding"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let r = st.reader();
assert_eq!(r.n_superfiles(), EXACT_TERM_SUPERFILE_COUNT);
let manifest = r.manifest();
let target_uri = manifest.superfiles[0].uri;
let before = store.snapshot();
let hits = r
.bm25_hits(
"title",
"nimblefox",
BM25_TOP_K,
infino::supertable::query::fts::BoolMode::Or,
)
.expect("query");
assert_eq!(hits.len(), 1, "exactly one doc matches `nimblefox`");
assert_eq!(hits[0].superfile, target_uri);
let delta = store.delta(&before);
assert_eq!(
delta.len(),
1,
"skip should open exactly one superfile for an exact-term query \
where 3 of 4 superfiles have the term definitively absent — got {delta:?}"
);
assert!(
delta.contains_key(&target_uri),
"the one opened superfile must be the planted one"
);
}
#[test]
fn bm25_prefix_skip_opens_only_superfiles_overlapping_prefix_range() {
let store = Arc::new(CountingStore::new());
let st = Supertable::create(options_with_counting_store(Arc::clone(&store))).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["apple bagel", "banana bread"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&["quokka cuddle", "quokkateer reviews"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&["cherry coke", "date butter"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&["edam fondue", "gouda henna"]))
.expect("append");
w.commit().expect("commit");
drop(w);
let r = st.reader();
assert_eq!(r.n_superfiles(), EXACT_TERM_SUPERFILE_COUNT);
let manifest = r.manifest();
let quokka_uri = manifest.superfiles[1].uri;
let before = store.snapshot();
let hits = r
.bm25_search_prefix("title", "quokka", BM25_TOP_K)
.expect("prefix query");
assert_eq!(hits.len(), 2, "two docs in superfile 1 begin with `quokka`");
for h in &hits {
assert_eq!(h.superfile, quokka_uri);
}
let delta = store.delta(&before);
assert_eq!(
delta.len(),
1,
"term-range skip should open exactly the one superfile whose \
lex term range overlaps [quokka, quokka_upper_bound) — got {delta:?}"
);
assert!(delta.contains_key(&quokka_uri));
}
#[test]
fn bm25_search_with_no_matching_superfiles_opens_no_superfiles_at_all() {
let store = Arc::new(CountingStore::new());
let st = Supertable::create(options_with_counting_store(Arc::clone(&store))).expect("create");
let mut w = st.writer().expect("writer");
for _i in 0..NO_MATCH_SUPERFILE_COUNT {
w.append(&build_title_batch(&[
"ordinary term filler",
"another mundane title",
]))
.expect("append");
w.commit().expect("commit");
}
drop(w);
let before = store.snapshot();
let hits = st
.reader()
.bm25_hits(
"title",
"definitelynotpresent",
BM25_TOP_K,
infino::supertable::query::fts::BoolMode::Or,
)
.expect("query");
assert!(hits.is_empty());
let delta = store.delta(&before);
assert!(
delta.is_empty(),
"an absent rare term should prune all superfiles — got {delta:?}"
);
}
#[test]
fn bm25_and_mode_skip_requires_all_terms_present_in_superfile() {
let store = Arc::new(CountingStore::new());
let st = Supertable::create(options_with_counting_store(Arc::clone(&store))).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha beta gamma", "doc with beta"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_title_batch(&[
"alpha only here",
"no betas whatever",
]))
.expect("append");
w.commit().expect("commit");
drop(w);
let r = st.reader();
let manifest = r.manifest();
let kept_uri = manifest.superfiles[0].uri;
let before = store.snapshot();
let _hits = r
.bm25_hits(
"title",
"alpha beta",
BM25_TOP_K,
infino::supertable::query::fts::BoolMode::And,
)
.expect("AND query");
let delta = store.delta(&before);
assert_eq!(
delta.len(),
1,
"AND mode should prune the superfile missing one of the terms"
);
assert!(delta.contains_key(&kept_uri));
}