1use std::collections::{BTreeMap, HashMap};
57use std::sync::RwLock;
58
59use crate::candidate_gate::AllowedSet;
60use crate::filter_ir::{FilterAtom, FilterIR, FilterValue};
61
62#[derive(Debug, Clone)]
68pub struct PostingSet {
69 doc_ids: Vec<u64>,
71}
72
73impl PostingSet {
74 pub fn new() -> Self {
76 Self {
77 doc_ids: Vec::new(),
78 }
79 }
80
81 pub fn from_vec(mut ids: Vec<u64>) -> Self {
83 ids.sort_unstable();
84 ids.dedup();
85 Self { doc_ids: ids }
86 }
87
88 pub fn add(&mut self, doc_id: u64) {
90 match self.doc_ids.binary_search(&doc_id) {
92 Ok(_) => {} Err(pos) => self.doc_ids.insert(pos, doc_id),
94 }
95 }
96
97 pub fn remove(&mut self, doc_id: u64) {
99 if let Ok(pos) = self.doc_ids.binary_search(&doc_id) {
100 self.doc_ids.remove(pos);
101 }
102 }
103
104 pub fn contains(&self, doc_id: u64) -> bool {
106 self.doc_ids.binary_search(&doc_id).is_ok()
107 }
108
109 pub fn len(&self) -> usize {
111 self.doc_ids.len()
112 }
113
114 pub fn is_empty(&self) -> bool {
116 self.doc_ids.is_empty()
117 }
118
119 pub fn to_allowed_set(&self) -> AllowedSet {
121 if self.doc_ids.is_empty() {
122 AllowedSet::None
123 } else {
124 AllowedSet::from_sorted_vec(self.doc_ids.clone())
125 }
126 }
127
128 pub fn intersect(&self, other: &PostingSet) -> PostingSet {
130 let mut result = Vec::with_capacity(self.doc_ids.len().min(other.doc_ids.len()));
131 let mut i = 0;
132 let mut j = 0;
133
134 while i < self.doc_ids.len() && j < other.doc_ids.len() {
135 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
136 std::cmp::Ordering::Less => i += 1,
137 std::cmp::Ordering::Greater => j += 1,
138 std::cmp::Ordering::Equal => {
139 result.push(self.doc_ids[i]);
140 i += 1;
141 j += 1;
142 }
143 }
144 }
145
146 PostingSet { doc_ids: result }
147 }
148
149 pub fn union(&self, other: &PostingSet) -> PostingSet {
151 let mut result = Vec::with_capacity(self.doc_ids.len() + other.doc_ids.len());
152 let mut i = 0;
153 let mut j = 0;
154
155 while i < self.doc_ids.len() && j < other.doc_ids.len() {
156 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
157 std::cmp::Ordering::Less => {
158 result.push(self.doc_ids[i]);
159 i += 1;
160 }
161 std::cmp::Ordering::Greater => {
162 result.push(other.doc_ids[j]);
163 j += 1;
164 }
165 std::cmp::Ordering::Equal => {
166 result.push(self.doc_ids[i]);
167 i += 1;
168 j += 1;
169 }
170 }
171 }
172
173 result.extend_from_slice(&self.doc_ids[i..]);
174 result.extend_from_slice(&other.doc_ids[j..]);
175
176 PostingSet { doc_ids: result }
177 }
178
179 pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
181 self.doc_ids.iter().copied()
182 }
183}
184
185impl Default for PostingSet {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191#[derive(Debug, Default)]
197pub struct EqualityIndex {
198 string_postings: HashMap<String, PostingSet>,
200 int_postings: HashMap<i64, PostingSet>,
202 uint_postings: HashMap<u64, PostingSet>,
204}
205
206impl EqualityIndex {
207 pub fn new() -> Self {
209 Self::default()
210 }
211
212 pub fn add_string(&mut self, value: &str, doc_id: u64) {
214 self.string_postings
215 .entry(value.to_string())
216 .or_default()
217 .add(doc_id);
218 }
219
220 pub fn add_int(&mut self, value: i64, doc_id: u64) {
222 self.int_postings.entry(value).or_default().add(doc_id);
223 }
224
225 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
227 self.uint_postings.entry(value).or_default().add(doc_id);
228 }
229
230 pub fn remove_string(&mut self, value: &str, doc_id: u64) {
232 if let Some(posting) = self.string_postings.get_mut(value) {
233 posting.remove(doc_id);
234 if posting.is_empty() {
235 self.string_postings.remove(value);
236 }
237 }
238 }
239
240 pub fn lookup_string(&self, value: &str) -> AllowedSet {
242 self.string_postings
243 .get(value)
244 .map(|p| p.to_allowed_set())
245 .unwrap_or(AllowedSet::None)
246 }
247
248 pub fn lookup_int(&self, value: i64) -> AllowedSet {
250 self.int_postings
251 .get(&value)
252 .map(|p| p.to_allowed_set())
253 .unwrap_or(AllowedSet::None)
254 }
255
256 pub fn lookup_uint(&self, value: u64) -> AllowedSet {
258 self.uint_postings
259 .get(&value)
260 .map(|p| p.to_allowed_set())
261 .unwrap_or(AllowedSet::None)
262 }
263
264 pub fn lookup_string_in(&self, values: &[String]) -> AllowedSet {
266 let sets: Vec<_> = values
267 .iter()
268 .filter_map(|v| self.string_postings.get(v))
269 .collect();
270
271 if sets.is_empty() {
272 return AllowedSet::None;
273 }
274
275 let mut result = sets[0].clone();
277 for set in &sets[1..] {
278 result = result.union(set);
279 }
280
281 result.to_allowed_set()
282 }
283
284 pub fn lookup_uint_in(&self, values: &[u64]) -> AllowedSet {
286 let sets: Vec<_> = values
287 .iter()
288 .filter_map(|v| self.uint_postings.get(v))
289 .collect();
290
291 if sets.is_empty() {
292 return AllowedSet::None;
293 }
294
295 let mut result = sets[0].clone();
296 for set in &sets[1..] {
297 result = result.union(set);
298 }
299
300 result.to_allowed_set()
301 }
302
303 pub fn string_values(&self) -> impl Iterator<Item = &str> {
305 self.string_postings.keys().map(|s| s.as_str())
306 }
307
308 pub fn stats(&self) -> EqualityIndexStats {
310 EqualityIndexStats {
311 unique_string_values: self.string_postings.len(),
312 unique_int_values: self.int_postings.len(),
313 unique_uint_values: self.uint_postings.len(),
314 total_postings: self
315 .string_postings
316 .values()
317 .map(|p| p.len())
318 .sum::<usize>()
319 + self.int_postings.values().map(|p| p.len()).sum::<usize>()
320 + self.uint_postings.values().map(|p| p.len()).sum::<usize>(),
321 }
322 }
323}
324
325#[derive(Debug, Clone)]
327pub struct EqualityIndexStats {
328 pub unique_string_values: usize,
329 pub unique_int_values: usize,
330 pub unique_uint_values: usize,
331 pub total_postings: usize,
332}
333
334#[derive(Debug, Default)]
340pub struct RangeIndex {
341 entries: BTreeMap<i64, PostingSet>,
344 doc_count: usize,
346}
347
348impl RangeIndex {
349 pub fn new() -> Self {
351 Self::default()
352 }
353
354 pub fn add(&mut self, value: i64, doc_id: u64) {
356 self.entries.entry(value).or_default().add(doc_id);
357 self.doc_count += 1;
358 }
359
360 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
362 self.add(value as i64, doc_id);
363 }
364
365 pub fn remove(&mut self, value: i64, doc_id: u64) {
367 if let Some(posting) = self.entries.get_mut(&value) {
368 posting.remove(doc_id);
369 if posting.is_empty() {
370 self.entries.remove(&value);
371 }
372 self.doc_count -= 1;
373 }
374 }
375
376 pub fn range_query(
378 &self,
379 min: Option<i64>,
380 max: Option<i64>,
381 min_inclusive: bool,
382 max_inclusive: bool,
383 ) -> AllowedSet {
384 use std::ops::Bound;
385
386 let start = match min {
387 Some(v) if min_inclusive => Bound::Included(v),
388 Some(v) => Bound::Excluded(v),
389 None => Bound::Unbounded,
390 };
391
392 let end = match max {
393 Some(v) if max_inclusive => Bound::Included(v),
394 Some(v) => Bound::Excluded(v),
395 None => Bound::Unbounded,
396 };
397
398 let mut result = PostingSet::new();
400 for (_, posting) in self.entries.range((start, end)) {
401 result = result.union(posting);
402 }
403
404 result.to_allowed_set()
405 }
406
407 pub fn greater_than(&self, value: i64, inclusive: bool) -> AllowedSet {
409 self.range_query(Some(value), None, inclusive, true)
410 }
411
412 pub fn less_than(&self, value: i64, inclusive: bool) -> AllowedSet {
414 self.range_query(None, Some(value), true, inclusive)
415 }
416
417 pub fn stats(&self) -> RangeIndexStats {
419 let values: Vec<_> = self.entries.keys().collect();
420 RangeIndexStats {
421 unique_values: self.entries.len(),
422 total_docs: self.doc_count,
423 min_value: values.first().copied().copied(),
424 max_value: values.last().copied().copied(),
425 }
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct RangeIndexStats {
432 pub unique_values: usize,
433 pub total_docs: usize,
434 pub min_value: Option<i64>,
435 pub max_value: Option<i64>,
436}
437
438#[derive(Debug, Default)]
444pub struct MetadataIndex {
445 equality_indexes: HashMap<String, EqualityIndex>,
447 range_indexes: HashMap<String, RangeIndex>,
449 doc_count: usize,
451}
452
453impl MetadataIndex {
454 pub fn new() -> Self {
456 Self::default()
457 }
458
459 pub fn add_equality(&mut self, field: &str, value: &FilterValue, doc_id: u64) {
461 let index = self.equality_indexes.entry(field.to_string()).or_default();
462
463 match value {
464 FilterValue::String(s) => index.add_string(s, doc_id),
465 FilterValue::Int64(i) => index.add_int(*i, doc_id),
466 FilterValue::Uint64(u) => index.add_uint(*u, doc_id),
467 _ => {} }
469 }
470
471 pub fn add_string(&mut self, field: &str, value: &str, doc_id: u64) {
473 self.equality_indexes
474 .entry(field.to_string())
475 .or_default()
476 .add_string(value, doc_id);
477 }
478
479 pub fn add_range(&mut self, field: &str, value: i64, doc_id: u64) {
481 self.range_indexes
482 .entry(field.to_string())
483 .or_default()
484 .add(value, doc_id);
485 }
486
487 pub fn add_timestamp(&mut self, field: &str, timestamp: u64, doc_id: u64) {
489 self.add_range(field, timestamp as i64, doc_id);
490 }
491
492 pub fn set_doc_count(&mut self, count: usize) {
494 self.doc_count = count;
495 }
496
497 pub fn inc_doc_count(&mut self) {
499 self.doc_count += 1;
500 }
501
502 pub fn doc_count(&self) -> usize {
504 self.doc_count
505 }
506
507 pub fn evaluate_atom(&self, atom: &FilterAtom) -> AllowedSet {
509 match atom {
510 FilterAtom::Eq { field, value } => {
511 if let Some(index) = self.equality_indexes.get(field) {
512 match value {
513 FilterValue::String(s) => index.lookup_string(s),
514 FilterValue::Int64(i) => index.lookup_int(*i),
515 FilterValue::Uint64(u) => index.lookup_uint(*u),
516 _ => AllowedSet::All, }
518 } else {
519 AllowedSet::All }
521 }
522
523 FilterAtom::In { field, values } => {
524 if let Some(index) = self.equality_indexes.get(field) {
525 let strings: Vec<String> = values
527 .iter()
528 .filter_map(|v| match v {
529 FilterValue::String(s) => Some(s.clone()),
530 _ => None,
531 })
532 .collect();
533
534 if strings.len() == values.len() {
535 return index.lookup_string_in(&strings);
536 }
537
538 let uints: Vec<u64> = values
540 .iter()
541 .filter_map(|v| match v {
542 FilterValue::Uint64(u) => Some(*u),
543 _ => None,
544 })
545 .collect();
546
547 if uints.len() == values.len() {
548 return index.lookup_uint_in(&uints);
549 }
550 }
551 AllowedSet::All }
553
554 FilterAtom::Range {
555 field,
556 min,
557 max,
558 min_inclusive,
559 max_inclusive,
560 } => {
561 if let Some(index) = self.range_indexes.get(field) {
562 let min_val = min.as_ref().and_then(|v| match v {
563 FilterValue::Int64(i) => Some(*i),
564 FilterValue::Uint64(u) => Some(*u as i64),
565 _ => None,
566 });
567 let max_val = max.as_ref().and_then(|v| match v {
568 FilterValue::Int64(i) => Some(*i),
569 FilterValue::Uint64(u) => Some(*u as i64),
570 _ => None,
571 });
572
573 index.range_query(min_val, max_val, *min_inclusive, *max_inclusive)
574 } else {
575 AllowedSet::All
576 }
577 }
578
579 FilterAtom::True => AllowedSet::All,
580 FilterAtom::False => AllowedSet::None,
581
582 _ => AllowedSet::All,
584 }
585 }
586
587 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
591 if filter.is_all() {
592 return AllowedSet::All;
593 }
594 if filter.is_none() {
595 return AllowedSet::None;
596 }
597
598 let mut result = AllowedSet::All;
600
601 for clause in &filter.clauses {
602 let clause_result = self.evaluate_disjunction(clause);
604
605 result = result.intersect(&clause_result);
607
608 if result.is_empty() {
610 return AllowedSet::None;
611 }
612 }
613
614 result
615 }
616
617 fn evaluate_disjunction(&self, clause: &crate::filter_ir::Disjunction) -> AllowedSet {
619 if clause.atoms.len() == 1 {
620 return self.evaluate_atom(&clause.atoms[0]);
621 }
622
623 let mut result = AllowedSet::None;
625 for atom in &clause.atoms {
626 let atom_result = self.evaluate_atom(atom);
627 result = result.union(&atom_result);
628
629 if result.is_all() {
631 return AllowedSet::All;
632 }
633 }
634
635 result
636 }
637
638 pub fn estimate_selectivity(&self, filter: &FilterIR) -> f64 {
640 if self.doc_count == 0 {
641 return 1.0;
642 }
643
644 let allowed = self.evaluate(filter);
645 allowed.selectivity(self.doc_count)
646 }
647}
648
649pub struct ConcurrentMetadataIndex {
655 inner: RwLock<MetadataIndex>,
656}
657
658impl ConcurrentMetadataIndex {
659 pub fn new() -> Self {
661 Self {
662 inner: RwLock::new(MetadataIndex::new()),
663 }
664 }
665
666 pub fn add_string(&self, field: &str, value: &str, doc_id: u64) {
668 self.inner.write().unwrap().add_string(field, value, doc_id);
669 }
670
671 pub fn add_timestamp(&self, field: &str, timestamp: u64, doc_id: u64) {
673 self.inner
674 .write()
675 .unwrap()
676 .add_timestamp(field, timestamp, doc_id);
677 }
678
679 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
681 self.inner.read().unwrap().evaluate(filter)
682 }
683
684 pub fn set_doc_count(&self, count: usize) {
686 self.inner.write().unwrap().set_doc_count(count);
687 }
688}
689
690impl Default for ConcurrentMetadataIndex {
691 fn default() -> Self {
692 Self::new()
693 }
694}
695
696#[cfg(test)]
701mod tests {
702 use super::*;
703 use crate::filter_ir::FilterBuilder;
704
705 #[test]
706 fn test_posting_set_basic() {
707 let mut ps = PostingSet::new();
708 ps.add(1);
709 ps.add(5);
710 ps.add(3);
711
712 assert!(ps.contains(1));
713 assert!(ps.contains(3));
714 assert!(ps.contains(5));
715 assert!(!ps.contains(2));
716 assert_eq!(ps.len(), 3);
717 }
718
719 #[test]
720 fn test_posting_set_intersection() {
721 let a = PostingSet::from_vec(vec![1, 2, 3, 4, 5]);
722 let b = PostingSet::from_vec(vec![3, 4, 5, 6, 7]);
723
724 let c = a.intersect(&b);
725 assert_eq!(c.len(), 3);
726 assert!(c.contains(3));
727 assert!(c.contains(4));
728 assert!(c.contains(5));
729 }
730
731 #[test]
732 fn test_equality_index() {
733 let mut idx = EqualityIndex::new();
734 idx.add_string("production", 1);
735 idx.add_string("production", 2);
736 idx.add_string("staging", 3);
737
738 let result = idx.lookup_string("production");
739 assert_eq!(result.cardinality(), Some(2));
740
741 let result2 = idx.lookup_string("staging");
742 assert_eq!(result2.cardinality(), Some(1));
743
744 let result3 = idx.lookup_string("dev");
745 assert!(result3.is_empty());
746 }
747
748 #[test]
749 fn test_range_index() {
750 let mut idx = RangeIndex::new();
751 idx.add(100, 1);
752 idx.add(200, 2);
753 idx.add(300, 3);
754 idx.add(400, 4);
755 idx.add(500, 5);
756
757 let result = idx.range_query(Some(200), Some(400), true, true);
759 assert_eq!(result.cardinality(), Some(3));
760
761 let result2 = idx.greater_than(300, false);
763 assert_eq!(result2.cardinality(), Some(2));
764
765 let result3 = idx.less_than(300, true);
767 assert_eq!(result3.cardinality(), Some(3));
768 }
769
770 #[test]
771 fn test_metadata_index_evaluation() {
772 let mut idx = MetadataIndex::new();
773
774 for i in 0..10 {
776 idx.add_string("namespace", "production", i);
777 idx.add_timestamp("created_at", 1000 + i * 100, i);
778 }
779 for i in 10..20 {
780 idx.add_string("namespace", "staging", i);
781 idx.add_timestamp("created_at", 1000 + i * 100, i);
782 }
783 idx.set_doc_count(20);
784
785 let filter = FilterBuilder::new().namespace("production").build();
787
788 let result = idx.evaluate(&filter);
789 assert_eq!(result.cardinality(), Some(10));
790
791 let filter2 = FilterBuilder::new()
793 .namespace("production")
794 .gte("created_at", 1500i64)
795 .build();
796
797 let result2 = idx.evaluate(&filter2);
798 assert_eq!(result2.cardinality(), Some(5));
800 }
801
802 #[test]
803 fn test_selectivity_estimate() {
804 let mut idx = MetadataIndex::new();
805
806 for i in 0..100 {
807 let ns = if i % 10 == 0 { "rare" } else { "common" };
808 idx.add_string("namespace", ns, i);
809 }
810 idx.set_doc_count(100);
811
812 let common_filter = FilterBuilder::new().namespace("common").build();
813 let rare_filter = FilterBuilder::new().namespace("rare").build();
814
815 let common_selectivity = idx.estimate_selectivity(&common_filter);
816 let rare_selectivity = idx.estimate_selectivity(&rare_filter);
817
818 assert!(common_selectivity > rare_selectivity);
819 assert!((common_selectivity - 0.9).abs() < 0.01);
820 assert!((rare_selectivity - 0.1).abs() < 0.01);
821 }
822}