use crate::btree::key_serde::serialize_datum;
use crate::btree::reader::BTreeIndexReader;
use crate::spec::{DataType, Datum, PredicateOperator};
use roaring::RoaringTreemap;
use std::cmp::Ordering;
use std::io;
#[async_trait::async_trait]
pub trait IndexQuery: Send + Sync {
async fn query(
&self,
op: PredicateOperator,
literals: &[Datum],
data_type: &DataType,
) -> io::Result<RoaringTreemap>;
}
#[async_trait::async_trait]
impl<F> IndexQuery for BTreeIndexReader<F>
where
F: Fn(&[u8], &[u8]) -> Ordering + Send + Sync,
{
async fn query(
&self,
op: PredicateOperator,
literals: &[Datum],
data_type: &DataType,
) -> io::Result<RoaringTreemap> {
match op {
PredicateOperator::Eq => {
let key = serialize_datum(&literals[0], data_type);
self.query_equal(&key).await
}
PredicateOperator::Lt => {
let key = serialize_datum(&literals[0], data_type);
self.query_less_than(&key).await
}
PredicateOperator::LtEq => {
let key = serialize_datum(&literals[0], data_type);
self.query_less_or_equal(&key).await
}
PredicateOperator::Gt => {
let key = serialize_datum(&literals[0], data_type);
self.query_greater_than(&key).await
}
PredicateOperator::GtEq => {
let key = serialize_datum(&literals[0], data_type);
self.query_greater_or_equal(&key).await
}
PredicateOperator::In => {
let keys: Vec<Vec<u8>> = literals
.iter()
.map(|lit| serialize_datum(lit, data_type))
.collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
self.query_in(&key_refs).await
}
PredicateOperator::IsNull => Ok(self.null_bitmap().clone()),
PredicateOperator::IsNotNull => self.all_non_null_rows().await,
PredicateOperator::NotEq => {
let key = serialize_datum(&literals[0], data_type);
self.query_not_equal(&key).await
}
PredicateOperator::NotIn => {
let mut all_non_null = self.all_non_null_rows().await?;
let keys: Vec<Vec<u8>> = literals
.iter()
.map(|lit| serialize_datum(lit, data_type))
.collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let excluded = self.query_in(&key_refs).await?;
all_non_null -= excluded;
Ok(all_non_null)
}
}
}
}
pub(crate) struct BetweenInfo<'a> {
pub from: &'a Datum,
pub to: &'a Datum,
pub from_inclusive: bool,
pub to_inclusive: bool,
pub data_type: &'a DataType,
}
pub(crate) type ExtractBetweenResult<'a> = (
Option<BetweenInfo<'a>>,
Vec<(PredicateOperator, &'a [Datum], &'a DataType)>,
);
pub(crate) fn extract_between<'a>(
predicates: &[(PredicateOperator, &'a [Datum], &'a DataType)],
) -> ExtractBetweenResult<'a> {
if predicates.len() < 2 {
return (None, predicates.to_vec());
}
let mut lower: Option<(usize, bool)> = None; let mut upper: Option<(usize, bool)> = None;
for (i, (op, literals, _)) in predicates.iter().enumerate() {
if literals.len() != 1 {
continue;
}
match op {
PredicateOperator::GtEq if lower.is_none() => lower = Some((i, true)),
PredicateOperator::Gt if lower.is_none() => lower = Some((i, false)),
PredicateOperator::LtEq if upper.is_none() => upper = Some((i, true)),
PredicateOperator::Lt if upper.is_none() => upper = Some((i, false)),
_ => {}
}
}
match (lower, upper) {
(Some((li, from_inclusive)), Some((ui, to_inclusive))) => {
let between = BetweenInfo {
from: &predicates[li].1[0],
to: &predicates[ui].1[0],
from_inclusive,
to_inclusive,
data_type: predicates[li].2,
};
let remaining: Vec<_> = predicates
.iter()
.enumerate()
.filter(|(i, _)| *i != li && *i != ui)
.map(|(_, p)| *p)
.collect();
(Some(between), remaining)
}
_ => (None, predicates.to_vec()),
}
}