1use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12 MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue, RelationshipId,
13 RelationshipRecord,
14};
15
16#[cfg(test)]
17use super::property_index::PropertyIndexState;
18use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
19
20#[derive(Default)]
21pub struct InMemoryGraph {
22 pub(super) next_node_id: NodeId,
23 pub(super) next_rel_id: RelationshipId,
24
25 pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
41 pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
42 pub(super) live_node_count: usize,
46 pub(super) live_rel_count: usize,
47
48 pub(super) outgoing: Vec<Vec<RelationshipId>>,
54 pub(super) incoming: Vec<Vec<RelationshipId>>,
55
56 pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
63 pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
64 pub(super) indexes: RwLock<PropertyIndexRegistry>,
65 pub(super) active_node_property_indexes: AtomicUsize,
66 pub(super) active_relationship_property_indexes: AtomicUsize,
67
68 pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
73}
74
75impl std::fmt::Debug for InMemoryGraph {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("InMemoryGraph")
78 .field("next_node_id", &self.next_node_id)
79 .field("next_rel_id", &self.next_rel_id)
80 .field("nodes", &self.nodes)
81 .field("relationships", &self.relationships)
82 .field("outgoing", &self.outgoing)
83 .field("incoming", &self.incoming)
84 .field("nodes_by_label", &self.nodes_by_label)
85 .field("relationships_by_type", &self.relationships_by_type)
86 .field("indexes", &self.indexes)
87 .field(
88 "active_node_property_indexes",
89 &self.active_node_property_index_count(),
90 )
91 .field(
92 "active_relationship_property_indexes",
93 &self.active_relationship_property_index_count(),
94 )
95 .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
96 .finish()
97 }
98}
99
100impl Clone for InMemoryGraph {
101 fn clone(&self) -> Self {
102 Self {
105 next_node_id: self.next_node_id,
106 next_rel_id: self.next_rel_id,
107 nodes: self.nodes.clone(),
108 relationships: self.relationships.clone(),
109 live_node_count: self.live_node_count,
110 live_rel_count: self.live_rel_count,
111 outgoing: self.outgoing.clone(),
112 incoming: self.incoming.clone(),
113 nodes_by_label: self.nodes_by_label.clone(),
114 relationships_by_type: self.relationships_by_type.clone(),
115 indexes: RwLock::new(if self.has_active_property_indexes() {
116 self.indexes_read().clone()
117 } else {
118 PropertyIndexRegistry::default()
119 }),
120 active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
121 active_relationship_property_indexes: AtomicUsize::new(
122 self.active_relationship_property_index_count(),
123 ),
124 recorder: None,
125 }
126 }
127}
128
129impl InMemoryGraph {
130 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
135 Self::default()
137 }
138
139 pub fn contains_node(&self, node_id: NodeId) -> bool {
140 self.node_at(node_id).is_some()
141 }
142
143 pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
144 self.rel_at(rel_id).is_some()
145 }
146
147 pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
151 self.recorder = recorder;
152 }
153
154 pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
156 self.recorder.as_ref()
157 }
158
159 #[inline]
164 pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
165 if let Some(rec) = &self.recorder {
166 rec.record(build());
167 }
168 }
169
170 pub(super) fn alloc_node_id(&mut self) -> NodeId {
171 let id = self.next_node_id;
172 self.next_node_id += 1;
173 id
174 }
175
176 pub(super) fn alloc_rel_id(&mut self) -> RelationshipId {
177 let id = self.next_rel_id;
178 self.next_rel_id += 1;
179 id
180 }
181
182 fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
183 let next = id
184 .checked_add(1)
185 .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
186 self.next_node_id = self.next_node_id.max(next);
187 Ok(())
188 }
189
190 fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
191 let next = id
192 .checked_add(1)
193 .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
194 self.next_rel_id = self.next_rel_id.max(next);
195 Ok(())
196 }
197
198 #[inline]
204 pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
205 self.nodes
206 .get(id as usize)
207 .and_then(|s| s.as_ref())
208 .map(|arc| arc.as_ref())
209 }
210
211 #[inline]
216 pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
217 self.nodes
218 .get_mut(id as usize)
219 .and_then(|s| s.as_mut())
220 .map(Arc::make_mut)
221 }
222
223 #[inline]
224 pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
225 self.relationships
226 .get(id as usize)
227 .and_then(|s| s.as_ref())
228 .map(|arc| arc.as_ref())
229 }
230
231 #[inline]
232 pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
233 self.relationships
234 .get_mut(id as usize)
235 .and_then(|s| s.as_mut())
236 .map(Arc::make_mut)
237 }
238
239 fn ensure_node_slot(&mut self, id: NodeId) {
243 let target = id as usize + 1;
244 if self.nodes.len() < target {
245 self.nodes.resize_with(target, || None);
246 self.outgoing.resize_with(target, Vec::new);
247 self.incoming.resize_with(target, Vec::new);
248 }
249 }
250
251 fn ensure_rel_slot(&mut self, id: RelationshipId) {
252 let target = id as usize + 1;
253 if self.relationships.len() < target {
254 self.relationships.resize_with(target, || None);
255 }
256 }
257
258 pub(super) fn put_node(&mut self, id: NodeId, node: NodeRecord) {
259 self.ensure_node_slot(id);
260 let was_present = self.nodes[id as usize].is_some();
261 self.nodes[id as usize] = Some(Arc::new(node));
262 if !was_present {
263 self.live_node_count += 1;
264 }
265 }
266
267 pub(super) fn put_rel(&mut self, id: RelationshipId, rel: RelationshipRecord) {
268 self.ensure_rel_slot(id);
269 let was_present = self.relationships[id as usize].is_some();
270 self.relationships[id as usize] = Some(Arc::new(rel));
271 if !was_present {
272 self.live_rel_count += 1;
273 }
274 }
275
276 pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
277 let idx = id as usize;
278 let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
279 if removed.is_some() {
280 self.live_node_count -= 1;
281 if let Some(out) = self.outgoing.get_mut(idx) {
287 out.clear();
288 }
289 if let Some(inc) = self.incoming.get_mut(idx) {
290 inc.clear();
291 }
292 }
293 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
298 }
299
300 pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
301 let idx = id as usize;
302 let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
303 if removed.is_some() {
304 self.live_rel_count -= 1;
305 }
306 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
307 }
308
309 #[inline]
310 pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
311 self.outgoing.get(id as usize)
312 }
313
314 #[inline]
315 pub(super) fn incoming_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
316 self.incoming.get(id as usize)
317 }
318
319 pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
320 self.nodes
321 .iter()
322 .enumerate()
323 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
324 }
325
326 pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
327 self.nodes
328 .iter()
329 .filter_map(|s| s.as_ref())
330 .map(|arc| arc.as_ref())
331 }
332
333 pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
334 self.relationships
335 .iter()
336 .enumerate()
337 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
338 }
339
340 pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
341 self.relationships
342 .iter()
343 .filter_map(|s| s.as_ref())
344 .map(|arc| arc.as_ref())
345 }
346
347 pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
348 self.nodes
349 .iter()
350 .enumerate()
351 .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
352 }
353
354 pub(super) fn iter_rels(
355 &self,
356 ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
357 self.relationships
358 .iter()
359 .enumerate()
360 .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
361 }
362
363 fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
367 self.ensure_node_slot(node_id);
368 self.outgoing[node_id as usize].push(rel_id);
369 }
370
371 fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
372 self.ensure_node_slot(node_id);
373 self.incoming[node_id as usize].push(rel_id);
374 }
375
376 fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
379 if let Some(v) = self.outgoing.get_mut(node_id as usize) {
380 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
381 v.swap_remove(pos);
382 }
383 }
384 }
385
386 fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
387 if let Some(v) = self.incoming.get_mut(node_id as usize) {
388 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
389 v.swap_remove(pos);
390 }
391 }
392 }
393
394 pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
395 let mut seen = BTreeSet::new();
396
397 labels
398 .into_iter()
399 .map(|s| s.trim().to_string())
400 .filter(|s| !s.is_empty())
401 .filter(|s| seen.insert(s.clone()))
402 .collect()
403 }
404
405 pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
406 if let Some(bucket) = self.nodes_by_label.get_mut(label) {
411 bucket.push(node_id);
412 } else {
413 self.nodes_by_label.insert(label.to_string(), vec![node_id]);
414 }
415 }
416
417 fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
418 if let Some(ids) = self.nodes_by_label.get_mut(label) {
419 if let Some(pos) = ids.iter().position(|&id| id == node_id) {
420 ids.swap_remove(pos);
421 }
422 if ids.is_empty() {
423 self.nodes_by_label.remove(label);
424 }
425 }
426 }
427
428 fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
429 if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
431 bucket.push(rel_id);
432 } else {
433 self.relationships_by_type
434 .insert(rel_type.to_string(), vec![rel_id]);
435 }
436 }
437
438 fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
439 if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
440 if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
441 ids.swap_remove(pos);
442 }
443 if ids.is_empty() {
444 self.relationships_by_type.remove(rel_type);
445 }
446 }
447 }
448
449 pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
450 self.indexes
451 .read()
452 .unwrap_or_else(|poisoned| poisoned.into_inner())
453 }
454
455 pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
456 self.indexes
457 .write()
458 .unwrap_or_else(|poisoned| poisoned.into_inner())
459 }
460
461 pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
462 self.indexes
463 .get_mut()
464 .unwrap_or_else(|poisoned| poisoned.into_inner())
465 }
466
467 #[inline]
468 pub(super) fn active_node_property_index_count(&self) -> usize {
469 self.active_node_property_indexes.load(Ordering::Relaxed)
470 }
471
472 #[inline]
473 pub(super) fn active_relationship_property_index_count(&self) -> usize {
474 self.active_relationship_property_indexes
475 .load(Ordering::Relaxed)
476 }
477
478 #[inline]
479 pub(super) fn has_active_property_indexes(&self) -> bool {
480 self.active_node_property_index_count() != 0
481 || self.active_relationship_property_index_count() != 0
482 }
483
484 pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
485 self.active_node_property_index_count() != 0
486 && self.indexes_mut().node_properties.is_active(key)
487 }
488
489 pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
490 self.active_relationship_property_index_count() != 0
491 && self.indexes_mut().relationship_properties.is_active(key)
492 }
493
494 pub(super) fn ensure_node_property_index(&self, key: &str) {
495 {
496 let indexes = self.indexes_read();
497 if indexes.node_properties.is_active(key) {
498 return;
499 }
500 }
501
502 let mut indexes = self.indexes_write();
503 if indexes.node_properties.is_active(key) {
504 return;
505 }
506
507 for (id, node) in self.iter_nodes() {
508 if let Some(value) = node.properties.get(key) {
509 indexes.node_properties.insert_with_scopes(
510 id,
511 node.labels.iter().map(String::as_str),
512 key,
513 value,
514 );
515 }
516 }
517 if indexes.node_properties.activate(key) {
518 self.active_node_property_indexes
519 .fetch_add(1, Ordering::Relaxed);
520 }
521 }
522
523 pub(super) fn ensure_relationship_property_index(&self, key: &str) {
524 {
525 let indexes = self.indexes_read();
526 if indexes.relationship_properties.is_active(key) {
527 return;
528 }
529 }
530
531 let mut indexes = self.indexes_write();
532 if indexes.relationship_properties.is_active(key) {
533 return;
534 }
535
536 for (id, rel) in self.iter_rels() {
537 if let Some(value) = rel.properties.get(key) {
538 indexes.relationship_properties.insert_with_scopes(
539 id,
540 [rel.rel_type.as_str()],
541 key,
542 value,
543 );
544 }
545 }
546 if indexes.relationship_properties.activate(key) {
547 self.active_relationship_property_indexes
548 .fetch_add(1, Ordering::Relaxed);
549 }
550 }
551
552 pub(super) fn rebuild_property_indexes(&mut self) {
553 let mut indexes = PropertyIndexRegistry::default();
554
555 for (id, node) in self.iter_nodes() {
556 for (key, value) in &node.properties {
557 if PropertyIndexKey::from_value(value).is_some() {
558 indexes.node_properties.activate(key);
559 indexes.node_properties.insert_with_scopes(
560 id,
561 node.labels.iter().map(String::as_str),
562 key,
563 value,
564 );
565 }
566 }
567 }
568
569 for (id, rel) in self.iter_rels() {
570 for (key, value) in &rel.properties {
571 if PropertyIndexKey::from_value(value).is_some() {
572 indexes.relationship_properties.activate(key);
573 indexes.relationship_properties.insert_with_scopes(
574 id,
575 [rel.rel_type.as_str()],
576 key,
577 value,
578 );
579 }
580 }
581 }
582
583 let node_index_count = indexes.node_properties.active_keys.len();
584 let relationship_index_count = indexes.relationship_properties.active_keys.len();
585 *self.indexes_mut() = indexes;
586 self.active_node_property_indexes
587 .store(node_index_count, Ordering::Relaxed);
588 self.active_relationship_property_indexes
589 .store(relationship_index_count, Ordering::Relaxed);
590 }
591
592 pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
593 for label in &node.labels {
594 self.insert_node_label_index(node.id, label);
595 }
596 self.index_node_properties_if_active(
597 node.id,
598 node.labels.iter().map(String::as_str),
599 &node.properties,
600 );
601 }
602
603 pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
604 for label in &node.labels {
605 self.insert_node_label_index(node.id, label);
606 }
607 self.index_node_properties_eager(
608 node.id,
609 node.labels.iter().map(String::as_str),
610 &node.properties,
611 );
612 }
613
614 pub(super) fn on_node_property_set(
615 &mut self,
616 node_id: NodeId,
617 key: &str,
618 old: Option<&PropertyValue>,
619 new: &PropertyValue,
620 ) {
621 if !self.node_property_index_is_active(key) {
622 return;
623 }
624
625 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
626 return;
627 };
628
629 if let Some(old) = old {
630 self.unindex_node_property_if_active(
631 node_id,
632 labels.iter().map(String::as_str),
633 key,
634 old,
635 );
636 }
637 self.index_node_property_if_active(node_id, labels.iter().map(String::as_str), key, new);
638 }
639
640 pub(super) fn on_node_property_removed(
641 &mut self,
642 node_id: NodeId,
643 key: &str,
644 old: &PropertyValue,
645 ) {
646 if !self.node_property_index_is_active(key) {
647 return;
648 }
649
650 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
651 return;
652 };
653 self.unindex_node_property_if_active(node_id, labels.iter().map(String::as_str), key, old);
654 }
655
656 pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
657 self.insert_node_label_index(node_id, label);
658
659 if self.active_node_property_index_count() == 0 {
660 return;
661 }
662
663 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
664 return;
665 };
666 self.index_node_scope_properties_if_active(node_id, label, &properties);
667 }
668
669 pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
670 self.remove_node_label_index(node_id, label);
671
672 if self.active_node_property_index_count() == 0 {
673 return;
674 }
675
676 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
677 return;
678 };
679 self.unindex_node_scope_properties_if_active(node_id, label, &properties);
680 }
681
682 pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
683 for label in &node.labels {
684 self.remove_node_label_index(node.id, label);
685 }
686 self.unindex_active_node_properties(
687 node.id,
688 node.labels.iter().map(String::as_str),
689 &node.properties,
690 );
691 }
692
693 pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
694 self.attach_relationship(rel);
695 self.index_relationship_properties_if_active(
696 rel.id,
697 [rel.rel_type.as_str()],
698 &rel.properties,
699 );
700 }
701
702 pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
703 self.attach_relationship(rel);
704 self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
705 }
706
707 pub(super) fn on_relationship_property_set(
708 &mut self,
709 rel_id: RelationshipId,
710 key: &str,
711 old: Option<&PropertyValue>,
712 new: &PropertyValue,
713 ) {
714 if !self.relationship_property_index_is_active(key) {
715 return;
716 }
717
718 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
719 return;
720 };
721
722 if let Some(old) = old {
723 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
724 }
725 self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
726 }
727
728 pub(super) fn on_relationship_property_removed(
729 &mut self,
730 rel_id: RelationshipId,
731 key: &str,
732 old: &PropertyValue,
733 ) {
734 if !self.relationship_property_index_is_active(key) {
735 return;
736 }
737
738 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
739 return;
740 };
741 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
742 }
743
744 pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
745 self.detach_relationship_indexes(rel);
746 self.unindex_active_relationship_properties(
747 rel.id,
748 [rel.rel_type.as_str()],
749 &rel.properties,
750 );
751 }
752
753 fn index_node_property_eager<'a>(
754 &mut self,
755 node_id: NodeId,
756 labels: impl IntoIterator<Item = &'a str>,
757 key: &str,
758 value: &PropertyValue,
759 ) {
760 if PropertyIndexKey::from_value(value).is_none() {
761 return;
762 }
763
764 let activated = {
765 let indexes = self.indexes_mut();
766 let activated = indexes.node_properties.activate(key);
767 indexes
768 .node_properties
769 .insert_with_scopes(node_id, labels, key, value);
770 activated
771 };
772 if activated {
773 self.active_node_property_indexes
774 .fetch_add(1, Ordering::Relaxed);
775 }
776 }
777
778 fn index_relationship_property_eager<'a>(
779 &mut self,
780 rel_id: RelationshipId,
781 scopes: impl IntoIterator<Item = &'a str>,
782 key: &str,
783 value: &PropertyValue,
784 ) {
785 if PropertyIndexKey::from_value(value).is_none() {
786 return;
787 }
788
789 let activated = {
790 let indexes = self.indexes_mut();
791 let activated = indexes.relationship_properties.activate(key);
792 indexes
793 .relationship_properties
794 .insert_with_scopes(rel_id, scopes, key, value);
795 activated
796 };
797 if activated {
798 self.active_relationship_property_indexes
799 .fetch_add(1, Ordering::Relaxed);
800 }
801 }
802
803 fn index_node_properties_eager<'a>(
804 &mut self,
805 node_id: NodeId,
806 labels: impl IntoIterator<Item = &'a str> + Clone,
807 properties: &Properties,
808 ) {
809 for (key, value) in properties {
810 self.index_node_property_eager(node_id, labels.clone(), key, value);
811 }
812 }
813
814 fn index_relationship_properties_eager<'a>(
815 &mut self,
816 rel_id: RelationshipId,
817 scopes: impl IntoIterator<Item = &'a str> + Clone,
818 properties: &Properties,
819 ) {
820 for (key, value) in properties {
821 self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
822 }
823 }
824
825 fn index_node_property_if_active<'a>(
826 &mut self,
827 node_id: NodeId,
828 labels: impl IntoIterator<Item = &'a str>,
829 key: &str,
830 value: &PropertyValue,
831 ) {
832 if self.active_node_property_index_count() == 0 {
833 return;
834 }
835 let indexes = self.indexes_mut();
836 if indexes.node_properties.is_active(key) {
837 indexes
838 .node_properties
839 .insert_with_scopes(node_id, labels, key, value);
840 }
841 }
842
843 fn index_node_properties_if_active<'a>(
844 &mut self,
845 node_id: NodeId,
846 labels: impl IntoIterator<Item = &'a str> + Clone,
847 properties: &Properties,
848 ) {
849 if self.active_node_property_index_count() == 0 {
850 return;
851 }
852 let indexes = self.indexes_mut();
853 for (key, value) in properties {
854 if indexes.node_properties.is_active(key) {
855 indexes
856 .node_properties
857 .insert_with_scopes(node_id, labels.clone(), key, value);
858 }
859 }
860 }
861
862 fn unindex_node_property_if_active<'a>(
863 &mut self,
864 node_id: NodeId,
865 labels: impl IntoIterator<Item = &'a str>,
866 key: &str,
867 value: &PropertyValue,
868 ) {
869 if self.active_node_property_index_count() == 0 {
870 return;
871 }
872 let indexes = self.indexes_mut();
873 if indexes.node_properties.is_active(key) {
874 indexes
875 .node_properties
876 .remove_with_scopes(node_id, labels, key, value);
877 }
878 }
879
880 fn index_node_scope_properties_if_active(
881 &mut self,
882 node_id: NodeId,
883 scope: &str,
884 properties: &Properties,
885 ) {
886 if self.active_node_property_index_count() == 0 {
887 return;
888 }
889 let indexes = self.indexes_mut();
890 for (key, value) in properties {
891 if indexes.node_properties.is_active(key) {
892 indexes
893 .node_properties
894 .insert_scoped(node_id, scope, key, value);
895 }
896 }
897 }
898
899 fn unindex_node_scope_properties_if_active(
900 &mut self,
901 node_id: NodeId,
902 scope: &str,
903 properties: &Properties,
904 ) {
905 if self.active_node_property_index_count() == 0 {
906 return;
907 }
908 let indexes = self.indexes_mut();
909 for (key, value) in properties {
910 if indexes.node_properties.is_active(key) {
911 indexes
912 .node_properties
913 .remove_scoped(node_id, scope, key, value);
914 }
915 }
916 }
917
918 fn unindex_active_node_properties<'a>(
919 &mut self,
920 node_id: NodeId,
921 labels: impl IntoIterator<Item = &'a str> + Clone,
922 properties: &Properties,
923 ) {
924 if self.active_node_property_index_count() == 0 {
925 return;
926 }
927 let indexes = self.indexes_mut();
928 for (key, value) in properties {
929 if indexes.node_properties.is_active(key) {
930 indexes
931 .node_properties
932 .remove_with_scopes(node_id, labels.clone(), key, value);
933 }
934 }
935 }
936
937 fn index_relationship_property_if_active<'a>(
938 &mut self,
939 rel_id: RelationshipId,
940 scopes: impl IntoIterator<Item = &'a str>,
941 key: &str,
942 value: &PropertyValue,
943 ) {
944 if self.active_relationship_property_index_count() == 0 {
945 return;
946 }
947 let indexes = self.indexes_mut();
948 if indexes.relationship_properties.is_active(key) {
949 indexes
950 .relationship_properties
951 .insert_with_scopes(rel_id, scopes, key, value);
952 }
953 }
954
955 fn index_relationship_properties_if_active<'a>(
956 &mut self,
957 rel_id: RelationshipId,
958 scopes: impl IntoIterator<Item = &'a str> + Clone,
959 properties: &Properties,
960 ) {
961 if self.active_relationship_property_index_count() == 0 {
962 return;
963 }
964 let indexes = self.indexes_mut();
965 for (key, value) in properties {
966 if indexes.relationship_properties.is_active(key) {
967 indexes.relationship_properties.insert_with_scopes(
968 rel_id,
969 scopes.clone(),
970 key,
971 value,
972 );
973 }
974 }
975 }
976
977 fn unindex_relationship_property_if_active<'a>(
978 &mut self,
979 rel_id: RelationshipId,
980 scopes: impl IntoIterator<Item = &'a str>,
981 key: &str,
982 value: &PropertyValue,
983 ) {
984 if self.active_relationship_property_index_count() == 0 {
985 return;
986 }
987 let indexes = self.indexes_mut();
988 if indexes.relationship_properties.is_active(key) {
989 indexes
990 .relationship_properties
991 .remove_with_scopes(rel_id, scopes, key, value);
992 }
993 }
994
995 fn unindex_active_relationship_properties<'a>(
996 &mut self,
997 rel_id: RelationshipId,
998 scopes: impl IntoIterator<Item = &'a str> + Clone,
999 properties: &Properties,
1000 ) {
1001 if self.active_relationship_property_index_count() == 0 {
1002 return;
1003 }
1004 let indexes = self.indexes_mut();
1005 for (key, value) in properties {
1006 if indexes.relationship_properties.is_active(key) {
1007 indexes.relationship_properties.remove_with_scopes(
1008 rel_id,
1009 scopes.clone(),
1010 key,
1011 value,
1012 );
1013 }
1014 }
1015 }
1016
1017 pub(super) fn scan_nodes_by_property(
1018 &self,
1019 label: Option<&str>,
1020 key: &str,
1021 value: &PropertyValue,
1022 ) -> Vec<NodeRecord> {
1023 let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
1024 Some(label) => Box::new(
1025 self.nodes_by_label
1026 .get(label)
1027 .into_iter()
1028 .flat_map(|ids| ids.iter())
1029 .filter_map(|&id| self.node_at(id)),
1030 ),
1031 None => Box::new(self.iter_node_records()),
1032 };
1033
1034 candidates
1035 .filter(|node| node.properties.get(key) == Some(value))
1036 .cloned()
1037 .collect()
1038 }
1039
1040 pub(super) fn scan_relationships_by_property(
1041 &self,
1042 rel_type: Option<&str>,
1043 key: &str,
1044 value: &PropertyValue,
1045 ) -> Vec<RelationshipRecord> {
1046 let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
1047 Some(rel_type) => Box::new(
1048 self.relationships_by_type
1049 .get(rel_type)
1050 .into_iter()
1051 .flat_map(|ids| ids.iter())
1052 .filter_map(|&id| self.rel_at(id)),
1053 ),
1054 None => Box::new(self.iter_rel_records()),
1055 };
1056
1057 candidates
1058 .filter(|rel| rel.properties.get(key) == Some(value))
1059 .cloned()
1060 .collect()
1061 }
1062
1063 pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
1064 self.outgoing_push(rel.src, rel.id);
1065 self.incoming_push(rel.dst, rel.id);
1066 self.insert_relationship_type_index(rel.id, &rel.rel_type);
1067 }
1068
1069 fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
1070 self.outgoing_remove(rel.src, rel.id);
1074 self.incoming_remove(rel.dst, rel.id);
1075
1076 self.remove_relationship_type_index(rel.id, &rel.rel_type);
1077 }
1078
1079 pub(super) fn relationship_ids_for_direction(
1080 &self,
1081 node_id: NodeId,
1082 direction: Direction,
1083 ) -> Vec<RelationshipId> {
1084 match direction {
1085 Direction::Left => self.incoming_at(node_id).cloned().unwrap_or_default(),
1086
1087 Direction::Right => self.outgoing_at(node_id).cloned().unwrap_or_default(),
1088
1089 Direction::Undirected => {
1090 let mut ids = BTreeSet::new();
1091
1092 if let Some(out) = self.outgoing_at(node_id) {
1093 ids.extend(out.iter().copied());
1094 }
1095 if let Some(inc) = self.incoming_at(node_id) {
1096 ids.extend(inc.iter().copied());
1097 }
1098
1099 ids.into_iter().collect()
1100 }
1101 }
1102 }
1103
1104 pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
1105 if rel.src == node_id {
1106 Some(rel.dst)
1107 } else if rel.dst == node_id {
1108 Some(rel.src)
1109 } else {
1110 None
1111 }
1112 }
1113
1114 pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
1115 self.outgoing_at(node_id)
1116 .map(|ids| !ids.is_empty())
1117 .unwrap_or(false)
1118 || self
1119 .incoming_at(node_id)
1120 .map(|ids| !ids.is_empty())
1121 .unwrap_or(false)
1122 }
1123
1124 pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
1125 let mut rel_ids = BTreeSet::new();
1126
1127 if let Some(ids) = self.outgoing_at(node_id) {
1128 rel_ids.extend(ids.iter().copied());
1129 }
1130 if let Some(ids) = self.incoming_at(node_id) {
1131 rel_ids.extend(ids.iter().copied());
1132 }
1133
1134 rel_ids
1135 }
1136
1137 #[doc(hidden)]
1141 pub fn replay_create_node(
1142 &mut self,
1143 id: NodeId,
1144 labels: Vec<String>,
1145 properties: Properties,
1146 ) -> Result<NodeRecord, String> {
1147 if self.recorder.is_some() {
1148 return Err(
1149 "cannot replay node creation while a mutation recorder is installed".into(),
1150 );
1151 }
1152 if self.node_at(id).is_some() {
1153 return Err(format!("node id {id} already exists"));
1154 }
1155
1156 let labels = Self::normalize_labels(labels);
1157 let node = NodeRecord {
1158 id,
1159 labels: labels.clone(),
1160 properties,
1161 };
1162
1163 self.on_node_replayed(&node);
1164 self.put_node(id, node.clone());
1165 self.bump_next_node_id_past(id)?;
1168
1169 Ok(node)
1170 }
1171
1172 #[doc(hidden)]
1176 pub fn replay_create_relationship(
1177 &mut self,
1178 id: RelationshipId,
1179 src: NodeId,
1180 dst: NodeId,
1181 rel_type: &str,
1182 properties: Properties,
1183 ) -> Result<RelationshipRecord, String> {
1184 if self.recorder.is_some() {
1185 return Err(
1186 "cannot replay relationship creation while a mutation recorder is installed".into(),
1187 );
1188 }
1189 if self.rel_at(id).is_some() {
1190 return Err(format!("relationship id {id} already exists"));
1191 }
1192 if self.node_at(src).is_none() {
1193 return Err(format!(
1194 "relationship {id} references missing source node {src}"
1195 ));
1196 }
1197 if self.node_at(dst).is_none() {
1198 return Err(format!(
1199 "relationship {id} references missing target node {dst}"
1200 ));
1201 }
1202
1203 let trimmed = rel_type.trim();
1204 if trimmed.is_empty() {
1205 return Err(format!("relationship {id} has an empty type"));
1206 }
1207
1208 let rel = RelationshipRecord {
1209 id,
1210 src,
1211 dst,
1212 rel_type: trimmed.to_string(),
1213 properties,
1214 };
1215
1216 self.on_relationship_replayed(&rel);
1217 self.put_rel(id, rel.clone());
1218 self.bump_next_rel_id_past(id)?;
1219
1220 Ok(rel)
1221 }
1222
1223 #[cfg(test)]
1224 pub(super) fn assert_property_indexes_match_scan(&self) {
1225 let indexes = self.indexes_read();
1226 assert_eq!(
1227 indexes.node_properties.active_keys.len(),
1228 self.active_node_property_index_count(),
1229 "node property index counter diverged from active key set"
1230 );
1231 assert_eq!(
1232 indexes.relationship_properties.active_keys.len(),
1233 self.active_relationship_property_index_count(),
1234 "relationship property index counter diverged from active key set"
1235 );
1236
1237 let mut expected_nodes = PropertyIndexState {
1238 active_keys: indexes.node_properties.active_keys.clone(),
1239 ..PropertyIndexState::default()
1240 };
1241 for (id, node) in self.iter_nodes() {
1242 for (key, value) in &node.properties {
1243 if expected_nodes.is_active(key) {
1244 expected_nodes.insert_with_scopes(
1245 id,
1246 node.labels.iter().map(String::as_str),
1247 key,
1248 value,
1249 );
1250 }
1251 }
1252 }
1253 assert_eq!(
1254 indexes.node_properties.values, expected_nodes.values,
1255 "node property index values diverged from scan"
1256 );
1257 assert_eq!(
1258 indexes.node_properties.scoped_values, expected_nodes.scoped_values,
1259 "node property scoped index values diverged from scan"
1260 );
1261
1262 let mut expected_relationships = PropertyIndexState {
1263 active_keys: indexes.relationship_properties.active_keys.clone(),
1264 ..PropertyIndexState::default()
1265 };
1266 for (id, rel) in self.iter_rels() {
1267 for (key, value) in &rel.properties {
1268 if expected_relationships.is_active(key) {
1269 expected_relationships.insert_with_scopes(
1270 id,
1271 [rel.rel_type.as_str()],
1272 key,
1273 value,
1274 );
1275 }
1276 }
1277 }
1278 assert_eq!(
1279 indexes.relationship_properties.values, expected_relationships.values,
1280 "relationship property index values diverged from scan"
1281 );
1282 assert_eq!(
1283 indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
1284 "relationship property scoped index values diverged from scan"
1285 );
1286 }
1287}