use std::{
borrow::Cow,
cmp::{Ordering, Reverse},
collections::BinaryHeap,
slice,
sync::{
Arc, Mutex,
atomic::{self, AtomicU32},
},
time::Instant,
};
use arrow::record_batch::RecordBatch;
use uuid::Uuid;
pub use crate::superfile::fts::reader::BoolMode;
use crate::{
InfinoError,
superfile::{
SuperfileReader,
fts::tokenize::{AsciiLowerTokenizer, Tokenizer},
},
supertable::{
error::QueryError,
handle::{Supertable, SupertableReader},
manifest::SuperfileEntry,
query::{
SuperfileHit, dispatch,
exec::common::resolve_hits_named,
prune::{PruneLeaf, select_superfiles},
},
},
};
struct SharedTopK {
k: usize,
heap: Mutex<BinaryHeap<Reverse<OrdScore>>>,
floor_bits: AtomicU32,
}
#[derive(PartialEq)]
struct OrdScore(f32);
impl Eq for OrdScore {}
impl PartialOrd for OrdScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrdScore {
fn cmp(&self, other: &Self) -> Ordering {
self.0.total_cmp(&other.0)
}
}
impl SharedTopK {
fn new(k: usize) -> Arc<Self> {
Arc::new(Self {
k,
heap: Mutex::new(BinaryHeap::new()),
floor_bits: AtomicU32::new(f32::NEG_INFINITY.to_bits()),
})
}
fn floor(&self) -> f32 {
f32::from_bits(self.floor_bits.load(atomic::Ordering::Acquire))
}
fn merge(&self, scores: impl IntoIterator<Item = f32>) {
let mut heap = self.heap.lock().expect("SharedTopK mutex poisoned");
for s in scores {
if heap.len() < self.k {
heap.push(Reverse(OrdScore(s)));
} else if let Some(Reverse(OrdScore(min))) = heap.peek()
&& s > *min
{
heap.pop();
heap.push(Reverse(OrdScore(s)));
}
}
if heap.len() == self.k
&& let Some(Reverse(OrdScore(min))) = heap.peek()
{
self.floor_bits
.store(min.to_bits(), atomic::Ordering::Release);
}
}
}
impl SupertableReader {
pub(crate) async fn bm25_search_async(
&self,
column: &str,
query: &str,
k: usize,
mode: BoolMode,
) -> Result<Vec<SuperfileHit>, QueryError> {
if k == 0 {
return Ok(Vec::new());
}
let manifest = self.manifest();
let pool_threads = manifest.options.reader_pool.current_num_threads();
let column_owned = column.to_owned();
let parsed = AsciiLowerTokenizer.parse(query);
let positives: Vec<String> = parsed.positives.into_iter().map(Cow::into_owned).collect();
let negatives: Vec<String> = parsed.negatives.into_iter().map(Cow::into_owned).collect();
let prune_leaf = PruneLeaf::TermPresence {
column: column_owned.clone(),
terms: positives,
mode,
};
let kept = select_superfiles(manifest.as_ref(), slice::from_ref(&prune_leaf)).await?;
let PruneLeaf::TermPresence {
terms: positives, ..
} = prune_leaf
else {
unreachable!("leaf constructed as TermPresence above")
};
if kept.is_empty() {
return Ok(Vec::new());
}
let kept_refs: Vec<&Arc<SuperfileEntry>> = kept.iter().collect();
let fanout = fanout_for(mode, positives.len(), !negatives.is_empty());
let work_units = build_work_units(&kept_refs, fanout, pool_threads);
let units: Vec<(Arc<SuperfileEntry>, (Option<(u32, u32)>, Uuid))> = work_units
.into_iter()
.map(|u| {
let suid = u.entry.superfile_id;
(u.entry, (u.range, suid))
})
.collect();
let term_arc: Arc<Vec<String>> = Arc::new(positives);
let neg_arc: Arc<Vec<String>> = Arc::new(negatives);
let column_arc = Arc::new(column_owned);
let shared = SharedTopK::new(k);
let tombstones = self.tombstone_cache.clone();
let now = Instant::now();
let kernel = move |r: Arc<SuperfileReader>, (range, suid): (Option<(u32, u32)>, Uuid)| {
let column_arc = Arc::clone(&column_arc);
let term_arc = Arc::clone(&term_arc);
let neg_arc = Arc::clone(&neg_arc);
let shared = Arc::clone(&shared);
let tombstones = tombstones.clone();
async move {
let term_refs: Vec<&str> = term_arc.iter().map(|s| s.as_str()).collect();
let floor = shared.floor();
let hits = match range {
Some((start, end)) => r
.bm25_search_or_range_pretokenized_with_floor(
&column_arc,
&term_refs,
k,
start,
end,
floor,
)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?,
None if neg_arc.is_empty() => r
.bm25_search_pretokenized_with_floor(
&column_arc,
&term_refs,
k,
mode,
floor,
)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?,
None => {
let neg_refs: Vec<&str> = neg_arc.iter().map(|s| s.as_str()).collect();
r.bm25_search_pretokenized_excluding(
&column_arc,
&term_refs,
&neg_refs,
k,
mode,
)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?
}
};
match tombstones.as_ref().map(|c| c.bitmap_for(suid, now)) {
Some(Ok(bitmap)) if !bitmap.is_empty() => shared.merge(
hits.iter()
.filter(|(d, _)| !bitmap.contains(*d))
.map(|(_, s)| *s),
),
Some(Err(_)) => {}
_ => shared.merge(hits.iter().map(|(_, s)| *s)),
}
Ok(hits)
}
};
let per_unit = dispatch::fanout(self, units, kernel).await?;
Ok(top_k_descending(per_unit, k))
}
pub(crate) async fn bm25_search_prefix_async(
&self,
column: &str,
prefix: &str,
k: usize,
) -> Result<Vec<SuperfileHit>, QueryError> {
if k == 0 {
return Ok(Vec::new());
}
let manifest = self.manifest();
let pool_threads = manifest.options.reader_pool.current_num_threads();
let column_owned = column.to_owned();
let prefix_owned = prefix.to_owned();
let prefix_lower = prefix_owned.to_ascii_lowercase();
let kept = select_superfiles(
manifest.as_ref(),
&[PruneLeaf::Prefix {
column: column_owned.clone(),
prefix: prefix_lower.as_bytes().to_vec(),
}],
)
.await?;
if kept.is_empty() {
return Ok(Vec::new());
}
let kept_refs: Vec<&Arc<SuperfileEntry>> = kept.iter().collect();
let work_units = build_work_units(&kept_refs, FanOut::SubRanges, pool_threads);
let units: Vec<(Arc<SuperfileEntry>, Option<(u32, u32)>)> =
work_units.into_iter().map(|u| (u.entry, u.range)).collect();
let column_arc = Arc::new(column_owned);
let prefix_arc = Arc::new(prefix_owned);
let kernel = move |r: Arc<SuperfileReader>, range: Option<(u32, u32)>| {
let column_arc = Arc::clone(&column_arc);
let prefix_arc = Arc::clone(&prefix_arc);
async move {
match range {
Some((start, end)) => r
.bm25_search_prefix_range(&column_arc, &prefix_arc, k, start, end)
.await
.map_err(|e| QueryError::Parquet(e.to_string())),
None => r
.bm25_search_prefix(&column_arc, &prefix_arc, k)
.await
.map_err(|e| QueryError::Parquet(e.to_string())),
}
}
};
let per_unit = dispatch::fanout(self, units, kernel).await?;
Ok(top_k_descending(per_unit, k))
}
pub(crate) async fn token_match_async(
&self,
column: &str,
query: &str,
mode: BoolMode,
) -> Result<Vec<SuperfileHit>, QueryError> {
let manifest = self.manifest();
let term_strings: Vec<String> = AsciiLowerTokenizer.tokenize(query).collect();
if term_strings.is_empty() {
return Ok(Vec::new());
}
let kept = select_superfiles(
manifest.as_ref(),
&[PruneLeaf::TermPresence {
column: column.to_owned(),
terms: term_strings.clone(),
mode,
}],
)
.await?;
if kept.is_empty() {
return Ok(Vec::new());
}
let units: Vec<(Arc<SuperfileEntry>, ())> = kept.into_iter().map(|e| (e, ())).collect();
let column_arc = Arc::new(column.to_owned());
let term_arc: Arc<Vec<String>> = Arc::new(term_strings);
let kernel = move |r: Arc<SuperfileReader>, _: ()| {
let column_arc = Arc::clone(&column_arc);
let term_arc = Arc::clone(&term_arc);
async move {
let refs: Vec<&str> = term_arc.iter().map(|s| s.as_str()).collect();
let docs = r
.token_match(&column_arc, &refs, mode)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?;
Ok(docs.into_iter().map(|d| (d, 0.0f32)).collect::<Vec<_>>())
}
};
let per_unit = dispatch::fanout(self, units, kernel).await?;
Ok(per_unit.into_iter().flatten().collect())
}
pub(crate) async fn token_match_count_async(
&self,
column: &str,
query: &str,
mode: BoolMode,
) -> Result<u64, QueryError> {
let manifest = self.manifest();
let term_strings: Vec<String> = AsciiLowerTokenizer.tokenize(query).collect();
if term_strings.is_empty() {
return Ok(0);
}
let kept = select_superfiles(
manifest.as_ref(),
&[PruneLeaf::TermPresence {
column: column.to_owned(),
terms: term_strings.clone(),
mode,
}],
)
.await?;
if kept.is_empty() {
return Ok(0);
}
let single_term = term_strings.len() == 1;
let column_arc = Arc::new(column.to_owned());
let term_arc: Arc<Vec<String>> = Arc::new(term_strings);
let units: Vec<(Arc<SuperfileEntry>, ())> = kept.into_iter().map(|e| (e, ())).collect();
let per_superfile = dispatch::fanout_with(
self,
units,
move |r, entry, tombstone_cache, now, _params: ()| {
let column_arc = Arc::clone(&column_arc);
let term_arc = Arc::clone(&term_arc);
async move {
let tomb = match tombstone_cache.as_ref() {
Some(c) => {
let b = c
.bitmap_for(entry.superfile_id, now)
.map_err(|e| QueryError::Store(format!("tombstone cache: {e}")))?;
if b.is_empty() { None } else { Some(b) }
}
None => None,
};
let refs: Vec<&str> = term_arc.iter().map(|s| s.as_str()).collect();
if let Some(b) = tomb {
let docs = r
.token_match(&column_arc, &refs, mode)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?;
return Ok::<u64, QueryError>(
docs.iter().filter(|d| !b.contains(**d)).count() as u64,
);
}
let n = if single_term {
r.term_df(&column_arc, &term_arc[0])
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?
} else {
r.token_match_count(&column_arc, &refs, mode)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?
};
Ok(n)
}
},
)
.await?;
Ok(per_superfile.into_iter().sum())
}
pub(crate) async fn exact_match_async(
&self,
column: &str,
value: &str,
) -> Result<Vec<SuperfileHit>, QueryError> {
let manifest = self.manifest();
let term_strings: Vec<String> = AsciiLowerTokenizer.tokenize(value).collect();
let leaves = if term_strings.is_empty() {
Vec::new()
} else {
vec![PruneLeaf::TermPresence {
column: column.to_owned(),
terms: term_strings,
mode: BoolMode::And,
}]
};
let kept = select_superfiles(manifest.as_ref(), &leaves).await?;
if kept.is_empty() {
return Ok(Vec::new());
}
let units: Vec<(Arc<SuperfileEntry>, ())> = kept.into_iter().map(|e| (e, ())).collect();
let column_arc = Arc::new(column.to_owned());
let value_arc = Arc::new(value.to_owned());
let kernel = move |r: Arc<SuperfileReader>, _: ()| {
let column_arc = Arc::clone(&column_arc);
let value_arc = Arc::clone(&value_arc);
async move {
let docs = r
.exact_match(&column_arc, &value_arc)
.await
.map_err(|e| QueryError::Parquet(e.to_string()))?;
Ok(docs.into_iter().map(|d| (d, 0.0f32)).collect::<Vec<_>>())
}
};
let per_unit = dispatch::fanout(self, units, kernel).await?;
Ok(per_unit.into_iter().flatten().collect())
}
}
impl SupertableReader {
pub fn bm25_search(
&self,
column: &str,
query: &str,
k: usize,
mode: BoolMode,
projection: Option<&[&str]>,
) -> Result<Vec<RecordBatch>, QueryError> {
self.block_on(async {
let hits = self.bm25_search_async(column, query, k, mode).await?;
let batch = resolve_hits_named(self, &hits, projection, "bm25_search")
.await
.map_err(|e| QueryError::Execute(e.to_string()))?;
Ok(vec![batch])
})
}
pub fn bm25_hits(
&self,
column: &str,
query: &str,
k: usize,
mode: BoolMode,
) -> Result<Vec<SuperfileHit>, QueryError> {
self.block_on(self.bm25_search_async(column, query, k, mode))
}
pub fn bm25_search_prefix(
&self,
column: &str,
prefix: &str,
k: usize,
) -> Result<Vec<SuperfileHit>, QueryError> {
self.block_on(self.bm25_search_prefix_async(column, prefix, k))
}
pub fn token_match(
&self,
column: &str,
query: &str,
mode: BoolMode,
) -> Result<Vec<SuperfileHit>, QueryError> {
self.block_on(self.token_match_async(column, query, mode))
}
pub fn count(&self, column: &str, query: &str, mode: BoolMode) -> Result<u64, QueryError> {
self.block_on(self.token_match_count_async(column, query, mode))
}
pub fn exact_match(&self, column: &str, value: &str) -> Result<Vec<SuperfileHit>, QueryError> {
self.block_on(self.exact_match_async(column, value))
}
}
struct WorkUnit {
entry: Arc<SuperfileEntry>,
range: Option<(u32, u32)>,
}
const SUBRANGE_MIN_DOCS: u32 = 50_000;
const OR_FANOUT_MIN_TERMS: usize = 2;
enum FanOut {
PerSuperfile,
SubRanges,
}
fn fanout_for(mode: BoolMode, n_positives: usize, has_negatives: bool) -> FanOut {
if mode == BoolMode::Or && n_positives >= OR_FANOUT_MIN_TERMS && !has_negatives {
FanOut::SubRanges
} else {
FanOut::PerSuperfile
}
}
fn build_work_units(
kept: &[&Arc<SuperfileEntry>],
fanout: FanOut,
pool_threads: usize,
) -> Vec<WorkUnit> {
let want_subranges = pool_threads.div_ceil(kept.len().max(1)).max(1);
if matches!(fanout, FanOut::PerSuperfile) || want_subranges <= 1 {
return kept
.iter()
.map(|e| WorkUnit {
entry: Arc::clone(e),
range: None,
})
.collect();
}
let mut units: Vec<WorkUnit> = Vec::with_capacity(kept.len() * want_subranges);
for entry in kept {
let n_docs = entry.n_docs as u32;
if n_docs == 0 {
continue;
}
let cap_by_floor = (n_docs / SUBRANGE_MIN_DOCS).max(1) as usize;
let n_sub = want_subranges.min(cap_by_floor);
if n_sub <= 1 {
units.push(WorkUnit {
entry: Arc::clone(entry),
range: None,
});
continue;
}
let stride = n_docs.div_ceil(n_sub as u32);
let mut start: u32 = 0;
while start < n_docs {
let end = start.saturating_add(stride).min(n_docs);
units.push(WorkUnit {
entry: Arc::clone(entry),
range: Some((start, end)),
});
start = end;
}
}
units
}
fn top_k_descending(per_superfile: Vec<Vec<SuperfileHit>>, k: usize) -> Vec<SuperfileHit> {
#[derive(PartialEq)]
struct MinByScore(SuperfileHit);
impl Eq for MinByScore {}
impl PartialOrd for MinByScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MinByScore {
fn cmp(&self, other: &Self) -> Ordering {
other
.0
.score
.partial_cmp(&self.0.score)
.unwrap_or(Ordering::Equal)
}
}
let mut heap = BinaryHeap::with_capacity(k + 1);
for hit in per_superfile.into_iter().flatten() {
if heap.len() < k {
heap.push(MinByScore(hit));
} else if let Some(worst) = heap.peek()
&& hit.score > worst.0.score
{
heap.pop();
heap.push(MinByScore(hit));
}
}
let mut result: Vec<SuperfileHit> = heap.into_iter().map(|m| m.0).collect();
result.sort_unstable_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal));
result
}
impl Supertable {
pub fn bm25_search(
&self,
column: &str,
query: &str,
k: usize,
mode: BoolMode,
projection: Option<&[&str]>,
) -> Result<Vec<RecordBatch>, InfinoError> {
self.reader()
.bm25_search(column, query, k, mode, projection)
.map_err(InfinoError::from)
}
pub fn token_match(
&self,
column: &str,
query: &str,
mode: BoolMode,
projection: Option<&[&str]>,
) -> Result<Vec<RecordBatch>, InfinoError> {
let reader = self.reader();
let hits = reader
.token_match(column, query, mode)
.map_err(InfinoError::from)?;
let batch = self
.block_on_query(resolve_hits_named(
&reader,
&hits,
projection,
"token_match",
))
.map_err(|e| InfinoError::Query(e.to_string()))?;
Ok(vec![batch])
}
pub fn exact_match(
&self,
column: &str,
value: &str,
projection: Option<&[&str]>,
) -> Result<Vec<RecordBatch>, InfinoError> {
let reader = self.reader();
let hits = reader
.exact_match(column, value)
.map_err(InfinoError::from)?;
let batch = self
.block_on_query(resolve_hits_named(
&reader,
&hits,
projection,
"exact_match",
))
.map_err(|e| InfinoError::Query(e.to_string()))?;
Ok(vec![batch])
}
pub fn count(&self, column: &str, query: &str, mode: BoolMode) -> Result<u64, InfinoError> {
self.reader()
.count(column, query, mode)
.map_err(InfinoError::from)
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, future::Future, sync::Arc};
use arrow_array::{Decimal128Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use datafusion::prelude::{col, lit};
use tokio::runtime::Builder;
use super::{BoolMode, FanOut, build_work_units, fanout_for};
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
superfile::{
SuperfileReader,
builder::{BuilderOptions, FtsConfig, SuperfileBuilder},
},
supertable::{
Supertable, SupertableOptions,
error::QueryError,
options::{DECIMAL128_PRECISION, DECIMAL128_SCALE},
},
test_helpers::default_tokenizer as tok,
};
fn block_on<F: Future>(fut: F) -> F::Output {
Builder::new_current_thread()
.enable_all()
.build()
.expect("test runtime")
.block_on(fut)
}
fn schema_id_title() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]))
}
fn options_one_superfile_per_commit() -> SupertableOptions {
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("pool"),
);
SupertableOptions::new(
schema_id_title(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tok()),
)
.expect("valid options")
.with_writer_pool(pool)
}
fn build_batch(_start: u64, titles: &[&str]) -> RecordBatch {
let titles_arr = LargeStringArray::from(titles.to_vec());
RecordBatch::try_new(schema_id_title(), vec![Arc::new(titles_arr)]).expect("batch")
}
fn build_oracle_superfile(titles: &[&str]) -> Arc<SuperfileReader> {
let schema = Arc::new(Schema::new(vec![
Field::new(
"_id",
DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE),
false,
),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(
schema.clone(),
"_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tok()),
);
let mut b = SuperfileBuilder::new(opts).expect("builder");
let n = titles.len();
let ids = Decimal128Array::from((0..n as i128).collect::<Vec<_>>())
.with_precision_and_scale(DECIMAL128_PRECISION, DECIMAL128_SCALE)
.expect("decimal128");
let titles_arr = LargeStringArray::from(titles.to_vec());
let batch =
RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(titles_arr)]).expect("batch");
b.add_batch(&batch, &[]).expect("add_batch");
let bytes = Bytes::from(b.finish().expect("finish"));
Arc::new(SuperfileReader::open(bytes).expect("open"))
}
#[test]
fn negation_excludes_across_superfiles() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha beta", "alpha gamma"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_batch(2, &["alpha delta"])).expect("append");
w.commit().expect("commit");
w.append(&build_batch(3, &["beta gamma"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r
.bm25_hits("title", "alpha -beta", 10, BoolMode::Or)
.expect("negation search");
assert_eq!(hits.len(), 2, "alpha minus beta: {hits:?}");
let hits = r
.bm25_hits("title", "alpha", 10, BoolMode::Or)
.expect("positive search");
assert_eq!(hits.len(), 3);
}
#[test]
fn negated_term_does_not_prune_superfiles() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha one", "alpha two"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_batch(2, &["alpha delta"])).expect("append");
w.commit().expect("commit");
w.append(&build_batch(3, &["gamma three"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r
.bm25_hits("title", "alpha -delta", 10, BoolMode::And)
.expect("negation search");
assert_eq!(hits.len(), 2, "alpha minus delta: {hits:?}");
}
#[test]
fn negation_only_query_errors() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha beta"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let res = r.bm25_hits("title", "-alpha", 10, BoolMode::Or);
assert!(res.is_err(), "negation-only must error; got {res:?}");
}
#[test]
fn bm25_search_empty_supertable_returns_empty_without_store_calls() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let r = st.reader();
let hits = r
.bm25_hits("title", "rust", 5, BoolMode::Or)
.expect("query");
assert!(hits.is_empty());
}
#[test]
fn bm25_search_k_zero_short_circuits() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["rust async"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r
.bm25_hits("title", "rust", 0, BoolMode::Or)
.expect("query");
assert!(hits.is_empty());
}
#[test]
fn bm25_search_returns_descending_score_order() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(
0,
&[
"rust rust rust async",
"rust async runtime",
"rust embedded",
"python data",
],
))
.expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r
.bm25_hits("title", "rust", 4, BoolMode::Or)
.expect("query");
assert_eq!(hits.len(), 3);
for w in hits.windows(2) {
assert!(w[0].score >= w[1].score);
}
}
#[test]
fn bm25_search_carries_superfile_uri_for_each_hit() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["rust rust async"])).expect("a1");
w.commit().expect("c1");
w.append(&build_batch(10, &["rust runtime"])).expect("a2");
w.commit().expect("c2");
let r = st.reader();
assert_eq!(r.n_superfiles(), 2);
let hits = r
.bm25_hits("title", "rust", 5, BoolMode::Or)
.expect("query");
assert_eq!(hits.len(), 2);
let mut uris: Vec<_> = hits.iter().map(|h| h.superfile).collect();
uris.sort();
let expected: Vec<_> = {
let mut v: Vec<_> = r.manifest().superfiles.iter().map(|e| e.uri).collect();
v.sort();
v
};
assert_eq!(uris, expected);
}
#[test]
fn bm25_search_oracle_top_k_set_matches_single_superfile() {
let titles = vec![
"lookup nimblefox special token", "ordinary common everyday text", "more usual filler corpus copy", "something boring without it", "mid corpus another nimblefox row", "generic page that adds nothing", "another stuffer no rare terms", "more padding here for filler", "tail nimblefox final superfile", "another tail row", "yet another normal title", "wrapping up the corpus today", ];
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
for chunk_start in (0..titles.len()).step_by(4) {
let end = (chunk_start + 4).min(titles.len());
let chunk = &titles[chunk_start..end];
w.append(&build_batch(chunk_start as u64, chunk))
.expect("append");
w.commit().expect("commit");
}
assert_eq!(st.reader().n_superfiles(), 3);
let oracle = build_oracle_superfile(&titles);
let oracle_hits = block_on(oracle.bm25_hits_async("title", "nimblefox", 5, BoolMode::Or))
.expect("oracle");
assert_eq!(oracle_hits.len(), 3);
let oracle_set: HashSet<u32> = oracle_hits.iter().map(|(d, _)| *d).collect();
assert_eq!(oracle_set, [0u32, 4, 8].iter().copied().collect());
let st_reader = st.reader();
let st_hits = st_reader
.bm25_hits("title", "nimblefox", 5, BoolMode::Or)
.expect("supertable query");
assert_eq!(st_hits.len(), 3);
let manifest = st_reader.manifest();
let st_globals: HashSet<u32> = st_hits
.iter()
.map(|h| {
let seg_idx = manifest
.superfiles
.iter()
.position(|e| e.uri == h.superfile)
.expect("superfile in manifest");
(seg_idx as u32) * 4 + h.local_doc_id
})
.collect();
assert_eq!(st_globals, oracle_set);
}
#[test]
fn bm25_search_prefix_oracle_top_k_set_matches_single_superfile() {
let titles = vec![
"rust async runtime",
"rust embedded systems",
"ruby gemfile config",
"rustacean conference",
"python machine learning",
"python web framework",
"rusty pipe rebuild",
"go concurrency model",
];
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
for chunk_start in (0..titles.len()).step_by(2) {
let end = (chunk_start + 2).min(titles.len());
let chunk = &titles[chunk_start..end];
w.append(&build_batch(chunk_start as u64, chunk))
.expect("append");
w.commit().expect("commit");
}
let oracle = build_oracle_superfile(&titles);
let oracle_hits = block_on(oracle.bm25_search_prefix("title", "rust", 5)).expect("oracle");
let oracle_globals: HashSet<u32> = oracle_hits.iter().map(|(d, _)| *d).collect();
let st_reader = st.reader();
let st_hits = st_reader
.bm25_search_prefix("title", "rust", 5)
.expect("supertable query");
let manifest = st_reader.manifest();
let st_globals: HashSet<u32> = st_hits
.iter()
.map(|h| {
let seg_idx = manifest
.superfiles
.iter()
.position(|e| e.uri == h.superfile)
.expect("superfile in manifest");
(seg_idx as u32) * 2 + h.local_doc_id
})
.collect();
assert_eq!(st_hits.len(), oracle_hits.len());
assert_eq!(st_globals, oracle_globals);
assert!(st_hits.len() >= 4);
}
#[test]
fn bm25_search_prefix_unmatched_prefix_returns_empty() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["rust async"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r.bm25_search_prefix("title", "zzzz", 10).expect("query");
assert!(hits.is_empty());
}
#[test]
fn bm25_search_prefix_lowercases_input() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["Rust async runtime"]))
.expect("append");
w.commit().expect("commit");
let r = st.reader();
let hits = r.bm25_search_prefix("title", "RUST", 5).expect("query");
assert_eq!(hits.len(), 1);
}
#[test]
fn bm25_search_unknown_column_errors() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["rust"])).expect("append");
w.commit().expect("commit");
let r = st.reader();
let err = r
.bm25_hits("missing_column", "rust", 5, BoolMode::Or)
.expect_err("expected error");
assert!(matches!(err, QueryError::Parquet(_)), "got {err:?}");
}
#[test]
fn bm25_search_results_global_top_k_caps_at_k() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
for i in 0..4 {
w.append(&build_batch(i * 10, &["rust async runtime"]))
.expect("a");
w.commit().expect("c");
}
let r = st.reader();
let hits = r
.bm25_hits("title", "rust", 2, BoolMode::Or)
.expect("query");
assert_eq!(hits.len(), 2);
}
fn seeded_three_doc_supertable() -> Supertable {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(
0,
&["the quick brown fox", "a lazy dog", "quick thinking"],
))
.expect("append");
w.commit().expect("commit");
st
}
#[test]
fn supertable_bm25_search_rows_default_and_projected() {
let st = seeded_three_doc_supertable();
let bare = st
.bm25_search("title", "fox", 10, BoolMode::Or, None)
.expect("bm25 rows");
assert_eq!(bare.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
assert_eq!(bare[0].num_columns(), 2, "_id + score");
let rows = st
.bm25_search(
"title",
"fox",
10,
BoolMode::Or,
Some(&["_id", "title", "score"]),
)
.expect("bm25 projected rows");
assert_eq!(rows[0].num_columns(), 3);
}
#[test]
fn supertable_token_match_and_exact_match_rows() {
let st = seeded_three_doc_supertable();
let tm = st
.token_match("title", "quick", BoolMode::Or, None)
.expect("token_match");
assert_eq!(tm.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
let em = st
.exact_match("title", "a lazy dog", Some(&["_id", "title"]))
.expect("exact_match");
assert_eq!(em.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
assert_eq!(em[0].num_columns(), 2);
}
#[test]
fn reader_token_match_and_exact_match_hits() {
let st = seeded_three_doc_supertable();
let r = st.reader();
let any = r.token_match("title", "quick", BoolMode::And).expect("tm");
assert_eq!(any.len(), 2);
let none = r.exact_match("title", "!!!").expect("em punctuation");
assert!(none.is_empty());
let one = r.exact_match("title", "quick thinking").expect("em");
assert_eq!(one.len(), 1);
}
#[test]
fn token_match_empty_query_short_circuits() {
let st = seeded_three_doc_supertable();
let r = st.reader();
let hits = r
.token_match("title", " ", BoolMode::Or)
.expect("tm empty");
assert!(hits.is_empty());
}
#[test]
fn token_match_no_match_returns_empty() {
let st = seeded_three_doc_supertable();
let r = st.reader();
let hits = r
.token_match("title", "nonexistentterm", BoolMode::Or)
.expect("tm");
assert!(hits.is_empty());
}
#[test]
fn fanout_for_only_multi_term_or_without_negation_subranges() {
assert!(matches!(
fanout_for(BoolMode::Or, 2, false),
FanOut::SubRanges
));
assert!(matches!(
fanout_for(BoolMode::Or, 1, false),
FanOut::PerSuperfile
));
assert!(matches!(
fanout_for(BoolMode::Or, 2, true),
FanOut::PerSuperfile
));
assert!(matches!(
fanout_for(BoolMode::And, 2, false),
FanOut::PerSuperfile
));
}
#[test]
fn build_work_units_per_superfile_is_one_unranged_unit_each() {
use std::collections::HashMap;
use uuid::Uuid;
use crate::supertable::manifest::{SuperfileEntry, SuperfileUri};
fn entry(n_docs: u64) -> Arc<SuperfileEntry> {
let id = Uuid::new_v4();
Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs,
id_min: 0,
id_max: n_docs.saturating_sub(1) as i128,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
let e0 = entry(100);
let e1 = entry(200);
let kept = vec![&e0, &e1];
let units = build_work_units(&kept, FanOut::PerSuperfile, 8);
assert_eq!(units.len(), 2);
assert!(units.iter().all(|u| u.range.is_none()));
let units = build_work_units(&kept, FanOut::SubRanges, 1);
assert_eq!(units.len(), 2);
assert!(units.iter().all(|u| u.range.is_none()));
let units = build_work_units(&kept, FanOut::SubRanges, 16);
assert_eq!(units.len(), 2);
assert!(units.iter().all(|u| u.range.is_none()));
}
#[test]
fn build_work_units_slices_large_superfiles_when_threads_spare() {
use std::collections::HashMap;
use uuid::Uuid;
use crate::supertable::manifest::{SuperfileEntry, SuperfileUri};
let id = Uuid::new_v4();
let big = Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs: 200_000,
id_min: 0,
id_max: 199_999,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
});
let kept = vec![&big];
let units = build_work_units(&kept, FanOut::SubRanges, 4);
assert!(units.len() > 1, "large superfile sliced into sub-ranges");
let mut cursor = 0u32;
for u in &units {
let (start, end) = u.range.expect("ranged unit");
assert_eq!(start, cursor);
cursor = end;
}
assert_eq!(cursor, 200_000, "sub-ranges tile the whole superfile");
}
#[test]
fn count_single_term_sums_df_across_superfiles() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha beta", "alpha gamma"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_batch(2, &["alpha delta"])).expect("append");
w.commit().expect("commit");
w.append(&build_batch(3, &["beta gamma"])).expect("append");
w.commit().expect("commit");
assert_eq!(st.count("title", "alpha", BoolMode::Or).expect("count"), 3);
assert_eq!(st.count("title", "beta", BoolMode::Or).expect("count"), 2);
assert_eq!(st.count("title", "gamma", BoolMode::Or).expect("count"), 2);
assert_eq!(st.count("title", "absent", BoolMode::Or).expect("count"), 0);
}
#[test]
fn count_multi_term_sums_across_superfiles() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha beta", "alpha gamma"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_batch(2, &["beta gamma", "delta"]))
.expect("append");
w.commit().expect("commit");
w.append(&build_batch(4, &["alpha delta", "beta"]))
.expect("append");
w.commit().expect("commit");
assert_eq!(st.count("title", "alpha beta", BoolMode::Or).expect("c"), 5);
assert_eq!(
st.count("title", "gamma delta", BoolMode::Or).expect("c"),
4
);
assert_eq!(
st.count("title", "alpha beta", BoolMode::And).expect("c"),
1
);
assert_eq!(
st.count("title", "alpha delta", BoolMode::And).expect("c"),
1
);
let r = st.reader();
for (q, mode) in [
("alpha beta", BoolMode::Or),
("gamma delta", BoolMode::Or),
("alpha beta", BoolMode::And),
("alpha delta", BoolMode::And),
] {
let c = r.count("title", q, mode).expect("count");
let n = r.token_match("title", q, mode).expect("token_match").len() as u64;
assert_eq!(c, n, "count vs token_match for {q:?} {mode:?}");
}
}
#[test]
fn count_honors_or_and_modes() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(
0,
&["alpha beta", "alpha gamma", "beta delta"],
))
.expect("append");
w.commit().expect("commit");
assert_eq!(
st.count("title", "alpha delta", BoolMode::Or).expect("c"),
3
);
assert_eq!(
st.count("title", "alpha beta", BoolMode::And).expect("c"),
1
);
assert_eq!(
st.count("title", "gamma delta", BoolMode::And).expect("c"),
0
);
}
#[test]
fn count_agrees_with_token_match_len() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(
0,
&["alpha beta", "alpha gamma", "beta delta"],
))
.expect("append");
w.commit().expect("commit");
let r = st.reader();
for (q, mode) in [
("alpha", BoolMode::Or),
("alpha delta", BoolMode::Or),
("alpha beta", BoolMode::And),
] {
let c = r.count("title", q, mode).expect("count");
let n = r.token_match("title", q, mode).expect("token_match").len() as u64;
assert_eq!(c, n, "count vs token_match for {q:?} {mode:?}");
}
}
#[test]
fn count_empty_query_and_empty_supertable_are_zero() {
let st = Supertable::create(options_one_superfile_per_commit()).expect("create");
assert_eq!(st.count("title", "alpha", BoolMode::Or).expect("c"), 0);
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha beta"])).expect("append");
w.commit().expect("commit");
assert_eq!(st.count("title", "", BoolMode::Or).expect("c"), 0);
assert_eq!(st.count("title", " ", BoolMode::Or).expect("c"), 0);
}
#[test]
fn count_excludes_tombstoned_docs() {
let dir = tempfile::TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(options_one_superfile_per_commit().with_storage(storage))
.expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, &["alpha one", "alpha two", "alpha three"]))
.expect("append");
w.commit().expect("commit");
drop(w);
assert_eq!(st.count("title", "alpha", BoolMode::Or).expect("count"), 3);
let stats = st
.delete(col("title").eq(lit("alpha two")))
.expect("delete");
assert_eq!(stats.matched(), 1);
assert_eq!(
st.count("title", "alpha", BoolMode::Or)
.expect("count after delete"),
2
);
}
}