1use std::collections::HashMap;
17
18use roaring::RoaringBitmap;
19
20use nodedb_types::Value;
21
22#[derive(
30 Debug,
31 Clone,
32 PartialEq,
33 Eq,
34 PartialOrd,
35 Ord,
36 Hash,
37 serde::Serialize,
38 serde::Deserialize,
39 zerompk::ToMessagePack,
40 zerompk::FromMessagePack,
41)]
42#[non_exhaustive]
43pub enum PayloadKey {
44 Null,
45 Bool(bool),
46 Integer(i64),
47 Float(u64),
49 String(String),
50 Bytes(Vec<u8>),
51 DateTime(String),
53 Uuid(String),
54}
55
56impl PayloadKey {
57 pub fn from_value(v: &Value) -> Option<PayloadKey> {
63 match v {
64 Value::Null => Some(PayloadKey::Null),
65 Value::Bool(b) => Some(PayloadKey::Bool(*b)),
66 Value::Integer(i) => Some(PayloadKey::Integer(*i)),
67 Value::Float(f) => {
68 let bits = if f.is_nan() { u64::MAX } else { f.to_bits() };
70 Some(PayloadKey::Float(bits))
71 }
72 Value::String(s) => Some(PayloadKey::String(s.clone())),
73 Value::Bytes(b) => Some(PayloadKey::Bytes(b.clone())),
74 Value::Uuid(s) | Value::Ulid(s) | Value::Regex(s) => Some(PayloadKey::Uuid(s.clone())),
75 Value::DateTime(dt) | Value::NaiveDateTime(dt) => {
76 Some(PayloadKey::DateTime(format!("{dt:?}")))
77 }
78 Value::Decimal(d) => Some(PayloadKey::String(d.to_string())),
79 Value::Array(_)
81 | Value::Object(_)
82 | Value::Set(_)
83 | Value::Geometry(_)
84 | Value::Duration(_)
85 | Value::Range { .. }
86 | Value::Record { .. }
87 | Value::ArrayCell(_) => None,
88 _ => None,
91 }
92 }
93}
94
95pub use nodedb_types::PayloadIndexKind;
100
101#[derive(Debug, Clone)]
107#[non_exhaustive]
108pub enum PayloadIndexBitmaps {
109 Equality(HashMap<PayloadKey, RoaringBitmap>),
110 Range(std::collections::BTreeMap<PayloadKey, RoaringBitmap>),
111}
112
113impl PayloadIndexBitmaps {
114 fn for_kind(kind: PayloadIndexKind) -> Self {
115 match kind {
116 PayloadIndexKind::Range => Self::Range(std::collections::BTreeMap::new()),
117 _ => Self::Equality(HashMap::new()),
120 }
121 }
122
123 fn get(&self, key: &PayloadKey) -> Option<&RoaringBitmap> {
124 match self {
125 Self::Equality(m) => m.get(key),
126 Self::Range(m) => m.get(key),
127 }
128 }
129
130 fn entry_or_default(&mut self, key: PayloadKey) -> &mut RoaringBitmap {
131 match self {
132 Self::Equality(m) => m.entry(key).or_default(),
133 Self::Range(m) => m.entry(key).or_default(),
134 }
135 }
136
137 fn get_mut(&mut self, key: &PayloadKey) -> Option<&mut RoaringBitmap> {
138 match self {
139 Self::Equality(m) => m.get_mut(key),
140 Self::Range(m) => m.get_mut(key),
141 }
142 }
143
144 fn remove(&mut self, key: &PayloadKey) -> Option<RoaringBitmap> {
145 match self {
146 Self::Equality(m) => m.remove(key),
147 Self::Range(m) => m.remove(key),
148 }
149 }
150
151 fn iter(&self) -> Box<dyn Iterator<Item = (&PayloadKey, &RoaringBitmap)> + '_> {
152 match self {
153 Self::Equality(m) => Box::new(m.iter()),
154 Self::Range(m) => Box::new(m.iter()),
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct PayloadIndex {
162 pub field: String,
164 pub kind: PayloadIndexKind,
166 pub bitmaps: PayloadIndexBitmaps,
168}
169
170impl Default for PayloadIndex {
171 fn default() -> Self {
172 Self::new(String::new(), PayloadIndexKind::Equality)
173 }
174}
175
176impl PayloadIndex {
177 pub fn new(field: impl Into<String>, kind: PayloadIndexKind) -> Self {
179 Self {
180 field: field.into(),
181 kind,
182 bitmaps: PayloadIndexBitmaps::for_kind(kind),
183 }
184 }
185
186 pub fn insert(&mut self, node_id: u32, value: &Value) {
188 let Some(key) = PayloadKey::from_value(value) else {
189 return;
190 };
191 self.bitmaps.entry_or_default(key).insert(node_id);
192 }
193
194 pub fn delete(&mut self, node_id: u32, value: &Value) {
196 let Some(key) = PayloadKey::from_value(value) else {
197 return;
198 };
199 if let Some(bm) = self.bitmaps.get_mut(&key) {
200 bm.remove(node_id);
201 if bm.is_empty() {
202 self.bitmaps.remove(&key);
203 }
204 }
205 }
206
207 pub fn equality(&self, value: &Value) -> Option<&RoaringBitmap> {
210 let key = PayloadKey::from_value(value)?;
211 self.bitmaps.get(&key)
212 }
213
214 pub fn range(
220 &self,
221 low: Option<&Value>,
222 low_inclusive: bool,
223 high: Option<&Value>,
224 high_inclusive: bool,
225 ) -> Option<RoaringBitmap> {
226 let PayloadIndexBitmaps::Range(map) = &self.bitmaps else {
227 return None;
228 };
229 use std::ops::Bound;
230 let low_key = low.and_then(PayloadKey::from_value);
231 let high_key = high.and_then(PayloadKey::from_value);
232 let low_bound = match (&low_key, low_inclusive) {
233 (Some(k), true) => Bound::Included(k.clone()),
234 (Some(k), false) => Bound::Excluded(k.clone()),
235 (None, _) => Bound::Unbounded,
236 };
237 let high_bound = match (&high_key, high_inclusive) {
238 (Some(k), true) => Bound::Included(k.clone()),
239 (Some(k), false) => Bound::Excluded(k.clone()),
240 (None, _) => Bound::Unbounded,
241 };
242 let mut acc = RoaringBitmap::new();
243 for (_, bm) in map.range((low_bound, high_bound)) {
244 acc |= bm;
245 }
246 Some(acc)
247 }
248}
249
250#[derive(Debug, Default)]
254pub struct PayloadIndexSet {
255 indexes: HashMap<String, PayloadIndex>,
256}
257
258impl PayloadIndexSet {
259 pub fn add_index(&mut self, field: impl Into<String>, kind: PayloadIndexKind) {
262 let field = field.into();
263 self.indexes
264 .entry(field.clone())
265 .or_insert_with(|| PayloadIndex::new(field, kind));
266 }
267
268 pub fn is_empty(&self) -> bool {
270 self.indexes.is_empty()
271 }
272
273 pub fn insert_row(&mut self, node_id: u32, fields: &HashMap<String, Value>) {
275 for (field, idx) in &mut self.indexes {
276 if let Some(value) = fields.get(field) {
277 idx.insert(node_id, value);
278 }
279 }
280 }
281
282 pub fn delete_row(&mut self, node_id: u32, fields: &HashMap<String, Value>) {
284 for (field, idx) in &mut self.indexes {
285 if let Some(value) = fields.get(field) {
286 idx.delete(node_id, value);
287 }
288 }
289 }
290
291 pub fn pre_filter(&self, predicate: &FilterPredicate) -> Option<RoaringBitmap> {
299 match predicate {
300 FilterPredicate::Eq { field, value } => {
301 let idx = self.indexes.get(field)?;
302 Some(
303 idx.equality(value)
304 .cloned()
305 .unwrap_or_else(RoaringBitmap::new),
306 )
307 }
308 FilterPredicate::In { field, values } => {
309 let idx = self.indexes.get(field)?;
310 let mut acc = RoaringBitmap::new();
311 for v in values {
312 if let Some(bm) = idx.equality(v) {
313 acc |= bm;
314 }
315 }
316 Some(acc)
317 }
318 FilterPredicate::Range {
319 field,
320 low,
321 low_inclusive,
322 high,
323 high_inclusive,
324 } => {
325 let idx = self.indexes.get(field)?;
326 idx.range(low.as_ref(), *low_inclusive, high.as_ref(), *high_inclusive)
327 }
328 FilterPredicate::And(preds) => {
329 let mut result: Option<RoaringBitmap> = None;
330 for pred in preds {
331 let bm = self.pre_filter(pred)?;
332 result = Some(match result {
333 None => bm,
334 Some(acc) => acc & bm,
335 });
336 }
337 Some(result.unwrap_or_default())
338 }
339 FilterPredicate::Or(preds) => {
340 let mut result = RoaringBitmap::new();
341 for pred in preds {
342 let bm = self.pre_filter(pred)?;
343 result |= bm;
344 }
345 Some(result)
346 }
347 FilterPredicate::Not(pred) => {
348 let _ = pred;
353 None
354 }
355 }
356 }
357
358 pub fn field_names(&self) -> impl Iterator<Item = &str> {
360 self.indexes.keys().map(String::as_str)
361 }
362}
363
364#[derive(Debug, Clone)]
369#[non_exhaustive]
370pub enum FilterPredicate {
371 Eq { field: String, value: Value },
373 In { field: String, values: Vec<Value> },
376 Range {
379 field: String,
380 low: Option<Value>,
381 low_inclusive: bool,
382 high: Option<Value>,
383 high_inclusive: bool,
384 },
385 And(Vec<FilterPredicate>),
387 Or(Vec<FilterPredicate>),
389 Not(Box<FilterPredicate>),
391}
392
393#[derive(
397 serde::Serialize, serde::Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
398)]
399pub struct PayloadIndexSnapshot {
400 pub field: String,
401 pub kind: PayloadIndexKind,
402 pub entries: Vec<(Vec<u8>, Vec<u8>)>,
405}
406
407#[derive(
409 serde::Serialize, serde::Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
410)]
411pub struct PayloadIndexSetSnapshot {
412 pub indexes: Vec<PayloadIndexSnapshot>,
413}
414
415impl PayloadIndexSet {
416 pub fn to_snapshot(&self) -> PayloadIndexSetSnapshot {
418 let indexes = self
419 .indexes
420 .values()
421 .map(|idx| {
422 let entries = idx
423 .bitmaps
424 .iter()
425 .filter_map(|(key, bm)| {
426 let key_bytes = zerompk::to_msgpack_vec(key).ok()?;
427 let mut bm_bytes = Vec::new();
428 bm.serialize_into(&mut bm_bytes).ok()?;
429 Some((key_bytes, bm_bytes))
430 })
431 .collect();
432 PayloadIndexSnapshot {
433 field: idx.field.clone(),
434 kind: idx.kind,
435 entries,
436 }
437 })
438 .collect();
439 PayloadIndexSetSnapshot { indexes }
440 }
441
442 pub fn from_snapshot(snap: PayloadIndexSetSnapshot) -> Self {
444 let mut indexes = HashMap::new();
445 for idx_snap in snap.indexes {
446 let mut bitmaps = PayloadIndexBitmaps::for_kind(idx_snap.kind);
447 for (key_bytes, bm_bytes) in idx_snap.entries {
448 let Ok(key) = zerompk::from_msgpack::<PayloadKey>(&key_bytes) else {
449 continue;
450 };
451 let Ok(bm) = RoaringBitmap::deserialize_from(bm_bytes.as_slice()) else {
452 continue;
453 };
454 *bitmaps.entry_or_default(key) = bm;
455 }
456 let idx = PayloadIndex {
457 field: idx_snap.field.clone(),
458 kind: idx_snap.kind,
459 bitmaps,
460 };
461 indexes.insert(idx_snap.field, idx);
462 }
463 Self { indexes }
464 }
465}
466
467#[cfg(test)]
470mod tests {
471 use super::*;
472
473 fn string_val(s: &str) -> Value {
474 Value::String(s.to_string())
475 }
476 fn int_val(i: i64) -> Value {
477 Value::Integer(i)
478 }
479
480 #[test]
481 fn insert_and_equality_lookup() {
482 let mut idx = PayloadIndex::new("category", PayloadIndexKind::Equality);
483 idx.insert(0, &string_val("A"));
484 idx.insert(1, &string_val("B"));
485 idx.insert(2, &string_val("A"));
486
487 let a_bm = idx.equality(&string_val("A")).unwrap();
488 assert!(a_bm.contains(0));
489 assert!(a_bm.contains(2));
490 assert!(!a_bm.contains(1));
491
492 let b_bm = idx.equality(&string_val("B")).unwrap();
493 assert!(b_bm.contains(1));
494 }
495
496 #[test]
497 fn delete_removes_from_bitmap() {
498 let mut idx = PayloadIndex::new("category", PayloadIndexKind::Equality);
499 idx.insert(0, &string_val("A"));
500 idx.insert(1, &string_val("A"));
501 idx.delete(0, &string_val("A"));
502
503 let bm = idx.equality(&string_val("A")).unwrap();
504 assert!(!bm.contains(0));
505 assert!(bm.contains(1));
506 }
507
508 #[test]
509 fn delete_empty_bitmap_removed() {
510 let mut idx = PayloadIndex::new("x", PayloadIndexKind::Equality);
511 idx.insert(5, &string_val("v"));
512 idx.delete(5, &string_val("v"));
513 assert!(idx.equality(&string_val("v")).is_none());
515 }
516
517 #[test]
518 fn pre_filter_equality() {
519 let mut set = PayloadIndexSet::default();
520 set.add_index("category", PayloadIndexKind::Equality);
521
522 let mut fields = HashMap::new();
523 for i in 0u32..10 {
524 fields.clear();
525 fields.insert(
526 "category".to_string(),
527 string_val(if i < 5 { "A" } else { "B" }),
528 );
529 set.insert_row(i, &fields);
530 }
531
532 let pred = FilterPredicate::Eq {
533 field: "category".to_string(),
534 value: string_val("A"),
535 };
536 let bm = set.pre_filter(&pred).unwrap();
537 assert_eq!(bm.len(), 5);
538 for i in 0..5u32 {
539 assert!(bm.contains(i));
540 }
541 }
542
543 #[test]
544 fn pre_filter_and() {
545 let mut set = PayloadIndexSet::default();
546 set.add_index("category", PayloadIndexKind::Equality);
547 set.add_index("active", PayloadIndexKind::Boolean);
548
549 let mut fields = HashMap::new();
550 fields.insert("category".to_string(), string_val("A"));
552 fields.insert("active".to_string(), Value::Bool(true));
553 set.insert_row(0, &fields);
554 fields.insert("active".to_string(), Value::Bool(false));
556 set.insert_row(1, &fields);
557 fields.insert("category".to_string(), string_val("B"));
559 fields.insert("active".to_string(), Value::Bool(true));
560 set.insert_row(2, &fields);
561
562 let pred = FilterPredicate::And(vec![
563 FilterPredicate::Eq {
564 field: "category".to_string(),
565 value: string_val("A"),
566 },
567 FilterPredicate::Eq {
568 field: "active".to_string(),
569 value: Value::Bool(true),
570 },
571 ]);
572 let bm = set.pre_filter(&pred).unwrap();
573 assert_eq!(bm.len(), 1);
574 assert!(bm.contains(0));
575 }
576
577 #[test]
578 fn pre_filter_or() {
579 let mut set = PayloadIndexSet::default();
580 set.add_index("category", PayloadIndexKind::Equality);
581
582 let mut fields = HashMap::new();
583 for i in 0u32..9 {
584 fields.clear();
585 let cat = match i % 3 {
586 0 => "A",
587 1 => "B",
588 _ => "C",
589 };
590 fields.insert("category".to_string(), string_val(cat));
591 set.insert_row(i, &fields);
592 }
593
594 let pred = FilterPredicate::Or(vec![
595 FilterPredicate::Eq {
596 field: "category".to_string(),
597 value: string_val("A"),
598 },
599 FilterPredicate::Eq {
600 field: "category".to_string(),
601 value: string_val("B"),
602 },
603 ]);
604 let bm = set.pre_filter(&pred).unwrap();
605 assert_eq!(bm.len(), 6);
606 }
607
608 #[test]
609 fn pre_filter_missing_index_returns_none() {
610 let set = PayloadIndexSet::default(); let pred = FilterPredicate::Eq {
613 field: "category".to_string(),
614 value: string_val("A"),
615 };
616 assert!(set.pre_filter(&pred).is_none());
617 }
618
619 #[test]
620 fn pre_filter_not_returns_none() {
621 let mut set = PayloadIndexSet::default();
622 set.add_index("category", PayloadIndexKind::Equality);
623
624 let pred = FilterPredicate::Not(Box::new(FilterPredicate::Eq {
625 field: "category".to_string(),
626 value: string_val("A"),
627 }));
628 assert!(set.pre_filter(&pred).is_none());
630 }
631
632 #[test]
633 fn pre_filter_and_partial_index_returns_none() {
634 let mut set = PayloadIndexSet::default();
636 set.add_index("category", PayloadIndexKind::Equality);
637
638 let pred = FilterPredicate::And(vec![
639 FilterPredicate::Eq {
640 field: "category".to_string(),
641 value: string_val("A"),
642 },
643 FilterPredicate::Eq {
644 field: "unindexed_field".to_string(),
645 value: int_val(42),
646 },
647 ]);
648 assert!(set.pre_filter(&pred).is_none());
649 }
650
651 #[test]
652 fn nan_equality() {
653 let mut idx = PayloadIndex::new("score", PayloadIndexKind::Equality);
654 idx.insert(0, &Value::Float(f64::NAN));
655 idx.insert(1, &Value::Float(f64::NAN));
656 idx.insert(2, &Value::Float(1.0));
657
658 let bm = idx.equality(&Value::Float(f64::NAN)).unwrap();
660 assert_eq!(bm.len(), 2);
661 assert!(bm.contains(0));
662 assert!(bm.contains(1));
663 }
664
665 #[test]
666 fn null_value_indexed() {
667 let mut idx = PayloadIndex::new("f", PayloadIndexKind::Equality);
668 idx.insert(0, &Value::Null);
669 idx.insert(1, &Value::Null);
670 let bm = idx.equality(&Value::Null).unwrap();
671 assert_eq!(bm.len(), 2);
672 }
673
674 #[test]
675 fn snapshot_roundtrip() {
676 let mut set = PayloadIndexSet::default();
677 set.add_index("category", PayloadIndexKind::Equality);
678
679 let mut fields = HashMap::new();
680 for i in 0u32..6 {
681 fields.clear();
682 fields.insert(
683 "category".to_string(),
684 string_val(if i < 3 { "A" } else { "B" }),
685 );
686 set.insert_row(i, &fields);
687 }
688
689 let snap = set.to_snapshot();
690 let snap_bytes = zerompk::to_msgpack_vec(&snap).unwrap();
691 let restored_snap: PayloadIndexSetSnapshot = zerompk::from_msgpack(&snap_bytes).unwrap();
692 let restored = PayloadIndexSet::from_snapshot(restored_snap);
693
694 let pred = FilterPredicate::Eq {
695 field: "category".to_string(),
696 value: string_val("A"),
697 };
698 let bm = restored.pre_filter(&pred).unwrap();
699 assert_eq!(bm.len(), 3);
700 }
701}