1use std::collections::{BTreeMap, HashMap};
57use std::sync::RwLock;
58
59use crate::candidate_gate::AllowedSet;
60use crate::filter_ir::{FilterAtom, FilterIR, FilterValue};
61
62#[derive(Debug, Clone)]
68pub struct PostingSet {
69 doc_ids: Vec<u64>,
71}
72
73impl PostingSet {
74 pub fn new() -> Self {
76 Self { doc_ids: Vec::new() }
77 }
78
79 pub fn from_vec(mut ids: Vec<u64>) -> Self {
81 ids.sort_unstable();
82 ids.dedup();
83 Self { doc_ids: ids }
84 }
85
86 pub fn add(&mut self, doc_id: u64) {
88 match self.doc_ids.binary_search(&doc_id) {
90 Ok(_) => {} Err(pos) => self.doc_ids.insert(pos, doc_id),
92 }
93 }
94
95 pub fn remove(&mut self, doc_id: u64) {
97 if let Ok(pos) = self.doc_ids.binary_search(&doc_id) {
98 self.doc_ids.remove(pos);
99 }
100 }
101
102 pub fn contains(&self, doc_id: u64) -> bool {
104 self.doc_ids.binary_search(&doc_id).is_ok()
105 }
106
107 pub fn len(&self) -> usize {
109 self.doc_ids.len()
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.doc_ids.is_empty()
115 }
116
117 pub fn to_allowed_set(&self) -> AllowedSet {
119 if self.doc_ids.is_empty() {
120 AllowedSet::None
121 } else {
122 AllowedSet::from_sorted_vec(self.doc_ids.clone())
123 }
124 }
125
126 pub fn intersect(&self, other: &PostingSet) -> PostingSet {
128 let mut result = Vec::with_capacity(self.doc_ids.len().min(other.doc_ids.len()));
129 let mut i = 0;
130 let mut j = 0;
131
132 while i < self.doc_ids.len() && j < other.doc_ids.len() {
133 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
134 std::cmp::Ordering::Less => i += 1,
135 std::cmp::Ordering::Greater => j += 1,
136 std::cmp::Ordering::Equal => {
137 result.push(self.doc_ids[i]);
138 i += 1;
139 j += 1;
140 }
141 }
142 }
143
144 PostingSet { doc_ids: result }
145 }
146
147 pub fn union(&self, other: &PostingSet) -> PostingSet {
149 let mut result = Vec::with_capacity(self.doc_ids.len() + other.doc_ids.len());
150 let mut i = 0;
151 let mut j = 0;
152
153 while i < self.doc_ids.len() && j < other.doc_ids.len() {
154 match self.doc_ids[i].cmp(&other.doc_ids[j]) {
155 std::cmp::Ordering::Less => {
156 result.push(self.doc_ids[i]);
157 i += 1;
158 }
159 std::cmp::Ordering::Greater => {
160 result.push(other.doc_ids[j]);
161 j += 1;
162 }
163 std::cmp::Ordering::Equal => {
164 result.push(self.doc_ids[i]);
165 i += 1;
166 j += 1;
167 }
168 }
169 }
170
171 result.extend_from_slice(&self.doc_ids[i..]);
172 result.extend_from_slice(&other.doc_ids[j..]);
173
174 PostingSet { doc_ids: result }
175 }
176
177 pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
179 self.doc_ids.iter().copied()
180 }
181}
182
183impl Default for PostingSet {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189#[derive(Debug, Default)]
195pub struct EqualityIndex {
196 string_postings: HashMap<String, PostingSet>,
198 int_postings: HashMap<i64, PostingSet>,
200 uint_postings: HashMap<u64, PostingSet>,
202}
203
204impl EqualityIndex {
205 pub fn new() -> Self {
207 Self::default()
208 }
209
210 pub fn add_string(&mut self, value: &str, doc_id: u64) {
212 self.string_postings
213 .entry(value.to_string())
214 .or_default()
215 .add(doc_id);
216 }
217
218 pub fn add_int(&mut self, value: i64, doc_id: u64) {
220 self.int_postings
221 .entry(value)
222 .or_default()
223 .add(doc_id);
224 }
225
226 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
228 self.uint_postings
229 .entry(value)
230 .or_default()
231 .add(doc_id);
232 }
233
234 pub fn remove_string(&mut self, value: &str, doc_id: u64) {
236 if let Some(posting) = self.string_postings.get_mut(value) {
237 posting.remove(doc_id);
238 if posting.is_empty() {
239 self.string_postings.remove(value);
240 }
241 }
242 }
243
244 pub fn lookup_string(&self, value: &str) -> AllowedSet {
246 self.string_postings
247 .get(value)
248 .map(|p| p.to_allowed_set())
249 .unwrap_or(AllowedSet::None)
250 }
251
252 pub fn lookup_int(&self, value: i64) -> AllowedSet {
254 self.int_postings
255 .get(&value)
256 .map(|p| p.to_allowed_set())
257 .unwrap_or(AllowedSet::None)
258 }
259
260 pub fn lookup_uint(&self, value: u64) -> AllowedSet {
262 self.uint_postings
263 .get(&value)
264 .map(|p| p.to_allowed_set())
265 .unwrap_or(AllowedSet::None)
266 }
267
268 pub fn lookup_string_in(&self, values: &[String]) -> AllowedSet {
270 let sets: Vec<_> = values.iter()
271 .filter_map(|v| self.string_postings.get(v))
272 .collect();
273
274 if sets.is_empty() {
275 return AllowedSet::None;
276 }
277
278 let mut result = sets[0].clone();
280 for set in &sets[1..] {
281 result = result.union(set);
282 }
283
284 result.to_allowed_set()
285 }
286
287 pub fn lookup_uint_in(&self, values: &[u64]) -> AllowedSet {
289 let sets: Vec<_> = values.iter()
290 .filter_map(|v| self.uint_postings.get(v))
291 .collect();
292
293 if sets.is_empty() {
294 return AllowedSet::None;
295 }
296
297 let mut result = sets[0].clone();
298 for set in &sets[1..] {
299 result = result.union(set);
300 }
301
302 result.to_allowed_set()
303 }
304
305 pub fn string_values(&self) -> impl Iterator<Item = &str> {
307 self.string_postings.keys().map(|s| s.as_str())
308 }
309
310 pub fn stats(&self) -> EqualityIndexStats {
312 EqualityIndexStats {
313 unique_string_values: self.string_postings.len(),
314 unique_int_values: self.int_postings.len(),
315 unique_uint_values: self.uint_postings.len(),
316 total_postings: self.string_postings.values().map(|p| p.len()).sum::<usize>()
317 + self.int_postings.values().map(|p| p.len()).sum::<usize>()
318 + self.uint_postings.values().map(|p| p.len()).sum::<usize>(),
319 }
320 }
321}
322
323#[derive(Debug, Clone)]
325pub struct EqualityIndexStats {
326 pub unique_string_values: usize,
327 pub unique_int_values: usize,
328 pub unique_uint_values: usize,
329 pub total_postings: usize,
330}
331
332#[derive(Debug, Default)]
338pub struct RangeIndex {
339 entries: BTreeMap<i64, PostingSet>,
342 doc_count: usize,
344}
345
346impl RangeIndex {
347 pub fn new() -> Self {
349 Self::default()
350 }
351
352 pub fn add(&mut self, value: i64, doc_id: u64) {
354 self.entries.entry(value).or_default().add(doc_id);
355 self.doc_count += 1;
356 }
357
358 pub fn add_uint(&mut self, value: u64, doc_id: u64) {
360 self.add(value as i64, doc_id);
361 }
362
363 pub fn remove(&mut self, value: i64, doc_id: u64) {
365 if let Some(posting) = self.entries.get_mut(&value) {
366 posting.remove(doc_id);
367 if posting.is_empty() {
368 self.entries.remove(&value);
369 }
370 self.doc_count -= 1;
371 }
372 }
373
374 pub fn range_query(
376 &self,
377 min: Option<i64>,
378 max: Option<i64>,
379 min_inclusive: bool,
380 max_inclusive: bool,
381 ) -> AllowedSet {
382 use std::ops::Bound;
383
384 let start = match min {
385 Some(v) if min_inclusive => Bound::Included(v),
386 Some(v) => Bound::Excluded(v),
387 None => Bound::Unbounded,
388 };
389
390 let end = match max {
391 Some(v) if max_inclusive => Bound::Included(v),
392 Some(v) => Bound::Excluded(v),
393 None => Bound::Unbounded,
394 };
395
396 let mut result = PostingSet::new();
398 for (_, posting) in self.entries.range((start, end)) {
399 result = result.union(posting);
400 }
401
402 result.to_allowed_set()
403 }
404
405 pub fn greater_than(&self, value: i64, inclusive: bool) -> AllowedSet {
407 self.range_query(Some(value), None, inclusive, true)
408 }
409
410 pub fn less_than(&self, value: i64, inclusive: bool) -> AllowedSet {
412 self.range_query(None, Some(value), true, inclusive)
413 }
414
415 pub fn stats(&self) -> RangeIndexStats {
417 let values: Vec<_> = self.entries.keys().collect();
418 RangeIndexStats {
419 unique_values: self.entries.len(),
420 total_docs: self.doc_count,
421 min_value: values.first().copied().copied(),
422 max_value: values.last().copied().copied(),
423 }
424 }
425}
426
427#[derive(Debug, Clone)]
429pub struct RangeIndexStats {
430 pub unique_values: usize,
431 pub total_docs: usize,
432 pub min_value: Option<i64>,
433 pub max_value: Option<i64>,
434}
435
436#[derive(Debug, Default)]
442pub struct MetadataIndex {
443 equality_indexes: HashMap<String, EqualityIndex>,
445 range_indexes: HashMap<String, RangeIndex>,
447 doc_count: usize,
449}
450
451impl MetadataIndex {
452 pub fn new() -> Self {
454 Self::default()
455 }
456
457 pub fn add_equality(&mut self, field: &str, value: &FilterValue, doc_id: u64) {
459 let index = self.equality_indexes
460 .entry(field.to_string())
461 .or_default();
462
463 match value {
464 FilterValue::String(s) => index.add_string(s, doc_id),
465 FilterValue::Int64(i) => index.add_int(*i, doc_id),
466 FilterValue::Uint64(u) => index.add_uint(*u, doc_id),
467 _ => {} }
469 }
470
471 pub fn add_string(&mut self, field: &str, value: &str, doc_id: u64) {
473 self.equality_indexes
474 .entry(field.to_string())
475 .or_default()
476 .add_string(value, doc_id);
477 }
478
479 pub fn add_range(&mut self, field: &str, value: i64, doc_id: u64) {
481 self.range_indexes
482 .entry(field.to_string())
483 .or_default()
484 .add(value, doc_id);
485 }
486
487 pub fn add_timestamp(&mut self, field: &str, timestamp: u64, doc_id: u64) {
489 self.add_range(field, timestamp as i64, doc_id);
490 }
491
492 pub fn set_doc_count(&mut self, count: usize) {
494 self.doc_count = count;
495 }
496
497 pub fn inc_doc_count(&mut self) {
499 self.doc_count += 1;
500 }
501
502 pub fn doc_count(&self) -> usize {
504 self.doc_count
505 }
506
507 pub fn evaluate_atom(&self, atom: &FilterAtom) -> AllowedSet {
509 match atom {
510 FilterAtom::Eq { field, value } => {
511 if let Some(index) = self.equality_indexes.get(field) {
512 match value {
513 FilterValue::String(s) => index.lookup_string(s),
514 FilterValue::Int64(i) => index.lookup_int(*i),
515 FilterValue::Uint64(u) => index.lookup_uint(*u),
516 _ => AllowedSet::All, }
518 } else {
519 AllowedSet::All }
521 }
522
523 FilterAtom::In { field, values } => {
524 if let Some(index) = self.equality_indexes.get(field) {
525 let strings: Vec<String> = values.iter()
527 .filter_map(|v| match v {
528 FilterValue::String(s) => Some(s.clone()),
529 _ => None,
530 })
531 .collect();
532
533 if strings.len() == values.len() {
534 return index.lookup_string_in(&strings);
535 }
536
537 let uints: Vec<u64> = values.iter()
539 .filter_map(|v| match v {
540 FilterValue::Uint64(u) => Some(*u),
541 _ => None,
542 })
543 .collect();
544
545 if uints.len() == values.len() {
546 return index.lookup_uint_in(&uints);
547 }
548 }
549 AllowedSet::All }
551
552 FilterAtom::Range { field, min, max, min_inclusive, max_inclusive } => {
553 if let Some(index) = self.range_indexes.get(field) {
554 let min_val = min.as_ref().and_then(|v| match v {
555 FilterValue::Int64(i) => Some(*i),
556 FilterValue::Uint64(u) => Some(*u as i64),
557 _ => None,
558 });
559 let max_val = max.as_ref().and_then(|v| match v {
560 FilterValue::Int64(i) => Some(*i),
561 FilterValue::Uint64(u) => Some(*u as i64),
562 _ => None,
563 });
564
565 index.range_query(min_val, max_val, *min_inclusive, *max_inclusive)
566 } else {
567 AllowedSet::All
568 }
569 }
570
571 FilterAtom::True => AllowedSet::All,
572 FilterAtom::False => AllowedSet::None,
573
574 _ => AllowedSet::All,
576 }
577 }
578
579 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
583 if filter.is_all() {
584 return AllowedSet::All;
585 }
586 if filter.is_none() {
587 return AllowedSet::None;
588 }
589
590 let mut result = AllowedSet::All;
592
593 for clause in &filter.clauses {
594 let clause_result = self.evaluate_disjunction(clause);
596
597 result = result.intersect(&clause_result);
599
600 if result.is_empty() {
602 return AllowedSet::None;
603 }
604 }
605
606 result
607 }
608
609 fn evaluate_disjunction(&self, clause: &crate::filter_ir::Disjunction) -> AllowedSet {
611 if clause.atoms.len() == 1 {
612 return self.evaluate_atom(&clause.atoms[0]);
613 }
614
615 let mut result = AllowedSet::None;
617 for atom in &clause.atoms {
618 let atom_result = self.evaluate_atom(atom);
619 result = result.union(&atom_result);
620
621 if result.is_all() {
623 return AllowedSet::All;
624 }
625 }
626
627 result
628 }
629
630 pub fn estimate_selectivity(&self, filter: &FilterIR) -> f64 {
632 if self.doc_count == 0 {
633 return 1.0;
634 }
635
636 let allowed = self.evaluate(filter);
637 allowed.selectivity(self.doc_count)
638 }
639}
640
641pub struct ConcurrentMetadataIndex {
647 inner: RwLock<MetadataIndex>,
648}
649
650impl ConcurrentMetadataIndex {
651 pub fn new() -> Self {
653 Self {
654 inner: RwLock::new(MetadataIndex::new()),
655 }
656 }
657
658 pub fn add_string(&self, field: &str, value: &str, doc_id: u64) {
660 self.inner.write().unwrap().add_string(field, value, doc_id);
661 }
662
663 pub fn add_timestamp(&self, field: &str, timestamp: u64, doc_id: u64) {
665 self.inner.write().unwrap().add_timestamp(field, timestamp, doc_id);
666 }
667
668 pub fn evaluate(&self, filter: &FilterIR) -> AllowedSet {
670 self.inner.read().unwrap().evaluate(filter)
671 }
672
673 pub fn set_doc_count(&self, count: usize) {
675 self.inner.write().unwrap().set_doc_count(count);
676 }
677}
678
679impl Default for ConcurrentMetadataIndex {
680 fn default() -> Self {
681 Self::new()
682 }
683}
684
685#[cfg(test)]
690mod tests {
691 use super::*;
692 use crate::filter_ir::FilterBuilder;
693
694 #[test]
695 fn test_posting_set_basic() {
696 let mut ps = PostingSet::new();
697 ps.add(1);
698 ps.add(5);
699 ps.add(3);
700
701 assert!(ps.contains(1));
702 assert!(ps.contains(3));
703 assert!(ps.contains(5));
704 assert!(!ps.contains(2));
705 assert_eq!(ps.len(), 3);
706 }
707
708 #[test]
709 fn test_posting_set_intersection() {
710 let a = PostingSet::from_vec(vec![1, 2, 3, 4, 5]);
711 let b = PostingSet::from_vec(vec![3, 4, 5, 6, 7]);
712
713 let c = a.intersect(&b);
714 assert_eq!(c.len(), 3);
715 assert!(c.contains(3));
716 assert!(c.contains(4));
717 assert!(c.contains(5));
718 }
719
720 #[test]
721 fn test_equality_index() {
722 let mut idx = EqualityIndex::new();
723 idx.add_string("production", 1);
724 idx.add_string("production", 2);
725 idx.add_string("staging", 3);
726
727 let result = idx.lookup_string("production");
728 assert_eq!(result.cardinality(), Some(2));
729
730 let result2 = idx.lookup_string("staging");
731 assert_eq!(result2.cardinality(), Some(1));
732
733 let result3 = idx.lookup_string("dev");
734 assert!(result3.is_empty());
735 }
736
737 #[test]
738 fn test_range_index() {
739 let mut idx = RangeIndex::new();
740 idx.add(100, 1);
741 idx.add(200, 2);
742 idx.add(300, 3);
743 idx.add(400, 4);
744 idx.add(500, 5);
745
746 let result = idx.range_query(Some(200), Some(400), true, true);
748 assert_eq!(result.cardinality(), Some(3));
749
750 let result2 = idx.greater_than(300, false);
752 assert_eq!(result2.cardinality(), Some(2));
753
754 let result3 = idx.less_than(300, true);
756 assert_eq!(result3.cardinality(), Some(3));
757 }
758
759 #[test]
760 fn test_metadata_index_evaluation() {
761 let mut idx = MetadataIndex::new();
762
763 for i in 0..10 {
765 idx.add_string("namespace", "production", i);
766 idx.add_timestamp("created_at", 1000 + i * 100, i);
767 }
768 for i in 10..20 {
769 idx.add_string("namespace", "staging", i);
770 idx.add_timestamp("created_at", 1000 + i * 100, i);
771 }
772 idx.set_doc_count(20);
773
774 let filter = FilterBuilder::new()
776 .namespace("production")
777 .build();
778
779 let result = idx.evaluate(&filter);
780 assert_eq!(result.cardinality(), Some(10));
781
782 let filter2 = FilterBuilder::new()
784 .namespace("production")
785 .gte("created_at", 1500i64)
786 .build();
787
788 let result2 = idx.evaluate(&filter2);
789 assert_eq!(result2.cardinality(), Some(5));
791 }
792
793 #[test]
794 fn test_selectivity_estimate() {
795 let mut idx = MetadataIndex::new();
796
797 for i in 0..100 {
798 let ns = if i % 10 == 0 { "rare" } else { "common" };
799 idx.add_string("namespace", ns, i);
800 }
801 idx.set_doc_count(100);
802
803 let common_filter = FilterBuilder::new().namespace("common").build();
804 let rare_filter = FilterBuilder::new().namespace("rare").build();
805
806 let common_selectivity = idx.estimate_selectivity(&common_filter);
807 let rare_selectivity = idx.estimate_selectivity(&rare_filter);
808
809 assert!(common_selectivity > rare_selectivity);
810 assert!((common_selectivity - 0.9).abs() < 0.01);
811 assert!((rare_selectivity - 0.1).abs() < 0.01);
812 }
813}