1use std::collections::{BTreeMap, HashMap};
54use std::sync::RwLock;
55
56use crate::candidate_gate::AllowedSet;
57use crate::filter_ir::{FilterAtom, FilterIR, FilterValue};
58
59#[derive(Debug, Clone)]
65pub struct PostingSet {
66 doc_ids: Vec<u64>,
68}
69
70impl PostingSet {
71 pub fn new() -> Self {
73 Self { doc_ids: Vec::new() }
74 }
75
76 pub fn from_vec(mut ids: Vec<u64>) -> Self {
78 ids.sort_unstable();
79 ids.dedup();
80 Self { doc_ids: ids }
81 }
82
83 pub fn add(&mut self, doc_id: u64) {
85 match self.doc_ids.binary_search(&doc_id) {
87 Ok(_) => {} Err(pos) => self.doc_ids.insert(pos, doc_id),
89 }
90 }
91
92 pub fn remove(&mut self, doc_id: u64) {
94 if let Ok(pos) = self.doc_ids.binary_search(&doc_id) {
95 self.doc_ids.remove(pos);
96 }
97 }
98
99 pub fn contains(&self, doc_id: u64) -> bool {
101 self.doc_ids.binary_search(&doc_id).is_ok()
102 }
103
104 pub fn len(&self) -> usize {
106 self.doc_ids.len()
107 }
108
109 pub fn is_empty(&self) -> bool {
111 self.doc_ids.is_empty()
112 }
113
114 pub fn to_allowed_set(&self) -> AllowedSet {
116 if self.doc_ids.is_empty() {
117 AllowedSet::None
118 } else {
119 AllowedSet::from_sorted_vec(self.doc_ids.clone())
120 }
121 }
122
123 pub fn intersect(&self, other: &PostingSet) -> PostingSet {
125 let mut result = Vec::with_capacity(self.doc_ids.len().min(other.doc_ids.len()));
126 let mut i = 0;
127 let mut j = 0;
128
129 while i < self.doc_ids.len() && j < other.doc_ids.len() {
130 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
131 std::cmp::Ordering::Less => i += 1,
132 std::cmp::Ordering::Greater => j += 1,
133 std::cmp::Ordering::Equal => {
134 result.push(self.doc_ids[i]);
135 i += 1;
136 j += 1;
137 }
138 }
139 }
140
141 PostingSet { doc_ids: result }
142 }
143
144 pub fn union(&self, other: &PostingSet) -> PostingSet {
146 let mut result = Vec::with_capacity(self.doc_ids.len() + other.doc_ids.len());
147 let mut i = 0;
148 let mut j = 0;
149
150 while i < self.doc_ids.len() && j < other.doc_ids.len() {
151 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
152 std::cmp::Ordering::Less => {
153 result.push(self.doc_ids[i]);
154 i += 1;
155 }
156 std::cmp::Ordering::Greater => {
157 result.push(other.doc_ids[j]);
158 j += 1;
159 }
160 std::cmp::Ordering::Equal => {
161 result.push(self.doc_ids[i]);
162 i += 1;
163 j += 1;
164 }
165 }
166 }
167
168 result.extend_from_slice(&self.doc_ids[i..]);
169 result.extend_from_slice(&other.doc_ids[j..]);
170
171 PostingSet { doc_ids: result }
172 }
173
174 pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
176 self.doc_ids.iter().copied()
177 }
178}
179
180impl Default for PostingSet {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186#[derive(Debug, Default)]
192pub struct EqualityIndex {
193 string_postings: HashMap<String, PostingSet>,
195 int_postings: HashMap<i64, PostingSet>,
197 uint_postings: HashMap<u64, PostingSet>,
199}
200
201impl EqualityIndex {
202 pub fn new() -> Self {
204 Self::default()
205 }
206
207 pub fn add_string(&mut self, value: &str, doc_id: u64) {
209 self.string_postings
210 .entry(value.to_string())
211 .or_default()
212 .add(doc_id);
213 }
214
215 pub fn add_int(&mut self, value: i64, doc_id: u64) {
217 self.int_postings
218 .entry(value)
219 .or_default()
220 .add(doc_id);
221 }
222
223 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
225 self.uint_postings
226 .entry(value)
227 .or_default()
228 .add(doc_id);
229 }
230
231 pub fn remove_string(&mut self, value: &str, doc_id: u64) {
233 if let Some(posting) = self.string_postings.get_mut(value) {
234 posting.remove(doc_id);
235 if posting.is_empty() {
236 self.string_postings.remove(value);
237 }
238 }
239 }
240
241 pub fn lookup_string(&self, value: &str) -> AllowedSet {
243 self.string_postings
244 .get(value)
245 .map(|p| p.to_allowed_set())
246 .unwrap_or(AllowedSet::None)
247 }
248
249 pub fn lookup_int(&self, value: i64) -> AllowedSet {
251 self.int_postings
252 .get(&value)
253 .map(|p| p.to_allowed_set())
254 .unwrap_or(AllowedSet::None)
255 }
256
257 pub fn lookup_uint(&self, value: u64) -> AllowedSet {
259 self.uint_postings
260 .get(&value)
261 .map(|p| p.to_allowed_set())
262 .unwrap_or(AllowedSet::None)
263 }
264
265 pub fn lookup_string_in(&self, values: &[String]) -> AllowedSet {
267 let sets: Vec<_> = values.iter()
268 .filter_map(|v| self.string_postings.get(v))
269 .collect();
270
271 if sets.is_empty() {
272 return AllowedSet::None;
273 }
274
275 let mut result = sets[0].clone();
277 for set in &sets[1..] {
278 result = result.union(set);
279 }
280
281 result.to_allowed_set()
282 }
283
284 pub fn lookup_uint_in(&self, values: &[u64]) -> AllowedSet {
286 let sets: Vec<_> = values.iter()
287 .filter_map(|v| self.uint_postings.get(v))
288 .collect();
289
290 if sets.is_empty() {
291 return AllowedSet::None;
292 }
293
294 let mut result = sets[0].clone();
295 for set in &sets[1..] {
296 result = result.union(set);
297 }
298
299 result.to_allowed_set()
300 }
301
302 pub fn string_values(&self) -> impl Iterator<Item = &str> {
304 self.string_postings.keys().map(|s| s.as_str())
305 }
306
307 pub fn stats(&self) -> EqualityIndexStats {
309 EqualityIndexStats {
310 unique_string_values: self.string_postings.len(),
311 unique_int_values: self.int_postings.len(),
312 unique_uint_values: self.uint_postings.len(),
313 total_postings: self.string_postings.values().map(|p| p.len()).sum::<usize>()
314 + self.int_postings.values().map(|p| p.len()).sum::<usize>()
315 + self.uint_postings.values().map(|p| p.len()).sum::<usize>(),
316 }
317 }
318}
319
320#[derive(Debug, Clone)]
322pub struct EqualityIndexStats {
323 pub unique_string_values: usize,
324 pub unique_int_values: usize,
325 pub unique_uint_values: usize,
326 pub total_postings: usize,
327}
328
329#[derive(Debug, Default)]
335pub struct RangeIndex {
336 entries: BTreeMap<i64, PostingSet>,
339 doc_count: usize,
341}
342
343impl RangeIndex {
344 pub fn new() -> Self {
346 Self::default()
347 }
348
349 pub fn add(&mut self, value: i64, doc_id: u64) {
351 self.entries.entry(value).or_default().add(doc_id);
352 self.doc_count += 1;
353 }
354
355 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
357 self.add(value as i64, doc_id);
358 }
359
360 pub fn remove(&mut self, value: i64, doc_id: u64) {
362 if let Some(posting) = self.entries.get_mut(&value) {
363 posting.remove(doc_id);
364 if posting.is_empty() {
365 self.entries.remove(&value);
366 }
367 self.doc_count -= 1;
368 }
369 }
370
371 pub fn range_query(
373 &self,
374 min: Option<i64>,
375 max: Option<i64>,
376 min_inclusive: bool,
377 max_inclusive: bool,
378 ) -> AllowedSet {
379 use std::ops::Bound;
380
381 let start = match min {
382 Some(v) if min_inclusive => Bound::Included(v),
383 Some(v) => Bound::Excluded(v),
384 None => Bound::Unbounded,
385 };
386
387 let end = match max {
388 Some(v) if max_inclusive => Bound::Included(v),
389 Some(v) => Bound::Excluded(v),
390 None => Bound::Unbounded,
391 };
392
393 let mut result = PostingSet::new();
395 for (_, posting) in self.entries.range((start, end)) {
396 result = result.union(posting);
397 }
398
399 result.to_allowed_set()
400 }
401
402 pub fn greater_than(&self, value: i64, inclusive: bool) -> AllowedSet {
404 self.range_query(Some(value), None, inclusive, true)
405 }
406
407 pub fn less_than(&self, value: i64, inclusive: bool) -> AllowedSet {
409 self.range_query(None, Some(value), true, inclusive)
410 }
411
412 pub fn stats(&self) -> RangeIndexStats {
414 let values: Vec<_> = self.entries.keys().collect();
415 RangeIndexStats {
416 unique_values: self.entries.len(),
417 total_docs: self.doc_count,
418 min_value: values.first().copied().copied(),
419 max_value: values.last().copied().copied(),
420 }
421 }
422}
423
424#[derive(Debug, Clone)]
426pub struct RangeIndexStats {
427 pub unique_values: usize,
428 pub total_docs: usize,
429 pub min_value: Option<i64>,
430 pub max_value: Option<i64>,
431}
432
433#[derive(Debug, Default)]
439pub struct MetadataIndex {
440 equality_indexes: HashMap<String, EqualityIndex>,
442 range_indexes: HashMap<String, RangeIndex>,
444 doc_count: usize,
446}
447
448impl MetadataIndex {
449 pub fn new() -> Self {
451 Self::default()
452 }
453
454 pub fn add_equality(&mut self, field: &str, value: &FilterValue, doc_id: u64) {
456 let index = self.equality_indexes
457 .entry(field.to_string())
458 .or_default();
459
460 match value {
461 FilterValue::String(s) => index.add_string(s, doc_id),
462 FilterValue::Int64(i) => index.add_int(*i, doc_id),
463 FilterValue::Uint64(u) => index.add_uint(*u, doc_id),
464 _ => {} }
466 }
467
468 pub fn add_string(&mut self, field: &str, value: &str, doc_id: u64) {
470 self.equality_indexes
471 .entry(field.to_string())
472 .or_default()
473 .add_string(value, doc_id);
474 }
475
476 pub fn add_range(&mut self, field: &str, value: i64, doc_id: u64) {
478 self.range_indexes
479 .entry(field.to_string())
480 .or_default()
481 .add(value, doc_id);
482 }
483
484 pub fn add_timestamp(&mut self, field: &str, timestamp: u64, doc_id: u64) {
486 self.add_range(field, timestamp as i64, doc_id);
487 }
488
489 pub fn set_doc_count(&mut self, count: usize) {
491 self.doc_count = count;
492 }
493
494 pub fn inc_doc_count(&mut self) {
496 self.doc_count += 1;
497 }
498
499 pub fn doc_count(&self) -> usize {
501 self.doc_count
502 }
503
504 pub fn evaluate_atom(&self, atom: &FilterAtom) -> AllowedSet {
506 match atom {
507 FilterAtom::Eq { field, value } => {
508 if let Some(index) = self.equality_indexes.get(field) {
509 match value {
510 FilterValue::String(s) => index.lookup_string(s),
511 FilterValue::Int64(i) => index.lookup_int(*i),
512 FilterValue::Uint64(u) => index.lookup_uint(*u),
513 _ => AllowedSet::All, }
515 } else {
516 AllowedSet::All }
518 }
519
520 FilterAtom::In { field, values } => {
521 if let Some(index) = self.equality_indexes.get(field) {
522 let strings: Vec<String> = values.iter()
524 .filter_map(|v| match v {
525 FilterValue::String(s) => Some(s.clone()),
526 _ => None,
527 })
528 .collect();
529
530 if strings.len() == values.len() {
531 return index.lookup_string_in(&strings);
532 }
533
534 let uints: Vec<u64> = values.iter()
536 .filter_map(|v| match v {
537 FilterValue::Uint64(u) => Some(*u),
538 _ => None,
539 })
540 .collect();
541
542 if uints.len() == values.len() {
543 return index.lookup_uint_in(&uints);
544 }
545 }
546 AllowedSet::All }
548
549 FilterAtom::Range { field, min, max, min_inclusive, max_inclusive } => {
550 if let Some(index) = self.range_indexes.get(field) {
551 let min_val = min.as_ref().and_then(|v| match v {
552 FilterValue::Int64(i) => Some(*i),
553 FilterValue::Uint64(u) => Some(*u as i64),
554 _ => None,
555 });
556 let max_val = max.as_ref().and_then(|v| match v {
557 FilterValue::Int64(i) => Some(*i),
558 FilterValue::Uint64(u) => Some(*u as i64),
559 _ => None,
560 });
561
562 index.range_query(min_val, max_val, *min_inclusive, *max_inclusive)
563 } else {
564 AllowedSet::All
565 }
566 }
567
568 FilterAtom::True => AllowedSet::All,
569 FilterAtom::False => AllowedSet::None,
570
571 _ => AllowedSet::All,
573 }
574 }
575
576 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
580 if filter.is_all() {
581 return AllowedSet::All;
582 }
583 if filter.is_none() {
584 return AllowedSet::None;
585 }
586
587 let mut result = AllowedSet::All;
589
590 for clause in &filter.clauses {
591 let clause_result = self.evaluate_disjunction(clause);
593
594 result = result.intersect(&clause_result);
596
597 if result.is_empty() {
599 return AllowedSet::None;
600 }
601 }
602
603 result
604 }
605
606 fn evaluate_disjunction(&self, clause: &crate::filter_ir::Disjunction) -> AllowedSet {
608 if clause.atoms.len() == 1 {
609 return self.evaluate_atom(&clause.atoms[0]);
610 }
611
612 let mut result = AllowedSet::None;
614 for atom in &clause.atoms {
615 let atom_result = self.evaluate_atom(atom);
616 result = result.union(&atom_result);
617
618 if result.is_all() {
620 return AllowedSet::All;
621 }
622 }
623
624 result
625 }
626
627 pub fn estimate_selectivity(&self, filter: &FilterIR) -> f64 {
629 if self.doc_count == 0 {
630 return 1.0;
631 }
632
633 let allowed = self.evaluate(filter);
634 allowed.selectivity(self.doc_count)
635 }
636}
637
638pub struct ConcurrentMetadataIndex {
644 inner: RwLock<MetadataIndex>,
645}
646
647impl ConcurrentMetadataIndex {
648 pub fn new() -> Self {
650 Self {
651 inner: RwLock::new(MetadataIndex::new()),
652 }
653 }
654
655 pub fn add_string(&self, field: &str, value: &str, doc_id: u64) {
657 self.inner.write().unwrap().add_string(field, value, doc_id);
658 }
659
660 pub fn add_timestamp(&self, field: &str, timestamp: u64, doc_id: u64) {
662 self.inner.write().unwrap().add_timestamp(field, timestamp, doc_id);
663 }
664
665 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
667 self.inner.read().unwrap().evaluate(filter)
668 }
669
670 pub fn set_doc_count(&self, count: usize) {
672 self.inner.write().unwrap().set_doc_count(count);
673 }
674}
675
676impl Default for ConcurrentMetadataIndex {
677 fn default() -> Self {
678 Self::new()
679 }
680}
681
682#[cfg(test)]
687mod tests {
688 use super::*;
689 use crate::filter_ir::FilterBuilder;
690
691 #[test]
692 fn test_posting_set_basic() {
693 let mut ps = PostingSet::new();
694 ps.add(1);
695 ps.add(5);
696 ps.add(3);
697
698 assert!(ps.contains(1));
699 assert!(ps.contains(3));
700 assert!(ps.contains(5));
701 assert!(!ps.contains(2));
702 assert_eq!(ps.len(), 3);
703 }
704
705 #[test]
706 fn test_posting_set_intersection() {
707 let a = PostingSet::from_vec(vec![1, 2, 3, 4, 5]);
708 let b = PostingSet::from_vec(vec![3, 4, 5, 6, 7]);
709
710 let c = a.intersect(&b);
711 assert_eq!(c.len(), 3);
712 assert!(c.contains(3));
713 assert!(c.contains(4));
714 assert!(c.contains(5));
715 }
716
717 #[test]
718 fn test_equality_index() {
719 let mut idx = EqualityIndex::new();
720 idx.add_string("production", 1);
721 idx.add_string("production", 2);
722 idx.add_string("staging", 3);
723
724 let result = idx.lookup_string("production");
725 assert_eq!(result.cardinality(), Some(2));
726
727 let result2 = idx.lookup_string("staging");
728 assert_eq!(result2.cardinality(), Some(1));
729
730 let result3 = idx.lookup_string("dev");
731 assert!(result3.is_empty());
732 }
733
734 #[test]
735 fn test_range_index() {
736 let mut idx = RangeIndex::new();
737 idx.add(100, 1);
738 idx.add(200, 2);
739 idx.add(300, 3);
740 idx.add(400, 4);
741 idx.add(500, 5);
742
743 let result = idx.range_query(Some(200), Some(400), true, true);
745 assert_eq!(result.cardinality(), Some(3));
746
747 let result2 = idx.greater_than(300, false);
749 assert_eq!(result2.cardinality(), Some(2));
750
751 let result3 = idx.less_than(300, true);
753 assert_eq!(result3.cardinality(), Some(3));
754 }
755
756 #[test]
757 fn test_metadata_index_evaluation() {
758 let mut idx = MetadataIndex::new();
759
760 for i in 0..10 {
762 idx.add_string("namespace", "production", i);
763 idx.add_timestamp("created_at", 1000 + i * 100, i);
764 }
765 for i in 10..20 {
766 idx.add_string("namespace", "staging", i);
767 idx.add_timestamp("created_at", 1000 + i * 100, i);
768 }
769 idx.set_doc_count(20);
770
771 let filter = FilterBuilder::new()
773 .namespace("production")
774 .build();
775
776 let result = idx.evaluate(&filter);
777 assert_eq!(result.cardinality(), Some(10));
778
779 let filter2 = FilterBuilder::new()
781 .namespace("production")
782 .gte("created_at", 1500i64)
783 .build();
784
785 let result2 = idx.evaluate(&filter2);
786 assert_eq!(result2.cardinality(), Some(5));
788 }
789
790 #[test]
791 fn test_selectivity_estimate() {
792 let mut idx = MetadataIndex::new();
793
794 for i in 0..100 {
795 let ns = if i % 10 == 0 { "rare" } else { "common" };
796 idx.add_string("namespace", ns, i);
797 }
798 idx.set_doc_count(100);
799
800 let common_filter = FilterBuilder::new().namespace("common").build();
801 let rare_filter = FilterBuilder::new().namespace("rare").build();
802
803 let common_selectivity = idx.estimate_selectivity(&common_filter);
804 let rare_selectivity = idx.estimate_selectivity(&rare_filter);
805
806 assert!(common_selectivity > rare_selectivity);
807 assert!((common_selectivity - 0.9).abs() < 0.01);
808 assert!((rare_selectivity - 0.1).abs() < 0.01);
809 }
810}