use std::sync::Arc;
use datafusion::scalar::ScalarValue;
use super::skip::{
ScalarPredicate, fts_bloom_skip, fts_prefix_skip, scalar_skip, scalar_value_may_match,
};
use crate::{
superfile::fts::reader::BoolMode,
supertable::{
error::QueryError,
manifest::{
ManifestSnapshot, SuperfileEntry,
list::Manifest,
list_prune::{prune_parts_for_fts_prefix, prune_parts_for_fts_terms},
part::PartId,
},
},
};
pub(crate) enum PruneLeaf {
TermPresence {
column: String,
terms: Vec<String>,
mode: BoolMode,
},
Prefix { column: String, prefix: Vec<u8> },
Scalar(ScalarPredicate),
}
impl PruneLeaf {
pub(crate) fn keep_parts(&self, list: &Manifest) -> Option<Vec<PartId>> {
match self {
PruneLeaf::TermPresence {
column,
terms,
mode,
} => {
let refs: Vec<&str> = terms.iter().map(|s| s.as_str()).collect();
Some(prune_parts_for_fts_terms(list, column, &refs, *mode))
}
PruneLeaf::Prefix { column, prefix } => {
Some(prune_parts_for_fts_prefix(list, column, prefix))
}
PruneLeaf::Scalar(pred) => Some(scalar_keep_parts(list, pred)),
}
}
}
fn scalar_keep_parts(list: &Manifest, pred: &ScalarPredicate) -> Vec<PartId> {
list.parts
.iter()
.filter_map(|entry| {
let keep = match entry.scalar_stats_agg.get(&pred.column) {
None => true,
Some(agg) => {
match (
ScalarValue::try_from_array(agg.min.as_ref(), 0).ok(),
ScalarValue::try_from_array(agg.max.as_ref(), 0).ok(),
) {
(Some(min), Some(max)) => {
scalar_value_may_match(&min, &max, pred.op, &pred.value)
}
_ => true,
}
}
};
keep.then_some(entry.part_id)
})
.collect()
}
pub(crate) async fn select_superfiles(
manifest: &ManifestSnapshot,
leaves: &[PruneLeaf],
) -> Result<Vec<Arc<SuperfileEntry>>, QueryError> {
let superfiles = manifest
.get_pruned_superfiles(leaves)
.await
.map_err(QueryError::ManifestLoad)?;
if superfiles.is_empty() {
return Ok(Vec::new());
}
let mut mask = vec![true; superfiles.len()];
let scalar_preds: Vec<ScalarPredicate> = leaves
.iter()
.filter_map(|l| match l {
PruneLeaf::Scalar(p) => Some(p.clone()),
_ => None,
})
.collect();
if !scalar_preds.is_empty() {
and_into(&mut mask, &scalar_skip(&superfiles, &scalar_preds));
}
for leaf in leaves {
match leaf {
PruneLeaf::TermPresence {
column,
terms,
mode,
} => {
let refs: Vec<&str> = terms.iter().map(|s| s.as_str()).collect();
and_into(
&mut mask,
&fts_bloom_skip(&superfiles, column, &refs, *mode),
);
}
PruneLeaf::Prefix { column, prefix } => {
and_into(&mut mask, &fts_prefix_skip(&superfiles, column, prefix));
}
PruneLeaf::Scalar(_) => {}
}
}
Ok(superfiles
.into_iter()
.zip(mask)
.filter_map(|(entry, keep)| keep.then_some(entry))
.collect())
}
fn and_into(dst: &mut [bool], src: &[bool]) {
debug_assert_eq!(dst.len(), src.len());
for (d, s) in dst.iter_mut().zip(src.iter()) {
*d &= *s;
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, slice::from_ref};
use arrow_array::{Int64Array, LargeStringArray};
use arrow_schema::{DataType, Field, Schema};
use uuid::Uuid;
use super::*;
use crate::{
superfile::builder::FtsConfig,
supertable::{
SupertableOptions,
manifest::{
FtsSummaryAgg, ManifestSnapshot, ScalarStatsAgg, SuperfileEntry, SuperfileUri,
aggregates,
bloom::BloomBuilder,
list::{FORMAT_VERSION, Manifest, ManifestPartEntry, PartitionStrategy},
part::{ContentHash, PartId},
},
query::skip::ScalarOp,
},
test_helpers::default_tokenizer,
};
fn seg_int(col: &str, min: i64, max: i64) -> Arc<SuperfileEntry> {
let id = Uuid::new_v4();
let mut cols: HashMap<String, ScalarStatsAgg> = HashMap::new();
cols.insert(
col.to_string(),
ScalarStatsAgg::from_min_max(
Arc::new(Int64Array::from(vec![min])),
Arc::new(Int64Array::from(vec![max])),
),
);
Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs: 1,
id_min: 0,
id_max: 0,
scalar_stats: cols,
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
fn part_from(segs: &[Arc<SuperfileEntry>], seed: u8) -> ManifestPartEntry {
let aggs = aggregates::compute(segs, None);
ManifestPartEntry {
part_id: PartId(Uuid::from_bytes([seed; 16])),
uri: format!("manifests/part-{seed:02x}.avro.zst"),
n_superfiles: segs.len() as u64,
size_bytes_compressed: 1,
size_bytes_uncompressed: 1,
content_hash: ContentHash([seed; 32]),
partition_key: Vec::new(),
id_range: aggs.id_range,
scalar_stats_agg: aggs.scalar_stats_agg,
fts_summary_agg: aggs.fts_summary_agg,
vector_summary_agg: aggs.vector_summary_agg,
}
}
fn list_with(parts: Vec<ManifestPartEntry>) -> Manifest {
Manifest {
format_version: FORMAT_VERSION.into(),
manifest_id: 1,
options_hash: ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 64,
},
parts,
}
}
fn pred(col: &str, op: ScalarOp, v: i64) -> ScalarPredicate {
ScalarPredicate {
column: col.to_string(),
op,
value: ScalarValue::Int64(Some(v)),
}
}
#[test]
fn scalar_keep_parts_prunes_non_overlapping_part() {
let p0 = part_from(&[seg_int("x", 0, 10)], 0);
let p1 = part_from(&[seg_int("x", 100, 110)], 1);
let list = list_with(vec![p0.clone(), p1.clone()]);
assert_eq!(
scalar_keep_parts(&list, &pred("x", ScalarOp::Eq, 5)),
vec![p0.part_id]
);
assert_eq!(
scalar_keep_parts(&list, &pred("x", ScalarOp::Eq, 105)),
vec![p1.part_id]
);
assert_eq!(
scalar_keep_parts(&list, &pred("x", ScalarOp::Gt, 50)),
vec![p1.part_id]
);
}
#[test]
fn scalar_keep_parts_keeps_on_missing_column_aggregate() {
let p0 = part_from(&[seg_int("x", 0, 10)], 0);
let list = list_with(vec![p0.clone()]);
assert_eq!(
scalar_keep_parts(&list, &pred("other", ScalarOp::Eq, 5)),
vec![p0.part_id]
);
}
fn opts_title_fts() -> Arc<SupertableOptions> {
let schema = Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]));
let tk = default_tokenizer();
Arc::new(
SupertableOptions::new(
schema,
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts"),
)
}
fn seg_title(titles: &[&str]) -> Arc<SuperfileEntry> {
let mut sorted = titles.to_vec();
sorted.sort();
let (mn, mx) = (sorted[0], sorted[sorted.len() - 1]);
let mut cols: HashMap<String, ScalarStatsAgg> = HashMap::new();
cols.insert(
"title".to_string(),
ScalarStatsAgg::from_min_max(
Arc::new(LargeStringArray::from(vec![mn])),
Arc::new(LargeStringArray::from(vec![mx])),
),
);
let mut bb = BloomBuilder::new();
for t in titles {
bb.insert(t.as_bytes());
}
let mut fts = HashMap::new();
fts.insert(
"title".to_string(),
FtsSummaryAgg::new_with_params(
bb.finish(),
titles.len() as u32,
(mn.as_bytes().to_vec(), mx.as_bytes().to_vec()),
),
);
let id = Uuid::new_v4();
Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs: titles.len() as u64,
id_min: 0,
id_max: 0,
scalar_stats: cols,
fts_summary: fts,
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
#[tokio::test]
async fn fts_bloom_prunes_superfile_that_scalar_minmax_cannot() {
let a = seg_title(&["apple", "zebra"]);
let b = seg_title(&["kiwi", "mango"]);
let manifest =
ManifestSnapshot::empty(opts_title_fts()).with_appended(vec![a.clone(), b.clone()]);
let scalar_leaf = PruneLeaf::Scalar(ScalarPredicate {
column: "title".into(),
op: ScalarOp::Eq,
value: ScalarValue::Utf8(Some("mango".into())),
});
let term_leaf = PruneLeaf::TermPresence {
column: "title".into(),
terms: vec!["mango".into()],
mode: BoolMode::And,
};
let scalar_only = select_superfiles(&manifest, from_ref(&scalar_leaf))
.await
.expect("select");
assert_eq!(
scalar_only.len(),
2,
"scalar min/max alone cannot prune either superfile"
);
let with_fts = select_superfiles(&manifest, &[scalar_leaf, term_leaf])
.await
.expect("select");
let kept: Vec<_> = with_fts.iter().map(|e| e.superfile_id).collect();
assert_eq!(
kept,
vec![b.superfile_id],
"FTS bloom prunes the superfile plain min/max could not, keeping only the real match"
);
}
fn seg(scalar_min: &str, scalar_max: &str, bloom_tokens: &[&str]) -> Arc<SuperfileEntry> {
let mut cols: HashMap<String, ScalarStatsAgg> = HashMap::new();
cols.insert(
"title".to_string(),
ScalarStatsAgg::from_min_max(
Arc::new(LargeStringArray::from(vec![scalar_min])),
Arc::new(LargeStringArray::from(vec![scalar_max])),
),
);
let mut bb = BloomBuilder::new();
for t in bloom_tokens {
bb.insert(t.as_bytes());
}
let term_range = if bloom_tokens.is_empty() {
(Vec::new(), Vec::new())
} else {
let mut sorted = bloom_tokens.to_vec();
sorted.sort();
(
sorted[0].as_bytes().to_vec(),
sorted[sorted.len() - 1].as_bytes().to_vec(),
)
};
let mut fts = HashMap::new();
fts.insert(
"title".to_string(),
FtsSummaryAgg::new_with_params(bb.finish(), bloom_tokens.len() as u32, term_range),
);
let id = Uuid::new_v4();
Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs: bloom_tokens.len().max(1) as u64,
id_min: 0,
id_max: 0,
scalar_stats: cols,
fts_summary: fts,
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
fn manifest(segs: Vec<Arc<SuperfileEntry>>) -> ManifestSnapshot {
ManifestSnapshot::empty(opts_title_fts()).with_appended(segs)
}
async fn ids(m: &ManifestSnapshot, leaves: &[PruneLeaf]) -> Vec<Uuid> {
select_superfiles(m, leaves)
.await
.expect("select")
.iter()
.map(|e| e.superfile_id)
.collect()
}
fn scalar(col: &str, op: ScalarOp, v: &str) -> PruneLeaf {
PruneLeaf::Scalar(ScalarPredicate {
column: col.into(),
op,
value: ScalarValue::Utf8(Some(v.into())),
})
}
fn eq(col: &str, v: &str) -> PruneLeaf {
scalar(col, ScalarOp::Eq, v)
}
fn term(col: &str, terms: &[&str], mode: BoolMode) -> PruneLeaf {
PruneLeaf::TermPresence {
column: col.into(),
terms: terms.iter().map(|s| s.to_string()).collect(),
mode,
}
}
fn pfx(col: &str, p: &str) -> PruneLeaf {
PruneLeaf::Prefix {
column: col.into(),
prefix: p.as_bytes().to_vec(),
}
}
#[tokio::test]
async fn multi_token_equality_prunes_when_any_token_absent() {
let a = seg("a", "z", &["rust", "tokio"]);
let m = manifest(vec![a.clone()]);
assert_eq!(
ids(&m, &[eq("title", "rust async")]).await,
vec![a.superfile_id]
);
assert!(
ids(
&m,
&[
eq("title", "rust async"),
term("title", &["rust", "async"], BoolMode::And),
],
)
.await
.is_empty(),
"AND-bloom prunes a superfile missing one of the literal's tokens"
);
}
#[tokio::test]
async fn bloom_keeps_only_the_token_holder_across_many_wide_superfiles() {
let s1 = seg("a", "z", &["alpha", "omega"]);
let s2 = seg("a", "z", &["beta", "gamma"]);
let hit = seg("a", "z", &["mango", "kiwi"]);
let s3 = seg("a", "z", &["delta", "sigma"]);
let m = manifest(vec![s1, s2, hit.clone(), s3]);
assert_eq!(
ids(&m, &[eq("title", "mango")]).await.len(),
4,
"min/max cannot prune any wide-range superfile"
);
assert_eq!(
ids(
&m,
&[
eq("title", "mango"),
term("title", &["mango"], BoolMode::And)
],
)
.await,
vec![hit.superfile_id],
"bloom keeps exactly the superfile that holds the token"
);
}
#[tokio::test]
async fn prefix_leaf_prunes_by_term_range() {
let outside = seg("a", "z", &["apple", "banana"]);
let inside = seg("a", "z", &["rustic", "rusty"]);
let m = manifest(vec![outside, inside.clone()]);
assert_eq!(
ids(&m, &[pfx("title", "rust")]).await,
vec![inside.superfile_id]
);
}
#[tokio::test]
async fn term_presence_or_keeps_any_match() {
let a = seg("a", "z", &["alpha", "beta"]);
let b = seg("a", "z", &["gamma", "delta"]);
let m = manifest(vec![a.clone(), b]);
assert_eq!(
ids(&m, &[term("title", &["alpha", "missing"], BoolMode::Or)]).await,
vec![a.superfile_id],
"OR keeps a superfile with any matching term, prunes one with none"
);
}
#[tokio::test]
async fn scalar_conjunction_prunes_outside_range() {
let lo = seg("a", "c", &[]);
let mid = seg("m", "o", &[]);
let hi = seg("x", "z", &[]);
let m = manifest(vec![lo, mid.clone(), hi]);
assert_eq!(
ids(
&m,
&[
scalar("title", ScalarOp::GtEq, "m"),
scalar("title", ScalarOp::LtEq, "p"),
],
)
.await,
vec![mid.superfile_id]
);
}
#[tokio::test]
async fn empty_predicate_keeps_all_superfiles() {
let m = manifest(vec![seg("a", "b", &[]), seg("c", "d", &[])]);
assert_eq!(ids(&m, &[]).await.len(), 2, "no leaves → full scan");
}
#[tokio::test]
async fn unknown_column_leaves_never_prune() {
let a = seg("a", "z", &["x"]);
let m = manifest(vec![a.clone()]);
assert_eq!(
ids(&m, &[eq("missing", "v")]).await,
vec![a.superfile_id],
"scalar on a column with no stats keeps the superfile"
);
assert_eq!(
ids(&m, &[term("missing", &["v"], BoolMode::And)]).await,
vec![a.superfile_id],
"term presence on a column with no FTS summary keeps the superfile"
);
}
#[tokio::test]
async fn superfile_holding_all_tokens_is_never_dropped() {
let a = seg("a", "z", &["rust", "async", "tokio"]);
let m = manifest(vec![a.clone()]);
assert_eq!(
ids(
&m,
&[
eq("title", "rust async"),
term("title", &["rust", "async"], BoolMode::And),
],
)
.await,
vec![a.superfile_id],
"a superfile whose terms cover the literal is always kept"
);
}
}