use std::{cmp::Ordering, sync::Arc};
use datafusion::scalar::ScalarValue;
use crate::{
superfile::{
fts::reader::BoolMode,
vector::distance::{Metric, distance},
},
supertable::manifest::{ManifestSnapshot, SuperfileEntry},
};
pub fn fts_bloom_skip(
superfiles: &[Arc<SuperfileEntry>],
column: &str,
query_terms: &[&str],
mode: BoolMode,
) -> Vec<bool> {
if query_terms.is_empty() {
return vec![true; superfiles.len()];
}
superfiles
.iter()
.map(|entry| match entry.fts_summary.get(column) {
None => true,
Some(summary) => match mode {
BoolMode::Or => query_terms
.iter()
.any(|t| summary.may_contain(t.as_bytes())),
BoolMode::And => query_terms
.iter()
.all(|t| summary.may_contain(t.as_bytes())),
},
})
.collect()
}
pub fn fts_prefix_skip(
superfiles: &[Arc<SuperfileEntry>],
column: &str,
prefix: &[u8],
) -> Vec<bool> {
if prefix.is_empty() {
return vec![true; superfiles.len()];
}
superfiles
.iter()
.map(|entry| match entry.fts_summary.get(column) {
None => true,
Some(summary) => summary.may_match_prefix(prefix),
})
.collect()
}
pub fn vector_centroid_skip(
manifest: &ManifestSnapshot,
_column: &str,
_query: &[f32],
) -> Vec<bool> {
vec![true; manifest.superfiles.len()]
}
pub fn superfiles_sorted_by_centroid_distance(
manifest: &ManifestSnapshot,
column: &str,
query: &[f32],
metric: Metric,
) -> Vec<usize> {
let mut scored: Vec<(usize, f32)> = manifest
.superfiles
.iter()
.enumerate()
.map(|(i, entry)| match entry.vector_summary.get(column) {
Some(vs) if vs.centroid.len() == query.len() => {
(i, distance(metric, query, &vs.centroid))
}
_ => (i, f32::INFINITY),
})
.collect();
scored.sort_unstable_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
scored.into_iter().map(|(i, _)| i).collect()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScalarOp {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
}
#[derive(Debug, Clone)]
pub struct ScalarPredicate {
pub column: String,
pub op: ScalarOp,
pub value: ScalarValue,
}
pub fn scalar_skip(
superfiles: &[Arc<SuperfileEntry>],
predicates: &[ScalarPredicate],
) -> Vec<bool> {
if predicates.is_empty() {
return vec![true; superfiles.len()];
}
superfiles
.iter()
.map(|entry| predicates.iter().all(|p| superfile_may_match(entry, p)))
.collect()
}
fn superfile_may_match(entry: &SuperfileEntry, pred: &ScalarPredicate) -> bool {
let Some(agg) = entry.scalar_stats.get(&pred.column) else {
return true;
};
let (Ok(min), Ok(max)) = (
ScalarValue::try_from_array(agg.min.as_ref(), 0),
ScalarValue::try_from_array(agg.max.as_ref(), 0),
) else {
return true;
};
scalar_value_may_match(&min, &max, pred.op, &pred.value)
}
pub(crate) fn scalar_value_may_match(
min: &ScalarValue,
max: &ScalarValue,
op: ScalarOp,
value: &ScalarValue,
) -> bool {
if min.is_null() || max.is_null() {
return true;
}
let v = match value.cast_to(&min.data_type()) {
Ok(v) if !v.is_null() => v,
_ => return true,
};
let cmp_v_min = v.partial_cmp(min);
let cmp_v_max = v.partial_cmp(max);
match op {
ScalarOp::Eq => match (cmp_v_min, cmp_v_max) {
(Some(lo), Some(hi)) => lo != Ordering::Less && hi != Ordering::Greater,
_ => true,
},
ScalarOp::NotEq => {
let constant = min.partial_cmp(max) == Some(Ordering::Equal);
let equals_v = cmp_v_min == Some(Ordering::Equal);
!(constant && equals_v)
}
ScalarOp::Lt => matches!(cmp_v_min, Some(Ordering::Greater) | None),
ScalarOp::LtEq => !matches!(cmp_v_min, Some(Ordering::Less)),
ScalarOp::Gt => matches!(cmp_v_max, Some(Ordering::Less) | None),
ScalarOp::GtEq => !matches!(cmp_v_max, Some(Ordering::Greater)),
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_array::{ArrayRef, Int64Array, LargeStringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion::scalar::ScalarValue;
use uuid::Uuid;
use super::*;
use crate::{
superfile::{
builder::{FtsConfig, VectorConfig},
vector::{distance::Metric, rerank_codec::RerankCodec},
},
supertable::{
SupertableOptions,
manifest::{
ClusterCentroids, FtsSummaryAgg, ManifestSnapshot, ScalarStatsAgg, SuperfileEntry,
SuperfileUri, VectorSummary, bloom::BloomBuilder,
},
},
test_helpers::default_tokenizer,
};
fn opts_simple() -> Arc<SupertableOptions> {
let schema = Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]));
let tk = default_tokenizer();
Arc::new(
SupertableOptions::new(
schema,
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts"),
)
}
fn opts_with_vector() -> Arc<SupertableOptions> {
let dim = 16;
let schema = Arc::new(Schema::new(vec![Field::new(
"emb",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
),
false,
)]));
Arc::new(
SupertableOptions::new(
schema,
vec![],
vec![VectorConfig {
column: "emb".into(),
dim,
n_cent: 4,
rot_seed: 0,
metric: Metric::Cosine,
rerank_codec: RerankCodec::Fp32,
}],
None,
)
.expect("opts"),
)
}
fn empty_superfile() -> SuperfileEntry {
let uri = SuperfileUri::new_v4();
SuperfileEntry {
superfile_id: Uuid::new_v4(),
uri,
n_docs: 0,
id_min: 0,
id_max: 0,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
}
}
fn fts_summary_with(column: &str, terms: &[&str]) -> (String, FtsSummaryAgg) {
let mut bb = BloomBuilder::new();
for t in terms {
bb.insert(t.as_bytes());
}
let term_range = match (terms.first(), terms.last()) {
(Some(min), Some(max)) => (min.as_bytes().to_vec(), max.as_bytes().to_vec()),
_ => (Vec::new(), Vec::new()),
};
let summary = FtsSummaryAgg::new_with_params(bb.finish(), terms.len() as u32, term_range);
(column.to_string(), summary)
}
fn superfile_with_terms(column: &str, terms: &[&str]) -> Arc<SuperfileEntry> {
let mut e = empty_superfile();
let (k, v) = fts_summary_with(column, terms);
e.fts_summary.insert(k, v);
Arc::new(e)
}
fn superfile_with_centroid(
column: &str,
centroid: Vec<f32>,
radius: f32,
) -> Arc<SuperfileEntry> {
let mut e = empty_superfile();
e.vector_summary.insert(
column.to_string(),
VectorSummary {
centroid,
radius,
clusters: ClusterCentroids::empty(),
},
);
Arc::new(e)
}
#[test]
fn bloom_skip_keeps_superfiles_with_any_query_term_in_or_mode() {
let s_a = superfile_with_terms("title", &["alpha", "beta"]);
let s_b = superfile_with_terms("title", &["gamma", "delta"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s_a, s_b]);
let mask = fts_bloom_skip(&m.superfiles, "title", &["alpha", "missing"], BoolMode::Or);
assert_eq!(mask, vec![true, false]);
}
#[test]
fn bloom_skip_requires_all_terms_present_in_and_mode() {
let s_a = superfile_with_terms("title", &["alpha", "beta"]);
let s_b = superfile_with_terms("title", &["alpha", "gamma"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s_a, s_b]);
let mask = fts_bloom_skip(&m.superfiles, "title", &["alpha", "beta"], BoolMode::And);
assert_eq!(mask, vec![true, false]);
}
#[test]
fn bloom_skip_unknown_column_keeps_all() {
let s = superfile_with_terms("title", &["alpha"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_bloom_skip(&m.superfiles, "no_such_column", &["alpha"], BoolMode::Or);
assert_eq!(mask, vec![true]);
}
#[test]
fn bloom_skip_empty_terms_keeps_all() {
let s = superfile_with_terms("title", &["alpha"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_bloom_skip(&m.superfiles, "title", &[], BoolMode::Or);
assert_eq!(mask, vec![true]);
}
#[test]
fn bloom_skip_with_no_superfiles_returns_empty_vec() {
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![]);
let mask = fts_bloom_skip(&m.superfiles, "title", &["alpha"], BoolMode::Or);
assert!(mask.is_empty());
}
#[test]
fn prefix_skip_prunes_superfiles_outside_prefix_range() {
let s_a = superfile_with_terms("title", &["apple", "banana"]);
let s_b = superfile_with_terms("title", &["python", "rust"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s_a, s_b]);
let mask = fts_prefix_skip(&m.superfiles, "title", b"rust");
assert_eq!(mask, vec![false, true]);
}
#[test]
fn prefix_skip_keeps_superfiles_with_matching_prefix_inside_range() {
let s = superfile_with_terms("title", &["rusting", "rusty"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_prefix_skip(&m.superfiles, "title", b"rust");
assert_eq!(mask, vec![true]);
}
#[test]
fn prefix_skip_empty_prefix_keeps_all() {
let s = superfile_with_terms("title", &["alpha"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_prefix_skip(&m.superfiles, "title", b"");
assert_eq!(mask, vec![true]);
}
#[test]
fn prefix_skip_unknown_column_keeps_all() {
let s = superfile_with_terms("title", &["alpha"]);
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_prefix_skip(&m.superfiles, "no_such_column", b"alp");
assert_eq!(mask, vec![true]);
}
#[test]
fn prefix_skip_zero_term_superfile_pruned() {
let s = Arc::new(empty_superfile());
let m = ManifestSnapshot::new_from_superfiles(opts_simple(), vec![s]);
let mask = fts_prefix_skip(&m.superfiles, "title", b"rust");
assert_eq!(mask, vec![true]);
}
#[test]
fn vector_centroid_skip_v1_keeps_all_superfiles() {
let s_a = superfile_with_centroid("emb", vec![0.0; 16], 0.5);
let s_b = superfile_with_centroid("emb", vec![10.0; 16], 0.5);
let m = ManifestSnapshot::new_from_superfiles(opts_with_vector(), vec![s_a, s_b]);
let q = vec![0.0f32; 16];
let mask = vector_centroid_skip(&m, "emb", &q);
assert_eq!(mask, vec![true, true]);
}
#[test]
fn superfiles_sorted_by_centroid_distance_orders_by_metric() {
let opts = opts_with_vector();
let near = superfile_with_centroid(
"emb",
{
let mut v = vec![0.0f32; 16];
v[0] = 1.0;
v
},
0.0,
);
let far = superfile_with_centroid(
"emb",
{
let mut v = vec![0.0f32; 16];
v[7] = 1.0;
v
},
0.0,
);
let m = ManifestSnapshot::new_from_superfiles(opts, vec![far.clone(), near.clone()]);
let q = {
let mut v = vec![0.0f32; 16];
v[0] = 1.0;
v
};
let order = superfiles_sorted_by_centroid_distance(&m, "emb", &q, Metric::L2Sq);
assert_eq!(order, vec![1, 0]);
}
#[test]
fn superfiles_sorted_by_centroid_distance_pushes_missing_summary_to_end() {
let with_v = superfile_with_centroid("emb", vec![1.0f32; 16], 0.0);
let without_v = Arc::new(empty_superfile());
let m = ManifestSnapshot::new_from_superfiles(opts_with_vector(), vec![without_v, with_v]);
let q = vec![1.0f32; 16];
let order = superfiles_sorted_by_centroid_distance(&m, "emb", &q, Metric::L2Sq);
assert_eq!(order, vec![1, 0]);
}
fn seg_with_int_stats(col: &str, min: i64, max: i64) -> Arc<SuperfileEntry> {
let mut e = empty_superfile();
let mn: ArrayRef = Arc::new(Int64Array::from(vec![min]));
let mx: ArrayRef = Arc::new(Int64Array::from(vec![max]));
e.scalar_stats
.insert(col.to_string(), ScalarStatsAgg::from_min_max(mn, mx));
Arc::new(e)
}
fn seg_with_str_stats(col: &str, min: &str, max: &str) -> Arc<SuperfileEntry> {
let mut e = empty_superfile();
let mn: ArrayRef = Arc::new(LargeStringArray::from(vec![min]));
let mx: ArrayRef = Arc::new(LargeStringArray::from(vec![max]));
e.scalar_stats
.insert(col.to_string(), ScalarStatsAgg::from_min_max(mn, mx));
Arc::new(e)
}
fn pred(column: &str, op: ScalarOp, value: ScalarValue) -> ScalarPredicate {
ScalarPredicate {
column: column.to_string(),
op,
value,
}
}
#[test]
fn scalar_skip_empty_predicates_keeps_all() {
let segs = vec![
seg_with_int_stats("x", 0, 10),
seg_with_int_stats("x", 100, 110),
];
assert_eq!(scalar_skip(&segs, &[]), vec![true, true]);
}
#[test]
fn scalar_skip_eq_prunes_superfiles_whose_range_excludes_value() {
let segs = vec![
seg_with_int_stats("x", 0, 10),
seg_with_int_stats("x", 100, 110),
];
let mask = scalar_skip(
&segs,
&[pred("x", ScalarOp::Eq, ScalarValue::Int64(Some(5)))],
);
assert_eq!(mask, vec![true, false]);
let mask = scalar_skip(
&segs,
&[pred("x", ScalarOp::Eq, ScalarValue::Int64(Some(105)))],
);
assert_eq!(mask, vec![false, true]);
let mask = scalar_skip(
&segs,
&[pred("x", ScalarOp::Eq, ScalarValue::Int64(Some(10)))],
);
assert_eq!(mask, vec![true, false]);
}
#[test]
fn scalar_skip_range_ops_prune_by_min_or_max() {
let segs = vec![
seg_with_int_stats("x", 0, 10),
seg_with_int_stats("x", 100, 110),
];
assert_eq!(
scalar_skip(
&segs,
&[pred("x", ScalarOp::Gt, ScalarValue::Int64(Some(50)))]
),
vec![false, true]
);
assert_eq!(
scalar_skip(
&segs,
&[pred("x", ScalarOp::Lt, ScalarValue::Int64(Some(50)))]
),
vec![true, false]
);
assert_eq!(
scalar_skip(
&segs,
&[pred("x", ScalarOp::GtEq, ScalarValue::Int64(Some(110)))]
),
vec![false, true]
);
assert_eq!(
scalar_skip(
&segs,
&[pred("x", ScalarOp::LtEq, ScalarValue::Int64(Some(0)))]
),
vec![true, false]
);
}
#[test]
fn scalar_skip_conjunction_prunes_when_any_predicate_excludes() {
let segs = vec![seg_with_int_stats("x", 0, 3), seg_with_int_stats("x", 6, 7)];
let preds = [
pred("x", ScalarOp::GtEq, ScalarValue::Int64(Some(5))),
pred("x", ScalarOp::LtEq, ScalarValue::Int64(Some(8))),
];
assert_eq!(scalar_skip(&segs, &preds), vec![false, true]);
}
#[test]
fn scalar_skip_unknown_column_keeps_all() {
let segs = vec![seg_with_int_stats("x", 0, 10)];
let mask = scalar_skip(
&segs,
&[pred("not_a_col", ScalarOp::Eq, ScalarValue::Int64(Some(5)))],
);
assert_eq!(mask, vec![true]);
}
#[test]
fn scalar_skip_coerces_utf8_literal_against_largeutf8_stats() {
let segs = vec![
seg_with_str_stats("name", "apple", "mango"),
seg_with_str_stats("name", "tango", "zulu"),
];
let mask = scalar_skip(
&segs,
&[pred(
"name",
ScalarOp::Eq,
ScalarValue::Utf8(Some("banana".into())),
)],
);
assert_eq!(mask, vec![true, false]);
}
#[test]
fn scalar_skip_null_stats_keeps_superfile() {
let mut e = empty_superfile();
let mn: ArrayRef = Arc::new(Int64Array::from(vec![None::<i64>]));
let mx: ArrayRef = Arc::new(Int64Array::from(vec![None::<i64>]));
e.scalar_stats
.insert("x".to_string(), ScalarStatsAgg::from_min_max(mn, mx));
let segs = vec![Arc::new(e)];
let mask = scalar_skip(
&segs,
&[pred("x", ScalarOp::Eq, ScalarValue::Int64(Some(5)))],
);
assert_eq!(mask, vec![true]);
}
#[test]
fn scalar_skip_not_eq_prunes_only_constant_superfile() {
let segs = vec![seg_with_int_stats("x", 5, 5), seg_with_int_stats("x", 5, 9)];
let mask = scalar_skip(
&segs,
&[pred("x", ScalarOp::NotEq, ScalarValue::Int64(Some(5)))],
);
assert_eq!(mask, vec![false, true]);
}
}