1use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12 LoraPoint, MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue,
13 RelationshipId, RelationshipRecord,
14};
15
16use super::constraint_catalog::{
17 ConstraintCatalog, ConstraintRequest, CreateConstraintError, CreateConstraintOutcome,
18 DropConstraintError, DropConstraintOutcome,
19};
20use super::entity_index_store::IndexBundle;
21use super::fulltext_index::FulltextRegistry;
22use super::hnsw::HnswParams;
23use super::index_catalog::{
24 CreateIndexError, CreateIndexOutcome, DropIndexError, DropIndexOutcome, IndexCatalog,
25 IndexDefinition, IndexRequest, StoredIndexEntity, StoredIndexKind, StoredIndexState,
26};
27use super::point_index::PointRegistry;
28#[cfg(test)]
29use super::property_index::PropertyIndexState;
30use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
31use super::secondary_index_maintenance::SecondaryIndexMutation;
32use super::sorted_property_index::SortedPropertyIndex;
33use super::stats::GraphStats;
34use super::text_index::TrigramRegistry;
35use super::vector_index::{VectorIndexProvider, VectorIndexRegistry, VectorSimilarity};
36
37#[derive(Default)]
38pub struct InMemoryGraph {
39 pub(super) next_node_id: NodeId,
40 pub(super) next_rel_id: RelationshipId,
41
42 pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
58 pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
59 pub(super) live_node_count: usize,
63 pub(super) live_rel_count: usize,
64
65 pub(super) outgoing: Vec<Vec<RelationshipId>>,
71 pub(super) incoming: Vec<Vec<RelationshipId>>,
72
73 pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
80 pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
81
82 pub(super) indexes: IndexBundle,
90
91 pub(super) constraint_catalog: RwLock<ConstraintCatalog>,
97 pub(super) active_constraints: AtomicUsize,
101
102 pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
107}
108
109impl std::fmt::Debug for InMemoryGraph {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("InMemoryGraph")
112 .field("next_node_id", &self.next_node_id)
113 .field("next_rel_id", &self.next_rel_id)
114 .field("nodes", &self.nodes)
115 .field("relationships", &self.relationships)
116 .field("outgoing", &self.outgoing)
117 .field("incoming", &self.incoming)
118 .field("nodes_by_label", &self.nodes_by_label)
119 .field("relationships_by_type", &self.relationships_by_type)
120 .field("indexes", &self.indexes)
121 .field(
122 "active_node_property_indexes",
123 &self.active_node_property_index_count(),
124 )
125 .field(
126 "active_relationship_property_indexes",
127 &self.active_relationship_property_index_count(),
128 )
129 .field(
130 "index_catalog_entries",
131 &self
132 .indexes
133 .catalog
134 .read()
135 .map(|c| c.list().len())
136 .unwrap_or(0),
137 )
138 .field("active_constraints", &self.active_constraint_count())
139 .field(
140 "active_fulltext_indexes",
141 &self.active_fulltext_index_count(),
142 )
143 .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
144 .finish()
145 }
146}
147
148impl Clone for InMemoryGraph {
149 fn clone(&self) -> Self {
150 Self {
153 next_node_id: self.next_node_id,
154 next_rel_id: self.next_rel_id,
155 nodes: self.nodes.clone(),
156 relationships: self.relationships.clone(),
157 live_node_count: self.live_node_count,
158 live_rel_count: self.live_rel_count,
159 outgoing: self.outgoing.clone(),
160 incoming: self.incoming.clone(),
161 nodes_by_label: self.nodes_by_label.clone(),
162 relationships_by_type: self.relationships_by_type.clone(),
163 indexes: self.indexes.clone(),
169 constraint_catalog: RwLock::new(self.constraint_catalog_read().clone()),
170 active_constraints: AtomicUsize::new(self.active_constraint_count()),
171 recorder: None,
172 }
173 }
174}
175
176impl InMemoryGraph {
177 pub fn new() -> Self {
178 Self::default()
179 }
180
181 pub fn with_capacity_hint(nodes: usize, relationships: usize) -> Self {
182 Self {
183 nodes: Vec::with_capacity(nodes),
184 relationships: Vec::with_capacity(relationships),
185 outgoing: Vec::with_capacity(nodes),
186 incoming: Vec::with_capacity(nodes),
187 ..Self::default()
188 }
189 }
190
191 pub fn contains_node(&self, node_id: NodeId) -> bool {
192 self.node_at(node_id).is_some()
193 }
194
195 pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
196 self.rel_at(rel_id).is_some()
197 }
198
199 pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
203 self.recorder = recorder;
204 }
205
206 pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
208 self.recorder.as_ref()
209 }
210
211 #[inline]
216 pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
217 if let Some(rec) = &self.recorder {
218 rec.record(build());
219 }
220 }
221
222 fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
223 let next = id
224 .checked_add(1)
225 .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
226 self.next_node_id = self.next_node_id.max(next);
227 Ok(())
228 }
229
230 fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
231 let next = id
232 .checked_add(1)
233 .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
234 self.next_rel_id = self.next_rel_id.max(next);
235 Ok(())
236 }
237
238 pub(super) fn try_reserve_next_node_slot(&mut self) -> Option<(NodeId, usize)> {
239 let id = self.next_node_id;
240 let idx = self.ensure_node_slot_checked(id).ok()?;
241 self.bump_next_node_id_past(id).ok()?;
242 Some((id, idx))
243 }
244
245 pub(super) fn try_reserve_next_rel_slot(&mut self) -> Option<(RelationshipId, usize)> {
246 let id = self.next_rel_id;
247 let idx = self.ensure_rel_slot_checked(id).ok()?;
248 self.bump_next_rel_id_past(id).ok()?;
249 Some((id, idx))
250 }
251
252 #[inline]
258 pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
259 self.nodes
260 .get(Self::slot_index(id)?)
261 .and_then(|s| s.as_ref())
262 .map(|arc| arc.as_ref())
263 }
264
265 #[inline]
270 pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
271 self.nodes
272 .get_mut(Self::slot_index(id)?)
273 .and_then(|s| s.as_mut())
274 .map(Arc::make_mut)
275 }
276
277 #[inline]
278 pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
279 self.relationships
280 .get(Self::slot_index(id)?)
281 .and_then(|s| s.as_ref())
282 .map(|arc| arc.as_ref())
283 }
284
285 #[inline]
286 pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
287 self.relationships
288 .get_mut(Self::slot_index(id)?)
289 .and_then(|s| s.as_mut())
290 .map(Arc::make_mut)
291 }
292
293 fn slot_len_for_id(id: u64, kind: &str) -> Result<usize, String> {
297 let idx = usize::try_from(id)
298 .map_err(|_| format!("{kind} id {id} does not fit in usize on this platform"))?;
299 idx.checked_add(1)
300 .ok_or_else(|| format!("{kind} id {id} leaves no valid slab slot"))
301 }
302
303 #[inline]
304 fn slot_index(id: u64) -> Option<usize> {
305 usize::try_from(id).ok()
306 }
307
308 fn ensure_node_slot_checked(&mut self, id: NodeId) -> Result<usize, String> {
309 let target = Self::slot_len_for_id(id, "node")?;
310 if self.nodes.len() < target {
311 let additional = target - self.nodes.len();
312 self.nodes.try_reserve_exact(additional).map_err(|e| {
313 format!("node id {id} requires {target} slots, but allocation failed: {e}")
314 })?;
315 self.outgoing.try_reserve_exact(additional).map_err(|e| {
316 format!(
317 "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
318 )
319 })?;
320 self.incoming.try_reserve_exact(additional).map_err(|e| {
321 format!(
322 "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
323 )
324 })?;
325 self.nodes.resize_with(target, || None);
326 self.outgoing.resize_with(target, Vec::new);
327 self.incoming.resize_with(target, Vec::new);
328 }
329 Ok(target - 1)
330 }
331
332 fn ensure_rel_slot_checked(&mut self, id: RelationshipId) -> Result<usize, String> {
333 let target = Self::slot_len_for_id(id, "relationship")?;
334 if self.relationships.len() < target {
335 self.relationships
336 .try_reserve_exact(target - self.relationships.len())
337 .map_err(|e| {
338 format!(
339 "relationship id {id} requires {target} slots, but allocation failed: {e}"
340 )
341 })?;
342 self.relationships.resize_with(target, || None);
343 }
344 Ok(target - 1)
345 }
346
347 pub(super) fn put_node_checked(&mut self, id: NodeId, node: NodeRecord) -> Result<(), String> {
348 let idx = self.ensure_node_slot_checked(id)?;
349 self.put_node_at_slot(idx, node);
350 Ok(())
351 }
352
353 pub(super) fn put_rel_checked(
354 &mut self,
355 id: RelationshipId,
356 rel: RelationshipRecord,
357 ) -> Result<(), String> {
358 let idx = self.ensure_rel_slot_checked(id)?;
359 self.put_rel_at_slot(idx, rel);
360 Ok(())
361 }
362
363 pub(super) fn put_node_at_slot(&mut self, idx: usize, node: NodeRecord) {
364 let was_present = self.nodes[idx].is_some();
365 self.nodes[idx] = Some(Arc::new(node));
366 if !was_present {
367 self.live_node_count += 1;
368 }
369 }
370
371 pub(super) fn put_rel_at_slot(&mut self, idx: usize, rel: RelationshipRecord) {
372 let was_present = self.relationships[idx].is_some();
373 self.relationships[idx] = Some(Arc::new(rel));
374 if !was_present {
375 self.live_rel_count += 1;
376 }
377 }
378
379 pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
380 let idx = Self::slot_index(id)?;
381 let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
382 if removed.is_some() {
383 self.live_node_count -= 1;
384 if let Some(out) = self.outgoing.get_mut(idx) {
390 out.clear();
391 }
392 if let Some(inc) = self.incoming.get_mut(idx) {
393 inc.clear();
394 }
395 }
396 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
401 }
402
403 pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
404 let idx = Self::slot_index(id)?;
405 let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
406 if removed.is_some() {
407 self.live_rel_count -= 1;
408 }
409 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
410 }
411
412 #[inline]
413 pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
414 self.outgoing.get(Self::slot_index(id)?).map(Vec::as_slice)
415 }
416
417 #[inline]
418 pub(super) fn incoming_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
419 self.incoming.get(Self::slot_index(id)?).map(Vec::as_slice)
420 }
421
422 #[inline]
423 fn try_for_each_adjacent_slice<F, E>(
424 &self,
425 node_id: NodeId,
426 types: &[String],
427 adj: &[RelationshipId],
428 skip_self_loops: bool,
429 visit: &mut F,
430 ) -> Result<(), E>
431 where
432 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
433 {
434 let single_type = match types {
435 [single] => Some(single.as_str()),
436 _ => None,
437 };
438 let has_type_filter = !types.is_empty();
439
440 for &rel_id in adj {
441 let Some(rel) = self.rel_at(rel_id) else {
442 continue;
443 };
444 if skip_self_loops && rel.src == node_id && rel.dst == node_id {
445 continue;
446 }
447 if let Some(single) = single_type {
448 if rel.rel_type != single {
449 continue;
450 }
451 } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
452 continue;
453 }
454 let Some(other_id) = Self::other_endpoint(rel, node_id) else {
455 continue;
456 };
457 visit(rel_id, other_id)?;
458 }
459 Ok(())
460 }
461
462 #[inline]
463 pub(super) fn try_for_each_adjacent_id_unchecked<F, E>(
464 &self,
465 node_id: NodeId,
466 direction: Direction,
467 types: &[String],
468 mut visit: F,
469 ) -> Result<(), E>
470 where
471 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
472 {
473 match direction {
474 Direction::Right => {
475 if let Some(adj) = self.outgoing_at(node_id) {
476 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
477 }
478 }
479 Direction::Left => {
480 if let Some(adj) = self.incoming_at(node_id) {
481 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
482 }
483 }
484 Direction::Undirected => {
485 if let Some(adj) = self.outgoing_at(node_id) {
486 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
487 }
488 if let Some(adj) = self.incoming_at(node_id) {
489 self.try_for_each_adjacent_slice(node_id, types, adj, true, &mut visit)?;
490 }
491 }
492 }
493
494 Ok(())
495 }
496
497 #[inline]
498 pub(super) fn try_for_each_adjacent_id<F, E>(
499 &self,
500 node_id: NodeId,
501 direction: Direction,
502 types: &[String],
503 visit: F,
504 ) -> Result<(), E>
505 where
506 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
507 {
508 if self.node_at(node_id).is_none() {
509 return Ok(());
510 }
511 self.try_for_each_adjacent_id_unchecked(node_id, direction, types, visit)
512 }
513
514 pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
515 self.nodes
516 .iter()
517 .enumerate()
518 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
519 }
520
521 pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
522 self.nodes
523 .iter()
524 .filter_map(|s| s.as_ref())
525 .map(|arc| arc.as_ref())
526 }
527
528 pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
529 self.relationships
530 .iter()
531 .enumerate()
532 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
533 }
534
535 pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
536 self.relationships
537 .iter()
538 .filter_map(|s| s.as_ref())
539 .map(|arc| arc.as_ref())
540 }
541
542 pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
543 self.nodes
544 .iter()
545 .enumerate()
546 .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
547 }
548
549 pub(super) fn iter_rels(
550 &self,
551 ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
552 self.relationships
553 .iter()
554 .enumerate()
555 .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
556 }
557
558 fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
562 if let Ok(idx) = self.ensure_node_slot_checked(node_id) {
563 self.outgoing[idx].push(rel_id);
564 }
565 }
566
567 fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
568 if let Ok(idx) = self.ensure_node_slot_checked(node_id) {
569 self.incoming[idx].push(rel_id);
570 }
571 }
572
573 fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
576 if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.outgoing.get_mut(idx)) {
577 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
578 v.swap_remove(pos);
579 }
580 }
581 }
582
583 fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
584 if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.incoming.get_mut(idx)) {
585 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
586 v.swap_remove(pos);
587 }
588 }
589 }
590
591 pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
592 let mut seen = BTreeSet::new();
593
594 labels
595 .into_iter()
596 .map(|s| s.trim().to_string())
597 .filter(|s| !s.is_empty())
598 .filter(|s| seen.insert(s.clone()))
599 .collect()
600 }
601
602 pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
603 if let Some(bucket) = self.nodes_by_label.get_mut(label) {
608 bucket.push(node_id);
609 } else {
610 self.nodes_by_label.insert(label.to_string(), vec![node_id]);
611 }
612 }
613
614 fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
615 if let Some(ids) = self.nodes_by_label.get_mut(label) {
616 if let Some(pos) = ids.iter().position(|&id| id == node_id) {
617 ids.swap_remove(pos);
618 }
619 if ids.is_empty() {
620 self.nodes_by_label.remove(label);
621 }
622 }
623 }
624
625 fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
626 if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
628 bucket.push(rel_id);
629 } else {
630 self.relationships_by_type
631 .insert(rel_type.to_string(), vec![rel_id]);
632 }
633 }
634
635 fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
636 if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
637 if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
638 ids.swap_remove(pos);
639 }
640 if ids.is_empty() {
641 self.relationships_by_type.remove(rel_type);
642 }
643 }
644 }
645
646 pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
647 self.indexes
648 .properties
649 .read()
650 .unwrap_or_else(|poisoned| poisoned.into_inner())
651 }
652
653 pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
654 self.indexes
655 .properties
656 .write()
657 .unwrap_or_else(|poisoned| poisoned.into_inner())
658 }
659
660 pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
661 self.indexes
662 .properties
663 .get_mut()
664 .unwrap_or_else(|poisoned| poisoned.into_inner())
665 }
666
667 #[inline]
668 pub(super) fn active_node_property_index_count(&self) -> usize {
669 self.indexes
670 .active_node_property_indexes
671 .load(Ordering::Relaxed)
672 }
673
674 #[inline]
675 pub(super) fn active_relationship_property_index_count(&self) -> usize {
676 self.indexes
677 .active_relationship_property_indexes
678 .load(Ordering::Relaxed)
679 }
680
681 #[inline]
682 pub(super) fn active_constraint_count(&self) -> usize {
683 self.active_constraints.load(Ordering::Relaxed)
684 }
685
686 #[inline]
687 pub(super) fn has_active_constraints(&self) -> bool {
688 self.active_constraint_count() != 0
689 }
690
691 #[inline]
692 pub(super) fn active_fulltext_index_count(&self) -> usize {
693 self.indexes.active_fulltext_indexes.load(Ordering::Relaxed)
694 }
695
696 #[inline]
697 pub(super) fn has_active_fulltext_indexes(&self) -> bool {
698 self.active_fulltext_index_count() != 0
699 }
700
701 pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
702 self.active_node_property_index_count() != 0
703 && self.indexes_mut().node_properties.is_active(key)
704 }
705
706 pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
707 self.active_relationship_property_index_count() != 0
708 && self.indexes_mut().relationship_properties.is_active(key)
709 }
710
711 pub(super) fn ensure_node_property_index(&self, key: &str) {
712 {
713 let indexes = self.indexes_read();
714 if indexes.node_properties.is_active(key) {
715 return;
716 }
717 }
718
719 let mut indexes = self.indexes_write();
720 if indexes.node_properties.is_active(key) {
721 return;
722 }
723
724 for (id, node) in self.iter_nodes() {
725 if let Some(value) = node.properties.get(key) {
726 indexes.node_properties.insert_with_scopes(
727 id,
728 node.labels.iter().map(String::as_str),
729 key,
730 value,
731 );
732 }
733 }
734 if indexes.node_properties.activate(key) {
735 self.indexes
736 .active_node_property_indexes
737 .fetch_add(1, Ordering::Relaxed);
738 }
739 }
740
741 pub(super) fn ensure_relationship_property_index(&self, key: &str) {
742 {
743 let indexes = self.indexes_read();
744 if indexes.relationship_properties.is_active(key) {
745 return;
746 }
747 }
748
749 let mut indexes = self.indexes_write();
750 if indexes.relationship_properties.is_active(key) {
751 return;
752 }
753
754 for (id, rel) in self.iter_rels() {
755 if let Some(value) = rel.properties.get(key) {
756 indexes.relationship_properties.insert_with_scopes(
757 id,
758 [rel.rel_type.as_str()],
759 key,
760 value,
761 );
762 }
763 }
764 if indexes.relationship_properties.activate(key) {
765 self.indexes
766 .active_relationship_property_indexes
767 .fetch_add(1, Ordering::Relaxed);
768 }
769 }
770
771 pub(super) fn index_catalog_read(&self) -> std::sync::RwLockReadGuard<'_, IndexCatalog> {
772 self.indexes
773 .catalog
774 .read()
775 .unwrap_or_else(|poisoned| poisoned.into_inner())
776 }
777
778 pub(super) fn index_catalog_write(&self) -> RwLockWriteGuard<'_, IndexCatalog> {
779 self.indexes
780 .catalog
781 .write()
782 .unwrap_or_else(|poisoned| poisoned.into_inner())
783 }
784
785 pub(super) fn constraint_catalog_read(
786 &self,
787 ) -> std::sync::RwLockReadGuard<'_, ConstraintCatalog> {
788 self.constraint_catalog
789 .read()
790 .unwrap_or_else(|poisoned| poisoned.into_inner())
791 }
792
793 pub(super) fn constraint_catalog_write(&self) -> RwLockWriteGuard<'_, ConstraintCatalog> {
794 self.constraint_catalog
795 .write()
796 .unwrap_or_else(|poisoned| poisoned.into_inner())
797 }
798
799 #[allow(clippy::result_large_err)]
807 pub(super) fn register_index(
808 &self,
809 request: IndexRequest,
810 if_not_exists: bool,
811 ) -> Result<CreateIndexOutcome, CreateIndexError> {
812 self.register_index_with_recording(request, if_not_exists, true)
813 }
814
815 #[allow(clippy::result_large_err)]
816 fn register_index_with_recording(
817 &self,
818 request: IndexRequest,
819 if_not_exists: bool,
820 record_event: bool,
821 ) -> Result<CreateIndexOutcome, CreateIndexError> {
822 let request_for_event = record_event.then(|| request.clone());
823 let outcome = {
824 let mut catalog = self.index_catalog_write();
825 catalog.try_create(request, if_not_exists)?
826 };
827
828 if let CreateIndexOutcome::Created(def) = &outcome {
829 self.populate_index_data(def);
830 }
831
832 if matches!(outcome, CreateIndexOutcome::Created(_)) {
836 if let Some(request_for_event) = request_for_event {
837 self.emit(|| crate::MutationEvent::CreateIndex {
838 request: request_for_event,
839 if_not_exists,
840 });
841 }
842 }
843
844 Ok(outcome)
845 }
846
847 #[doc(hidden)]
851 pub fn replay_create_index(
852 &mut self,
853 request: IndexRequest,
854 if_not_exists: bool,
855 ) -> Result<(), String> {
856 if self.recorder.is_some() {
857 return Err("cannot replay create_index while a mutation recorder is installed".into());
858 }
859 self.register_index(request, if_not_exists)
860 .map(|_| ())
861 .map_err(|e| e.to_string())
862 }
863
864 #[doc(hidden)]
866 pub fn replay_drop_index(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
867 if self.recorder.is_some() {
868 return Err("cannot replay drop_index while a mutation recorder is installed".into());
869 }
870 self.drop_named_index(name, if_exists)
871 .map(|_| ())
872 .map_err(|e| e.to_string())
873 }
874
875 pub(super) fn register_constraint(
881 &self,
882 request: ConstraintRequest,
883 if_not_exists: bool,
884 ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
885 {
890 let constraint_catalog = self.constraint_catalog_read();
891 if let Some(existing) = constraint_catalog.find_equivalent(&request) {
892 let cloned = existing.clone();
893 drop(constraint_catalog);
894 if if_not_exists {
895 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
896 }
897 return Err(CreateConstraintError::EquivalentConstraintExists(
898 cloned.name,
899 ));
900 }
901 if let Some(existing) = constraint_catalog.get(&request.name) {
902 let cloned = existing.clone();
903 drop(constraint_catalog);
904 if if_not_exists {
905 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
906 }
907 return Err(CreateConstraintError::DuplicateName(cloned.name));
908 }
909 if let Some(existing) = constraint_catalog.find_same_schema(&request) {
910 let cloned = existing.clone();
911 drop(constraint_catalog);
912 if super::constraint_catalog::kinds_conflict_for_validation(
913 &cloned.kind,
914 &request.kind,
915 ) {
916 if if_not_exists {
917 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
918 }
919 return Err(CreateConstraintError::ConflictingConstraint(cloned.name));
920 }
921 }
922 }
923
924 if request.kind.requires_backing_index() {
929 let idx_catalog = self.index_catalog_read();
930 if idx_catalog.get(&request.name).is_some() {
931 return Err(CreateConstraintError::DuplicateIndexName(
932 request.name.clone(),
933 ));
934 }
935 let conflict = idx_catalog.list().into_iter().find(|def| {
936 def.kind == StoredIndexKind::Range
937 && def.entity == request.entity
938 && def.label.as_deref() == Some(request.label.as_str())
939 && def.properties == request.properties
940 && def.name != request.name
941 });
942 drop(idx_catalog);
943 if let Some(def) = conflict {
944 return Err(CreateConstraintError::BackingIndexConflict(format!(
945 "(:{} {{{}}}) already covered by index `{}`",
946 request.label,
947 request.properties.join(", "),
948 def.name,
949 )));
950 }
951 }
952
953 let owns_backing = request.kind.requires_backing_index();
954 let request_for_event = request.clone();
955 let outcome = {
956 let mut catalog = self.constraint_catalog_write();
957 catalog.try_create(request, if_not_exists)?
958 };
959
960 if let CreateConstraintOutcome::Created(def) = &outcome {
961 if let Err(violation) = self.validate_existing_data_for_constraint(def) {
966 let mut catalog = self.constraint_catalog_write();
967 let _ = catalog.try_drop(&def.name, true);
968 return Err(CreateConstraintError::DataViolation(violation.to_string()));
969 }
970 }
971
972 if let CreateConstraintOutcome::Created(def) = &outcome {
973 if owns_backing {
974 let idx_request = IndexRequest {
978 explicit_name: Some(def.name.clone()),
979 kind: StoredIndexKind::Range,
980 entity: def.entity,
981 label: Some(def.label.clone()),
982 additional_labels: Vec::new(),
983 properties: def.properties.clone(),
984 options: Default::default(),
985 };
986 if let Err(err) = self.register_index_with_recording(idx_request, true, false) {
990 let mut catalog = self.constraint_catalog_write();
991 let _ = catalog.try_drop(&def.name, true);
992 return Err(CreateConstraintError::BackingIndexConflict(err.to_string()));
993 }
994 }
995 self.emit(|| crate::MutationEvent::CreateConstraint {
996 request: request_for_event,
997 if_not_exists,
998 });
999 self.active_constraints.fetch_add(1, Ordering::Relaxed);
1000 }
1001
1002 Ok(outcome)
1003 }
1004
1005 #[doc(hidden)]
1007 pub fn replay_create_constraint(
1008 &mut self,
1009 request: ConstraintRequest,
1010 if_not_exists: bool,
1011 ) -> Result<(), String> {
1012 if self.recorder.is_some() {
1013 return Err(
1014 "cannot replay create_constraint while a mutation recorder is installed".into(),
1015 );
1016 }
1017 self.register_constraint(request, if_not_exists)
1018 .map(|_| ())
1019 .map_err(|e| e.to_string())
1020 }
1021
1022 #[doc(hidden)]
1024 pub fn replay_drop_constraint(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
1025 if self.recorder.is_some() {
1026 return Err(
1027 "cannot replay drop_constraint while a mutation recorder is installed".into(),
1028 );
1029 }
1030 self.drop_named_constraint(name, if_exists)
1031 .map(|_| ())
1032 .map_err(|e| e.to_string())
1033 }
1034
1035 pub(super) fn drop_named_constraint(
1038 &self,
1039 name: &str,
1040 if_exists: bool,
1041 ) -> Result<DropConstraintOutcome, DropConstraintError> {
1042 let outcome = {
1043 let mut catalog = self.constraint_catalog_write();
1044 catalog.try_drop(name, if_exists)?
1045 };
1046 if let DropConstraintOutcome::Dropped(def) = &outcome {
1047 if let Some(index_name) = def.owned_index.as_deref() {
1048 let _ = self.drop_named_index_inner(index_name, true, false);
1051 }
1052 self.active_constraints.fetch_sub(1, Ordering::Relaxed);
1053 self.emit(|| crate::MutationEvent::DropConstraint {
1054 name: name.to_string(),
1055 if_exists,
1056 });
1057 }
1058 Ok(outcome)
1059 }
1060
1061 pub(super) fn drop_named_index(
1066 &self,
1067 name: &str,
1068 if_exists: bool,
1069 ) -> Result<DropIndexOutcome, DropIndexError> {
1070 self.drop_named_index_inner(name, if_exists, true)
1071 }
1072
1073 fn drop_named_index_inner(
1074 &self,
1075 name: &str,
1076 if_exists: bool,
1077 emit_event: bool,
1078 ) -> Result<DropIndexOutcome, DropIndexError> {
1079 if let Some(owner) = self
1080 .constraint_catalog_read()
1081 .constraint_owning_index(name)
1082 .cloned()
1083 {
1084 return Err(DropIndexError::ConstraintOwned {
1085 index: name.to_string(),
1086 constraint: owner.name,
1087 });
1088 }
1089
1090 let outcome = {
1091 let mut catalog = self.index_catalog_write();
1092 catalog.try_drop(name, if_exists)?
1093 };
1094 if let DropIndexOutcome::Dropped(def) = &outcome {
1095 match def.kind {
1097 StoredIndexKind::Text => {
1098 if let Some(label) = def.label.as_deref() {
1099 for prop in &def.properties {
1100 self.deactivate_text_scope(def.entity, label, prop);
1101 }
1102 }
1103 }
1104 StoredIndexKind::Range => {
1105 if let Some(label) = def.label.as_deref() {
1106 for prop in &def.properties {
1107 self.deactivate_sorted_scope(def.entity, label, prop);
1108 }
1109 }
1110 }
1111 StoredIndexKind::Point => {
1112 if let Some(label) = def.label.as_deref() {
1113 for prop in &def.properties {
1114 self.deactivate_point_scope(def.entity, label, prop);
1115 }
1116 }
1117 }
1118 StoredIndexKind::Lookup => {
1119 }
1122 StoredIndexKind::Vector => {
1123 self.deactivate_vector_index(def.entity, &def.name);
1124 }
1125 StoredIndexKind::Fulltext => {
1126 self.deactivate_fulltext_index(def.entity, &def.name);
1127 }
1128 }
1129 if emit_event {
1130 self.emit(|| crate::MutationEvent::DropIndex {
1131 name: name.to_string(),
1132 if_exists,
1133 });
1134 }
1135 }
1136 Ok(outcome)
1137 }
1138
1139 fn populate_index_data(&self, def: &IndexDefinition) {
1140 match def.kind {
1148 StoredIndexKind::Range => {
1149 for key in &def.properties {
1150 match def.entity {
1151 StoredIndexEntity::Node => self.ensure_node_property_index(key),
1152 StoredIndexEntity::Relationship => {
1153 self.ensure_relationship_property_index(key)
1154 }
1155 }
1156 if let Some(label) = def.label.as_deref() {
1157 self.activate_sorted_scope(def.entity, label, key);
1158 }
1159 }
1160 }
1161 StoredIndexKind::Text => {
1162 let label = match def.label.as_deref() {
1163 Some(l) => l,
1164 None => return,
1165 };
1166 for property in &def.properties {
1167 self.activate_text_scope(def.entity, label, property);
1168 }
1169 }
1170 StoredIndexKind::Point => {
1171 let label = match def.label.as_deref() {
1172 Some(l) => l,
1173 None => return,
1174 };
1175 let cell_size = PointRegistry::cell_size_from_options(&def.options);
1176 for property in &def.properties {
1177 self.activate_point_scope(def.entity, label, property, cell_size);
1178 }
1179 }
1180 StoredIndexKind::Fulltext => {
1181 let labels: Vec<String> = def.all_labels().map(String::from).collect();
1182 if labels.is_empty() {
1183 return;
1184 }
1185 self.activate_fulltext_index(def.entity, &def.name, &labels, &def.properties);
1186 }
1187 StoredIndexKind::Vector => {
1188 let label = match def.label.as_deref() {
1189 Some(l) => l,
1190 None => return,
1191 };
1192 let property = match def.properties.first() {
1193 Some(p) => p.as_str(),
1194 None => return,
1195 };
1196 let similarity = VectorSimilarity::from_options(&def.options)
1197 .unwrap_or(VectorSimilarity::Cosine);
1198 let provider = VectorIndexProvider::from_options(&def.options)
1199 .unwrap_or(VectorIndexProvider::Flat);
1200 let hnsw = HnswParams::from_options(&def.options);
1201 let lazy = matches!(
1202 def.options.get("vector.populate.async"),
1203 Some(super::index_catalog::IndexConfigValue::Bool(true))
1204 );
1205 self.activate_vector_index(
1206 def.entity, &def.name, label, property, similarity, provider, hnsw, lazy,
1207 );
1208 if lazy {
1209 self.index_catalog_write()
1210 .set_state(&def.name, StoredIndexState::Populating);
1211 }
1212 }
1213 StoredIndexKind::Lookup => {}
1215 }
1216 }
1217
1218 pub(super) fn text_indexes_read(
1219 &self,
1220 entity: StoredIndexEntity,
1221 ) -> std::sync::RwLockReadGuard<'_, TrigramRegistry> {
1222 self.indexes.text.read(entity)
1223 }
1224
1225 pub(super) fn text_indexes_write(
1226 &self,
1227 entity: StoredIndexEntity,
1228 ) -> RwLockWriteGuard<'_, TrigramRegistry> {
1229 self.indexes.text.write(entity)
1230 }
1231
1232 pub(super) fn fulltext_indexes_read(
1233 &self,
1234 entity: StoredIndexEntity,
1235 ) -> std::sync::RwLockReadGuard<'_, FulltextRegistry> {
1236 self.indexes.fulltext.read(entity)
1237 }
1238
1239 #[allow(dead_code)]
1240 pub(super) fn fulltext_indexes_write(
1241 &self,
1242 entity: StoredIndexEntity,
1243 ) -> RwLockWriteGuard<'_, FulltextRegistry> {
1244 self.indexes.fulltext.write(entity)
1245 }
1246
1247 fn activate_text_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1248 if !self.text_indexes_write(entity).add_scope(label, property) {
1249 return;
1250 }
1251
1252 let backfill: Vec<(u64, String)> = match entity {
1253 StoredIndexEntity::Node => self
1254 .iter_nodes()
1255 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1256 .filter_map(|(id, node)| match node.properties.get(property) {
1257 Some(PropertyValue::String(value)) => Some((id, value.clone())),
1258 _ => None,
1259 })
1260 .collect(),
1261 StoredIndexEntity::Relationship => self
1262 .iter_rels()
1263 .filter(|(_, rel)| rel.rel_type == label)
1264 .filter_map(|(id, rel)| match rel.properties.get(property) {
1265 Some(PropertyValue::String(value)) => Some((id, value.clone())),
1266 _ => None,
1267 })
1268 .collect(),
1269 };
1270
1271 let mut registry = self.text_indexes_write(entity);
1272 for (id, value) in backfill {
1273 registry.insert(label, property, id, &value);
1274 }
1275 }
1276
1277 pub(super) fn deactivate_text_scope(
1279 &self,
1280 entity: StoredIndexEntity,
1281 label: &str,
1282 property: &str,
1283 ) {
1284 self.text_indexes_write(entity)
1285 .remove_scope(label, property);
1286 }
1287
1288 fn activate_fulltext_index(
1289 &self,
1290 entity: StoredIndexEntity,
1291 name: &str,
1292 labels: &[String],
1293 properties: &[String],
1294 ) {
1295 use super::fulltext_index::{term_counts_for_properties, TermCounts};
1296
1297 {
1298 let mut registry = self.fulltext_indexes_write(entity);
1299 registry.register(name.to_string(), labels.to_vec(), properties.to_vec());
1300 }
1301 self.indexes
1302 .active_fulltext_indexes
1303 .fetch_add(1, Ordering::Relaxed);
1304
1305 let backfill: Vec<(u64, TermCounts)> = match entity {
1308 StoredIndexEntity::Node => self
1309 .iter_nodes()
1310 .filter(|(_, node)| {
1311 labels
1312 .iter()
1313 .any(|wanted| node.labels.iter().any(|l| l == wanted))
1314 })
1315 .map(|(id, node)| {
1316 let counts = term_counts_for_properties(&node.properties, properties);
1317 (id, counts)
1318 })
1319 .filter(|(_, c)| !c.is_empty())
1320 .collect(),
1321 StoredIndexEntity::Relationship => self
1322 .iter_rels()
1323 .filter(|(_, rel)| labels.iter().any(|wanted| wanted == &rel.rel_type))
1324 .map(|(id, rel)| {
1325 let counts = term_counts_for_properties(&rel.properties, properties);
1326 (id, counts)
1327 })
1328 .filter(|(_, c)| !c.is_empty())
1329 .collect(),
1330 };
1331
1332 let mut registry = self.fulltext_indexes_write(entity);
1333 if let Some(index) = registry.get_mut(name) {
1334 for (id, counts) in backfill {
1335 index.reindex_entity(id, counts);
1336 }
1337 }
1338 }
1339
1340 pub(super) fn deactivate_fulltext_index(&self, entity: StoredIndexEntity, name: &str) {
1341 self.fulltext_indexes_write(entity).deregister(name);
1342 self.indexes
1343 .active_fulltext_indexes
1344 .fetch_sub(1, Ordering::Relaxed);
1345 }
1346
1347 pub(super) fn sorted_indexes_read(
1348 &self,
1349 entity: StoredIndexEntity,
1350 ) -> std::sync::RwLockReadGuard<'_, SortedPropertyIndex> {
1351 self.indexes.sorted.read(entity)
1352 }
1353
1354 pub(super) fn sorted_indexes_write(
1355 &self,
1356 entity: StoredIndexEntity,
1357 ) -> RwLockWriteGuard<'_, SortedPropertyIndex> {
1358 self.indexes.sorted.write(entity)
1359 }
1360
1361 fn activate_sorted_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1362 if !self.sorted_indexes_write(entity).add_scope(label, property) {
1363 return;
1364 }
1365
1366 let backfill: Vec<(u64, PropertyValue)> = match entity {
1367 StoredIndexEntity::Node => self
1368 .iter_nodes()
1369 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1370 .filter_map(|(id, node)| {
1371 node.properties
1372 .get(property)
1373 .map(|value| (id, value.clone()))
1374 })
1375 .collect(),
1376 StoredIndexEntity::Relationship => self
1377 .iter_rels()
1378 .filter(|(_, rel)| rel.rel_type == label)
1379 .filter_map(|(id, rel)| {
1380 rel.properties
1381 .get(property)
1382 .map(|value| (id, value.clone()))
1383 })
1384 .collect(),
1385 };
1386
1387 let mut registry = self.sorted_indexes_write(entity);
1388 for (id, value) in backfill {
1389 registry.insert(label, property, id, &value);
1390 }
1391 }
1392
1393 pub(super) fn deactivate_sorted_scope(
1394 &self,
1395 entity: StoredIndexEntity,
1396 label: &str,
1397 property: &str,
1398 ) {
1399 self.sorted_indexes_write(entity)
1400 .remove_scope(label, property);
1401 }
1402
1403 pub(super) fn point_indexes_read(
1404 &self,
1405 entity: StoredIndexEntity,
1406 ) -> std::sync::RwLockReadGuard<'_, PointRegistry> {
1407 self.indexes.point.read(entity)
1408 }
1409
1410 pub(super) fn point_indexes_write(
1411 &self,
1412 entity: StoredIndexEntity,
1413 ) -> RwLockWriteGuard<'_, PointRegistry> {
1414 self.indexes.point.write(entity)
1415 }
1416
1417 fn activate_point_scope(
1418 &self,
1419 entity: StoredIndexEntity,
1420 label: &str,
1421 property: &str,
1422 cell_size: Option<f64>,
1423 ) {
1424 if !self
1425 .point_indexes_write(entity)
1426 .add_scope(label, property, cell_size)
1427 {
1428 return;
1429 }
1430
1431 let backfill: Vec<(u64, LoraPoint)> = match entity {
1432 StoredIndexEntity::Node => self
1433 .iter_nodes()
1434 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1435 .filter_map(|(id, node)| match node.properties.get(property) {
1436 Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1437 _ => None,
1438 })
1439 .collect(),
1440 StoredIndexEntity::Relationship => self
1441 .iter_rels()
1442 .filter(|(_, rel)| rel.rel_type == label)
1443 .filter_map(|(id, rel)| match rel.properties.get(property) {
1444 Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1445 _ => None,
1446 })
1447 .collect(),
1448 };
1449
1450 let mut registry = self.point_indexes_write(entity);
1451 for (id, point) in backfill {
1452 registry.insert(label, property, id, point);
1453 }
1454 }
1455
1456 pub(super) fn deactivate_point_scope(
1457 &self,
1458 entity: StoredIndexEntity,
1459 label: &str,
1460 property: &str,
1461 ) {
1462 self.point_indexes_write(entity)
1463 .remove_scope(label, property);
1464 }
1465
1466 pub(super) fn vector_indexes_read(
1467 &self,
1468 entity: StoredIndexEntity,
1469 ) -> std::sync::RwLockReadGuard<'_, VectorIndexRegistry> {
1470 self.indexes.vector.read(entity)
1471 }
1472
1473 pub(super) fn vector_indexes_write(
1474 &self,
1475 entity: StoredIndexEntity,
1476 ) -> RwLockWriteGuard<'_, VectorIndexRegistry> {
1477 self.indexes.vector.write(entity)
1478 }
1479
1480 #[allow(clippy::too_many_arguments)]
1481 fn activate_vector_index(
1482 &self,
1483 entity: StoredIndexEntity,
1484 name: &str,
1485 label: &str,
1486 property: &str,
1487 similarity: VectorSimilarity,
1488 provider: VectorIndexProvider,
1489 hnsw: HnswParams,
1490 lazy: bool,
1491 ) {
1492 {
1493 let mut registry = self.vector_indexes_write(entity);
1494 registry.register(
1495 name.to_string(),
1496 label.to_string(),
1497 property.to_string(),
1498 similarity,
1499 provider,
1500 hnsw,
1501 );
1502 }
1503
1504 if lazy {
1505 return;
1512 }
1513
1514 self.backfill_vector_index(entity, label, property);
1515 }
1516
1517 fn backfill_vector_index(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1521 let backfill: Vec<(u64, crate::LoraVector)> = match entity {
1522 StoredIndexEntity::Node => self
1523 .iter_nodes()
1524 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1525 .filter_map(|(id, node)| match node.properties.get(property) {
1526 Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1527 _ => None,
1528 })
1529 .collect(),
1530 StoredIndexEntity::Relationship => self
1531 .iter_rels()
1532 .filter(|(_, rel)| rel.rel_type == label)
1533 .filter_map(|(id, rel)| match rel.properties.get(property) {
1534 Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1535 _ => None,
1536 })
1537 .collect(),
1538 };
1539
1540 let mut registry = self.vector_indexes_write(entity);
1541 for (id, vector) in backfill {
1542 registry.insert_for(label, property, id, &vector);
1543 }
1544 }
1545
1546 pub(super) fn lazy_populate_vector_index(&self, name: &str) {
1552 let def = match self.index_catalog_read().get(name).cloned() {
1553 Some(d) => d,
1554 None => return,
1555 };
1556 if def.state != StoredIndexState::Populating || def.kind != StoredIndexKind::Vector {
1557 return;
1558 }
1559 let label = match def.label.as_deref() {
1560 Some(l) => l,
1561 None => return,
1562 };
1563 let property = match def.properties.first() {
1564 Some(p) => p.as_str(),
1565 None => return,
1566 };
1567 self.backfill_vector_index(def.entity, label, property);
1568 self.index_catalog_write()
1569 .set_state(name, StoredIndexState::Online);
1570 }
1571
1572 pub(super) fn deactivate_vector_index(&self, entity: StoredIndexEntity, name: &str) {
1573 self.vector_indexes_write(entity).deregister(name);
1574 }
1575
1576 pub fn graph_stats(&self) -> GraphStats {
1581 let mut stats = GraphStats {
1582 node_count: self.live_node_count,
1583 relationship_count: self.live_rel_count,
1584 ..Default::default()
1585 };
1586 for (label, ids) in &self.nodes_by_label {
1587 stats.nodes_by_label.insert(label.clone(), ids.len());
1588 }
1589 for (rel_type, ids) in &self.relationships_by_type {
1590 stats
1591 .relationships_by_type
1592 .insert(rel_type.clone(), ids.len());
1593 }
1594 let prop_indexes = self.indexes_read();
1600 for (scope, props) in &prop_indexes.node_properties.scoped_values {
1601 for (key, values) in props {
1602 stats
1603 .node_distinct_values
1604 .insert((scope.clone(), key.clone()), values.len());
1605 }
1606 }
1607 for (scope, props) in &prop_indexes.relationship_properties.scoped_values {
1608 for (key, values) in props {
1609 stats
1610 .relationship_distinct_values
1611 .insert((scope.clone(), key.clone()), values.len());
1612 }
1613 }
1614
1615 for def in self.index_catalog_read().list() {
1616 if def.state != StoredIndexState::Online {
1617 continue;
1618 }
1619 let Some(label) = def.label else {
1620 continue;
1621 };
1622 for property in def.properties {
1623 let scope = (label.clone(), property);
1624 match (def.entity, def.kind) {
1625 (StoredIndexEntity::Node, StoredIndexKind::Range) => {
1626 stats.node_range_indexes.insert(scope);
1627 }
1628 (StoredIndexEntity::Node, StoredIndexKind::Text) => {
1629 stats.node_text_indexes.insert(scope);
1630 }
1631 (StoredIndexEntity::Node, StoredIndexKind::Point) => {
1632 stats.node_point_indexes.insert(scope);
1633 }
1634 (StoredIndexEntity::Relationship, StoredIndexKind::Range) => {
1635 stats.relationship_range_indexes.insert(scope);
1636 }
1637 (StoredIndexEntity::Relationship, StoredIndexKind::Text) => {
1638 stats.relationship_text_indexes.insert(scope);
1639 }
1640 (StoredIndexEntity::Relationship, StoredIndexKind::Point) => {
1641 stats.relationship_point_indexes.insert(scope);
1642 }
1643 (StoredIndexEntity::Node, StoredIndexKind::Vector) => {
1644 stats.node_vector_indexes.insert(scope);
1645 }
1646 (StoredIndexEntity::Relationship, StoredIndexKind::Vector) => {
1647 stats.relationship_vector_indexes.insert(scope);
1648 }
1649 (_, StoredIndexKind::Lookup | StoredIndexKind::Fulltext) => {}
1650 }
1651 }
1652 }
1653 stats
1654 }
1655
1656 pub fn memory_estimate(&self) -> super::MemoryReport {
1661 super::mem_report::estimate(self)
1662 }
1663
1664 pub(super) fn rebuild_property_indexes(&mut self) {
1665 let mut indexes = PropertyIndexRegistry::default();
1666
1667 for (id, node) in self.iter_nodes() {
1668 for (key, value) in &node.properties {
1669 if PropertyIndexKey::from_value(value).is_some() {
1670 indexes.node_properties.activate(key);
1671 indexes.node_properties.insert_with_scopes(
1672 id,
1673 node.labels.iter().map(String::as_str),
1674 key,
1675 value,
1676 );
1677 }
1678 }
1679 }
1680
1681 for (id, rel) in self.iter_rels() {
1682 for (key, value) in &rel.properties {
1683 if PropertyIndexKey::from_value(value).is_some() {
1684 indexes.relationship_properties.activate(key);
1685 indexes.relationship_properties.insert_with_scopes(
1686 id,
1687 [rel.rel_type.as_str()],
1688 key,
1689 value,
1690 );
1691 }
1692 }
1693 }
1694
1695 let node_index_count = indexes.node_properties.active_keys.len();
1696 let relationship_index_count = indexes.relationship_properties.active_keys.len();
1697 *self.indexes_mut() = indexes;
1698 self.indexes
1699 .active_node_property_indexes
1700 .store(node_index_count, Ordering::Relaxed);
1701 self.indexes
1702 .active_relationship_property_indexes
1703 .store(relationship_index_count, Ordering::Relaxed);
1704 }
1705
1706 pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
1707 for label in &node.labels {
1708 self.insert_node_label_index(node.id, label);
1709 }
1710 self.index_node_properties_if_active(
1711 node.id,
1712 node.labels.iter().map(String::as_str),
1713 &node.properties,
1714 );
1715 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1716 }
1717
1718 pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
1719 for label in &node.labels {
1720 self.insert_node_label_index(node.id, label);
1721 }
1722 self.index_node_properties_eager(
1723 node.id,
1724 node.labels.iter().map(String::as_str),
1725 &node.properties,
1726 );
1727 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1728 }
1729
1730 pub(super) fn on_node_property_set(
1731 &mut self,
1732 node_id: NodeId,
1733 key: &str,
1734 old: Option<&PropertyValue>,
1735 new: &PropertyValue,
1736 ) {
1737 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1738 return;
1739 };
1740
1741 if self.node_property_index_is_active(key) {
1742 if let Some(old) = old {
1743 self.unindex_node_property_if_active(
1744 node_id,
1745 labels.iter().map(String::as_str),
1746 key,
1747 old,
1748 );
1749 }
1750 self.index_node_property_if_active(
1751 node_id,
1752 labels.iter().map(String::as_str),
1753 key,
1754 new,
1755 );
1756 }
1757
1758 self.update_secondary_property(
1759 StoredIndexEntity::Node,
1760 labels.iter().map(String::as_str),
1761 node_id,
1762 key,
1763 old,
1764 Some(new),
1765 );
1766 }
1767
1768 pub(super) fn on_node_property_removed(
1769 &mut self,
1770 node_id: NodeId,
1771 key: &str,
1772 old: &PropertyValue,
1773 ) {
1774 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1775 return;
1776 };
1777 if self.node_property_index_is_active(key) {
1778 self.unindex_node_property_if_active(
1779 node_id,
1780 labels.iter().map(String::as_str),
1781 key,
1782 old,
1783 );
1784 }
1785 self.update_secondary_property(
1786 StoredIndexEntity::Node,
1787 labels.iter().map(String::as_str),
1788 node_id,
1789 key,
1790 Some(old),
1791 None,
1792 );
1793 }
1794
1795 pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
1796 self.insert_node_label_index(node_id, label);
1797
1798 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1799 return;
1800 };
1801 if self.active_node_property_index_count() != 0 {
1802 self.index_node_scope_properties_if_active(node_id, label, &properties);
1803 }
1804 for (key, value) in &properties {
1805 self.update_secondary_property(
1806 StoredIndexEntity::Node,
1807 [label],
1808 node_id,
1809 key,
1810 None,
1811 Some(value),
1812 );
1813 }
1814 }
1815
1816 pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
1817 self.remove_node_label_index(node_id, label);
1818
1819 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1820 return;
1821 };
1822 if self.active_node_property_index_count() != 0 {
1823 self.unindex_node_scope_properties_if_active(node_id, label, &properties);
1824 }
1825 for (key, value) in &properties {
1826 self.update_secondary_property(
1827 StoredIndexEntity::Node,
1828 [label],
1829 node_id,
1830 key,
1831 Some(value),
1832 None,
1833 );
1834 }
1835 }
1836
1837 pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
1838 for label in &node.labels {
1839 self.remove_node_label_index(node.id, label);
1840 }
1841 self.unindex_active_node_properties(
1842 node.id,
1843 node.labels.iter().map(String::as_str),
1844 &node.properties,
1845 );
1846 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Remove);
1847 }
1848
1849 pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
1850 self.attach_relationship(rel);
1851 self.index_relationship_properties_if_active(
1852 rel.id,
1853 [rel.rel_type.as_str()],
1854 &rel.properties,
1855 );
1856 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1857 }
1858
1859 pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
1860 self.attach_relationship(rel);
1861 self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
1862 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1863 }
1864
1865 pub(super) fn on_relationship_property_set(
1866 &mut self,
1867 rel_id: RelationshipId,
1868 key: &str,
1869 old: Option<&PropertyValue>,
1870 new: &PropertyValue,
1871 ) {
1872 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1873 return;
1874 };
1875
1876 if self.relationship_property_index_is_active(key) {
1877 if let Some(old) = old {
1878 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1879 }
1880 self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
1881 }
1882
1883 self.update_secondary_property(
1884 StoredIndexEntity::Relationship,
1885 [rel_type.as_str()],
1886 rel_id,
1887 key,
1888 old,
1889 Some(new),
1890 );
1891 }
1892
1893 pub(super) fn on_relationship_property_removed(
1894 &mut self,
1895 rel_id: RelationshipId,
1896 key: &str,
1897 old: &PropertyValue,
1898 ) {
1899 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1900 return;
1901 };
1902 if self.relationship_property_index_is_active(key) {
1903 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1904 }
1905 self.update_secondary_property(
1906 StoredIndexEntity::Relationship,
1907 [rel_type.as_str()],
1908 rel_id,
1909 key,
1910 Some(old),
1911 None,
1912 );
1913 }
1914
1915 pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
1916 self.detach_relationship_indexes(rel);
1917 self.unindex_active_relationship_properties(
1918 rel.id,
1919 [rel.rel_type.as_str()],
1920 &rel.properties,
1921 );
1922 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Remove);
1923 }
1924
1925 fn index_node_property_eager<'a>(
1926 &mut self,
1927 node_id: NodeId,
1928 labels: impl IntoIterator<Item = &'a str>,
1929 key: &str,
1930 value: &PropertyValue,
1931 ) {
1932 if PropertyIndexKey::from_value(value).is_none() {
1933 return;
1934 }
1935
1936 let activated = {
1937 let indexes = self.indexes_mut();
1938 let activated = indexes.node_properties.activate(key);
1939 indexes
1940 .node_properties
1941 .insert_with_scopes(node_id, labels, key, value);
1942 activated
1943 };
1944 if activated {
1945 self.indexes
1946 .active_node_property_indexes
1947 .fetch_add(1, Ordering::Relaxed);
1948 }
1949 }
1950
1951 fn index_relationship_property_eager<'a>(
1952 &mut self,
1953 rel_id: RelationshipId,
1954 scopes: impl IntoIterator<Item = &'a str>,
1955 key: &str,
1956 value: &PropertyValue,
1957 ) {
1958 if PropertyIndexKey::from_value(value).is_none() {
1959 return;
1960 }
1961
1962 let activated = {
1963 let indexes = self.indexes_mut();
1964 let activated = indexes.relationship_properties.activate(key);
1965 indexes
1966 .relationship_properties
1967 .insert_with_scopes(rel_id, scopes, key, value);
1968 activated
1969 };
1970 if activated {
1971 self.indexes
1972 .active_relationship_property_indexes
1973 .fetch_add(1, Ordering::Relaxed);
1974 }
1975 }
1976
1977 fn index_node_properties_eager<'a>(
1978 &mut self,
1979 node_id: NodeId,
1980 labels: impl IntoIterator<Item = &'a str> + Clone,
1981 properties: &Properties,
1982 ) {
1983 for (key, value) in properties {
1984 self.index_node_property_eager(node_id, labels.clone(), key, value);
1985 }
1986 }
1987
1988 fn index_relationship_properties_eager<'a>(
1989 &mut self,
1990 rel_id: RelationshipId,
1991 scopes: impl IntoIterator<Item = &'a str> + Clone,
1992 properties: &Properties,
1993 ) {
1994 for (key, value) in properties {
1995 self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
1996 }
1997 }
1998
1999 fn index_node_property_if_active<'a>(
2000 &mut self,
2001 node_id: NodeId,
2002 labels: impl IntoIterator<Item = &'a str>,
2003 key: &str,
2004 value: &PropertyValue,
2005 ) {
2006 if self.active_node_property_index_count() == 0 {
2007 return;
2008 }
2009 let indexes = self.indexes_mut();
2010 if indexes.node_properties.is_active(key) {
2011 indexes
2012 .node_properties
2013 .insert_with_scopes(node_id, labels, key, value);
2014 }
2015 }
2016
2017 fn index_node_properties_if_active<'a>(
2018 &mut self,
2019 node_id: NodeId,
2020 labels: impl IntoIterator<Item = &'a str> + Clone,
2021 properties: &Properties,
2022 ) {
2023 if self.active_node_property_index_count() == 0 {
2024 return;
2025 }
2026 let indexes = self.indexes_mut();
2027 for (key, value) in properties {
2028 if indexes.node_properties.is_active(key) {
2029 indexes
2030 .node_properties
2031 .insert_with_scopes(node_id, labels.clone(), key, value);
2032 }
2033 }
2034 }
2035
2036 fn unindex_node_property_if_active<'a>(
2037 &mut self,
2038 node_id: NodeId,
2039 labels: impl IntoIterator<Item = &'a str>,
2040 key: &str,
2041 value: &PropertyValue,
2042 ) {
2043 if self.active_node_property_index_count() == 0 {
2044 return;
2045 }
2046 let indexes = self.indexes_mut();
2047 if indexes.node_properties.is_active(key) {
2048 indexes
2049 .node_properties
2050 .remove_with_scopes(node_id, labels, key, value);
2051 }
2052 }
2053
2054 fn index_node_scope_properties_if_active(
2055 &mut self,
2056 node_id: NodeId,
2057 scope: &str,
2058 properties: &Properties,
2059 ) {
2060 if self.active_node_property_index_count() == 0 {
2061 return;
2062 }
2063 let indexes = self.indexes_mut();
2064 for (key, value) in properties {
2065 if indexes.node_properties.is_active(key) {
2066 indexes
2067 .node_properties
2068 .insert_scoped(node_id, scope, key, value);
2069 }
2070 }
2071 }
2072
2073 fn unindex_node_scope_properties_if_active(
2074 &mut self,
2075 node_id: NodeId,
2076 scope: &str,
2077 properties: &Properties,
2078 ) {
2079 if self.active_node_property_index_count() == 0 {
2080 return;
2081 }
2082 let indexes = self.indexes_mut();
2083 for (key, value) in properties {
2084 if indexes.node_properties.is_active(key) {
2085 indexes
2086 .node_properties
2087 .remove_scoped(node_id, scope, key, value);
2088 }
2089 }
2090 }
2091
2092 fn unindex_active_node_properties<'a>(
2093 &mut self,
2094 node_id: NodeId,
2095 labels: impl IntoIterator<Item = &'a str> + Clone,
2096 properties: &Properties,
2097 ) {
2098 if self.active_node_property_index_count() == 0 {
2099 return;
2100 }
2101 let indexes = self.indexes_mut();
2102 for (key, value) in properties {
2103 if indexes.node_properties.is_active(key) {
2104 indexes
2105 .node_properties
2106 .remove_with_scopes(node_id, labels.clone(), key, value);
2107 }
2108 }
2109 }
2110
2111 fn index_relationship_property_if_active<'a>(
2112 &mut self,
2113 rel_id: RelationshipId,
2114 scopes: impl IntoIterator<Item = &'a str>,
2115 key: &str,
2116 value: &PropertyValue,
2117 ) {
2118 if self.active_relationship_property_index_count() == 0 {
2119 return;
2120 }
2121 let indexes = self.indexes_mut();
2122 if indexes.relationship_properties.is_active(key) {
2123 indexes
2124 .relationship_properties
2125 .insert_with_scopes(rel_id, scopes, key, value);
2126 }
2127 }
2128
2129 fn index_relationship_properties_if_active<'a>(
2130 &mut self,
2131 rel_id: RelationshipId,
2132 scopes: impl IntoIterator<Item = &'a str> + Clone,
2133 properties: &Properties,
2134 ) {
2135 if self.active_relationship_property_index_count() == 0 {
2136 return;
2137 }
2138 let indexes = self.indexes_mut();
2139 for (key, value) in properties {
2140 if indexes.relationship_properties.is_active(key) {
2141 indexes.relationship_properties.insert_with_scopes(
2142 rel_id,
2143 scopes.clone(),
2144 key,
2145 value,
2146 );
2147 }
2148 }
2149 }
2150
2151 fn unindex_relationship_property_if_active<'a>(
2152 &mut self,
2153 rel_id: RelationshipId,
2154 scopes: impl IntoIterator<Item = &'a str>,
2155 key: &str,
2156 value: &PropertyValue,
2157 ) {
2158 if self.active_relationship_property_index_count() == 0 {
2159 return;
2160 }
2161 let indexes = self.indexes_mut();
2162 if indexes.relationship_properties.is_active(key) {
2163 indexes
2164 .relationship_properties
2165 .remove_with_scopes(rel_id, scopes, key, value);
2166 }
2167 }
2168
2169 fn unindex_active_relationship_properties<'a>(
2170 &mut self,
2171 rel_id: RelationshipId,
2172 scopes: impl IntoIterator<Item = &'a str> + Clone,
2173 properties: &Properties,
2174 ) {
2175 if self.active_relationship_property_index_count() == 0 {
2176 return;
2177 }
2178 let indexes = self.indexes_mut();
2179 for (key, value) in properties {
2180 if indexes.relationship_properties.is_active(key) {
2181 indexes.relationship_properties.remove_with_scopes(
2182 rel_id,
2183 scopes.clone(),
2184 key,
2185 value,
2186 );
2187 }
2188 }
2189 }
2190
2191 pub(super) fn scan_nodes_by_property(
2192 &self,
2193 label: Option<&str>,
2194 key: &str,
2195 value: &PropertyValue,
2196 ) -> Vec<NodeRecord> {
2197 match label {
2198 Some(label) => self
2199 .nodes_by_label
2200 .get(label)
2201 .into_iter()
2202 .flat_map(|ids| ids.iter())
2203 .filter_map(|&id| self.node_at(id))
2204 .filter(|node| node.properties.get(key) == Some(value))
2205 .cloned()
2206 .collect(),
2207 None => self
2208 .iter_node_records()
2209 .filter(|node| node.properties.get(key) == Some(value))
2210 .cloned()
2211 .collect(),
2212 }
2213 }
2214
2215 pub(super) fn scan_node_ids_by_property(
2216 &self,
2217 label: Option<&str>,
2218 key: &str,
2219 value: &PropertyValue,
2220 ) -> Vec<NodeId> {
2221 match label {
2222 Some(label) => self
2223 .nodes_by_label
2224 .get(label)
2225 .into_iter()
2226 .flat_map(|ids| ids.iter())
2227 .filter_map(|&id| {
2228 (self.node_at(id)?.properties.get(key) == Some(value)).then_some(id)
2229 })
2230 .collect(),
2231 None => self
2232 .iter_nodes()
2233 .filter_map(|(id, node)| (node.properties.get(key) == Some(value)).then_some(id))
2234 .collect(),
2235 }
2236 }
2237
2238 pub(super) fn any_node_by_property(
2239 &self,
2240 label: &str,
2241 key: &str,
2242 value: &PropertyValue,
2243 ) -> bool {
2244 self.nodes_by_label
2245 .get(label)
2246 .into_iter()
2247 .flat_map(|ids| ids.iter())
2248 .filter_map(|&id| self.node_at(id))
2249 .any(|node| node.properties.get(key) == Some(value))
2250 }
2251
2252 pub(super) fn scan_relationships_by_property(
2253 &self,
2254 rel_type: Option<&str>,
2255 key: &str,
2256 value: &PropertyValue,
2257 ) -> Vec<RelationshipRecord> {
2258 match rel_type {
2259 Some(rel_type) => self
2260 .relationships_by_type
2261 .get(rel_type)
2262 .into_iter()
2263 .flat_map(|ids| ids.iter())
2264 .filter_map(|&id| self.rel_at(id))
2265 .filter(|rel| rel.properties.get(key) == Some(value))
2266 .cloned()
2267 .collect(),
2268 None => self
2269 .iter_rel_records()
2270 .filter(|rel| rel.properties.get(key) == Some(value))
2271 .cloned()
2272 .collect(),
2273 }
2274 }
2275
2276 pub(super) fn scan_relationship_ids_by_property(
2277 &self,
2278 rel_type: Option<&str>,
2279 key: &str,
2280 value: &PropertyValue,
2281 ) -> Vec<RelationshipId> {
2282 match rel_type {
2283 Some(rel_type) => self
2284 .relationships_by_type
2285 .get(rel_type)
2286 .into_iter()
2287 .flat_map(|ids| ids.iter())
2288 .filter_map(|&id| {
2289 (self.rel_at(id)?.properties.get(key) == Some(value)).then_some(id)
2290 })
2291 .collect(),
2292 None => self
2293 .iter_rels()
2294 .filter_map(|(id, rel)| (rel.properties.get(key) == Some(value)).then_some(id))
2295 .collect(),
2296 }
2297 }
2298
2299 pub(super) fn any_relationship_by_property(
2300 &self,
2301 rel_type: &str,
2302 key: &str,
2303 value: &PropertyValue,
2304 ) -> bool {
2305 self.relationships_by_type
2306 .get(rel_type)
2307 .into_iter()
2308 .flat_map(|ids| ids.iter())
2309 .filter_map(|&id| self.rel_at(id))
2310 .any(|rel| rel.properties.get(key) == Some(value))
2311 }
2312
2313 pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
2314 self.outgoing_push(rel.src, rel.id);
2315 self.incoming_push(rel.dst, rel.id);
2316 self.insert_relationship_type_index(rel.id, &rel.rel_type);
2317 }
2318
2319 fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
2320 self.outgoing_remove(rel.src, rel.id);
2324 self.incoming_remove(rel.dst, rel.id);
2325
2326 self.remove_relationship_type_index(rel.id, &rel.rel_type);
2327 }
2328
2329 pub(super) fn relationship_ids_for_direction(
2330 &self,
2331 node_id: NodeId,
2332 direction: Direction,
2333 ) -> Vec<RelationshipId> {
2334 match direction {
2335 Direction::Left => self
2336 .incoming_at(node_id)
2337 .map(<[_]>::to_vec)
2338 .unwrap_or_default(),
2339
2340 Direction::Right => self
2341 .outgoing_at(node_id)
2342 .map(<[_]>::to_vec)
2343 .unwrap_or_default(),
2344
2345 Direction::Undirected => {
2346 let out = self.outgoing_at(node_id);
2347 let inc = self.incoming_at(node_id);
2348 let mut ids = Vec::with_capacity(
2349 out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0),
2350 );
2351
2352 if let Some(out) = out {
2353 ids.extend(out.iter().copied());
2354 }
2355 if let Some(inc) = inc {
2356 for &rel_id in inc {
2357 let Some(rel) = self.rel_at(rel_id) else {
2358 continue;
2359 };
2360 if rel.src == node_id && rel.dst == node_id {
2361 continue;
2362 }
2363 ids.push(rel_id);
2364 }
2365 }
2366
2367 ids
2368 }
2369 }
2370 }
2371
2372 pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
2373 if rel.src == node_id {
2374 Some(rel.dst)
2375 } else if rel.dst == node_id {
2376 Some(rel.src)
2377 } else {
2378 None
2379 }
2380 }
2381
2382 pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
2383 self.outgoing_at(node_id)
2384 .map(|ids| !ids.is_empty())
2385 .unwrap_or(false)
2386 || self
2387 .incoming_at(node_id)
2388 .map(|ids| !ids.is_empty())
2389 .unwrap_or(false)
2390 }
2391
2392 pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> Vec<RelationshipId> {
2393 let out = self.outgoing_at(node_id);
2394 let inc = self.incoming_at(node_id);
2395 let mut rel_ids =
2396 Vec::with_capacity(out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0));
2397
2398 if let Some(ids) = out {
2399 rel_ids.extend(ids.iter().copied());
2400 }
2401 if let Some(ids) = inc {
2402 for &rel_id in ids {
2403 let Some(rel) = self.rel_at(rel_id) else {
2404 continue;
2405 };
2406 if rel.src == node_id && rel.dst == node_id {
2407 continue;
2408 }
2409 rel_ids.push(rel_id);
2410 }
2411 }
2412
2413 rel_ids
2414 }
2415
2416 #[doc(hidden)]
2420 pub fn replay_create_node(
2421 &mut self,
2422 id: NodeId,
2423 labels: Vec<String>,
2424 properties: Properties,
2425 ) -> Result<NodeRecord, String> {
2426 if self.recorder.is_some() {
2427 return Err(
2428 "cannot replay node creation while a mutation recorder is installed".into(),
2429 );
2430 }
2431 if self.node_at(id).is_some() {
2432 return Err(format!("node id {id} already exists"));
2433 }
2434 let idx = self.ensure_node_slot_checked(id)?;
2435 self.bump_next_node_id_past(id)?;
2436
2437 let labels = Self::normalize_labels(labels);
2438 let node = NodeRecord {
2439 id,
2440 labels: labels.clone(),
2441 properties,
2442 };
2443
2444 self.put_node_at_slot(idx, node.clone());
2445 self.on_node_replayed(&node);
2446
2447 Ok(node)
2448 }
2449
2450 #[doc(hidden)]
2454 pub fn replay_create_relationship(
2455 &mut self,
2456 id: RelationshipId,
2457 src: NodeId,
2458 dst: NodeId,
2459 rel_type: &str,
2460 properties: Properties,
2461 ) -> Result<RelationshipRecord, String> {
2462 if self.recorder.is_some() {
2463 return Err(
2464 "cannot replay relationship creation while a mutation recorder is installed".into(),
2465 );
2466 }
2467 if self.rel_at(id).is_some() {
2468 return Err(format!("relationship id {id} already exists"));
2469 }
2470 if self.node_at(src).is_none() {
2471 return Err(format!(
2472 "relationship {id} references missing source node {src}"
2473 ));
2474 }
2475 if self.node_at(dst).is_none() {
2476 return Err(format!(
2477 "relationship {id} references missing target node {dst}"
2478 ));
2479 }
2480
2481 let trimmed = rel_type.trim();
2482 if trimmed.is_empty() {
2483 return Err(format!("relationship {id} has an empty type"));
2484 }
2485 let idx = self.ensure_rel_slot_checked(id)?;
2486 self.bump_next_rel_id_past(id)?;
2487
2488 let rel = RelationshipRecord {
2489 id,
2490 src,
2491 dst,
2492 rel_type: trimmed.to_string(),
2493 properties,
2494 };
2495
2496 self.put_rel_at_slot(idx, rel.clone());
2497 self.on_relationship_replayed(&rel);
2498
2499 Ok(rel)
2500 }
2501
2502 #[cfg(test)]
2503 pub(super) fn assert_property_indexes_match_scan(&self) {
2504 let indexes = self.indexes_read();
2505 assert_eq!(
2506 indexes.node_properties.active_keys.len(),
2507 self.active_node_property_index_count(),
2508 "node property index counter diverged from active key set"
2509 );
2510 assert_eq!(
2511 indexes.relationship_properties.active_keys.len(),
2512 self.active_relationship_property_index_count(),
2513 "relationship property index counter diverged from active key set"
2514 );
2515
2516 let mut expected_nodes = PropertyIndexState {
2517 active_keys: indexes.node_properties.active_keys.clone(),
2518 ..PropertyIndexState::default()
2519 };
2520 for (id, node) in self.iter_nodes() {
2521 for (key, value) in &node.properties {
2522 if expected_nodes.is_active(key) {
2523 expected_nodes.insert_with_scopes(
2524 id,
2525 node.labels.iter().map(String::as_str),
2526 key,
2527 value,
2528 );
2529 }
2530 }
2531 }
2532 assert_eq!(
2533 indexes.node_properties.values, expected_nodes.values,
2534 "node property index values diverged from scan"
2535 );
2536 assert_eq!(
2537 indexes.node_properties.scoped_values, expected_nodes.scoped_values,
2538 "node property scoped index values diverged from scan"
2539 );
2540
2541 let mut expected_relationships = PropertyIndexState {
2542 active_keys: indexes.relationship_properties.active_keys.clone(),
2543 ..PropertyIndexState::default()
2544 };
2545 for (id, rel) in self.iter_rels() {
2546 for (key, value) in &rel.properties {
2547 if expected_relationships.is_active(key) {
2548 expected_relationships.insert_with_scopes(
2549 id,
2550 [rel.rel_type.as_str()],
2551 key,
2552 value,
2553 );
2554 }
2555 }
2556 }
2557 assert_eq!(
2558 indexes.relationship_properties.values, expected_relationships.values,
2559 "relationship property index values diverged from scan"
2560 );
2561 assert_eq!(
2562 indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
2563 "relationship property scoped index values diverged from scan"
2564 );
2565 }
2566}