use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use iqdb_types::{Filter, Metadata, Value};
use crate::FilterEvaluator;
use crate::selectivity;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum IndexKey {
Str(String),
Int(i64),
Bool(bool),
Null,
}
impl IndexKey {
fn from_value(value: &Value) -> Option<Self> {
match value {
Value::String(s) => Some(IndexKey::Str(s.clone())),
Value::Int(i) => Some(IndexKey::Int(*i)),
Value::Bool(b) => Some(IndexKey::Bool(*b)),
Value::Null => Some(IndexKey::Null),
Value::Float(_) => None,
}
}
}
#[derive(Debug, Clone)]
pub struct MetadataIndex<K> {
fields: HashMap<String, HashMap<IndexKey, Vec<K>>>,
total: usize,
}
impl<K> MetadataIndex<K>
where
K: Clone + Eq + Hash,
{
pub fn build<'a, I>(fields: &[&str], records: I) -> Self
where
I: IntoIterator<Item = (K, Option<&'a Metadata>)>,
{
let mut field_maps: HashMap<String, HashMap<IndexKey, Vec<K>>> = fields
.iter()
.map(|f| ((*f).to_string(), HashMap::new()))
.collect();
let mut total = 0usize;
for (key, metadata) in records {
total += 1;
let Some(metadata) = metadata else { continue };
for (field, postings) in field_maps.iter_mut() {
let Some(value) = metadata.get(field) else {
continue;
};
let Some(index_key) = IndexKey::from_value(value) else {
continue;
};
postings.entry(index_key).or_default().push(key.clone());
}
}
Self {
fields: field_maps,
total,
}
}
#[must_use]
pub fn len(&self) -> usize {
self.total
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.total == 0
}
#[must_use]
pub fn is_indexed(&self, field: &str) -> bool {
self.fields.contains_key(field)
}
pub fn indexed_fields(&self) -> impl Iterator<Item = &str> {
self.fields.keys().map(String::as_str)
}
#[must_use]
pub fn candidates(&self, evaluator: &FilterEvaluator) -> Option<Vec<K>> {
self.resolve(evaluator.filter())
.map(|set| set.into_iter().collect())
}
fn resolve(&self, filter: &Filter) -> Option<HashSet<K>> {
match filter {
Filter::Eq { field, value } => self.postings_for(field, value),
Filter::In { field, values } => {
let mut acc: HashSet<K> = HashSet::new();
for value in values {
acc.extend(self.postings_for(field, value)?);
}
Some(acc)
}
Filter::And(children) => {
let mut acc: Option<HashSet<K>> = None;
for child in children {
if let Some(set) = self.resolve(child) {
acc = Some(match acc {
None => set,
Some(current) => intersect(current, &set),
});
}
}
acc
}
Filter::Or(children) => {
let mut acc: HashSet<K> = HashSet::new();
for child in children {
acc.extend(self.resolve(child)?);
}
Some(acc)
}
Filter::Neq { .. }
| Filter::Not(_)
| Filter::Lt { .. }
| Filter::Lte { .. }
| Filter::Gt { .. }
| Filter::Gte { .. } => None,
}
}
fn postings_for(&self, field: &str, value: &Value) -> Option<HashSet<K>> {
let postings = self.fields.get(field)?;
let index_key = IndexKey::from_value(value)?;
Some(
postings
.get(&index_key)
.map(|keys| keys.iter().cloned().collect())
.unwrap_or_default(),
)
}
#[must_use]
pub fn estimate_selectivity(&self, evaluator: &FilterEvaluator) -> f64 {
if self.total == 0 {
return selectivity::structural(evaluator.filter()).clamp(0.0, 1.0);
}
self.estimate(evaluator.filter()).clamp(0.0, 1.0)
}
fn estimate(&self, filter: &Filter) -> f64 {
match filter {
Filter::Eq { field, value } => self
.leaf_fraction(field, value)
.unwrap_or(selectivity::EQ_SELECTIVITY),
Filter::Neq { field, value } => {
1.0 - self
.leaf_fraction(field, value)
.unwrap_or(selectivity::EQ_SELECTIVITY)
}
Filter::In { field, values } => {
let mut acc = 0.0;
for value in values {
match self.leaf_fraction(field, value) {
Some(f) => acc += f,
None => return selectivity::structural(filter),
}
}
acc.min(1.0)
}
Filter::Lt { .. } | Filter::Lte { .. } | Filter::Gt { .. } | Filter::Gte { .. } => {
selectivity::RANGE_SELECTIVITY
}
Filter::And(children) => children.iter().map(|c| self.estimate(c)).product(),
Filter::Or(children) => {
1.0 - children
.iter()
.map(|c| 1.0 - self.estimate(c))
.product::<f64>()
}
Filter::Not(inner) => 1.0 - self.estimate(inner),
}
}
fn leaf_fraction(&self, field: &str, value: &Value) -> Option<f64> {
let postings = self.fields.get(field)?;
let index_key = IndexKey::from_value(value)?;
let count = postings.get(&index_key).map_or(0, Vec::len);
Some(count as f64 / self.total as f64)
}
}
fn intersect<K: Eq + Hash>(a: HashSet<K>, b: &HashSet<K>) -> HashSet<K> {
a.into_iter().filter(|k| b.contains(k)).collect()
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
use iqdb_types::Metadata;
fn meta(pairs: &[(&str, Value)]) -> Metadata {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), v.clone()))
.collect()
}
fn corpus() -> Vec<(usize, Metadata)> {
vec![
(
0,
meta(&[
("lang", Value::String("rust".into())),
("year", Value::Int(2026)),
]),
),
(
1,
meta(&[
("lang", Value::String("go".into())),
("year", Value::Int(2024)),
]),
),
(
2,
meta(&[
("lang", Value::String("rust".into())),
("year", Value::Int(2020)),
]),
),
(3, meta(&[("lang", Value::String("rust".into()))])),
]
}
fn index() -> MetadataIndex<usize> {
let rows = corpus();
MetadataIndex::build(&["lang", "year"], rows.iter().map(|(k, m)| (*k, Some(m))))
}
fn sorted(mut v: Vec<usize>) -> Vec<usize> {
v.sort_unstable();
v
}
fn cands(filter: Filter) -> Option<Vec<usize>> {
let evaluator = FilterEvaluator::new(filter).unwrap();
index().candidates(&evaluator).map(sorted)
}
#[test]
fn build_counts_all_records_and_fields() {
let idx = index();
assert_eq!(idx.len(), 4);
assert!(!idx.is_empty());
assert!(idx.is_indexed("lang"));
assert!(idx.is_indexed("year"));
assert!(!idx.is_indexed("missing"));
}
#[test]
fn eq_on_indexed_field_resolves() {
assert_eq!(
cands(Filter::eq("lang", Value::String("rust".into()))),
Some(vec![0, 2, 3])
);
}
#[test]
fn eq_with_no_postings_is_empty_not_none() {
assert_eq!(
cands(Filter::eq("lang", Value::String("zig".into()))),
Some(vec![])
);
}
#[test]
fn eq_on_unindexed_field_is_none() {
assert_eq!(
cands(Filter::eq("author", Value::String("ada".into()))),
None
);
}
#[test]
fn float_literal_is_none() {
assert_eq!(cands(Filter::eq("year", Value::Float(2026.0))), None);
}
#[test]
fn in_resolves_to_union() {
assert_eq!(
cands(Filter::is_in(
"lang",
vec![Value::String("go".into()), Value::String("rust".into())]
)),
Some(vec![0, 1, 2, 3])
);
}
#[test]
fn and_intersects_resolvable_children() {
assert_eq!(
cands(Filter::and(vec![
Filter::eq("lang", Value::String("rust".into())),
Filter::eq("year", Value::Int(2026)),
])),
Some(vec![0])
);
}
#[test]
fn and_narrows_using_only_the_resolvable_child() {
assert_eq!(
cands(Filter::and(vec![
Filter::eq("lang", Value::String("rust".into())),
Filter::gt("year", Value::Int(2021)),
])),
Some(vec![0, 2, 3])
);
}
#[test]
fn or_with_unresolvable_child_is_none() {
assert_eq!(
cands(Filter::or(vec![
Filter::eq("lang", Value::String("rust".into())),
Filter::gt("year", Value::Int(2021)),
])),
None
);
}
#[test]
fn not_and_ranges_are_none() {
assert_eq!(
cands(Filter::not(Filter::eq(
"lang",
Value::String("rust".into())
))),
None
);
assert_eq!(cands(Filter::gt("year", Value::Int(2000))), None);
}
#[test]
fn candidates_are_a_superset_of_true_matches() {
let rows = corpus();
let filter = Filter::and(vec![
Filter::eq("lang", Value::String("rust".into())),
Filter::gt("year", Value::Int(2021)),
]);
let evaluator = FilterEvaluator::new(filter).unwrap();
let candidates: std::collections::HashSet<usize> = index()
.candidates(&evaluator)
.unwrap()
.into_iter()
.collect();
for (k, m) in &rows {
if evaluator.evaluate(Some(m)) {
assert!(
candidates.contains(k),
"true match {k} missing from candidates"
);
}
}
}
#[test]
fn index_backed_selectivity_uses_real_counts() {
let idx = index();
let rust = FilterEvaluator::new(Filter::eq("lang", Value::String("rust".into()))).unwrap();
assert!((idx.estimate_selectivity(&rust) - 0.75).abs() < 1e-9);
let y = FilterEvaluator::new(Filter::eq("year", Value::Int(2026))).unwrap();
assert!((idx.estimate_selectivity(&y) - 0.25).abs() < 1e-9);
}
#[test]
fn empty_index_falls_back_to_structural() {
let empty: MetadataIndex<usize> = MetadataIndex::build(&["lang"], std::iter::empty());
assert!(empty.is_empty());
let eq = FilterEvaluator::new(Filter::eq("lang", Value::String("rust".into()))).unwrap();
let s = empty.estimate_selectivity(&eq);
assert!((0.0..=1.0).contains(&s));
}
#[test]
fn records_without_metadata_count_but_do_not_post() {
let rows: Vec<(usize, Option<&Metadata>)> = vec![(0, None), (1, None)];
let idx: MetadataIndex<usize> = MetadataIndex::build(&["lang"], rows);
assert_eq!(idx.len(), 2);
let eq = FilterEvaluator::new(Filter::eq("lang", Value::String("rust".into()))).unwrap();
assert_eq!(idx.candidates(&eq), Some(vec![]));
}
}