1use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12 MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue, RelationshipId,
13 RelationshipRecord,
14};
15
16#[cfg(test)]
17use super::property_index::PropertyIndexState;
18use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
19
20#[derive(Default)]
21pub struct InMemoryGraph {
22 pub(super) next_node_id: NodeId,
23 pub(super) next_rel_id: RelationshipId,
24
25 pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
41 pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
42 pub(super) live_node_count: usize,
46 pub(super) live_rel_count: usize,
47
48 pub(super) outgoing: Vec<Vec<RelationshipId>>,
54 pub(super) incoming: Vec<Vec<RelationshipId>>,
55
56 pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
63 pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
64 pub(super) indexes: RwLock<PropertyIndexRegistry>,
65 pub(super) active_node_property_indexes: AtomicUsize,
66 pub(super) active_relationship_property_indexes: AtomicUsize,
67
68 pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
73}
74
75impl std::fmt::Debug for InMemoryGraph {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("InMemoryGraph")
78 .field("next_node_id", &self.next_node_id)
79 .field("next_rel_id", &self.next_rel_id)
80 .field("nodes", &self.nodes)
81 .field("relationships", &self.relationships)
82 .field("outgoing", &self.outgoing)
83 .field("incoming", &self.incoming)
84 .field("nodes_by_label", &self.nodes_by_label)
85 .field("relationships_by_type", &self.relationships_by_type)
86 .field("indexes", &self.indexes)
87 .field(
88 "active_node_property_indexes",
89 &self.active_node_property_index_count(),
90 )
91 .field(
92 "active_relationship_property_indexes",
93 &self.active_relationship_property_index_count(),
94 )
95 .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
96 .finish()
97 }
98}
99
100impl Clone for InMemoryGraph {
101 fn clone(&self) -> Self {
102 Self {
105 next_node_id: self.next_node_id,
106 next_rel_id: self.next_rel_id,
107 nodes: self.nodes.clone(),
108 relationships: self.relationships.clone(),
109 live_node_count: self.live_node_count,
110 live_rel_count: self.live_rel_count,
111 outgoing: self.outgoing.clone(),
112 incoming: self.incoming.clone(),
113 nodes_by_label: self.nodes_by_label.clone(),
114 relationships_by_type: self.relationships_by_type.clone(),
115 indexes: RwLock::new(if self.has_active_property_indexes() {
116 self.indexes_read().clone()
117 } else {
118 PropertyIndexRegistry::default()
119 }),
120 active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
121 active_relationship_property_indexes: AtomicUsize::new(
122 self.active_relationship_property_index_count(),
123 ),
124 recorder: None,
125 }
126 }
127}
128
129impl InMemoryGraph {
130 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
135 Self::default()
137 }
138
139 pub fn contains_node(&self, node_id: NodeId) -> bool {
140 self.node_at(node_id).is_some()
141 }
142
143 pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
144 self.rel_at(rel_id).is_some()
145 }
146
147 pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
151 self.recorder = recorder;
152 }
153
154 pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
156 self.recorder.as_ref()
157 }
158
159 #[inline]
164 pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
165 if let Some(rec) = &self.recorder {
166 rec.record(build());
167 }
168 }
169
170 pub(super) fn alloc_node_id(&mut self) -> NodeId {
171 let id = self.next_node_id;
172 self.next_node_id += 1;
173 id
174 }
175
176 pub(super) fn alloc_rel_id(&mut self) -> RelationshipId {
177 let id = self.next_rel_id;
178 self.next_rel_id += 1;
179 id
180 }
181
182 fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
183 let next = id
184 .checked_add(1)
185 .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
186 self.next_node_id = self.next_node_id.max(next);
187 Ok(())
188 }
189
190 fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
191 let next = id
192 .checked_add(1)
193 .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
194 self.next_rel_id = self.next_rel_id.max(next);
195 Ok(())
196 }
197
198 #[inline]
204 pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
205 self.nodes
206 .get(id as usize)
207 .and_then(|s| s.as_ref())
208 .map(|arc| arc.as_ref())
209 }
210
211 #[inline]
216 pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
217 self.nodes
218 .get_mut(id as usize)
219 .and_then(|s| s.as_mut())
220 .map(Arc::make_mut)
221 }
222
223 #[inline]
224 pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
225 self.relationships
226 .get(id as usize)
227 .and_then(|s| s.as_ref())
228 .map(|arc| arc.as_ref())
229 }
230
231 #[inline]
232 pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
233 self.relationships
234 .get_mut(id as usize)
235 .and_then(|s| s.as_mut())
236 .map(Arc::make_mut)
237 }
238
239 fn ensure_node_slot(&mut self, id: NodeId) {
243 let target = id as usize + 1;
244 if self.nodes.len() < target {
245 self.nodes.resize_with(target, || None);
246 self.outgoing.resize_with(target, Vec::new);
247 self.incoming.resize_with(target, Vec::new);
248 }
249 }
250
251 fn ensure_rel_slot(&mut self, id: RelationshipId) {
252 let target = id as usize + 1;
253 if self.relationships.len() < target {
254 self.relationships.resize_with(target, || None);
255 }
256 }
257
258 pub(super) fn put_node(&mut self, id: NodeId, node: NodeRecord) {
259 self.ensure_node_slot(id);
260 let was_present = self.nodes[id as usize].is_some();
261 self.nodes[id as usize] = Some(Arc::new(node));
262 if !was_present {
263 self.live_node_count += 1;
264 }
265 }
266
267 pub(super) fn put_rel(&mut self, id: RelationshipId, rel: RelationshipRecord) {
268 self.ensure_rel_slot(id);
269 let was_present = self.relationships[id as usize].is_some();
270 self.relationships[id as usize] = Some(Arc::new(rel));
271 if !was_present {
272 self.live_rel_count += 1;
273 }
274 }
275
276 pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
277 let idx = id as usize;
278 let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
279 if removed.is_some() {
280 self.live_node_count -= 1;
281 if let Some(out) = self.outgoing.get_mut(idx) {
287 out.clear();
288 }
289 if let Some(inc) = self.incoming.get_mut(idx) {
290 inc.clear();
291 }
292 }
293 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
298 }
299
300 pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
301 let idx = id as usize;
302 let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
303 if removed.is_some() {
304 self.live_rel_count -= 1;
305 }
306 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
307 }
308
309 #[inline]
310 pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
311 self.outgoing.get(id as usize)
312 }
313
314 #[inline]
315 pub(super) fn incoming_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
316 self.incoming.get(id as usize)
317 }
318
319 pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
320 self.nodes
321 .iter()
322 .enumerate()
323 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
324 }
325
326 pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
327 self.nodes
328 .iter()
329 .filter_map(|s| s.as_ref())
330 .map(|arc| arc.as_ref())
331 }
332
333 pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
334 self.relationships
335 .iter()
336 .enumerate()
337 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
338 }
339
340 pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
341 self.relationships
342 .iter()
343 .filter_map(|s| s.as_ref())
344 .map(|arc| arc.as_ref())
345 }
346
347 pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
348 self.nodes
349 .iter()
350 .enumerate()
351 .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
352 }
353
354 pub(super) fn iter_rels(
355 &self,
356 ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
357 self.relationships
358 .iter()
359 .enumerate()
360 .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
361 }
362
363 fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
367 self.ensure_node_slot(node_id);
368 let v = &mut self.outgoing[node_id as usize];
369 if !v.contains(&rel_id) {
370 v.push(rel_id);
371 }
372 }
373
374 fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
375 self.ensure_node_slot(node_id);
376 let v = &mut self.incoming[node_id as usize];
377 if !v.contains(&rel_id) {
378 v.push(rel_id);
379 }
380 }
381
382 fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
385 if let Some(v) = self.outgoing.get_mut(node_id as usize) {
386 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
387 v.swap_remove(pos);
388 }
389 }
390 }
391
392 fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
393 if let Some(v) = self.incoming.get_mut(node_id as usize) {
394 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
395 v.swap_remove(pos);
396 }
397 }
398 }
399
400 pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
401 let mut seen = BTreeSet::new();
402
403 labels
404 .into_iter()
405 .map(|s| s.trim().to_string())
406 .filter(|s| !s.is_empty())
407 .filter(|s| seen.insert(s.clone()))
408 .collect()
409 }
410
411 pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
412 let bucket = self.nodes_by_label.entry(label.to_string()).or_default();
413 if !bucket.contains(&node_id) {
417 bucket.push(node_id);
418 }
419 }
420
421 fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
422 if let Some(ids) = self.nodes_by_label.get_mut(label) {
423 if let Some(pos) = ids.iter().position(|&id| id == node_id) {
424 ids.swap_remove(pos);
425 }
426 if ids.is_empty() {
427 self.nodes_by_label.remove(label);
428 }
429 }
430 }
431
432 fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
433 let bucket = self
434 .relationships_by_type
435 .entry(rel_type.to_string())
436 .or_default();
437 if !bucket.contains(&rel_id) {
438 bucket.push(rel_id);
439 }
440 }
441
442 fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
443 if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
444 if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
445 ids.swap_remove(pos);
446 }
447 if ids.is_empty() {
448 self.relationships_by_type.remove(rel_type);
449 }
450 }
451 }
452
453 pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
454 self.indexes
455 .read()
456 .unwrap_or_else(|poisoned| poisoned.into_inner())
457 }
458
459 pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
460 self.indexes
461 .write()
462 .unwrap_or_else(|poisoned| poisoned.into_inner())
463 }
464
465 pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
466 self.indexes
467 .get_mut()
468 .unwrap_or_else(|poisoned| poisoned.into_inner())
469 }
470
471 #[inline]
472 pub(super) fn active_node_property_index_count(&self) -> usize {
473 self.active_node_property_indexes.load(Ordering::Relaxed)
474 }
475
476 #[inline]
477 pub(super) fn active_relationship_property_index_count(&self) -> usize {
478 self.active_relationship_property_indexes
479 .load(Ordering::Relaxed)
480 }
481
482 #[inline]
483 pub(super) fn has_active_property_indexes(&self) -> bool {
484 self.active_node_property_index_count() != 0
485 || self.active_relationship_property_index_count() != 0
486 }
487
488 pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
489 self.active_node_property_index_count() != 0
490 && self.indexes_mut().node_properties.is_active(key)
491 }
492
493 pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
494 self.active_relationship_property_index_count() != 0
495 && self.indexes_mut().relationship_properties.is_active(key)
496 }
497
498 pub(super) fn ensure_node_property_index(&self, key: &str) {
499 {
500 let indexes = self.indexes_read();
501 if indexes.node_properties.is_active(key) {
502 return;
503 }
504 }
505
506 let mut indexes = self.indexes_write();
507 if indexes.node_properties.is_active(key) {
508 return;
509 }
510
511 for (id, node) in self.iter_nodes() {
512 if let Some(value) = node.properties.get(key) {
513 indexes.node_properties.insert_with_scopes(
514 id,
515 node.labels.iter().map(String::as_str),
516 key,
517 value,
518 );
519 }
520 }
521 if indexes.node_properties.activate(key) {
522 self.active_node_property_indexes
523 .fetch_add(1, Ordering::Relaxed);
524 }
525 }
526
527 pub(super) fn ensure_relationship_property_index(&self, key: &str) {
528 {
529 let indexes = self.indexes_read();
530 if indexes.relationship_properties.is_active(key) {
531 return;
532 }
533 }
534
535 let mut indexes = self.indexes_write();
536 if indexes.relationship_properties.is_active(key) {
537 return;
538 }
539
540 for (id, rel) in self.iter_rels() {
541 if let Some(value) = rel.properties.get(key) {
542 indexes.relationship_properties.insert_with_scopes(
543 id,
544 [rel.rel_type.as_str()],
545 key,
546 value,
547 );
548 }
549 }
550 if indexes.relationship_properties.activate(key) {
551 self.active_relationship_property_indexes
552 .fetch_add(1, Ordering::Relaxed);
553 }
554 }
555
556 pub(super) fn rebuild_property_indexes(&mut self) {
557 let mut indexes = PropertyIndexRegistry::default();
558
559 for (id, node) in self.iter_nodes() {
560 for (key, value) in &node.properties {
561 if PropertyIndexKey::from_value(value).is_some() {
562 indexes.node_properties.activate(key);
563 indexes.node_properties.insert_with_scopes(
564 id,
565 node.labels.iter().map(String::as_str),
566 key,
567 value,
568 );
569 }
570 }
571 }
572
573 for (id, rel) in self.iter_rels() {
574 for (key, value) in &rel.properties {
575 if PropertyIndexKey::from_value(value).is_some() {
576 indexes.relationship_properties.activate(key);
577 indexes.relationship_properties.insert_with_scopes(
578 id,
579 [rel.rel_type.as_str()],
580 key,
581 value,
582 );
583 }
584 }
585 }
586
587 let node_index_count = indexes.node_properties.active_keys.len();
588 let relationship_index_count = indexes.relationship_properties.active_keys.len();
589 *self.indexes_mut() = indexes;
590 self.active_node_property_indexes
591 .store(node_index_count, Ordering::Relaxed);
592 self.active_relationship_property_indexes
593 .store(relationship_index_count, Ordering::Relaxed);
594 }
595
596 pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
597 for label in &node.labels {
598 self.insert_node_label_index(node.id, label);
599 }
600 self.index_node_properties_if_active(
601 node.id,
602 node.labels.iter().map(String::as_str),
603 &node.properties,
604 );
605 }
606
607 pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
608 for label in &node.labels {
609 self.insert_node_label_index(node.id, label);
610 }
611 self.index_node_properties_eager(
612 node.id,
613 node.labels.iter().map(String::as_str),
614 &node.properties,
615 );
616 }
617
618 pub(super) fn on_node_property_set(
619 &mut self,
620 node_id: NodeId,
621 key: &str,
622 old: Option<&PropertyValue>,
623 new: &PropertyValue,
624 ) {
625 if !self.node_property_index_is_active(key) {
626 return;
627 }
628
629 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
630 return;
631 };
632
633 if let Some(old) = old {
634 self.unindex_node_property_if_active(
635 node_id,
636 labels.iter().map(String::as_str),
637 key,
638 old,
639 );
640 }
641 self.index_node_property_if_active(node_id, labels.iter().map(String::as_str), key, new);
642 }
643
644 pub(super) fn on_node_property_removed(
645 &mut self,
646 node_id: NodeId,
647 key: &str,
648 old: &PropertyValue,
649 ) {
650 if !self.node_property_index_is_active(key) {
651 return;
652 }
653
654 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
655 return;
656 };
657 self.unindex_node_property_if_active(node_id, labels.iter().map(String::as_str), key, old);
658 }
659
660 pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
661 self.insert_node_label_index(node_id, label);
662
663 if self.active_node_property_index_count() == 0 {
664 return;
665 }
666
667 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
668 return;
669 };
670 self.index_node_scope_properties_if_active(node_id, label, &properties);
671 }
672
673 pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
674 self.remove_node_label_index(node_id, label);
675
676 if self.active_node_property_index_count() == 0 {
677 return;
678 }
679
680 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
681 return;
682 };
683 self.unindex_node_scope_properties_if_active(node_id, label, &properties);
684 }
685
686 pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
687 for label in &node.labels {
688 self.remove_node_label_index(node.id, label);
689 }
690 self.unindex_active_node_properties(
691 node.id,
692 node.labels.iter().map(String::as_str),
693 &node.properties,
694 );
695 }
696
697 pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
698 self.attach_relationship(rel);
699 self.index_relationship_properties_if_active(
700 rel.id,
701 [rel.rel_type.as_str()],
702 &rel.properties,
703 );
704 }
705
706 pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
707 self.attach_relationship(rel);
708 self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
709 }
710
711 pub(super) fn on_relationship_property_set(
712 &mut self,
713 rel_id: RelationshipId,
714 key: &str,
715 old: Option<&PropertyValue>,
716 new: &PropertyValue,
717 ) {
718 if !self.relationship_property_index_is_active(key) {
719 return;
720 }
721
722 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
723 return;
724 };
725
726 if let Some(old) = old {
727 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
728 }
729 self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
730 }
731
732 pub(super) fn on_relationship_property_removed(
733 &mut self,
734 rel_id: RelationshipId,
735 key: &str,
736 old: &PropertyValue,
737 ) {
738 if !self.relationship_property_index_is_active(key) {
739 return;
740 }
741
742 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
743 return;
744 };
745 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
746 }
747
748 pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
749 self.detach_relationship_indexes(rel);
750 self.unindex_active_relationship_properties(
751 rel.id,
752 [rel.rel_type.as_str()],
753 &rel.properties,
754 );
755 }
756
757 fn index_node_property_eager<'a>(
758 &mut self,
759 node_id: NodeId,
760 labels: impl IntoIterator<Item = &'a str>,
761 key: &str,
762 value: &PropertyValue,
763 ) {
764 if PropertyIndexKey::from_value(value).is_none() {
765 return;
766 }
767
768 let activated = {
769 let indexes = self.indexes_mut();
770 let activated = indexes.node_properties.activate(key);
771 indexes
772 .node_properties
773 .insert_with_scopes(node_id, labels, key, value);
774 activated
775 };
776 if activated {
777 self.active_node_property_indexes
778 .fetch_add(1, Ordering::Relaxed);
779 }
780 }
781
782 fn index_relationship_property_eager<'a>(
783 &mut self,
784 rel_id: RelationshipId,
785 scopes: impl IntoIterator<Item = &'a str>,
786 key: &str,
787 value: &PropertyValue,
788 ) {
789 if PropertyIndexKey::from_value(value).is_none() {
790 return;
791 }
792
793 let activated = {
794 let indexes = self.indexes_mut();
795 let activated = indexes.relationship_properties.activate(key);
796 indexes
797 .relationship_properties
798 .insert_with_scopes(rel_id, scopes, key, value);
799 activated
800 };
801 if activated {
802 self.active_relationship_property_indexes
803 .fetch_add(1, Ordering::Relaxed);
804 }
805 }
806
807 fn index_node_properties_eager<'a>(
808 &mut self,
809 node_id: NodeId,
810 labels: impl IntoIterator<Item = &'a str> + Clone,
811 properties: &Properties,
812 ) {
813 for (key, value) in properties {
814 self.index_node_property_eager(node_id, labels.clone(), key, value);
815 }
816 }
817
818 fn index_relationship_properties_eager<'a>(
819 &mut self,
820 rel_id: RelationshipId,
821 scopes: impl IntoIterator<Item = &'a str> + Clone,
822 properties: &Properties,
823 ) {
824 for (key, value) in properties {
825 self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
826 }
827 }
828
829 fn index_node_property_if_active<'a>(
830 &mut self,
831 node_id: NodeId,
832 labels: impl IntoIterator<Item = &'a str>,
833 key: &str,
834 value: &PropertyValue,
835 ) {
836 if self.active_node_property_index_count() == 0 {
837 return;
838 }
839 let indexes = self.indexes_mut();
840 if indexes.node_properties.is_active(key) {
841 indexes
842 .node_properties
843 .insert_with_scopes(node_id, labels, key, value);
844 }
845 }
846
847 fn index_node_properties_if_active<'a>(
848 &mut self,
849 node_id: NodeId,
850 labels: impl IntoIterator<Item = &'a str> + Clone,
851 properties: &Properties,
852 ) {
853 if self.active_node_property_index_count() == 0 {
854 return;
855 }
856 let indexes = self.indexes_mut();
857 for (key, value) in properties {
858 if indexes.node_properties.is_active(key) {
859 indexes
860 .node_properties
861 .insert_with_scopes(node_id, labels.clone(), key, value);
862 }
863 }
864 }
865
866 fn unindex_node_property_if_active<'a>(
867 &mut self,
868 node_id: NodeId,
869 labels: impl IntoIterator<Item = &'a str>,
870 key: &str,
871 value: &PropertyValue,
872 ) {
873 if self.active_node_property_index_count() == 0 {
874 return;
875 }
876 let indexes = self.indexes_mut();
877 if indexes.node_properties.is_active(key) {
878 indexes
879 .node_properties
880 .remove_with_scopes(node_id, labels, key, value);
881 }
882 }
883
884 fn index_node_scope_properties_if_active(
885 &mut self,
886 node_id: NodeId,
887 scope: &str,
888 properties: &Properties,
889 ) {
890 if self.active_node_property_index_count() == 0 {
891 return;
892 }
893 let indexes = self.indexes_mut();
894 for (key, value) in properties {
895 if indexes.node_properties.is_active(key) {
896 indexes
897 .node_properties
898 .insert_scoped(node_id, scope, key, value);
899 }
900 }
901 }
902
903 fn unindex_node_scope_properties_if_active(
904 &mut self,
905 node_id: NodeId,
906 scope: &str,
907 properties: &Properties,
908 ) {
909 if self.active_node_property_index_count() == 0 {
910 return;
911 }
912 let indexes = self.indexes_mut();
913 for (key, value) in properties {
914 if indexes.node_properties.is_active(key) {
915 indexes
916 .node_properties
917 .remove_scoped(node_id, scope, key, value);
918 }
919 }
920 }
921
922 fn unindex_active_node_properties<'a>(
923 &mut self,
924 node_id: NodeId,
925 labels: impl IntoIterator<Item = &'a str> + Clone,
926 properties: &Properties,
927 ) {
928 if self.active_node_property_index_count() == 0 {
929 return;
930 }
931 let indexes = self.indexes_mut();
932 for (key, value) in properties {
933 if indexes.node_properties.is_active(key) {
934 indexes
935 .node_properties
936 .remove_with_scopes(node_id, labels.clone(), key, value);
937 }
938 }
939 }
940
941 fn index_relationship_property_if_active<'a>(
942 &mut self,
943 rel_id: RelationshipId,
944 scopes: impl IntoIterator<Item = &'a str>,
945 key: &str,
946 value: &PropertyValue,
947 ) {
948 if self.active_relationship_property_index_count() == 0 {
949 return;
950 }
951 let indexes = self.indexes_mut();
952 if indexes.relationship_properties.is_active(key) {
953 indexes
954 .relationship_properties
955 .insert_with_scopes(rel_id, scopes, key, value);
956 }
957 }
958
959 fn index_relationship_properties_if_active<'a>(
960 &mut self,
961 rel_id: RelationshipId,
962 scopes: impl IntoIterator<Item = &'a str> + Clone,
963 properties: &Properties,
964 ) {
965 if self.active_relationship_property_index_count() == 0 {
966 return;
967 }
968 let indexes = self.indexes_mut();
969 for (key, value) in properties {
970 if indexes.relationship_properties.is_active(key) {
971 indexes.relationship_properties.insert_with_scopes(
972 rel_id,
973 scopes.clone(),
974 key,
975 value,
976 );
977 }
978 }
979 }
980
981 fn unindex_relationship_property_if_active<'a>(
982 &mut self,
983 rel_id: RelationshipId,
984 scopes: impl IntoIterator<Item = &'a str>,
985 key: &str,
986 value: &PropertyValue,
987 ) {
988 if self.active_relationship_property_index_count() == 0 {
989 return;
990 }
991 let indexes = self.indexes_mut();
992 if indexes.relationship_properties.is_active(key) {
993 indexes
994 .relationship_properties
995 .remove_with_scopes(rel_id, scopes, key, value);
996 }
997 }
998
999 fn unindex_active_relationship_properties<'a>(
1000 &mut self,
1001 rel_id: RelationshipId,
1002 scopes: impl IntoIterator<Item = &'a str> + Clone,
1003 properties: &Properties,
1004 ) {
1005 if self.active_relationship_property_index_count() == 0 {
1006 return;
1007 }
1008 let indexes = self.indexes_mut();
1009 for (key, value) in properties {
1010 if indexes.relationship_properties.is_active(key) {
1011 indexes.relationship_properties.remove_with_scopes(
1012 rel_id,
1013 scopes.clone(),
1014 key,
1015 value,
1016 );
1017 }
1018 }
1019 }
1020
1021 pub(super) fn scan_nodes_by_property(
1022 &self,
1023 label: Option<&str>,
1024 key: &str,
1025 value: &PropertyValue,
1026 ) -> Vec<NodeRecord> {
1027 let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
1028 Some(label) => Box::new(
1029 self.nodes_by_label
1030 .get(label)
1031 .into_iter()
1032 .flat_map(|ids| ids.iter())
1033 .filter_map(|&id| self.node_at(id)),
1034 ),
1035 None => Box::new(self.iter_node_records()),
1036 };
1037
1038 candidates
1039 .filter(|node| node.properties.get(key) == Some(value))
1040 .cloned()
1041 .collect()
1042 }
1043
1044 pub(super) fn scan_relationships_by_property(
1045 &self,
1046 rel_type: Option<&str>,
1047 key: &str,
1048 value: &PropertyValue,
1049 ) -> Vec<RelationshipRecord> {
1050 let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
1051 Some(rel_type) => Box::new(
1052 self.relationships_by_type
1053 .get(rel_type)
1054 .into_iter()
1055 .flat_map(|ids| ids.iter())
1056 .filter_map(|&id| self.rel_at(id)),
1057 ),
1058 None => Box::new(self.iter_rel_records()),
1059 };
1060
1061 candidates
1062 .filter(|rel| rel.properties.get(key) == Some(value))
1063 .cloned()
1064 .collect()
1065 }
1066
1067 pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
1068 self.outgoing_push(rel.src, rel.id);
1069 self.incoming_push(rel.dst, rel.id);
1070 self.insert_relationship_type_index(rel.id, &rel.rel_type);
1071 }
1072
1073 fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
1074 self.outgoing_remove(rel.src, rel.id);
1078 self.incoming_remove(rel.dst, rel.id);
1079
1080 self.remove_relationship_type_index(rel.id, &rel.rel_type);
1081 }
1082
1083 pub(super) fn relationship_ids_for_direction(
1084 &self,
1085 node_id: NodeId,
1086 direction: Direction,
1087 ) -> Vec<RelationshipId> {
1088 match direction {
1089 Direction::Left => self.incoming_at(node_id).cloned().unwrap_or_default(),
1090
1091 Direction::Right => self.outgoing_at(node_id).cloned().unwrap_or_default(),
1092
1093 Direction::Undirected => {
1094 let mut ids = BTreeSet::new();
1095
1096 if let Some(out) = self.outgoing_at(node_id) {
1097 ids.extend(out.iter().copied());
1098 }
1099 if let Some(inc) = self.incoming_at(node_id) {
1100 ids.extend(inc.iter().copied());
1101 }
1102
1103 ids.into_iter().collect()
1104 }
1105 }
1106 }
1107
1108 pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
1109 if rel.src == node_id {
1110 Some(rel.dst)
1111 } else if rel.dst == node_id {
1112 Some(rel.src)
1113 } else {
1114 None
1115 }
1116 }
1117
1118 pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
1119 self.outgoing_at(node_id)
1120 .map(|ids| !ids.is_empty())
1121 .unwrap_or(false)
1122 || self
1123 .incoming_at(node_id)
1124 .map(|ids| !ids.is_empty())
1125 .unwrap_or(false)
1126 }
1127
1128 pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
1129 let mut rel_ids = BTreeSet::new();
1130
1131 if let Some(ids) = self.outgoing_at(node_id) {
1132 rel_ids.extend(ids.iter().copied());
1133 }
1134 if let Some(ids) = self.incoming_at(node_id) {
1135 rel_ids.extend(ids.iter().copied());
1136 }
1137
1138 rel_ids
1139 }
1140
1141 #[doc(hidden)]
1145 pub fn replay_create_node(
1146 &mut self,
1147 id: NodeId,
1148 labels: Vec<String>,
1149 properties: Properties,
1150 ) -> Result<NodeRecord, String> {
1151 if self.recorder.is_some() {
1152 return Err(
1153 "cannot replay node creation while a mutation recorder is installed".into(),
1154 );
1155 }
1156 if self.node_at(id).is_some() {
1157 return Err(format!("node id {id} already exists"));
1158 }
1159
1160 let labels = Self::normalize_labels(labels);
1161 let node = NodeRecord {
1162 id,
1163 labels: labels.clone(),
1164 properties,
1165 };
1166
1167 self.on_node_replayed(&node);
1168 self.put_node(id, node.clone());
1169 self.bump_next_node_id_past(id)?;
1172
1173 Ok(node)
1174 }
1175
1176 #[doc(hidden)]
1180 pub fn replay_create_relationship(
1181 &mut self,
1182 id: RelationshipId,
1183 src: NodeId,
1184 dst: NodeId,
1185 rel_type: &str,
1186 properties: Properties,
1187 ) -> Result<RelationshipRecord, String> {
1188 if self.recorder.is_some() {
1189 return Err(
1190 "cannot replay relationship creation while a mutation recorder is installed".into(),
1191 );
1192 }
1193 if self.rel_at(id).is_some() {
1194 return Err(format!("relationship id {id} already exists"));
1195 }
1196 if self.node_at(src).is_none() {
1197 return Err(format!(
1198 "relationship {id} references missing source node {src}"
1199 ));
1200 }
1201 if self.node_at(dst).is_none() {
1202 return Err(format!(
1203 "relationship {id} references missing target node {dst}"
1204 ));
1205 }
1206
1207 let trimmed = rel_type.trim();
1208 if trimmed.is_empty() {
1209 return Err(format!("relationship {id} has an empty type"));
1210 }
1211
1212 let rel = RelationshipRecord {
1213 id,
1214 src,
1215 dst,
1216 rel_type: trimmed.to_string(),
1217 properties,
1218 };
1219
1220 self.on_relationship_replayed(&rel);
1221 self.put_rel(id, rel.clone());
1222 self.bump_next_rel_id_past(id)?;
1223
1224 Ok(rel)
1225 }
1226
1227 #[cfg(test)]
1228 pub(super) fn assert_property_indexes_match_scan(&self) {
1229 let indexes = self.indexes_read();
1230 assert_eq!(
1231 indexes.node_properties.active_keys.len(),
1232 self.active_node_property_index_count(),
1233 "node property index counter diverged from active key set"
1234 );
1235 assert_eq!(
1236 indexes.relationship_properties.active_keys.len(),
1237 self.active_relationship_property_index_count(),
1238 "relationship property index counter diverged from active key set"
1239 );
1240
1241 let mut expected_nodes = PropertyIndexState {
1242 active_keys: indexes.node_properties.active_keys.clone(),
1243 ..PropertyIndexState::default()
1244 };
1245 for (id, node) in self.iter_nodes() {
1246 for (key, value) in &node.properties {
1247 if expected_nodes.is_active(key) {
1248 expected_nodes.insert_with_scopes(
1249 id,
1250 node.labels.iter().map(String::as_str),
1251 key,
1252 value,
1253 );
1254 }
1255 }
1256 }
1257 assert_eq!(
1258 indexes.node_properties.values, expected_nodes.values,
1259 "node property index values diverged from scan"
1260 );
1261 assert_eq!(
1262 indexes.node_properties.scoped_values, expected_nodes.scoped_values,
1263 "node property scoped index values diverged from scan"
1264 );
1265
1266 let mut expected_relationships = PropertyIndexState {
1267 active_keys: indexes.relationship_properties.active_keys.clone(),
1268 ..PropertyIndexState::default()
1269 };
1270 for (id, rel) in self.iter_rels() {
1271 for (key, value) in &rel.properties {
1272 if expected_relationships.is_active(key) {
1273 expected_relationships.insert_with_scopes(
1274 id,
1275 [rel.rel_type.as_str()],
1276 key,
1277 value,
1278 );
1279 }
1280 }
1281 }
1282 assert_eq!(
1283 indexes.relationship_properties.values, expected_relationships.values,
1284 "relationship property index values diverged from scan"
1285 );
1286 assert_eq!(
1287 indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
1288 "relationship property scoped index values diverged from scan"
1289 );
1290 }
1291}