use std::{collections::HashSet, sync::Arc};
use datafusion::{
logical_expr::{Expr, Operator},
scalar::ScalarValue,
};
use futures::future::BoxFuture;
use roaring::RoaringBitmap;
use crate::superfile::{
ReadError, SuperfileReader,
fts::{reader::BoolMode, tokenize::Tokenizer},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CandidatePlan {
TermsAll { column: String, tokens: Vec<String> },
And(Vec<CandidatePlan>),
Or(Vec<CandidatePlan>),
Unbounded,
}
impl CandidatePlan {
pub(crate) fn from_filters(
filters: &[Expr],
fts_cols: &HashSet<&str>,
tokenizer: Option<&Arc<dyn Tokenizer>>,
) -> CandidatePlan {
let Some(tok) = tokenizer else {
return CandidatePlan::Unbounded;
};
if fts_cols.is_empty() {
return CandidatePlan::Unbounded;
}
and_combine(
filters
.iter()
.map(|f| lower(f, fts_cols, tok.as_ref()))
.collect(),
)
}
pub(crate) fn evaluate<'a>(
&'a self,
reader: &'a SuperfileReader,
) -> BoxFuture<'a, Result<Option<RoaringBitmap>, ReadError>> {
Box::pin(async move {
match self {
CandidatePlan::Unbounded => Ok(None),
CandidatePlan::TermsAll { column, tokens } => {
let refs: Vec<&str> = tokens.iter().map(String::as_str).collect();
let docs = reader.token_match(column, &refs, BoolMode::And).await?;
Ok(Some(docs.into_iter().collect()))
}
CandidatePlan::And(children) => {
let mut acc: Option<RoaringBitmap> = None;
for c in children {
if let Some(bm) = c.evaluate(reader).await? {
acc = Some(match acc {
Some(a) => a & bm,
None => bm,
});
if acc.as_ref().is_some_and(RoaringBitmap::is_empty) {
return Ok(Some(RoaringBitmap::new()));
}
}
}
Ok(acc)
}
CandidatePlan::Or(children) => {
let mut acc = RoaringBitmap::new();
for c in children {
match c.evaluate(reader).await? {
Some(bm) => acc |= bm,
None => return Ok(None),
}
}
Ok(Some(acc))
}
}
})
}
}
impl CandidatePlan {
pub(crate) fn estimate<'a>(
&'a self,
reader: &'a SuperfileReader,
) -> BoxFuture<'a, Result<u64, ReadError>> {
Box::pin(async move {
let n_docs = reader.n_docs();
match self {
CandidatePlan::Unbounded => Ok(n_docs),
CandidatePlan::TermsAll { column, tokens } => {
if tokens.is_empty() {
return Ok(n_docs);
}
let mut min_df = u64::MAX;
for t in tokens {
min_df = min_df.min(reader.term_df(column, t).await?);
}
Ok(min_df.min(n_docs))
}
CandidatePlan::And(children) => {
let mut m = n_docs;
for c in children {
m = m.min(c.estimate(reader).await?);
}
Ok(m)
}
CandidatePlan::Or(children) => {
let mut sum: u64 = 0;
for c in children {
sum = sum.saturating_add(c.estimate(reader).await?);
}
Ok(sum.min(n_docs))
}
}
})
}
}
fn lower(expr: &Expr, fts_cols: &HashSet<&str>, tok: &dyn Tokenizer) -> CandidatePlan {
match expr {
Expr::BinaryExpr(be) => match be.op {
Operator::And => and_combine(vec![
lower(&be.left, fts_cols, tok),
lower(&be.right, fts_cols, tok),
]),
Operator::Or => or_combine(vec![
lower(&be.left, fts_cols, tok),
lower(&be.right, fts_cols, tok),
]),
Operator::Eq => eq_leaf(&be.left, &be.right, fts_cols, tok),
_ => CandidatePlan::Unbounded,
},
Expr::InList(il) if !il.negated => in_list_leaf(il, fts_cols, tok),
_ => CandidatePlan::Unbounded,
}
}
fn eq_leaf(
left: &Expr,
right: &Expr,
fts_cols: &HashSet<&str>,
tok: &dyn Tokenizer,
) -> CandidatePlan {
let (column, value) = match (left, right) {
(Expr::Column(c), Expr::Literal(v, _)) => (&c.name, v),
(Expr::Literal(v, _), Expr::Column(c)) => (&c.name, v),
_ => return CandidatePlan::Unbounded,
};
terms_all(column, value, fts_cols, tok)
}
fn in_list_leaf(
il: &datafusion::logical_expr::expr::InList,
fts_cols: &HashSet<&str>,
tok: &dyn Tokenizer,
) -> CandidatePlan {
let Expr::Column(c) = il.expr.as_ref() else {
return CandidatePlan::Unbounded;
};
let mut branches = Vec::with_capacity(il.list.len());
for item in &il.list {
let Expr::Literal(v, _) = item else {
return CandidatePlan::Unbounded;
};
branches.push(terms_all(&c.name, v, fts_cols, tok));
}
or_combine(branches)
}
fn terms_all(
column: &str,
value: &ScalarValue,
fts_cols: &HashSet<&str>,
tok: &dyn Tokenizer,
) -> CandidatePlan {
if !fts_cols.contains(column) {
return CandidatePlan::Unbounded;
}
let Some(s) = scalar_str(value) else {
return CandidatePlan::Unbounded;
};
let tokens: Vec<String> = tok.tokenize(s).collect();
if tokens.is_empty() {
return CandidatePlan::Unbounded;
}
CandidatePlan::TermsAll {
column: column.to_owned(),
tokens,
}
}
fn scalar_str(v: &ScalarValue) -> Option<&str> {
match v {
ScalarValue::Utf8(Some(s))
| ScalarValue::LargeUtf8(Some(s))
| ScalarValue::Utf8View(Some(s)) => Some(s.as_str()),
_ => None,
}
}
fn and_combine(children: Vec<CandidatePlan>) -> CandidatePlan {
let mut flat = Vec::with_capacity(children.len());
for c in children {
match c {
CandidatePlan::Unbounded => {}
CandidatePlan::And(inner) => flat.extend(inner),
other => flat.push(other),
}
}
collapse(flat, true)
}
fn or_combine(children: Vec<CandidatePlan>) -> CandidatePlan {
let mut flat = Vec::with_capacity(children.len());
for c in children {
match c {
CandidatePlan::Unbounded => return CandidatePlan::Unbounded,
CandidatePlan::Or(inner) => flat.extend(inner),
other => flat.push(other),
}
}
collapse(flat, false)
}
fn collapse(mut flat: Vec<CandidatePlan>, is_and: bool) -> CandidatePlan {
match flat.len() {
0 => CandidatePlan::Unbounded,
1 => flat.pop().expect("len checked == 1"),
_ if is_and => CandidatePlan::And(flat),
_ => CandidatePlan::Or(flat),
}
}
#[cfg(test)]
mod tests {
use datafusion::{
logical_expr::expr::InList,
prelude::{col, lit},
};
use super::*;
use crate::superfile::fts::tokenize::AsciiLowerTokenizer;
fn fts_cols() -> HashSet<&'static str> {
let mut s = HashSet::new();
s.insert("title");
s
}
fn tok() -> Arc<dyn Tokenizer> {
Arc::new(AsciiLowerTokenizer)
}
fn plan(expr: Expr) -> CandidatePlan {
CandidatePlan::from_filters(&[expr], &fts_cols(), Some(&tok()))
}
#[test]
fn eq_on_fts_column_lowers_to_terms_all() {
let p = plan(col("title").eq(lit("rust async")));
assert_eq!(
p,
CandidatePlan::TermsAll {
column: "title".into(),
tokens: vec!["rust".into(), "async".into()],
}
);
}
#[test]
fn eq_operands_reversed_still_lowers() {
let p = plan(lit("rust").eq(col("title")));
assert_eq!(
p,
CandidatePlan::TermsAll {
column: "title".into(),
tokens: vec!["rust".into()],
}
);
}
#[test]
fn eq_on_non_fts_column_is_unbounded() {
assert_eq!(
plan(col("category").eq(lit("rust"))),
CandidatePlan::Unbounded
);
}
#[test]
fn empty_literal_is_unbounded() {
assert_eq!(plan(col("title").eq(lit(""))), CandidatePlan::Unbounded);
}
#[test]
fn range_op_is_unbounded() {
assert_eq!(plan(col("title").gt(lit("m"))), CandidatePlan::Unbounded);
}
#[test]
fn and_of_fts_and_non_fts_keeps_only_fts_branch() {
let p = plan(
col("title")
.eq(lit("rust"))
.and(col("category").eq(lit("lang"))),
);
assert_eq!(
p,
CandidatePlan::TermsAll {
column: "title".into(),
tokens: vec!["rust".into()],
}
);
}
#[test]
fn and_of_two_fts_equalities_intersects() {
let p = plan(
col("title")
.eq(lit("rust"))
.and(col("title").eq(lit("async"))),
);
match p {
CandidatePlan::And(children) => assert_eq!(children.len(), 2),
other => panic!("expected And, got {other:?}"),
}
}
#[test]
fn or_of_two_fts_equalities_unions() {
let p = plan(
col("title")
.eq(lit("rust"))
.or(col("title").eq(lit("python"))),
);
match p {
CandidatePlan::Or(children) => assert_eq!(children.len(), 2),
other => panic!("expected Or, got {other:?}"),
}
}
#[test]
fn or_with_non_fts_branch_is_unbounded() {
let p = plan(
col("title")
.eq(lit("rust"))
.or(col("category").eq(lit("lang"))),
);
assert_eq!(p, CandidatePlan::Unbounded);
}
#[test]
fn not_is_unbounded() {
assert_eq!(
plan(!col("title").eq(lit("rust"))),
CandidatePlan::Unbounded
);
}
#[test]
fn not_eq_is_unbounded() {
assert_eq!(
plan(col("title").not_eq(lit("rust"))),
CandidatePlan::Unbounded
);
}
#[test]
fn and_with_not_child_keeps_fts_branch() {
let p = plan(
col("title")
.eq(lit("rust"))
.and(!col("title").eq(lit("compiler"))),
);
assert_eq!(
p,
CandidatePlan::TermsAll {
column: "title".into(),
tokens: vec!["rust".into()],
}
);
}
#[test]
fn like_is_unbounded() {
assert_eq!(
plan(col("title").like(lit("rust%"))),
CandidatePlan::Unbounded
);
}
#[test]
fn in_list_on_fts_column_is_or_of_terms_all() {
let expr = Expr::InList(InList::new(
Box::new(col("title")),
vec![lit("rust"), lit("python")],
false,
));
match plan(expr) {
CandidatePlan::Or(children) => {
assert_eq!(children.len(), 2);
assert!(matches!(children[0], CandidatePlan::TermsAll { .. }));
}
other => panic!("expected Or, got {other:?}"),
}
}
#[test]
fn negated_in_list_is_unbounded() {
let expr = Expr::InList(InList::new(Box::new(col("title")), vec![lit("rust")], true));
assert_eq!(plan(expr), CandidatePlan::Unbounded);
}
#[test]
fn no_tokenizer_is_unbounded() {
let p = CandidatePlan::from_filters(&[col("title").eq(lit("rust"))], &fts_cols(), None);
assert_eq!(p, CandidatePlan::Unbounded);
}
}