Skip to main content

reddb_server/storage/engine/
vector_metadata.rs

1//! Vector Metadata Storage
2//!
3//! Type-aware metadata storage for vectors, inspired by Chroma's design.
4//! Supports efficient filtering on metadata during vector search.
5//!
6//! # Design
7//!
8//! - Metadata values are stored by type for efficient comparisons
9//! - Inverted indexes enable fast filtering by metadata
10//! - Supports rich filter operators (eq, ne, gt, gte, lt, lte, in, contains)
11
12use std::collections::{BTreeMap, HashMap, HashSet};
13
14use super::hnsw::NodeId;
15use crate::storage::query::value_compare::partial_compare_values;
16use crate::storage::schema::{value_to_canonical_key, CanonicalKey, CanonicalKeyFamily, Value};
17
18/// A metadata value that can be one of several types
19#[derive(Debug, Clone, PartialEq)]
20pub enum MetadataValue {
21    /// String value
22    String(String),
23    /// Integer value
24    Integer(i64),
25    /// Floating point value
26    Float(f64),
27    /// Boolean value
28    Bool(bool),
29    /// Null value
30    Null,
31}
32
33impl MetadataValue {
34    /// Check if this value matches another for equality
35    pub fn matches_eq(&self, other: &MetadataValue) -> bool {
36        compare_metadata_values(self, other)
37            .map(|ord| ord == std::cmp::Ordering::Equal)
38            .unwrap_or(false)
39    }
40
41    /// Compare for ordering (returns None for incompatible types)
42    pub fn compare(&self, other: &MetadataValue) -> Option<std::cmp::Ordering> {
43        compare_metadata_values(self, other)
44    }
45
46    /// Check if this string value contains a substring
47    pub fn contains_str(&self, needle: &str) -> bool {
48        match self {
49            MetadataValue::String(s) => s.contains(needle),
50            _ => false,
51        }
52    }
53
54    /// Check if this string value starts with a prefix
55    pub fn starts_with(&self, prefix: &str) -> bool {
56        match self {
57            MetadataValue::String(s) => s.starts_with(prefix),
58            _ => false,
59        }
60    }
61
62    /// Check if this string value ends with a suffix
63    pub fn ends_with(&self, suffix: &str) -> bool {
64        match self {
65            MetadataValue::String(s) => s.ends_with(suffix),
66            _ => false,
67        }
68    }
69}
70
71impl From<String> for MetadataValue {
72    fn from(s: String) -> Self {
73        MetadataValue::String(s)
74    }
75}
76
77impl From<&str> for MetadataValue {
78    fn from(s: &str) -> Self {
79        MetadataValue::String(s.to_string())
80    }
81}
82
83impl From<i64> for MetadataValue {
84    fn from(i: i64) -> Self {
85        MetadataValue::Integer(i)
86    }
87}
88
89impl From<i32> for MetadataValue {
90    fn from(i: i32) -> Self {
91        MetadataValue::Integer(i as i64)
92    }
93}
94
95impl From<f64> for MetadataValue {
96    fn from(f: f64) -> Self {
97        MetadataValue::Float(f)
98    }
99}
100
101impl From<f32> for MetadataValue {
102    fn from(f: f32) -> Self {
103        MetadataValue::Float(f as f64)
104    }
105}
106
107impl From<bool> for MetadataValue {
108    fn from(b: bool) -> Self {
109        MetadataValue::Bool(b)
110    }
111}
112
113fn metadata_value_to_storage_value(value: &MetadataValue) -> Value {
114    match value {
115        MetadataValue::String(s) => Value::text(s.clone()),
116        MetadataValue::Integer(i) => Value::Integer(*i),
117        MetadataValue::Float(f) => Value::Float(*f),
118        MetadataValue::Bool(b) => Value::Boolean(*b),
119        MetadataValue::Null => Value::Null,
120    }
121}
122
123fn metadata_value_to_canonical_key(value: &MetadataValue) -> Option<CanonicalKey> {
124    let storage_value = metadata_value_to_storage_value(value);
125    value_to_canonical_key(&storage_value)
126}
127
128fn compare_metadata_values(
129    left: &MetadataValue,
130    right: &MetadataValue,
131) -> Option<std::cmp::Ordering> {
132    let left_value = metadata_value_to_storage_value(left);
133    let right_value = metadata_value_to_storage_value(right);
134    partial_compare_values(&left_value, &right_value).or_else(|| {
135        let left_key = value_to_canonical_key(&left_value)?;
136        let right_key = value_to_canonical_key(&right_value)?;
137        (left_key.family() == right_key.family()).then(|| left_key.cmp(&right_key))
138    })
139}
140
141/// A metadata entry containing key-value pairs organized by type
142#[derive(Debug, Clone, Default)]
143pub struct MetadataEntry {
144    /// String metadata values
145    pub strings: HashMap<String, String>,
146    /// Integer metadata values
147    pub integers: HashMap<String, i64>,
148    /// Float metadata values
149    pub floats: HashMap<String, f64>,
150    /// Boolean metadata values
151    pub bools: HashMap<String, bool>,
152}
153
154impl MetadataEntry {
155    /// Create a new empty metadata entry
156    pub fn new() -> Self {
157        Self::default()
158    }
159
160    /// Insert a metadata value
161    pub fn insert(&mut self, key: impl Into<String>, value: MetadataValue) {
162        let key = key.into();
163        match value {
164            MetadataValue::String(s) => {
165                self.strings.insert(key, s);
166            }
167            MetadataValue::Integer(i) => {
168                self.integers.insert(key, i);
169            }
170            MetadataValue::Float(f) => {
171                self.floats.insert(key, f);
172            }
173            MetadataValue::Bool(b) => {
174                self.bools.insert(key, b);
175            }
176            MetadataValue::Null => {
177                // Remove from all maps
178                self.strings.remove(&key);
179                self.integers.remove(&key);
180                self.floats.remove(&key);
181                self.bools.remove(&key);
182            }
183        }
184    }
185
186    /// Get a metadata value by key
187    pub fn get(&self, key: &str) -> Option<MetadataValue> {
188        if let Some(s) = self.strings.get(key) {
189            return Some(MetadataValue::String(s.clone()));
190        }
191        if let Some(i) = self.integers.get(key) {
192            return Some(MetadataValue::Integer(*i));
193        }
194        if let Some(f) = self.floats.get(key) {
195            return Some(MetadataValue::Float(*f));
196        }
197        if let Some(b) = self.bools.get(key) {
198            return Some(MetadataValue::Bool(*b));
199        }
200        None
201    }
202
203    /// Check if a key exists
204    pub fn contains_key(&self, key: &str) -> bool {
205        self.strings.contains_key(key)
206            || self.integers.contains_key(key)
207            || self.floats.contains_key(key)
208            || self.bools.contains_key(key)
209    }
210
211    /// Get all keys
212    pub fn keys(&self) -> Vec<String> {
213        let mut keys: Vec<String> = Vec::new();
214        keys.extend(self.strings.keys().cloned());
215        keys.extend(self.integers.keys().cloned());
216        keys.extend(self.floats.keys().cloned());
217        keys.extend(self.bools.keys().cloned());
218        keys
219    }
220
221    /// Check if empty
222    pub fn is_empty(&self) -> bool {
223        self.strings.is_empty()
224            && self.integers.is_empty()
225            && self.floats.is_empty()
226            && self.bools.is_empty()
227    }
228}
229
230/// Metadata filter operators
231#[derive(Debug, Clone)]
232pub enum MetadataFilter {
233    /// Equal: key == value
234    Eq(String, MetadataValue),
235    /// Not equal: key != value
236    Ne(String, MetadataValue),
237    /// Greater than: key > value
238    Gt(String, MetadataValue),
239    /// Greater than or equal: key >= value
240    Gte(String, MetadataValue),
241    /// Less than: key < value
242    Lt(String, MetadataValue),
243    /// Less than or equal: key <= value
244    Lte(String, MetadataValue),
245    /// In set: key in [values]
246    In(String, Vec<MetadataValue>),
247    /// Not in set: key not in [values]
248    NotIn(String, Vec<MetadataValue>),
249    /// String contains: key contains substring
250    Contains(String, String),
251    /// String starts with: key starts with prefix
252    StartsWith(String, String),
253    /// String ends with: key ends with suffix
254    EndsWith(String, String),
255    /// Key exists
256    Exists(String),
257    /// Key does not exist
258    NotExists(String),
259    /// Logical AND of filters
260    And(Vec<MetadataFilter>),
261    /// Logical OR of filters
262    Or(Vec<MetadataFilter>),
263    /// Logical NOT of filter
264    Not(Box<MetadataFilter>),
265}
266
267impl MetadataFilter {
268    /// Create an equality filter
269    pub fn eq(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
270        MetadataFilter::Eq(key.into(), value.into())
271    }
272
273    /// Create a not-equal filter
274    pub fn ne(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
275        MetadataFilter::Ne(key.into(), value.into())
276    }
277
278    /// Create a greater-than filter
279    pub fn gt(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
280        MetadataFilter::Gt(key.into(), value.into())
281    }
282
283    /// Create a greater-than-or-equal filter
284    pub fn gte(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
285        MetadataFilter::Gte(key.into(), value.into())
286    }
287
288    /// Create a less-than filter
289    pub fn lt(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
290        MetadataFilter::Lt(key.into(), value.into())
291    }
292
293    /// Create a less-than-or-equal filter
294    pub fn lte(key: impl Into<String>, value: impl Into<MetadataValue>) -> Self {
295        MetadataFilter::Lte(key.into(), value.into())
296    }
297
298    /// Create an AND filter
299    pub fn and(filters: Vec<MetadataFilter>) -> Self {
300        MetadataFilter::And(filters)
301    }
302
303    /// Create an OR filter
304    pub fn or(filters: Vec<MetadataFilter>) -> Self {
305        MetadataFilter::Or(filters)
306    }
307
308    /// Create a NOT filter
309    pub fn not(filter: MetadataFilter) -> Self {
310        MetadataFilter::Not(Box::new(filter))
311    }
312
313    /// Check if a metadata entry matches this filter
314    pub fn matches(&self, entry: &MetadataEntry) -> bool {
315        match self {
316            MetadataFilter::Eq(key, value) => {
317                entry.get(key).map(|v| v.matches_eq(value)).unwrap_or(false)
318            }
319            MetadataFilter::Ne(key, value) => {
320                entry.get(key).map(|v| !v.matches_eq(value)).unwrap_or(true)
321            }
322            MetadataFilter::Gt(key, value) => entry
323                .get(key)
324                .and_then(|v| v.compare(value))
325                .map(|ord| ord == std::cmp::Ordering::Greater)
326                .unwrap_or(false),
327            MetadataFilter::Gte(key, value) => entry
328                .get(key)
329                .and_then(|v| v.compare(value))
330                .map(|ord| ord != std::cmp::Ordering::Less)
331                .unwrap_or(false),
332            MetadataFilter::Lt(key, value) => entry
333                .get(key)
334                .and_then(|v| v.compare(value))
335                .map(|ord| ord == std::cmp::Ordering::Less)
336                .unwrap_or(false),
337            MetadataFilter::Lte(key, value) => entry
338                .get(key)
339                .and_then(|v| v.compare(value))
340                .map(|ord| ord != std::cmp::Ordering::Greater)
341                .unwrap_or(false),
342            MetadataFilter::In(key, values) => entry
343                .get(key)
344                .map(|v| values.iter().any(|val| v.matches_eq(val)))
345                .unwrap_or(false),
346            MetadataFilter::NotIn(key, values) => entry
347                .get(key)
348                .map(|v| !values.iter().any(|val| v.matches_eq(val)))
349                .unwrap_or(true),
350            MetadataFilter::Contains(key, needle) => entry
351                .get(key)
352                .map(|v| v.contains_str(needle))
353                .unwrap_or(false),
354            MetadataFilter::StartsWith(key, prefix) => entry
355                .get(key)
356                .map(|v| v.starts_with(prefix))
357                .unwrap_or(false),
358            MetadataFilter::EndsWith(key, suffix) => {
359                entry.get(key).map(|v| v.ends_with(suffix)).unwrap_or(false)
360            }
361            MetadataFilter::Exists(key) => entry.contains_key(key),
362            MetadataFilter::NotExists(key) => !entry.contains_key(key),
363            MetadataFilter::And(filters) => filters.iter().all(|f| f.matches(entry)),
364            MetadataFilter::Or(filters) => filters.iter().any(|f| f.matches(entry)),
365            MetadataFilter::Not(filter) => !filter.matches(entry),
366        }
367    }
368}
369
370/// Inverted index for a single metadata key
371#[derive(Debug, Clone, Default)]
372struct KeyIndex {
373    /// String value -> vector IDs
374    string_index: HashMap<String, HashSet<NodeId>>,
375    /// Integer value -> vector IDs
376    integer_index: HashMap<i64, HashSet<NodeId>>,
377    /// Boolean value -> vector IDs
378    bool_index: HashMap<bool, HashSet<NodeId>>,
379    /// Canonical ordered value -> vector IDs
380    ordered_index: BTreeMap<CanonicalKey, HashSet<NodeId>>,
381    /// Family seen in this key's metadata values. Mixed families disable range pushdown.
382    range_family: Option<CanonicalKeyFamily>,
383    has_mixed_families: bool,
384    /// All vector IDs that have this key
385    all_ids: HashSet<NodeId>,
386}
387
388impl KeyIndex {
389    fn new() -> Self {
390        Self::default()
391    }
392
393    fn insert(&mut self, id: NodeId, value: &MetadataValue) {
394        self.all_ids.insert(id);
395        match value {
396            MetadataValue::String(s) => {
397                self.string_index.entry(s.clone()).or_default().insert(id);
398            }
399            MetadataValue::Integer(i) => {
400                self.integer_index.entry(*i).or_default().insert(id);
401            }
402            MetadataValue::Bool(b) => {
403                self.bool_index.entry(*b).or_default().insert(id);
404            }
405            MetadataValue::Float(_) | MetadataValue::Null => {}
406        }
407
408        if let Some(key) = metadata_value_to_canonical_key(value) {
409            match self.range_family {
410                Some(existing) if existing != key.family() => self.has_mixed_families = true,
411                None => self.range_family = Some(key.family()),
412                _ => {}
413            }
414            self.ordered_index.entry(key).or_default().insert(id);
415        }
416    }
417
418    fn remove(&mut self, id: NodeId, value: &MetadataValue) {
419        self.all_ids.remove(&id);
420        match value {
421            MetadataValue::String(s) => {
422                if let Some(ids) = self.string_index.get_mut(s) {
423                    ids.remove(&id);
424                }
425            }
426            MetadataValue::Integer(i) => {
427                if let Some(ids) = self.integer_index.get_mut(i) {
428                    ids.remove(&id);
429                }
430            }
431            MetadataValue::Bool(b) => {
432                if let Some(ids) = self.bool_index.get_mut(b) {
433                    ids.remove(&id);
434                }
435            }
436            _ => {}
437        }
438
439        if let Some(key) = metadata_value_to_canonical_key(value) {
440            if let Some(ids) = self.ordered_index.get_mut(&key) {
441                ids.remove(&id);
442                if ids.is_empty() {
443                    self.ordered_index.remove(&key);
444                }
445            }
446        }
447    }
448
449    fn exact_match_ids(&self, value: &MetadataValue) -> Option<HashSet<NodeId>> {
450        match value {
451            MetadataValue::String(s) => Some(self.string_index.get(s).cloned().unwrap_or_default()),
452            MetadataValue::Integer(i) => {
453                Some(self.integer_index.get(i).cloned().unwrap_or_default())
454            }
455            MetadataValue::Bool(b) => Some(self.bool_index.get(b).cloned().unwrap_or_default()),
456            MetadataValue::Null => Some(HashSet::new()),
457            MetadataValue::Float(f) if f.is_nan() => Some(HashSet::new()),
458            MetadataValue::Float(_) => metadata_value_to_canonical_key(value)
459                .map(|key| self.ordered_index.get(&key).cloned().unwrap_or_default()),
460        }
461    }
462
463    fn supports_range_key(&self, key: &CanonicalKey) -> bool {
464        !self.has_mixed_families && self.range_family == Some(key.family())
465    }
466
467    fn range_match_ids(
468        &self,
469        value: &MetadataValue,
470        op: MetadataRangeOp,
471    ) -> Option<HashSet<NodeId>> {
472        let key = metadata_value_to_canonical_key(value)?;
473        if !self.supports_range_key(&key) {
474            return None;
475        }
476
477        let mut out = HashSet::new();
478        match op {
479            MetadataRangeOp::Gt => {
480                for ids in self
481                    .ordered_index
482                    .range((std::ops::Bound::Excluded(key), std::ops::Bound::Unbounded))
483                    .map(|(_, ids)| ids)
484                {
485                    out.extend(ids.iter().copied());
486                }
487            }
488            MetadataRangeOp::Gte => {
489                for ids in self
490                    .ordered_index
491                    .range((std::ops::Bound::Included(key), std::ops::Bound::Unbounded))
492                    .map(|(_, ids)| ids)
493                {
494                    out.extend(ids.iter().copied());
495                }
496            }
497            MetadataRangeOp::Lt => {
498                for ids in self
499                    .ordered_index
500                    .range((std::ops::Bound::Unbounded, std::ops::Bound::Excluded(key)))
501                    .map(|(_, ids)| ids)
502                {
503                    out.extend(ids.iter().copied());
504                }
505            }
506            MetadataRangeOp::Lte => {
507                for ids in self
508                    .ordered_index
509                    .range((std::ops::Bound::Unbounded, std::ops::Bound::Included(key)))
510                    .map(|(_, ids)| ids)
511                {
512                    out.extend(ids.iter().copied());
513                }
514            }
515        }
516        Some(out)
517    }
518}
519
520#[derive(Debug, Clone, Copy)]
521enum MetadataRangeOp {
522    Gt,
523    Gte,
524    Lt,
525    Lte,
526}
527
528/// Metadata storage with inverted indexes for filtering
529pub struct MetadataStore {
530    /// Vector ID -> metadata entry
531    entries: HashMap<NodeId, MetadataEntry>,
532    /// Key -> inverted index
533    indexes: HashMap<String, KeyIndex>,
534}
535
536impl MetadataStore {
537    /// Create a new empty metadata store
538    pub fn new() -> Self {
539        Self {
540            entries: HashMap::new(),
541            indexes: HashMap::new(),
542        }
543    }
544
545    /// Get the number of entries
546    pub fn len(&self) -> usize {
547        self.entries.len()
548    }
549
550    /// Check if empty
551    pub fn is_empty(&self) -> bool {
552        self.entries.is_empty()
553    }
554
555    /// Insert or update metadata for a vector
556    pub fn insert(&mut self, id: NodeId, entry: MetadataEntry) {
557        // Remove old indexes
558        if let Some(old_entry) = self.entries.get(&id) {
559            for key in old_entry.keys() {
560                if let Some(value) = old_entry.get(&key) {
561                    if let Some(index) = self.indexes.get_mut(&key) {
562                        index.remove(id, &value);
563                    }
564                }
565            }
566        }
567
568        // Add new indexes
569        for key in entry.keys() {
570            if let Some(value) = entry.get(&key) {
571                self.indexes
572                    .entry(key.clone())
573                    .or_default()
574                    .insert(id, &value);
575            }
576        }
577
578        self.entries.insert(id, entry);
579    }
580
581    /// Get metadata for a vector
582    pub fn get(&self, id: NodeId) -> Option<&MetadataEntry> {
583        self.entries.get(&id)
584    }
585
586    /// Remove metadata for a vector
587    pub fn remove(&mut self, id: NodeId) -> Option<MetadataEntry> {
588        if let Some(entry) = self.entries.remove(&id) {
589            for key in entry.keys() {
590                if let Some(value) = entry.get(&key) {
591                    if let Some(index) = self.indexes.get_mut(&key) {
592                        index.remove(id, &value);
593                    }
594                }
595            }
596            Some(entry)
597        } else {
598            None
599        }
600    }
601
602    /// Filter entries and return matching vector IDs
603    pub fn filter(&self, filter: &MetadataFilter) -> HashSet<NodeId> {
604        self.filter_internal(filter)
605    }
606
607    fn filter_internal(&self, filter: &MetadataFilter) -> HashSet<NodeId> {
608        match filter {
609            MetadataFilter::Eq(key, value) => self
610                .indexes
611                .get(key)
612                .and_then(|idx| idx.exact_match_ids(value))
613                .unwrap_or_else(|| {
614                    self.entries
615                        .iter()
616                        .filter(|(_, entry)| {
617                            entry
618                                .get(key)
619                                .map(|candidate| candidate.matches_eq(value))
620                                .unwrap_or(false)
621                        })
622                        .map(|(id, _)| *id)
623                        .collect()
624                }),
625            MetadataFilter::Ne(key, value) => {
626                let all: HashSet<_> = self.entries.keys().copied().collect();
627                if let Some(index) = self.indexes.get(key) {
628                    if let Some(exact) = index.exact_match_ids(value) {
629                        return all.difference(&exact).copied().collect();
630                    }
631                }
632                self.entries
633                    .iter()
634                    .filter(|(_, entry)| {
635                        entry
636                            .get(key)
637                            .map(|candidate| !candidate.matches_eq(value))
638                            .unwrap_or(true)
639                    })
640                    .map(|(id, _)| *id)
641                    .collect()
642            }
643            MetadataFilter::Gt(key, value) => self
644                .indexes
645                .get(key)
646                .and_then(|idx| idx.range_match_ids(value, MetadataRangeOp::Gt))
647                .unwrap_or_else(|| {
648                    self.entries
649                        .iter()
650                        .filter(|(_, entry)| {
651                            entry
652                                .get(key)
653                                .and_then(|candidate| candidate.compare(value))
654                                .map(|ord| ord == std::cmp::Ordering::Greater)
655                                .unwrap_or(false)
656                        })
657                        .map(|(id, _)| *id)
658                        .collect()
659                }),
660            MetadataFilter::Gte(key, value) => self
661                .indexes
662                .get(key)
663                .and_then(|idx| idx.range_match_ids(value, MetadataRangeOp::Gte))
664                .unwrap_or_else(|| {
665                    self.entries
666                        .iter()
667                        .filter(|(_, entry)| {
668                            entry
669                                .get(key)
670                                .and_then(|candidate| candidate.compare(value))
671                                .map(|ord| ord != std::cmp::Ordering::Less)
672                                .unwrap_or(false)
673                        })
674                        .map(|(id, _)| *id)
675                        .collect()
676                }),
677            MetadataFilter::Lt(key, value) => self
678                .indexes
679                .get(key)
680                .and_then(|idx| idx.range_match_ids(value, MetadataRangeOp::Lt))
681                .unwrap_or_else(|| {
682                    self.entries
683                        .iter()
684                        .filter(|(_, entry)| {
685                            entry
686                                .get(key)
687                                .and_then(|candidate| candidate.compare(value))
688                                .map(|ord| ord == std::cmp::Ordering::Less)
689                                .unwrap_or(false)
690                        })
691                        .map(|(id, _)| *id)
692                        .collect()
693                }),
694            MetadataFilter::Lte(key, value) => self
695                .indexes
696                .get(key)
697                .and_then(|idx| idx.range_match_ids(value, MetadataRangeOp::Lte))
698                .unwrap_or_else(|| {
699                    self.entries
700                        .iter()
701                        .filter(|(_, entry)| {
702                            entry
703                                .get(key)
704                                .and_then(|candidate| candidate.compare(value))
705                                .map(|ord| ord != std::cmp::Ordering::Greater)
706                                .unwrap_or(false)
707                        })
708                        .map(|(id, _)| *id)
709                        .collect()
710                }),
711            MetadataFilter::In(key, values) => {
712                if let Some(index) = self.indexes.get(key) {
713                    if let Some(result) =
714                        values.iter().try_fold(HashSet::new(), |mut acc, value| {
715                            let ids = index.exact_match_ids(value)?;
716                            acc.extend(ids);
717                            Some(acc)
718                        })
719                    {
720                        return result;
721                    }
722                }
723                self.entries
724                    .iter()
725                    .filter(|(_, entry)| {
726                        entry
727                            .get(key)
728                            .map(|candidate| values.iter().any(|value| candidate.matches_eq(value)))
729                            .unwrap_or(false)
730                    })
731                    .map(|(id, _)| *id)
732                    .collect()
733            }
734            MetadataFilter::NotIn(key, values) => {
735                let all: HashSet<_> = self.entries.keys().copied().collect();
736                if let Some(index) = self.indexes.get(key) {
737                    if let Some(matched) =
738                        values.iter().try_fold(HashSet::new(), |mut acc, value| {
739                            let ids = index.exact_match_ids(value)?;
740                            acc.extend(ids);
741                            Some(acc)
742                        })
743                    {
744                        return all.difference(&matched).copied().collect();
745                    }
746                }
747                self.entries
748                    .iter()
749                    .filter(|(_, entry)| {
750                        entry
751                            .get(key)
752                            .map(|candidate| {
753                                !values.iter().any(|value| candidate.matches_eq(value))
754                            })
755                            .unwrap_or(true)
756                    })
757                    .map(|(id, _)| *id)
758                    .collect()
759            }
760            MetadataFilter::Exists(key) => self
761                .indexes
762                .get(key)
763                .map(|idx| idx.all_ids.clone())
764                .unwrap_or_default(),
765            MetadataFilter::And(filters) => {
766                if filters.is_empty() {
767                    return self.entries.keys().copied().collect();
768                }
769                let mut result = self.filter_internal(&filters[0]);
770                for filter in &filters[1..] {
771                    let other = self.filter_internal(filter);
772                    result = result.intersection(&other).copied().collect();
773                }
774                result
775            }
776            MetadataFilter::Or(filters) => {
777                let mut result = HashSet::new();
778                for filter in filters {
779                    result.extend(self.filter_internal(filter));
780                }
781                result
782            }
783            MetadataFilter::Not(inner) => {
784                let all: HashSet<_> = self.entries.keys().copied().collect();
785                let matched = self.filter_internal(inner);
786                all.difference(&matched).copied().collect()
787            }
788            // For complex filters, fall back to scanning
789            _ => self
790                .entries
791                .iter()
792                .filter(|(_, entry)| filter.matches(entry))
793                .map(|(id, _)| *id)
794                .collect(),
795        }
796    }
797}
798
799impl Default for MetadataStore {
800    fn default() -> Self {
801        Self::new()
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808
809    #[test]
810    fn test_metadata_entry() {
811        let mut entry = MetadataEntry::new();
812        entry.insert("name", MetadataValue::String("test".to_string()));
813        entry.insert("count", MetadataValue::Integer(42));
814        entry.insert("score", MetadataValue::Float(2.5));
815        entry.insert("active", MetadataValue::Bool(true));
816
817        assert_eq!(
818            entry.get("name"),
819            Some(MetadataValue::String("test".to_string()))
820        );
821        assert_eq!(entry.get("count"), Some(MetadataValue::Integer(42)));
822        assert!(entry.get("score").is_some());
823        assert_eq!(entry.get("active"), Some(MetadataValue::Bool(true)));
824        assert!(entry.get("nonexistent").is_none());
825    }
826
827    #[test]
828    fn test_filter_eq() {
829        let mut store = MetadataStore::new();
830
831        let mut entry1 = MetadataEntry::new();
832        entry1.insert("type", MetadataValue::String("host".to_string()));
833
834        let mut entry2 = MetadataEntry::new();
835        entry2.insert("type", MetadataValue::String("service".to_string()));
836
837        store.insert(1, entry1);
838        store.insert(2, entry2);
839
840        let filter = MetadataFilter::eq("type", "host");
841        let results = store.filter(&filter);
842
843        assert_eq!(results.len(), 1);
844        assert!(results.contains(&1));
845    }
846
847    #[test]
848    fn test_filter_comparison() {
849        let mut store = MetadataStore::new();
850
851        for i in 0..10 {
852            let mut entry = MetadataEntry::new();
853            entry.insert("score", MetadataValue::Integer(i));
854            store.insert(i as u64, entry);
855        }
856
857        // score > 5
858        let filter = MetadataFilter::gt("score", MetadataValue::Integer(5));
859        let results = store.filter(&filter);
860        assert_eq!(results.len(), 4); // 6, 7, 8, 9
861
862        // score >= 5
863        let filter = MetadataFilter::gte("score", MetadataValue::Integer(5));
864        let results = store.filter(&filter);
865        assert_eq!(results.len(), 5); // 5, 6, 7, 8, 9
866
867        // score < 3
868        let filter = MetadataFilter::lt("score", MetadataValue::Integer(3));
869        let results = store.filter(&filter);
870        assert_eq!(results.len(), 3); // 0, 1, 2
871    }
872
873    #[test]
874    fn test_filter_and() {
875        let mut store = MetadataStore::new();
876
877        let mut entry1 = MetadataEntry::new();
878        entry1.insert("type", MetadataValue::String("host".to_string()));
879        entry1.insert("active", MetadataValue::Bool(true));
880
881        let mut entry2 = MetadataEntry::new();
882        entry2.insert("type", MetadataValue::String("host".to_string()));
883        entry2.insert("active", MetadataValue::Bool(false));
884
885        let mut entry3 = MetadataEntry::new();
886        entry3.insert("type", MetadataValue::String("service".to_string()));
887        entry3.insert("active", MetadataValue::Bool(true));
888
889        store.insert(1, entry1);
890        store.insert(2, entry2);
891        store.insert(3, entry3);
892
893        let filter = MetadataFilter::and(vec![
894            MetadataFilter::eq("type", "host"),
895            MetadataFilter::eq("active", true),
896        ]);
897        let results = store.filter(&filter);
898
899        assert_eq!(results.len(), 1);
900        assert!(results.contains(&1));
901    }
902
903    #[test]
904    fn test_filter_or() {
905        let mut store = MetadataStore::new();
906
907        let mut entry1 = MetadataEntry::new();
908        entry1.insert("type", MetadataValue::String("host".to_string()));
909
910        let mut entry2 = MetadataEntry::new();
911        entry2.insert("type", MetadataValue::String("service".to_string()));
912
913        let mut entry3 = MetadataEntry::new();
914        entry3.insert("type", MetadataValue::String("network".to_string()));
915
916        store.insert(1, entry1);
917        store.insert(2, entry2);
918        store.insert(3, entry3);
919
920        let filter = MetadataFilter::or(vec![
921            MetadataFilter::eq("type", "host"),
922            MetadataFilter::eq("type", "service"),
923        ]);
924        let results = store.filter(&filter);
925
926        assert_eq!(results.len(), 2);
927        assert!(results.contains(&1));
928        assert!(results.contains(&2));
929    }
930
931    #[test]
932    fn test_filter_contains() {
933        let mut store = MetadataStore::new();
934
935        let mut entry1 = MetadataEntry::new();
936        entry1.insert(
937            "description",
938            MetadataValue::String("SSH vulnerability".to_string()),
939        );
940
941        let mut entry2 = MetadataEntry::new();
942        entry2.insert(
943            "description",
944            MetadataValue::String("HTTP server".to_string()),
945        );
946
947        store.insert(1, entry1);
948        store.insert(2, entry2);
949
950        let filter =
951            MetadataFilter::Contains("description".to_string(), "vulnerability".to_string());
952        let results = store.filter(&filter);
953
954        assert_eq!(results.len(), 1);
955        assert!(results.contains(&1));
956    }
957
958    #[test]
959    fn test_filter_in() {
960        let mut store = MetadataStore::new();
961
962        for i in 0..5 {
963            let mut entry = MetadataEntry::new();
964            entry.insert("id", MetadataValue::Integer(i));
965            store.insert(i as u64, entry);
966        }
967
968        let filter = MetadataFilter::In(
969            "id".to_string(),
970            vec![MetadataValue::Integer(1), MetadataValue::Integer(3)],
971        );
972        let results = store.filter(&filter);
973
974        assert_eq!(results.len(), 2);
975        assert!(results.contains(&1));
976        assert!(results.contains(&3));
977    }
978
979    #[test]
980    fn test_remove_updates_index() {
981        let mut store = MetadataStore::new();
982
983        let mut entry = MetadataEntry::new();
984        entry.insert("type", MetadataValue::String("host".to_string()));
985        store.insert(1, entry);
986
987        assert_eq!(store.filter(&MetadataFilter::eq("type", "host")).len(), 1);
988
989        store.remove(1);
990
991        assert_eq!(store.filter(&MetadataFilter::eq("type", "host")).len(), 0);
992    }
993
994    #[test]
995    fn test_filter_float_eq_uses_canonical_index() {
996        let mut store = MetadataStore::new();
997
998        let mut entry1 = MetadataEntry::new();
999        entry1.insert("score", MetadataValue::Float(1.5));
1000        store.insert(1, entry1);
1001
1002        let mut entry2 = MetadataEntry::new();
1003        entry2.insert("score", MetadataValue::Float(2.5));
1004        store.insert(2, entry2);
1005
1006        let results = store.filter(&MetadataFilter::eq("score", MetadataValue::Float(2.5)));
1007        assert_eq!(results, HashSet::from([2]));
1008    }
1009
1010    #[test]
1011    fn test_filter_string_range_uses_ordered_index() {
1012        let mut store = MetadataStore::new();
1013
1014        for (id, tier) in [(1, "alpha"), (2, "bravo"), (3, "delta")] {
1015            let mut entry = MetadataEntry::new();
1016            entry.insert("tier", MetadataValue::String(tier.to_string()));
1017            store.insert(id, entry);
1018        }
1019
1020        let results = store.filter(&MetadataFilter::gte(
1021            "tier",
1022            MetadataValue::String("bravo".to_string()),
1023        ));
1024        assert_eq!(results, HashSet::from([2, 3]));
1025    }
1026}