1use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12 LoraPoint, MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue,
13 RelationshipId, RelationshipRecord,
14};
15
16use super::constraint_catalog::{
17 ConstraintCatalog, ConstraintRequest, CreateConstraintError, CreateConstraintOutcome,
18 DropConstraintError, DropConstraintOutcome,
19};
20use super::entity_index_store::IndexBundle;
21use super::fulltext_index::FulltextRegistry;
22use super::hnsw::HnswParams;
23use super::index_catalog::{
24 CreateIndexError, CreateIndexOutcome, DropIndexError, DropIndexOutcome, IndexCatalog,
25 IndexDefinition, IndexRequest, StoredIndexEntity, StoredIndexKind, StoredIndexState,
26};
27use super::point_index::PointRegistry;
28#[cfg(test)]
29use super::property_index::PropertyIndexState;
30use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
31use super::secondary_index_maintenance::SecondaryIndexMutation;
32use super::sorted_property_index::SortedPropertyIndex;
33use super::stats::GraphStats;
34use super::text_index::TrigramRegistry;
35use super::vector_index::{VectorIndexProvider, VectorIndexRegistry, VectorSimilarity};
36
37#[derive(Default)]
38pub struct InMemoryGraph {
39 pub(super) next_node_id: NodeId,
40 pub(super) next_rel_id: RelationshipId,
41
42 pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
58 pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
59 pub(super) live_node_count: usize,
63 pub(super) live_rel_count: usize,
64
65 pub(super) outgoing: Vec<Vec<RelationshipId>>,
71 pub(super) incoming: Vec<Vec<RelationshipId>>,
72
73 pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
80 pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
81
82 pub(super) indexes: IndexBundle,
90
91 pub(super) constraint_catalog: RwLock<ConstraintCatalog>,
97 pub(super) active_constraints: AtomicUsize,
101
102 pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
107}
108
109impl std::fmt::Debug for InMemoryGraph {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("InMemoryGraph")
112 .field("next_node_id", &self.next_node_id)
113 .field("next_rel_id", &self.next_rel_id)
114 .field("nodes", &self.nodes)
115 .field("relationships", &self.relationships)
116 .field("outgoing", &self.outgoing)
117 .field("incoming", &self.incoming)
118 .field("nodes_by_label", &self.nodes_by_label)
119 .field("relationships_by_type", &self.relationships_by_type)
120 .field("indexes", &self.indexes)
121 .field(
122 "active_node_property_indexes",
123 &self.active_node_property_index_count(),
124 )
125 .field(
126 "active_relationship_property_indexes",
127 &self.active_relationship_property_index_count(),
128 )
129 .field(
130 "index_catalog_entries",
131 &self
132 .indexes
133 .catalog
134 .read()
135 .map(|c| c.list().len())
136 .unwrap_or(0),
137 )
138 .field("active_constraints", &self.active_constraint_count())
139 .field(
140 "active_fulltext_indexes",
141 &self.active_fulltext_index_count(),
142 )
143 .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
144 .finish()
145 }
146}
147
148impl Clone for InMemoryGraph {
149 fn clone(&self) -> Self {
150 Self {
153 next_node_id: self.next_node_id,
154 next_rel_id: self.next_rel_id,
155 nodes: self.nodes.clone(),
156 relationships: self.relationships.clone(),
157 live_node_count: self.live_node_count,
158 live_rel_count: self.live_rel_count,
159 outgoing: self.outgoing.clone(),
160 incoming: self.incoming.clone(),
161 nodes_by_label: self.nodes_by_label.clone(),
162 relationships_by_type: self.relationships_by_type.clone(),
163 indexes: self.indexes.clone(),
169 constraint_catalog: RwLock::new(self.constraint_catalog_read().clone()),
170 active_constraints: AtomicUsize::new(self.active_constraint_count()),
171 recorder: None,
172 }
173 }
174}
175
176impl InMemoryGraph {
177 pub fn new() -> Self {
178 Self::default()
179 }
180
181 pub fn with_capacity_hint(nodes: usize, relationships: usize) -> Self {
182 Self {
183 nodes: Vec::with_capacity(nodes),
184 relationships: Vec::with_capacity(relationships),
185 outgoing: Vec::with_capacity(nodes),
186 incoming: Vec::with_capacity(nodes),
187 ..Self::default()
188 }
189 }
190
191 pub fn contains_node(&self, node_id: NodeId) -> bool {
192 self.node_at(node_id).is_some()
193 }
194
195 pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
196 self.rel_at(rel_id).is_some()
197 }
198
199 pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
203 self.recorder = recorder;
204 }
205
206 pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
208 self.recorder.as_ref()
209 }
210
211 #[inline]
216 pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
217 if let Some(rec) = &self.recorder {
218 rec.record(build());
219 }
220 }
221
222 fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
223 let next = id
224 .checked_add(1)
225 .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
226 self.next_node_id = self.next_node_id.max(next);
227 Ok(())
228 }
229
230 fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
231 let next = id
232 .checked_add(1)
233 .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
234 self.next_rel_id = self.next_rel_id.max(next);
235 Ok(())
236 }
237
238 pub(super) fn reserve_next_node_slot(&mut self) -> (NodeId, usize) {
239 let id = self.next_node_id;
240 let idx = self
241 .ensure_node_slot_checked(id)
242 .expect("next node id should fit in memory-backed slab");
243 self.bump_next_node_id_past(id)
244 .expect("next node id should leave a valid successor");
245 (id, idx)
246 }
247
248 pub(super) fn try_reserve_next_rel_slot(&mut self) -> Option<(RelationshipId, usize)> {
249 let id = self.next_rel_id;
250 let idx = self.ensure_rel_slot_checked(id).ok()?;
251 self.bump_next_rel_id_past(id).ok()?;
252 Some((id, idx))
253 }
254
255 #[inline]
261 pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
262 self.nodes
263 .get(Self::slot_index(id)?)
264 .and_then(|s| s.as_ref())
265 .map(|arc| arc.as_ref())
266 }
267
268 #[inline]
273 pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
274 self.nodes
275 .get_mut(Self::slot_index(id)?)
276 .and_then(|s| s.as_mut())
277 .map(Arc::make_mut)
278 }
279
280 #[inline]
281 pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
282 self.relationships
283 .get(Self::slot_index(id)?)
284 .and_then(|s| s.as_ref())
285 .map(|arc| arc.as_ref())
286 }
287
288 #[inline]
289 pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
290 self.relationships
291 .get_mut(Self::slot_index(id)?)
292 .and_then(|s| s.as_mut())
293 .map(Arc::make_mut)
294 }
295
296 fn slot_len_for_id(id: u64, kind: &str) -> Result<usize, String> {
300 let idx = usize::try_from(id)
301 .map_err(|_| format!("{kind} id {id} does not fit in usize on this platform"))?;
302 idx.checked_add(1)
303 .ok_or_else(|| format!("{kind} id {id} leaves no valid slab slot"))
304 }
305
306 #[inline]
307 fn slot_index(id: u64) -> Option<usize> {
308 usize::try_from(id).ok()
309 }
310
311 fn ensure_node_slot_checked(&mut self, id: NodeId) -> Result<usize, String> {
312 let target = Self::slot_len_for_id(id, "node")?;
313 if self.nodes.len() < target {
314 let additional = target - self.nodes.len();
315 self.nodes.try_reserve_exact(additional).map_err(|e| {
316 format!("node id {id} requires {target} slots, but allocation failed: {e}")
317 })?;
318 self.outgoing.try_reserve_exact(additional).map_err(|e| {
319 format!(
320 "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
321 )
322 })?;
323 self.incoming.try_reserve_exact(additional).map_err(|e| {
324 format!(
325 "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
326 )
327 })?;
328 self.nodes.resize_with(target, || None);
329 self.outgoing.resize_with(target, Vec::new);
330 self.incoming.resize_with(target, Vec::new);
331 }
332 Ok(target - 1)
333 }
334
335 fn ensure_rel_slot_checked(&mut self, id: RelationshipId) -> Result<usize, String> {
336 let target = Self::slot_len_for_id(id, "relationship")?;
337 if self.relationships.len() < target {
338 self.relationships
339 .try_reserve_exact(target - self.relationships.len())
340 .map_err(|e| {
341 format!(
342 "relationship id {id} requires {target} slots, but allocation failed: {e}"
343 )
344 })?;
345 self.relationships.resize_with(target, || None);
346 }
347 Ok(target - 1)
348 }
349
350 fn ensure_node_slot(&mut self, id: NodeId) -> usize {
351 self.ensure_node_slot_checked(id)
352 .expect("node id should fit in memory-backed slab")
353 }
354
355 pub(super) fn put_node_checked(&mut self, id: NodeId, node: NodeRecord) -> Result<(), String> {
356 let idx = self.ensure_node_slot_checked(id)?;
357 self.put_node_at_slot(idx, node);
358 Ok(())
359 }
360
361 pub(super) fn put_rel_checked(
362 &mut self,
363 id: RelationshipId,
364 rel: RelationshipRecord,
365 ) -> Result<(), String> {
366 let idx = self.ensure_rel_slot_checked(id)?;
367 self.put_rel_at_slot(idx, rel);
368 Ok(())
369 }
370
371 pub(super) fn put_node_at_slot(&mut self, idx: usize, node: NodeRecord) {
372 let was_present = self.nodes[idx].is_some();
373 self.nodes[idx] = Some(Arc::new(node));
374 if !was_present {
375 self.live_node_count += 1;
376 }
377 }
378
379 pub(super) fn put_rel_at_slot(&mut self, idx: usize, rel: RelationshipRecord) {
380 let was_present = self.relationships[idx].is_some();
381 self.relationships[idx] = Some(Arc::new(rel));
382 if !was_present {
383 self.live_rel_count += 1;
384 }
385 }
386
387 pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
388 let idx = Self::slot_index(id)?;
389 let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
390 if removed.is_some() {
391 self.live_node_count -= 1;
392 if let Some(out) = self.outgoing.get_mut(idx) {
398 out.clear();
399 }
400 if let Some(inc) = self.incoming.get_mut(idx) {
401 inc.clear();
402 }
403 }
404 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
409 }
410
411 pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
412 let idx = Self::slot_index(id)?;
413 let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
414 if removed.is_some() {
415 self.live_rel_count -= 1;
416 }
417 removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
418 }
419
420 #[inline]
421 pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
422 self.outgoing.get(Self::slot_index(id)?).map(Vec::as_slice)
423 }
424
425 #[inline]
426 pub(super) fn incoming_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
427 self.incoming.get(Self::slot_index(id)?).map(Vec::as_slice)
428 }
429
430 #[inline]
431 fn try_for_each_adjacent_slice<F, E>(
432 &self,
433 node_id: NodeId,
434 types: &[String],
435 adj: &[RelationshipId],
436 skip_self_loops: bool,
437 visit: &mut F,
438 ) -> Result<(), E>
439 where
440 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
441 {
442 let single_type = match types {
443 [single] => Some(single.as_str()),
444 _ => None,
445 };
446 let has_type_filter = !types.is_empty();
447
448 for &rel_id in adj {
449 let Some(rel) = self.rel_at(rel_id) else {
450 continue;
451 };
452 if skip_self_loops && rel.src == node_id && rel.dst == node_id {
453 continue;
454 }
455 if let Some(single) = single_type {
456 if rel.rel_type != single {
457 continue;
458 }
459 } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
460 continue;
461 }
462 let Some(other_id) = Self::other_endpoint(rel, node_id) else {
463 continue;
464 };
465 visit(rel_id, other_id)?;
466 }
467 Ok(())
468 }
469
470 #[inline]
471 pub(super) fn try_for_each_adjacent_id_unchecked<F, E>(
472 &self,
473 node_id: NodeId,
474 direction: Direction,
475 types: &[String],
476 mut visit: F,
477 ) -> Result<(), E>
478 where
479 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
480 {
481 match direction {
482 Direction::Right => {
483 if let Some(adj) = self.outgoing_at(node_id) {
484 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
485 }
486 }
487 Direction::Left => {
488 if let Some(adj) = self.incoming_at(node_id) {
489 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
490 }
491 }
492 Direction::Undirected => {
493 if let Some(adj) = self.outgoing_at(node_id) {
494 self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
495 }
496 if let Some(adj) = self.incoming_at(node_id) {
497 self.try_for_each_adjacent_slice(node_id, types, adj, true, &mut visit)?;
498 }
499 }
500 }
501
502 Ok(())
503 }
504
505 #[inline]
506 pub(super) fn try_for_each_adjacent_id<F, E>(
507 &self,
508 node_id: NodeId,
509 direction: Direction,
510 types: &[String],
511 visit: F,
512 ) -> Result<(), E>
513 where
514 F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
515 {
516 if self.node_at(node_id).is_none() {
517 return Ok(());
518 }
519 self.try_for_each_adjacent_id_unchecked(node_id, direction, types, visit)
520 }
521
522 pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
523 self.nodes
524 .iter()
525 .enumerate()
526 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
527 }
528
529 pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
530 self.nodes
531 .iter()
532 .filter_map(|s| s.as_ref())
533 .map(|arc| arc.as_ref())
534 }
535
536 pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
537 self.relationships
538 .iter()
539 .enumerate()
540 .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
541 }
542
543 pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
544 self.relationships
545 .iter()
546 .filter_map(|s| s.as_ref())
547 .map(|arc| arc.as_ref())
548 }
549
550 pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
551 self.nodes
552 .iter()
553 .enumerate()
554 .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
555 }
556
557 pub(super) fn iter_rels(
558 &self,
559 ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
560 self.relationships
561 .iter()
562 .enumerate()
563 .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
564 }
565
566 fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
570 let idx = self.ensure_node_slot(node_id);
571 self.outgoing[idx].push(rel_id);
572 }
573
574 fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
575 let idx = self.ensure_node_slot(node_id);
576 self.incoming[idx].push(rel_id);
577 }
578
579 fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
582 if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.outgoing.get_mut(idx)) {
583 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
584 v.swap_remove(pos);
585 }
586 }
587 }
588
589 fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
590 if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.incoming.get_mut(idx)) {
591 if let Some(pos) = v.iter().position(|&id| id == rel_id) {
592 v.swap_remove(pos);
593 }
594 }
595 }
596
597 pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
598 let mut seen = BTreeSet::new();
599
600 labels
601 .into_iter()
602 .map(|s| s.trim().to_string())
603 .filter(|s| !s.is_empty())
604 .filter(|s| seen.insert(s.clone()))
605 .collect()
606 }
607
608 pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
609 if let Some(bucket) = self.nodes_by_label.get_mut(label) {
614 bucket.push(node_id);
615 } else {
616 self.nodes_by_label.insert(label.to_string(), vec![node_id]);
617 }
618 }
619
620 fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
621 if let Some(ids) = self.nodes_by_label.get_mut(label) {
622 if let Some(pos) = ids.iter().position(|&id| id == node_id) {
623 ids.swap_remove(pos);
624 }
625 if ids.is_empty() {
626 self.nodes_by_label.remove(label);
627 }
628 }
629 }
630
631 fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
632 if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
634 bucket.push(rel_id);
635 } else {
636 self.relationships_by_type
637 .insert(rel_type.to_string(), vec![rel_id]);
638 }
639 }
640
641 fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
642 if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
643 if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
644 ids.swap_remove(pos);
645 }
646 if ids.is_empty() {
647 self.relationships_by_type.remove(rel_type);
648 }
649 }
650 }
651
652 pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
653 self.indexes
654 .properties
655 .read()
656 .unwrap_or_else(|poisoned| poisoned.into_inner())
657 }
658
659 pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
660 self.indexes
661 .properties
662 .write()
663 .unwrap_or_else(|poisoned| poisoned.into_inner())
664 }
665
666 pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
667 self.indexes
668 .properties
669 .get_mut()
670 .unwrap_or_else(|poisoned| poisoned.into_inner())
671 }
672
673 #[inline]
674 pub(super) fn active_node_property_index_count(&self) -> usize {
675 self.indexes
676 .active_node_property_indexes
677 .load(Ordering::Relaxed)
678 }
679
680 #[inline]
681 pub(super) fn active_relationship_property_index_count(&self) -> usize {
682 self.indexes
683 .active_relationship_property_indexes
684 .load(Ordering::Relaxed)
685 }
686
687 #[inline]
688 pub(super) fn active_constraint_count(&self) -> usize {
689 self.active_constraints.load(Ordering::Relaxed)
690 }
691
692 #[inline]
693 pub(super) fn has_active_constraints(&self) -> bool {
694 self.active_constraint_count() != 0
695 }
696
697 #[inline]
698 pub(super) fn active_fulltext_index_count(&self) -> usize {
699 self.indexes.active_fulltext_indexes.load(Ordering::Relaxed)
700 }
701
702 #[inline]
703 pub(super) fn has_active_fulltext_indexes(&self) -> bool {
704 self.active_fulltext_index_count() != 0
705 }
706
707 pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
708 self.active_node_property_index_count() != 0
709 && self.indexes_mut().node_properties.is_active(key)
710 }
711
712 pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
713 self.active_relationship_property_index_count() != 0
714 && self.indexes_mut().relationship_properties.is_active(key)
715 }
716
717 pub(super) fn ensure_node_property_index(&self, key: &str) {
718 {
719 let indexes = self.indexes_read();
720 if indexes.node_properties.is_active(key) {
721 return;
722 }
723 }
724
725 let mut indexes = self.indexes_write();
726 if indexes.node_properties.is_active(key) {
727 return;
728 }
729
730 for (id, node) in self.iter_nodes() {
731 if let Some(value) = node.properties.get(key) {
732 indexes.node_properties.insert_with_scopes(
733 id,
734 node.labels.iter().map(String::as_str),
735 key,
736 value,
737 );
738 }
739 }
740 if indexes.node_properties.activate(key) {
741 self.indexes
742 .active_node_property_indexes
743 .fetch_add(1, Ordering::Relaxed);
744 }
745 }
746
747 pub(super) fn ensure_relationship_property_index(&self, key: &str) {
748 {
749 let indexes = self.indexes_read();
750 if indexes.relationship_properties.is_active(key) {
751 return;
752 }
753 }
754
755 let mut indexes = self.indexes_write();
756 if indexes.relationship_properties.is_active(key) {
757 return;
758 }
759
760 for (id, rel) in self.iter_rels() {
761 if let Some(value) = rel.properties.get(key) {
762 indexes.relationship_properties.insert_with_scopes(
763 id,
764 [rel.rel_type.as_str()],
765 key,
766 value,
767 );
768 }
769 }
770 if indexes.relationship_properties.activate(key) {
771 self.indexes
772 .active_relationship_property_indexes
773 .fetch_add(1, Ordering::Relaxed);
774 }
775 }
776
777 pub(super) fn index_catalog_read(&self) -> std::sync::RwLockReadGuard<'_, IndexCatalog> {
778 self.indexes
779 .catalog
780 .read()
781 .unwrap_or_else(|poisoned| poisoned.into_inner())
782 }
783
784 pub(super) fn index_catalog_write(&self) -> RwLockWriteGuard<'_, IndexCatalog> {
785 self.indexes
786 .catalog
787 .write()
788 .unwrap_or_else(|poisoned| poisoned.into_inner())
789 }
790
791 pub(super) fn constraint_catalog_read(
792 &self,
793 ) -> std::sync::RwLockReadGuard<'_, ConstraintCatalog> {
794 self.constraint_catalog
795 .read()
796 .unwrap_or_else(|poisoned| poisoned.into_inner())
797 }
798
799 pub(super) fn constraint_catalog_write(&self) -> RwLockWriteGuard<'_, ConstraintCatalog> {
800 self.constraint_catalog
801 .write()
802 .unwrap_or_else(|poisoned| poisoned.into_inner())
803 }
804
805 #[allow(clippy::result_large_err)]
813 pub(super) fn register_index(
814 &self,
815 request: IndexRequest,
816 if_not_exists: bool,
817 ) -> Result<CreateIndexOutcome, CreateIndexError> {
818 self.register_index_with_recording(request, if_not_exists, true)
819 }
820
821 #[allow(clippy::result_large_err)]
822 fn register_index_with_recording(
823 &self,
824 request: IndexRequest,
825 if_not_exists: bool,
826 record_event: bool,
827 ) -> Result<CreateIndexOutcome, CreateIndexError> {
828 let request_for_event = record_event.then(|| request.clone());
829 let outcome = {
830 let mut catalog = self.index_catalog_write();
831 catalog.try_create(request, if_not_exists)?
832 };
833
834 if let CreateIndexOutcome::Created(def) = &outcome {
835 self.populate_index_data(def);
836 }
837
838 if matches!(outcome, CreateIndexOutcome::Created(_)) {
842 if let Some(request_for_event) = request_for_event {
843 self.emit(|| crate::MutationEvent::CreateIndex {
844 request: request_for_event,
845 if_not_exists,
846 });
847 }
848 }
849
850 Ok(outcome)
851 }
852
853 #[doc(hidden)]
857 pub fn replay_create_index(
858 &mut self,
859 request: IndexRequest,
860 if_not_exists: bool,
861 ) -> Result<(), String> {
862 if self.recorder.is_some() {
863 return Err("cannot replay create_index while a mutation recorder is installed".into());
864 }
865 self.register_index(request, if_not_exists)
866 .map(|_| ())
867 .map_err(|e| e.to_string())
868 }
869
870 #[doc(hidden)]
872 pub fn replay_drop_index(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
873 if self.recorder.is_some() {
874 return Err("cannot replay drop_index while a mutation recorder is installed".into());
875 }
876 self.drop_named_index(name, if_exists)
877 .map(|_| ())
878 .map_err(|e| e.to_string())
879 }
880
881 pub(super) fn register_constraint(
887 &self,
888 request: ConstraintRequest,
889 if_not_exists: bool,
890 ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
891 {
896 let constraint_catalog = self.constraint_catalog_read();
897 if let Some(existing) = constraint_catalog.find_equivalent(&request) {
898 let cloned = existing.clone();
899 drop(constraint_catalog);
900 if if_not_exists {
901 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
902 }
903 return Err(CreateConstraintError::EquivalentConstraintExists(
904 cloned.name,
905 ));
906 }
907 if let Some(existing) = constraint_catalog.get(&request.name) {
908 let cloned = existing.clone();
909 drop(constraint_catalog);
910 if if_not_exists {
911 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
912 }
913 return Err(CreateConstraintError::DuplicateName(cloned.name));
914 }
915 if let Some(existing) = constraint_catalog.find_same_schema(&request) {
916 let cloned = existing.clone();
917 drop(constraint_catalog);
918 if super::constraint_catalog::kinds_conflict_for_validation(
919 &cloned.kind,
920 &request.kind,
921 ) {
922 if if_not_exists {
923 return Ok(CreateConstraintOutcome::NoOpExists(cloned));
924 }
925 return Err(CreateConstraintError::ConflictingConstraint(cloned.name));
926 }
927 }
928 }
929
930 if request.kind.requires_backing_index() {
935 let idx_catalog = self.index_catalog_read();
936 if idx_catalog.get(&request.name).is_some() {
937 return Err(CreateConstraintError::DuplicateIndexName(
938 request.name.clone(),
939 ));
940 }
941 let conflict = idx_catalog.list().into_iter().find(|def| {
942 def.kind == StoredIndexKind::Range
943 && def.entity == request.entity
944 && def.label.as_deref() == Some(request.label.as_str())
945 && def.properties == request.properties
946 && def.name != request.name
947 });
948 drop(idx_catalog);
949 if let Some(def) = conflict {
950 return Err(CreateConstraintError::BackingIndexConflict(format!(
951 "(:{} {{{}}}) already covered by index `{}`",
952 request.label,
953 request.properties.join(", "),
954 def.name,
955 )));
956 }
957 }
958
959 let owns_backing = request.kind.requires_backing_index();
960 let request_for_event = request.clone();
961 let outcome = {
962 let mut catalog = self.constraint_catalog_write();
963 catalog.try_create(request, if_not_exists)?
964 };
965
966 if let CreateConstraintOutcome::Created(def) = &outcome {
967 if let Err(violation) = self.validate_existing_data_for_constraint(def) {
972 let mut catalog = self.constraint_catalog_write();
973 let _ = catalog.try_drop(&def.name, true);
974 return Err(CreateConstraintError::DataViolation(violation.to_string()));
975 }
976 }
977
978 if let CreateConstraintOutcome::Created(def) = &outcome {
979 if owns_backing {
980 let idx_request = IndexRequest {
984 explicit_name: Some(def.name.clone()),
985 kind: StoredIndexKind::Range,
986 entity: def.entity,
987 label: Some(def.label.clone()),
988 additional_labels: Vec::new(),
989 properties: def.properties.clone(),
990 options: Default::default(),
991 };
992 if let Err(err) = self.register_index_with_recording(idx_request, true, false) {
996 let mut catalog = self.constraint_catalog_write();
997 let _ = catalog.try_drop(&def.name, true);
998 return Err(CreateConstraintError::BackingIndexConflict(err.to_string()));
999 }
1000 }
1001 self.emit(|| crate::MutationEvent::CreateConstraint {
1002 request: request_for_event,
1003 if_not_exists,
1004 });
1005 self.active_constraints.fetch_add(1, Ordering::Relaxed);
1006 }
1007
1008 Ok(outcome)
1009 }
1010
1011 #[doc(hidden)]
1013 pub fn replay_create_constraint(
1014 &mut self,
1015 request: ConstraintRequest,
1016 if_not_exists: bool,
1017 ) -> Result<(), String> {
1018 if self.recorder.is_some() {
1019 return Err(
1020 "cannot replay create_constraint while a mutation recorder is installed".into(),
1021 );
1022 }
1023 self.register_constraint(request, if_not_exists)
1024 .map(|_| ())
1025 .map_err(|e| e.to_string())
1026 }
1027
1028 #[doc(hidden)]
1030 pub fn replay_drop_constraint(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
1031 if self.recorder.is_some() {
1032 return Err(
1033 "cannot replay drop_constraint while a mutation recorder is installed".into(),
1034 );
1035 }
1036 self.drop_named_constraint(name, if_exists)
1037 .map(|_| ())
1038 .map_err(|e| e.to_string())
1039 }
1040
1041 pub(super) fn drop_named_constraint(
1044 &self,
1045 name: &str,
1046 if_exists: bool,
1047 ) -> Result<DropConstraintOutcome, DropConstraintError> {
1048 let outcome = {
1049 let mut catalog = self.constraint_catalog_write();
1050 catalog.try_drop(name, if_exists)?
1051 };
1052 if let DropConstraintOutcome::Dropped(def) = &outcome {
1053 if let Some(index_name) = def.owned_index.as_deref() {
1054 let _ = self.drop_named_index_inner(index_name, true, false);
1057 }
1058 self.active_constraints.fetch_sub(1, Ordering::Relaxed);
1059 self.emit(|| crate::MutationEvent::DropConstraint {
1060 name: name.to_string(),
1061 if_exists,
1062 });
1063 }
1064 Ok(outcome)
1065 }
1066
1067 pub(super) fn drop_named_index(
1072 &self,
1073 name: &str,
1074 if_exists: bool,
1075 ) -> Result<DropIndexOutcome, DropIndexError> {
1076 self.drop_named_index_inner(name, if_exists, true)
1077 }
1078
1079 fn drop_named_index_inner(
1080 &self,
1081 name: &str,
1082 if_exists: bool,
1083 emit_event: bool,
1084 ) -> Result<DropIndexOutcome, DropIndexError> {
1085 if let Some(owner) = self
1086 .constraint_catalog_read()
1087 .constraint_owning_index(name)
1088 .cloned()
1089 {
1090 return Err(DropIndexError::ConstraintOwned {
1091 index: name.to_string(),
1092 constraint: owner.name,
1093 });
1094 }
1095
1096 let outcome = {
1097 let mut catalog = self.index_catalog_write();
1098 catalog.try_drop(name, if_exists)?
1099 };
1100 if let DropIndexOutcome::Dropped(def) = &outcome {
1101 match def.kind {
1103 StoredIndexKind::Text => {
1104 if let Some(label) = def.label.as_deref() {
1105 for prop in &def.properties {
1106 self.deactivate_text_scope(def.entity, label, prop);
1107 }
1108 }
1109 }
1110 StoredIndexKind::Range => {
1111 if let Some(label) = def.label.as_deref() {
1112 for prop in &def.properties {
1113 self.deactivate_sorted_scope(def.entity, label, prop);
1114 }
1115 }
1116 }
1117 StoredIndexKind::Point => {
1118 if let Some(label) = def.label.as_deref() {
1119 for prop in &def.properties {
1120 self.deactivate_point_scope(def.entity, label, prop);
1121 }
1122 }
1123 }
1124 StoredIndexKind::Lookup => {
1125 }
1128 StoredIndexKind::Vector => {
1129 self.deactivate_vector_index(def.entity, &def.name);
1130 }
1131 StoredIndexKind::Fulltext => {
1132 self.deactivate_fulltext_index(def.entity, &def.name);
1133 }
1134 }
1135 if emit_event {
1136 self.emit(|| crate::MutationEvent::DropIndex {
1137 name: name.to_string(),
1138 if_exists,
1139 });
1140 }
1141 }
1142 Ok(outcome)
1143 }
1144
1145 fn populate_index_data(&self, def: &IndexDefinition) {
1146 match def.kind {
1154 StoredIndexKind::Range => {
1155 for key in &def.properties {
1156 match def.entity {
1157 StoredIndexEntity::Node => self.ensure_node_property_index(key),
1158 StoredIndexEntity::Relationship => {
1159 self.ensure_relationship_property_index(key)
1160 }
1161 }
1162 if let Some(label) = def.label.as_deref() {
1163 self.activate_sorted_scope(def.entity, label, key);
1164 }
1165 }
1166 }
1167 StoredIndexKind::Text => {
1168 let label = match def.label.as_deref() {
1169 Some(l) => l,
1170 None => return,
1171 };
1172 for property in &def.properties {
1173 self.activate_text_scope(def.entity, label, property);
1174 }
1175 }
1176 StoredIndexKind::Point => {
1177 let label = match def.label.as_deref() {
1178 Some(l) => l,
1179 None => return,
1180 };
1181 let cell_size = PointRegistry::cell_size_from_options(&def.options);
1182 for property in &def.properties {
1183 self.activate_point_scope(def.entity, label, property, cell_size);
1184 }
1185 }
1186 StoredIndexKind::Fulltext => {
1187 let labels: Vec<String> = def.all_labels().map(String::from).collect();
1188 if labels.is_empty() {
1189 return;
1190 }
1191 self.activate_fulltext_index(def.entity, &def.name, &labels, &def.properties);
1192 }
1193 StoredIndexKind::Vector => {
1194 let label = match def.label.as_deref() {
1195 Some(l) => l,
1196 None => return,
1197 };
1198 let property = match def.properties.first() {
1199 Some(p) => p.as_str(),
1200 None => return,
1201 };
1202 let similarity = VectorSimilarity::from_options(&def.options)
1203 .unwrap_or(VectorSimilarity::Cosine);
1204 let provider = VectorIndexProvider::from_options(&def.options)
1205 .unwrap_or(VectorIndexProvider::Flat);
1206 let hnsw = HnswParams::from_options(&def.options);
1207 let lazy = matches!(
1208 def.options.get("vector.populate.async"),
1209 Some(super::index_catalog::IndexConfigValue::Bool(true))
1210 );
1211 self.activate_vector_index(
1212 def.entity, &def.name, label, property, similarity, provider, hnsw, lazy,
1213 );
1214 if lazy {
1215 self.index_catalog_write()
1216 .set_state(&def.name, StoredIndexState::Populating);
1217 }
1218 }
1219 StoredIndexKind::Lookup => {}
1221 }
1222 }
1223
1224 pub(super) fn text_indexes_read(
1225 &self,
1226 entity: StoredIndexEntity,
1227 ) -> std::sync::RwLockReadGuard<'_, TrigramRegistry> {
1228 self.indexes.text.read(entity)
1229 }
1230
1231 pub(super) fn text_indexes_write(
1232 &self,
1233 entity: StoredIndexEntity,
1234 ) -> RwLockWriteGuard<'_, TrigramRegistry> {
1235 self.indexes.text.write(entity)
1236 }
1237
1238 pub(super) fn fulltext_indexes_read(
1239 &self,
1240 entity: StoredIndexEntity,
1241 ) -> std::sync::RwLockReadGuard<'_, FulltextRegistry> {
1242 self.indexes.fulltext.read(entity)
1243 }
1244
1245 #[allow(dead_code)]
1246 pub(super) fn fulltext_indexes_write(
1247 &self,
1248 entity: StoredIndexEntity,
1249 ) -> RwLockWriteGuard<'_, FulltextRegistry> {
1250 self.indexes.fulltext.write(entity)
1251 }
1252
1253 fn activate_text_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1254 if !self.text_indexes_write(entity).add_scope(label, property) {
1255 return;
1256 }
1257
1258 let backfill: Vec<(u64, String)> = match entity {
1259 StoredIndexEntity::Node => self
1260 .iter_nodes()
1261 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1262 .filter_map(|(id, node)| match node.properties.get(property) {
1263 Some(PropertyValue::String(value)) => Some((id, value.clone())),
1264 _ => None,
1265 })
1266 .collect(),
1267 StoredIndexEntity::Relationship => self
1268 .iter_rels()
1269 .filter(|(_, rel)| rel.rel_type == label)
1270 .filter_map(|(id, rel)| match rel.properties.get(property) {
1271 Some(PropertyValue::String(value)) => Some((id, value.clone())),
1272 _ => None,
1273 })
1274 .collect(),
1275 };
1276
1277 let mut registry = self.text_indexes_write(entity);
1278 for (id, value) in backfill {
1279 registry.insert(label, property, id, &value);
1280 }
1281 }
1282
1283 pub(super) fn deactivate_text_scope(
1285 &self,
1286 entity: StoredIndexEntity,
1287 label: &str,
1288 property: &str,
1289 ) {
1290 self.text_indexes_write(entity)
1291 .remove_scope(label, property);
1292 }
1293
1294 fn activate_fulltext_index(
1295 &self,
1296 entity: StoredIndexEntity,
1297 name: &str,
1298 labels: &[String],
1299 properties: &[String],
1300 ) {
1301 use super::fulltext_index::{term_counts_for_properties, TermCounts};
1302
1303 {
1304 let mut registry = self.fulltext_indexes_write(entity);
1305 registry.register(name.to_string(), labels.to_vec(), properties.to_vec());
1306 }
1307 self.indexes
1308 .active_fulltext_indexes
1309 .fetch_add(1, Ordering::Relaxed);
1310
1311 let backfill: Vec<(u64, TermCounts)> = match entity {
1314 StoredIndexEntity::Node => self
1315 .iter_nodes()
1316 .filter(|(_, node)| {
1317 labels
1318 .iter()
1319 .any(|wanted| node.labels.iter().any(|l| l == wanted))
1320 })
1321 .map(|(id, node)| {
1322 let counts = term_counts_for_properties(&node.properties, properties);
1323 (id, counts)
1324 })
1325 .filter(|(_, c)| !c.is_empty())
1326 .collect(),
1327 StoredIndexEntity::Relationship => self
1328 .iter_rels()
1329 .filter(|(_, rel)| labels.iter().any(|wanted| wanted == &rel.rel_type))
1330 .map(|(id, rel)| {
1331 let counts = term_counts_for_properties(&rel.properties, properties);
1332 (id, counts)
1333 })
1334 .filter(|(_, c)| !c.is_empty())
1335 .collect(),
1336 };
1337
1338 let mut registry = self.fulltext_indexes_write(entity);
1339 if let Some(index) = registry.get_mut(name) {
1340 for (id, counts) in backfill {
1341 index.reindex_entity(id, counts);
1342 }
1343 }
1344 }
1345
1346 pub(super) fn deactivate_fulltext_index(&self, entity: StoredIndexEntity, name: &str) {
1347 self.fulltext_indexes_write(entity).deregister(name);
1348 self.indexes
1349 .active_fulltext_indexes
1350 .fetch_sub(1, Ordering::Relaxed);
1351 }
1352
1353 pub(super) fn sorted_indexes_read(
1354 &self,
1355 entity: StoredIndexEntity,
1356 ) -> std::sync::RwLockReadGuard<'_, SortedPropertyIndex> {
1357 self.indexes.sorted.read(entity)
1358 }
1359
1360 pub(super) fn sorted_indexes_write(
1361 &self,
1362 entity: StoredIndexEntity,
1363 ) -> RwLockWriteGuard<'_, SortedPropertyIndex> {
1364 self.indexes.sorted.write(entity)
1365 }
1366
1367 fn activate_sorted_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1368 if !self.sorted_indexes_write(entity).add_scope(label, property) {
1369 return;
1370 }
1371
1372 let backfill: Vec<(u64, PropertyValue)> = match entity {
1373 StoredIndexEntity::Node => self
1374 .iter_nodes()
1375 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1376 .filter_map(|(id, node)| {
1377 node.properties
1378 .get(property)
1379 .map(|value| (id, value.clone()))
1380 })
1381 .collect(),
1382 StoredIndexEntity::Relationship => self
1383 .iter_rels()
1384 .filter(|(_, rel)| rel.rel_type == label)
1385 .filter_map(|(id, rel)| {
1386 rel.properties
1387 .get(property)
1388 .map(|value| (id, value.clone()))
1389 })
1390 .collect(),
1391 };
1392
1393 let mut registry = self.sorted_indexes_write(entity);
1394 for (id, value) in backfill {
1395 registry.insert(label, property, id, &value);
1396 }
1397 }
1398
1399 pub(super) fn deactivate_sorted_scope(
1400 &self,
1401 entity: StoredIndexEntity,
1402 label: &str,
1403 property: &str,
1404 ) {
1405 self.sorted_indexes_write(entity)
1406 .remove_scope(label, property);
1407 }
1408
1409 pub(super) fn point_indexes_read(
1410 &self,
1411 entity: StoredIndexEntity,
1412 ) -> std::sync::RwLockReadGuard<'_, PointRegistry> {
1413 self.indexes.point.read(entity)
1414 }
1415
1416 pub(super) fn point_indexes_write(
1417 &self,
1418 entity: StoredIndexEntity,
1419 ) -> RwLockWriteGuard<'_, PointRegistry> {
1420 self.indexes.point.write(entity)
1421 }
1422
1423 fn activate_point_scope(
1424 &self,
1425 entity: StoredIndexEntity,
1426 label: &str,
1427 property: &str,
1428 cell_size: Option<f64>,
1429 ) {
1430 if !self
1431 .point_indexes_write(entity)
1432 .add_scope(label, property, cell_size)
1433 {
1434 return;
1435 }
1436
1437 let backfill: Vec<(u64, LoraPoint)> = match entity {
1438 StoredIndexEntity::Node => self
1439 .iter_nodes()
1440 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1441 .filter_map(|(id, node)| match node.properties.get(property) {
1442 Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1443 _ => None,
1444 })
1445 .collect(),
1446 StoredIndexEntity::Relationship => self
1447 .iter_rels()
1448 .filter(|(_, rel)| rel.rel_type == label)
1449 .filter_map(|(id, rel)| match rel.properties.get(property) {
1450 Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1451 _ => None,
1452 })
1453 .collect(),
1454 };
1455
1456 let mut registry = self.point_indexes_write(entity);
1457 for (id, point) in backfill {
1458 registry.insert(label, property, id, point);
1459 }
1460 }
1461
1462 pub(super) fn deactivate_point_scope(
1463 &self,
1464 entity: StoredIndexEntity,
1465 label: &str,
1466 property: &str,
1467 ) {
1468 self.point_indexes_write(entity)
1469 .remove_scope(label, property);
1470 }
1471
1472 pub(super) fn vector_indexes_read(
1473 &self,
1474 entity: StoredIndexEntity,
1475 ) -> std::sync::RwLockReadGuard<'_, VectorIndexRegistry> {
1476 self.indexes.vector.read(entity)
1477 }
1478
1479 pub(super) fn vector_indexes_write(
1480 &self,
1481 entity: StoredIndexEntity,
1482 ) -> RwLockWriteGuard<'_, VectorIndexRegistry> {
1483 self.indexes.vector.write(entity)
1484 }
1485
1486 #[allow(clippy::too_many_arguments)]
1487 fn activate_vector_index(
1488 &self,
1489 entity: StoredIndexEntity,
1490 name: &str,
1491 label: &str,
1492 property: &str,
1493 similarity: VectorSimilarity,
1494 provider: VectorIndexProvider,
1495 hnsw: HnswParams,
1496 lazy: bool,
1497 ) {
1498 {
1499 let mut registry = self.vector_indexes_write(entity);
1500 registry.register(
1501 name.to_string(),
1502 label.to_string(),
1503 property.to_string(),
1504 similarity,
1505 provider,
1506 hnsw,
1507 );
1508 }
1509
1510 if lazy {
1511 return;
1518 }
1519
1520 self.backfill_vector_index(entity, label, property);
1521 }
1522
1523 fn backfill_vector_index(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1527 let backfill: Vec<(u64, crate::LoraVector)> = match entity {
1528 StoredIndexEntity::Node => self
1529 .iter_nodes()
1530 .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1531 .filter_map(|(id, node)| match node.properties.get(property) {
1532 Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1533 _ => None,
1534 })
1535 .collect(),
1536 StoredIndexEntity::Relationship => self
1537 .iter_rels()
1538 .filter(|(_, rel)| rel.rel_type == label)
1539 .filter_map(|(id, rel)| match rel.properties.get(property) {
1540 Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1541 _ => None,
1542 })
1543 .collect(),
1544 };
1545
1546 let mut registry = self.vector_indexes_write(entity);
1547 for (id, vector) in backfill {
1548 registry.insert_for(label, property, id, &vector);
1549 }
1550 }
1551
1552 pub(super) fn lazy_populate_vector_index(&self, name: &str) {
1558 let def = match self.index_catalog_read().get(name).cloned() {
1559 Some(d) => d,
1560 None => return,
1561 };
1562 if def.state != StoredIndexState::Populating || def.kind != StoredIndexKind::Vector {
1563 return;
1564 }
1565 let label = match def.label.as_deref() {
1566 Some(l) => l,
1567 None => return,
1568 };
1569 let property = match def.properties.first() {
1570 Some(p) => p.as_str(),
1571 None => return,
1572 };
1573 self.backfill_vector_index(def.entity, label, property);
1574 self.index_catalog_write()
1575 .set_state(name, StoredIndexState::Online);
1576 }
1577
1578 pub(super) fn deactivate_vector_index(&self, entity: StoredIndexEntity, name: &str) {
1579 self.vector_indexes_write(entity).deregister(name);
1580 }
1581
1582 pub fn graph_stats(&self) -> GraphStats {
1587 let mut stats = GraphStats {
1588 node_count: self.live_node_count,
1589 relationship_count: self.live_rel_count,
1590 ..Default::default()
1591 };
1592 for (label, ids) in &self.nodes_by_label {
1593 stats.nodes_by_label.insert(label.clone(), ids.len());
1594 }
1595 for (rel_type, ids) in &self.relationships_by_type {
1596 stats
1597 .relationships_by_type
1598 .insert(rel_type.clone(), ids.len());
1599 }
1600 let prop_indexes = self.indexes_read();
1606 for (scope, props) in &prop_indexes.node_properties.scoped_values {
1607 for (key, values) in props {
1608 stats
1609 .node_distinct_values
1610 .insert((scope.clone(), key.clone()), values.len());
1611 }
1612 }
1613 for (scope, props) in &prop_indexes.relationship_properties.scoped_values {
1614 for (key, values) in props {
1615 stats
1616 .relationship_distinct_values
1617 .insert((scope.clone(), key.clone()), values.len());
1618 }
1619 }
1620
1621 for def in self.index_catalog_read().list() {
1622 if def.state != StoredIndexState::Online {
1623 continue;
1624 }
1625 let Some(label) = def.label else {
1626 continue;
1627 };
1628 for property in def.properties {
1629 let scope = (label.clone(), property);
1630 match (def.entity, def.kind) {
1631 (StoredIndexEntity::Node, StoredIndexKind::Range) => {
1632 stats.node_range_indexes.insert(scope);
1633 }
1634 (StoredIndexEntity::Node, StoredIndexKind::Text) => {
1635 stats.node_text_indexes.insert(scope);
1636 }
1637 (StoredIndexEntity::Node, StoredIndexKind::Point) => {
1638 stats.node_point_indexes.insert(scope);
1639 }
1640 (StoredIndexEntity::Relationship, StoredIndexKind::Range) => {
1641 stats.relationship_range_indexes.insert(scope);
1642 }
1643 (StoredIndexEntity::Relationship, StoredIndexKind::Text) => {
1644 stats.relationship_text_indexes.insert(scope);
1645 }
1646 (StoredIndexEntity::Relationship, StoredIndexKind::Point) => {
1647 stats.relationship_point_indexes.insert(scope);
1648 }
1649 (StoredIndexEntity::Node, StoredIndexKind::Vector) => {
1650 stats.node_vector_indexes.insert(scope);
1651 }
1652 (StoredIndexEntity::Relationship, StoredIndexKind::Vector) => {
1653 stats.relationship_vector_indexes.insert(scope);
1654 }
1655 (_, StoredIndexKind::Lookup | StoredIndexKind::Fulltext) => {}
1656 }
1657 }
1658 }
1659 stats
1660 }
1661
1662 pub(super) fn rebuild_property_indexes(&mut self) {
1663 let mut indexes = PropertyIndexRegistry::default();
1664
1665 for (id, node) in self.iter_nodes() {
1666 for (key, value) in &node.properties {
1667 if PropertyIndexKey::from_value(value).is_some() {
1668 indexes.node_properties.activate(key);
1669 indexes.node_properties.insert_with_scopes(
1670 id,
1671 node.labels.iter().map(String::as_str),
1672 key,
1673 value,
1674 );
1675 }
1676 }
1677 }
1678
1679 for (id, rel) in self.iter_rels() {
1680 for (key, value) in &rel.properties {
1681 if PropertyIndexKey::from_value(value).is_some() {
1682 indexes.relationship_properties.activate(key);
1683 indexes.relationship_properties.insert_with_scopes(
1684 id,
1685 [rel.rel_type.as_str()],
1686 key,
1687 value,
1688 );
1689 }
1690 }
1691 }
1692
1693 let node_index_count = indexes.node_properties.active_keys.len();
1694 let relationship_index_count = indexes.relationship_properties.active_keys.len();
1695 *self.indexes_mut() = indexes;
1696 self.indexes
1697 .active_node_property_indexes
1698 .store(node_index_count, Ordering::Relaxed);
1699 self.indexes
1700 .active_relationship_property_indexes
1701 .store(relationship_index_count, Ordering::Relaxed);
1702 }
1703
1704 pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
1705 for label in &node.labels {
1706 self.insert_node_label_index(node.id, label);
1707 }
1708 self.index_node_properties_if_active(
1709 node.id,
1710 node.labels.iter().map(String::as_str),
1711 &node.properties,
1712 );
1713 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1714 }
1715
1716 pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
1717 for label in &node.labels {
1718 self.insert_node_label_index(node.id, label);
1719 }
1720 self.index_node_properties_eager(
1721 node.id,
1722 node.labels.iter().map(String::as_str),
1723 &node.properties,
1724 );
1725 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1726 }
1727
1728 pub(super) fn on_node_property_set(
1729 &mut self,
1730 node_id: NodeId,
1731 key: &str,
1732 old: Option<&PropertyValue>,
1733 new: &PropertyValue,
1734 ) {
1735 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1736 return;
1737 };
1738
1739 if self.node_property_index_is_active(key) {
1740 if let Some(old) = old {
1741 self.unindex_node_property_if_active(
1742 node_id,
1743 labels.iter().map(String::as_str),
1744 key,
1745 old,
1746 );
1747 }
1748 self.index_node_property_if_active(
1749 node_id,
1750 labels.iter().map(String::as_str),
1751 key,
1752 new,
1753 );
1754 }
1755
1756 self.update_secondary_property(
1757 StoredIndexEntity::Node,
1758 labels.iter().map(String::as_str),
1759 node_id,
1760 key,
1761 old,
1762 Some(new),
1763 );
1764 }
1765
1766 pub(super) fn on_node_property_removed(
1767 &mut self,
1768 node_id: NodeId,
1769 key: &str,
1770 old: &PropertyValue,
1771 ) {
1772 let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1773 return;
1774 };
1775 if self.node_property_index_is_active(key) {
1776 self.unindex_node_property_if_active(
1777 node_id,
1778 labels.iter().map(String::as_str),
1779 key,
1780 old,
1781 );
1782 }
1783 self.update_secondary_property(
1784 StoredIndexEntity::Node,
1785 labels.iter().map(String::as_str),
1786 node_id,
1787 key,
1788 Some(old),
1789 None,
1790 );
1791 }
1792
1793 pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
1794 self.insert_node_label_index(node_id, label);
1795
1796 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1797 return;
1798 };
1799 if self.active_node_property_index_count() != 0 {
1800 self.index_node_scope_properties_if_active(node_id, label, &properties);
1801 }
1802 for (key, value) in &properties {
1803 self.update_secondary_property(
1804 StoredIndexEntity::Node,
1805 [label],
1806 node_id,
1807 key,
1808 None,
1809 Some(value),
1810 );
1811 }
1812 }
1813
1814 pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
1815 self.remove_node_label_index(node_id, label);
1816
1817 let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1818 return;
1819 };
1820 if self.active_node_property_index_count() != 0 {
1821 self.unindex_node_scope_properties_if_active(node_id, label, &properties);
1822 }
1823 for (key, value) in &properties {
1824 self.update_secondary_property(
1825 StoredIndexEntity::Node,
1826 [label],
1827 node_id,
1828 key,
1829 Some(value),
1830 None,
1831 );
1832 }
1833 }
1834
1835 pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
1836 for label in &node.labels {
1837 self.remove_node_label_index(node.id, label);
1838 }
1839 self.unindex_active_node_properties(
1840 node.id,
1841 node.labels.iter().map(String::as_str),
1842 &node.properties,
1843 );
1844 self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Remove);
1845 }
1846
1847 pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
1848 self.attach_relationship(rel);
1849 self.index_relationship_properties_if_active(
1850 rel.id,
1851 [rel.rel_type.as_str()],
1852 &rel.properties,
1853 );
1854 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1855 }
1856
1857 pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
1858 self.attach_relationship(rel);
1859 self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
1860 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1861 }
1862
1863 pub(super) fn on_relationship_property_set(
1864 &mut self,
1865 rel_id: RelationshipId,
1866 key: &str,
1867 old: Option<&PropertyValue>,
1868 new: &PropertyValue,
1869 ) {
1870 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1871 return;
1872 };
1873
1874 if self.relationship_property_index_is_active(key) {
1875 if let Some(old) = old {
1876 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1877 }
1878 self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
1879 }
1880
1881 self.update_secondary_property(
1882 StoredIndexEntity::Relationship,
1883 [rel_type.as_str()],
1884 rel_id,
1885 key,
1886 old,
1887 Some(new),
1888 );
1889 }
1890
1891 pub(super) fn on_relationship_property_removed(
1892 &mut self,
1893 rel_id: RelationshipId,
1894 key: &str,
1895 old: &PropertyValue,
1896 ) {
1897 let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1898 return;
1899 };
1900 if self.relationship_property_index_is_active(key) {
1901 self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1902 }
1903 self.update_secondary_property(
1904 StoredIndexEntity::Relationship,
1905 [rel_type.as_str()],
1906 rel_id,
1907 key,
1908 Some(old),
1909 None,
1910 );
1911 }
1912
1913 pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
1914 self.detach_relationship_indexes(rel);
1915 self.unindex_active_relationship_properties(
1916 rel.id,
1917 [rel.rel_type.as_str()],
1918 &rel.properties,
1919 );
1920 self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Remove);
1921 }
1922
1923 fn index_node_property_eager<'a>(
1924 &mut self,
1925 node_id: NodeId,
1926 labels: impl IntoIterator<Item = &'a str>,
1927 key: &str,
1928 value: &PropertyValue,
1929 ) {
1930 if PropertyIndexKey::from_value(value).is_none() {
1931 return;
1932 }
1933
1934 let activated = {
1935 let indexes = self.indexes_mut();
1936 let activated = indexes.node_properties.activate(key);
1937 indexes
1938 .node_properties
1939 .insert_with_scopes(node_id, labels, key, value);
1940 activated
1941 };
1942 if activated {
1943 self.indexes
1944 .active_node_property_indexes
1945 .fetch_add(1, Ordering::Relaxed);
1946 }
1947 }
1948
1949 fn index_relationship_property_eager<'a>(
1950 &mut self,
1951 rel_id: RelationshipId,
1952 scopes: impl IntoIterator<Item = &'a str>,
1953 key: &str,
1954 value: &PropertyValue,
1955 ) {
1956 if PropertyIndexKey::from_value(value).is_none() {
1957 return;
1958 }
1959
1960 let activated = {
1961 let indexes = self.indexes_mut();
1962 let activated = indexes.relationship_properties.activate(key);
1963 indexes
1964 .relationship_properties
1965 .insert_with_scopes(rel_id, scopes, key, value);
1966 activated
1967 };
1968 if activated {
1969 self.indexes
1970 .active_relationship_property_indexes
1971 .fetch_add(1, Ordering::Relaxed);
1972 }
1973 }
1974
1975 fn index_node_properties_eager<'a>(
1976 &mut self,
1977 node_id: NodeId,
1978 labels: impl IntoIterator<Item = &'a str> + Clone,
1979 properties: &Properties,
1980 ) {
1981 for (key, value) in properties {
1982 self.index_node_property_eager(node_id, labels.clone(), key, value);
1983 }
1984 }
1985
1986 fn index_relationship_properties_eager<'a>(
1987 &mut self,
1988 rel_id: RelationshipId,
1989 scopes: impl IntoIterator<Item = &'a str> + Clone,
1990 properties: &Properties,
1991 ) {
1992 for (key, value) in properties {
1993 self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
1994 }
1995 }
1996
1997 fn index_node_property_if_active<'a>(
1998 &mut self,
1999 node_id: NodeId,
2000 labels: impl IntoIterator<Item = &'a str>,
2001 key: &str,
2002 value: &PropertyValue,
2003 ) {
2004 if self.active_node_property_index_count() == 0 {
2005 return;
2006 }
2007 let indexes = self.indexes_mut();
2008 if indexes.node_properties.is_active(key) {
2009 indexes
2010 .node_properties
2011 .insert_with_scopes(node_id, labels, key, value);
2012 }
2013 }
2014
2015 fn index_node_properties_if_active<'a>(
2016 &mut self,
2017 node_id: NodeId,
2018 labels: impl IntoIterator<Item = &'a str> + Clone,
2019 properties: &Properties,
2020 ) {
2021 if self.active_node_property_index_count() == 0 {
2022 return;
2023 }
2024 let indexes = self.indexes_mut();
2025 for (key, value) in properties {
2026 if indexes.node_properties.is_active(key) {
2027 indexes
2028 .node_properties
2029 .insert_with_scopes(node_id, labels.clone(), key, value);
2030 }
2031 }
2032 }
2033
2034 fn unindex_node_property_if_active<'a>(
2035 &mut self,
2036 node_id: NodeId,
2037 labels: impl IntoIterator<Item = &'a str>,
2038 key: &str,
2039 value: &PropertyValue,
2040 ) {
2041 if self.active_node_property_index_count() == 0 {
2042 return;
2043 }
2044 let indexes = self.indexes_mut();
2045 if indexes.node_properties.is_active(key) {
2046 indexes
2047 .node_properties
2048 .remove_with_scopes(node_id, labels, key, value);
2049 }
2050 }
2051
2052 fn index_node_scope_properties_if_active(
2053 &mut self,
2054 node_id: NodeId,
2055 scope: &str,
2056 properties: &Properties,
2057 ) {
2058 if self.active_node_property_index_count() == 0 {
2059 return;
2060 }
2061 let indexes = self.indexes_mut();
2062 for (key, value) in properties {
2063 if indexes.node_properties.is_active(key) {
2064 indexes
2065 .node_properties
2066 .insert_scoped(node_id, scope, key, value);
2067 }
2068 }
2069 }
2070
2071 fn unindex_node_scope_properties_if_active(
2072 &mut self,
2073 node_id: NodeId,
2074 scope: &str,
2075 properties: &Properties,
2076 ) {
2077 if self.active_node_property_index_count() == 0 {
2078 return;
2079 }
2080 let indexes = self.indexes_mut();
2081 for (key, value) in properties {
2082 if indexes.node_properties.is_active(key) {
2083 indexes
2084 .node_properties
2085 .remove_scoped(node_id, scope, key, value);
2086 }
2087 }
2088 }
2089
2090 fn unindex_active_node_properties<'a>(
2091 &mut self,
2092 node_id: NodeId,
2093 labels: impl IntoIterator<Item = &'a str> + Clone,
2094 properties: &Properties,
2095 ) {
2096 if self.active_node_property_index_count() == 0 {
2097 return;
2098 }
2099 let indexes = self.indexes_mut();
2100 for (key, value) in properties {
2101 if indexes.node_properties.is_active(key) {
2102 indexes
2103 .node_properties
2104 .remove_with_scopes(node_id, labels.clone(), key, value);
2105 }
2106 }
2107 }
2108
2109 fn index_relationship_property_if_active<'a>(
2110 &mut self,
2111 rel_id: RelationshipId,
2112 scopes: impl IntoIterator<Item = &'a str>,
2113 key: &str,
2114 value: &PropertyValue,
2115 ) {
2116 if self.active_relationship_property_index_count() == 0 {
2117 return;
2118 }
2119 let indexes = self.indexes_mut();
2120 if indexes.relationship_properties.is_active(key) {
2121 indexes
2122 .relationship_properties
2123 .insert_with_scopes(rel_id, scopes, key, value);
2124 }
2125 }
2126
2127 fn index_relationship_properties_if_active<'a>(
2128 &mut self,
2129 rel_id: RelationshipId,
2130 scopes: impl IntoIterator<Item = &'a str> + Clone,
2131 properties: &Properties,
2132 ) {
2133 if self.active_relationship_property_index_count() == 0 {
2134 return;
2135 }
2136 let indexes = self.indexes_mut();
2137 for (key, value) in properties {
2138 if indexes.relationship_properties.is_active(key) {
2139 indexes.relationship_properties.insert_with_scopes(
2140 rel_id,
2141 scopes.clone(),
2142 key,
2143 value,
2144 );
2145 }
2146 }
2147 }
2148
2149 fn unindex_relationship_property_if_active<'a>(
2150 &mut self,
2151 rel_id: RelationshipId,
2152 scopes: impl IntoIterator<Item = &'a str>,
2153 key: &str,
2154 value: &PropertyValue,
2155 ) {
2156 if self.active_relationship_property_index_count() == 0 {
2157 return;
2158 }
2159 let indexes = self.indexes_mut();
2160 if indexes.relationship_properties.is_active(key) {
2161 indexes
2162 .relationship_properties
2163 .remove_with_scopes(rel_id, scopes, key, value);
2164 }
2165 }
2166
2167 fn unindex_active_relationship_properties<'a>(
2168 &mut self,
2169 rel_id: RelationshipId,
2170 scopes: impl IntoIterator<Item = &'a str> + Clone,
2171 properties: &Properties,
2172 ) {
2173 if self.active_relationship_property_index_count() == 0 {
2174 return;
2175 }
2176 let indexes = self.indexes_mut();
2177 for (key, value) in properties {
2178 if indexes.relationship_properties.is_active(key) {
2179 indexes.relationship_properties.remove_with_scopes(
2180 rel_id,
2181 scopes.clone(),
2182 key,
2183 value,
2184 );
2185 }
2186 }
2187 }
2188
2189 pub(super) fn scan_nodes_by_property(
2190 &self,
2191 label: Option<&str>,
2192 key: &str,
2193 value: &PropertyValue,
2194 ) -> Vec<NodeRecord> {
2195 match label {
2196 Some(label) => self
2197 .nodes_by_label
2198 .get(label)
2199 .into_iter()
2200 .flat_map(|ids| ids.iter())
2201 .filter_map(|&id| self.node_at(id))
2202 .filter(|node| node.properties.get(key) == Some(value))
2203 .cloned()
2204 .collect(),
2205 None => self
2206 .iter_node_records()
2207 .filter(|node| node.properties.get(key) == Some(value))
2208 .cloned()
2209 .collect(),
2210 }
2211 }
2212
2213 pub(super) fn scan_node_ids_by_property(
2214 &self,
2215 label: Option<&str>,
2216 key: &str,
2217 value: &PropertyValue,
2218 ) -> Vec<NodeId> {
2219 match label {
2220 Some(label) => self
2221 .nodes_by_label
2222 .get(label)
2223 .into_iter()
2224 .flat_map(|ids| ids.iter())
2225 .filter_map(|&id| {
2226 (self.node_at(id)?.properties.get(key) == Some(value)).then_some(id)
2227 })
2228 .collect(),
2229 None => self
2230 .iter_nodes()
2231 .filter_map(|(id, node)| (node.properties.get(key) == Some(value)).then_some(id))
2232 .collect(),
2233 }
2234 }
2235
2236 pub(super) fn any_node_by_property(
2237 &self,
2238 label: &str,
2239 key: &str,
2240 value: &PropertyValue,
2241 ) -> bool {
2242 self.nodes_by_label
2243 .get(label)
2244 .into_iter()
2245 .flat_map(|ids| ids.iter())
2246 .filter_map(|&id| self.node_at(id))
2247 .any(|node| node.properties.get(key) == Some(value))
2248 }
2249
2250 pub(super) fn scan_relationships_by_property(
2251 &self,
2252 rel_type: Option<&str>,
2253 key: &str,
2254 value: &PropertyValue,
2255 ) -> Vec<RelationshipRecord> {
2256 match rel_type {
2257 Some(rel_type) => self
2258 .relationships_by_type
2259 .get(rel_type)
2260 .into_iter()
2261 .flat_map(|ids| ids.iter())
2262 .filter_map(|&id| self.rel_at(id))
2263 .filter(|rel| rel.properties.get(key) == Some(value))
2264 .cloned()
2265 .collect(),
2266 None => self
2267 .iter_rel_records()
2268 .filter(|rel| rel.properties.get(key) == Some(value))
2269 .cloned()
2270 .collect(),
2271 }
2272 }
2273
2274 pub(super) fn scan_relationship_ids_by_property(
2275 &self,
2276 rel_type: Option<&str>,
2277 key: &str,
2278 value: &PropertyValue,
2279 ) -> Vec<RelationshipId> {
2280 match rel_type {
2281 Some(rel_type) => self
2282 .relationships_by_type
2283 .get(rel_type)
2284 .into_iter()
2285 .flat_map(|ids| ids.iter())
2286 .filter_map(|&id| {
2287 (self.rel_at(id)?.properties.get(key) == Some(value)).then_some(id)
2288 })
2289 .collect(),
2290 None => self
2291 .iter_rels()
2292 .filter_map(|(id, rel)| (rel.properties.get(key) == Some(value)).then_some(id))
2293 .collect(),
2294 }
2295 }
2296
2297 pub(super) fn any_relationship_by_property(
2298 &self,
2299 rel_type: &str,
2300 key: &str,
2301 value: &PropertyValue,
2302 ) -> bool {
2303 self.relationships_by_type
2304 .get(rel_type)
2305 .into_iter()
2306 .flat_map(|ids| ids.iter())
2307 .filter_map(|&id| self.rel_at(id))
2308 .any(|rel| rel.properties.get(key) == Some(value))
2309 }
2310
2311 pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
2312 self.outgoing_push(rel.src, rel.id);
2313 self.incoming_push(rel.dst, rel.id);
2314 self.insert_relationship_type_index(rel.id, &rel.rel_type);
2315 }
2316
2317 fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
2318 self.outgoing_remove(rel.src, rel.id);
2322 self.incoming_remove(rel.dst, rel.id);
2323
2324 self.remove_relationship_type_index(rel.id, &rel.rel_type);
2325 }
2326
2327 pub(super) fn relationship_ids_for_direction(
2328 &self,
2329 node_id: NodeId,
2330 direction: Direction,
2331 ) -> Vec<RelationshipId> {
2332 match direction {
2333 Direction::Left => self
2334 .incoming_at(node_id)
2335 .map(<[_]>::to_vec)
2336 .unwrap_or_default(),
2337
2338 Direction::Right => self
2339 .outgoing_at(node_id)
2340 .map(<[_]>::to_vec)
2341 .unwrap_or_default(),
2342
2343 Direction::Undirected => {
2344 let out = self.outgoing_at(node_id);
2345 let inc = self.incoming_at(node_id);
2346 let mut ids = Vec::with_capacity(
2347 out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0),
2348 );
2349
2350 if let Some(out) = out {
2351 ids.extend(out.iter().copied());
2352 }
2353 if let Some(inc) = inc {
2354 for &rel_id in inc {
2355 let Some(rel) = self.rel_at(rel_id) else {
2356 continue;
2357 };
2358 if rel.src == node_id && rel.dst == node_id {
2359 continue;
2360 }
2361 ids.push(rel_id);
2362 }
2363 }
2364
2365 ids
2366 }
2367 }
2368 }
2369
2370 pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
2371 if rel.src == node_id {
2372 Some(rel.dst)
2373 } else if rel.dst == node_id {
2374 Some(rel.src)
2375 } else {
2376 None
2377 }
2378 }
2379
2380 pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
2381 self.outgoing_at(node_id)
2382 .map(|ids| !ids.is_empty())
2383 .unwrap_or(false)
2384 || self
2385 .incoming_at(node_id)
2386 .map(|ids| !ids.is_empty())
2387 .unwrap_or(false)
2388 }
2389
2390 pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> Vec<RelationshipId> {
2391 let out = self.outgoing_at(node_id);
2392 let inc = self.incoming_at(node_id);
2393 let mut rel_ids =
2394 Vec::with_capacity(out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0));
2395
2396 if let Some(ids) = out {
2397 rel_ids.extend(ids.iter().copied());
2398 }
2399 if let Some(ids) = inc {
2400 for &rel_id in ids {
2401 let Some(rel) = self.rel_at(rel_id) else {
2402 continue;
2403 };
2404 if rel.src == node_id && rel.dst == node_id {
2405 continue;
2406 }
2407 rel_ids.push(rel_id);
2408 }
2409 }
2410
2411 rel_ids
2412 }
2413
2414 #[doc(hidden)]
2418 pub fn replay_create_node(
2419 &mut self,
2420 id: NodeId,
2421 labels: Vec<String>,
2422 properties: Properties,
2423 ) -> Result<NodeRecord, String> {
2424 if self.recorder.is_some() {
2425 return Err(
2426 "cannot replay node creation while a mutation recorder is installed".into(),
2427 );
2428 }
2429 if self.node_at(id).is_some() {
2430 return Err(format!("node id {id} already exists"));
2431 }
2432 let idx = self.ensure_node_slot_checked(id)?;
2433 self.bump_next_node_id_past(id)?;
2434
2435 let labels = Self::normalize_labels(labels);
2436 let node = NodeRecord {
2437 id,
2438 labels: labels.clone(),
2439 properties,
2440 };
2441
2442 self.put_node_at_slot(idx, node.clone());
2443 self.on_node_replayed(&node);
2444
2445 Ok(node)
2446 }
2447
2448 #[doc(hidden)]
2452 pub fn replay_create_relationship(
2453 &mut self,
2454 id: RelationshipId,
2455 src: NodeId,
2456 dst: NodeId,
2457 rel_type: &str,
2458 properties: Properties,
2459 ) -> Result<RelationshipRecord, String> {
2460 if self.recorder.is_some() {
2461 return Err(
2462 "cannot replay relationship creation while a mutation recorder is installed".into(),
2463 );
2464 }
2465 if self.rel_at(id).is_some() {
2466 return Err(format!("relationship id {id} already exists"));
2467 }
2468 if self.node_at(src).is_none() {
2469 return Err(format!(
2470 "relationship {id} references missing source node {src}"
2471 ));
2472 }
2473 if self.node_at(dst).is_none() {
2474 return Err(format!(
2475 "relationship {id} references missing target node {dst}"
2476 ));
2477 }
2478
2479 let trimmed = rel_type.trim();
2480 if trimmed.is_empty() {
2481 return Err(format!("relationship {id} has an empty type"));
2482 }
2483 let idx = self.ensure_rel_slot_checked(id)?;
2484 self.bump_next_rel_id_past(id)?;
2485
2486 let rel = RelationshipRecord {
2487 id,
2488 src,
2489 dst,
2490 rel_type: trimmed.to_string(),
2491 properties,
2492 };
2493
2494 self.put_rel_at_slot(idx, rel.clone());
2495 self.on_relationship_replayed(&rel);
2496
2497 Ok(rel)
2498 }
2499
2500 #[cfg(test)]
2501 pub(super) fn assert_property_indexes_match_scan(&self) {
2502 let indexes = self.indexes_read();
2503 assert_eq!(
2504 indexes.node_properties.active_keys.len(),
2505 self.active_node_property_index_count(),
2506 "node property index counter diverged from active key set"
2507 );
2508 assert_eq!(
2509 indexes.relationship_properties.active_keys.len(),
2510 self.active_relationship_property_index_count(),
2511 "relationship property index counter diverged from active key set"
2512 );
2513
2514 let mut expected_nodes = PropertyIndexState {
2515 active_keys: indexes.node_properties.active_keys.clone(),
2516 ..PropertyIndexState::default()
2517 };
2518 for (id, node) in self.iter_nodes() {
2519 for (key, value) in &node.properties {
2520 if expected_nodes.is_active(key) {
2521 expected_nodes.insert_with_scopes(
2522 id,
2523 node.labels.iter().map(String::as_str),
2524 key,
2525 value,
2526 );
2527 }
2528 }
2529 }
2530 assert_eq!(
2531 indexes.node_properties.values, expected_nodes.values,
2532 "node property index values diverged from scan"
2533 );
2534 assert_eq!(
2535 indexes.node_properties.scoped_values, expected_nodes.scoped_values,
2536 "node property scoped index values diverged from scan"
2537 );
2538
2539 let mut expected_relationships = PropertyIndexState {
2540 active_keys: indexes.relationship_properties.active_keys.clone(),
2541 ..PropertyIndexState::default()
2542 };
2543 for (id, rel) in self.iter_rels() {
2544 for (key, value) in &rel.properties {
2545 if expected_relationships.is_active(key) {
2546 expected_relationships.insert_with_scopes(
2547 id,
2548 [rel.rel_type.as_str()],
2549 key,
2550 value,
2551 );
2552 }
2553 }
2554 }
2555 assert_eq!(
2556 indexes.relationship_properties.values, expected_relationships.values,
2557 "relationship property index values diverged from scan"
2558 );
2559 assert_eq!(
2560 indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
2561 "relationship property scoped index values diverged from scan"
2562 );
2563 }
2564}