Skip to main content

lora_store/memory/
impls.rs

1//! `GraphStorage` / `BorrowedGraphStorage` / `GraphStorageMut` impls
2//! for [`InMemoryGraph`]. The trait surfaces — read, borrow, mutate —
3//! all delegate into the inherent helpers defined in `super`.
4
5use std::collections::BTreeSet;
6
7use lora_ast::Direction;
8
9use crate::{
10    BorrowedGraphStorage, ConstraintDefinition, ConstraintRequest, CreateConstraintError,
11    CreateConstraintOutcome, CreateIndexError, CreateIndexOutcome, DropConstraintError,
12    DropConstraintOutcome, DropIndexError, DropIndexOutcome, GraphStats, GraphStorage,
13    GraphStorageMut, IndexDefinition, IndexRequest, LoraVector, MutationEvent, NodeId, NodeRecord,
14    Properties, PropertyValue, RelationshipId, RelationshipRecord, StoredIndexEntity,
15};
16
17use super::property_index::PropertyIndexKey;
18use super::InMemoryGraph;
19
20impl GraphStorage for InMemoryGraph {
21    // ---------- Required primitives ----------
22
23    fn list_indexes(&self) -> Vec<IndexDefinition> {
24        self.index_catalog_read().list()
25    }
26
27    fn get_index(&self, name: &str) -> Option<IndexDefinition> {
28        self.index_catalog_read().get(name).cloned()
29    }
30
31    fn fulltext_search(&self, name: &str, query: &str) -> Vec<(u64, f64)> {
32        // The index lives on exactly one entity scope; check both.
33        let node_hits = self
34            .fulltext_indexes_read(StoredIndexEntity::Node)
35            .get(name)
36            .map(|idx| idx.query(query));
37        if let Some(hits) = node_hits {
38            if !hits.is_empty() {
39                return hits;
40            }
41        }
42        let rel_hits = self
43            .fulltext_indexes_read(StoredIndexEntity::Relationship)
44            .get(name)
45            .map(|idx| idx.query(query));
46        rel_hits.unwrap_or_default()
47    }
48
49    fn vector_search(
50        &self,
51        name: &str,
52        query: &LoraVector,
53        k: usize,
54        restrict_to: Option<&std::collections::BTreeSet<u64>>,
55    ) -> Vec<(u64, f64)> {
56        // Lazy populate: an index created with
57        // `vector.populate.async: true` stays Populating until the
58        // first query forces the backfill. The check is cheap (one
59        // catalog read), and the populate itself takes the write
60        // lock — concurrent queries pile up on the read lock and
61        // proceed once the writer drops.
62        self.lazy_populate_vector_index(name);
63
64        // Like fulltext_search, an index lives on exactly one entity
65        // scope. Probe both registries; the catalog has already
66        // enforced kind/entity agreement so only one will hit.
67        if let Some(hits) =
68            self.vector_indexes_read(StoredIndexEntity::Node)
69                .query(name, query, k, restrict_to)
70        {
71            if !hits.is_empty() {
72                return hits;
73            }
74        }
75        self.vector_indexes_read(StoredIndexEntity::Relationship)
76            .query(name, query, k, restrict_to)
77            .unwrap_or_default()
78    }
79
80    fn list_constraints(&self) -> Vec<ConstraintDefinition> {
81        self.constraint_catalog_read().list()
82    }
83
84    fn get_constraint(&self, name: &str) -> Option<ConstraintDefinition> {
85        self.constraint_catalog_read().get(name).cloned()
86    }
87
88    fn check_node_create_against_constraints(
89        &self,
90        labels: &[String],
91        properties: &Properties,
92    ) -> Result<(), String> {
93        if !self.has_active_constraints() {
94            return Ok(());
95        }
96        let catalog = self.constraint_catalog_read();
97        crate::memory::constraint_enforce::check_node_create(&catalog, self, labels, properties)
98            .map_err(|e| format!("[{}] {e}", e.gql_status()))
99    }
100
101    fn check_relationship_create_against_constraints(
102        &self,
103        rel_type: &str,
104        properties: &Properties,
105    ) -> Result<(), String> {
106        if !self.has_active_constraints() {
107            return Ok(());
108        }
109        let catalog = self.constraint_catalog_read();
110        crate::memory::constraint_enforce::check_relationship_create(
111            &catalog, self, rel_type, properties,
112        )
113        .map_err(|e| format!("[{}] {e}", e.gql_status()))
114    }
115
116    fn check_node_set_property_against_constraints(
117        &self,
118        node_id: NodeId,
119        key: &str,
120        value: &PropertyValue,
121    ) -> Result<(), String> {
122        if !self.has_active_constraints() {
123            return Ok(());
124        }
125        let catalog = self.constraint_catalog_read();
126        crate::memory::constraint_enforce::check_node_set_property(
127            &catalog, self, node_id, key, value,
128        )
129        .map_err(|e| format!("[{}] {e}", e.gql_status()))
130    }
131
132    fn check_node_remove_property_against_constraints(
133        &self,
134        node_id: NodeId,
135        key: &str,
136    ) -> Result<(), String> {
137        if !self.has_active_constraints() {
138            return Ok(());
139        }
140        let catalog = self.constraint_catalog_read();
141        crate::memory::constraint_enforce::check_node_remove_property(&catalog, self, node_id, key)
142            .map_err(|e| format!("[{}] {e}", e.gql_status()))
143    }
144
145    fn check_node_replace_properties_against_constraints(
146        &self,
147        node_id: NodeId,
148        properties: &Properties,
149    ) -> Result<(), String> {
150        if !self.has_active_constraints() {
151            return Ok(());
152        }
153        let catalog = self.constraint_catalog_read();
154        crate::memory::constraint_enforce::check_node_replace_properties(
155            &catalog, self, node_id, properties,
156        )
157        .map_err(|e| format!("[{}] {e}", e.gql_status()))
158    }
159
160    fn check_relationship_set_property_against_constraints(
161        &self,
162        rel_id: RelationshipId,
163        key: &str,
164        value: &PropertyValue,
165    ) -> Result<(), String> {
166        if !self.has_active_constraints() {
167            return Ok(());
168        }
169        let catalog = self.constraint_catalog_read();
170        crate::memory::constraint_enforce::check_relationship_set_property(
171            &catalog, self, rel_id, key, value,
172        )
173        .map_err(|e| format!("[{}] {e}", e.gql_status()))
174    }
175
176    fn check_relationship_remove_property_against_constraints(
177        &self,
178        rel_id: RelationshipId,
179        key: &str,
180    ) -> Result<(), String> {
181        if !self.has_active_constraints() {
182            return Ok(());
183        }
184        let catalog = self.constraint_catalog_read();
185        crate::memory::constraint_enforce::check_relationship_remove_property(
186            &catalog, self, rel_id, key,
187        )
188        .map_err(|e| format!("[{}] {e}", e.gql_status()))
189    }
190
191    fn check_relationship_replace_properties_against_constraints(
192        &self,
193        rel_id: RelationshipId,
194        properties: &Properties,
195    ) -> Result<(), String> {
196        if !self.has_active_constraints() {
197            return Ok(());
198        }
199        let catalog = self.constraint_catalog_read();
200        crate::memory::constraint_enforce::check_relationship_replace_properties(
201            &catalog, self, rel_id, properties,
202        )
203        .map_err(|e| format!("[{}] {e}", e.gql_status()))
204    }
205
206    fn check_node_add_label_against_constraints(
207        &self,
208        node_id: NodeId,
209        label: &str,
210    ) -> Result<(), String> {
211        if !self.has_active_constraints() {
212            return Ok(());
213        }
214        let catalog = self.constraint_catalog_read();
215        crate::memory::constraint_enforce::check_node_add_label(&catalog, self, node_id, label)
216            .map_err(|e| format!("[{}] {e}", e.gql_status()))
217    }
218
219    fn graph_stats(&self) -> GraphStats {
220        InMemoryGraph::graph_stats(self)
221    }
222
223    fn node_text_candidates(
224        &self,
225        label: &str,
226        property: &str,
227        query: &str,
228    ) -> Option<Vec<NodeId>> {
229        let registry = self.text_indexes_read(crate::StoredIndexEntity::Node);
230        let candidates = registry.candidates(label, property, query)?;
231        Some(candidates.into_iter().collect())
232    }
233
234    fn node_range_candidates(
235        &self,
236        label: &str,
237        property: &str,
238        lo: Option<&PropertyValue>,
239        hi: Option<&PropertyValue>,
240    ) -> Option<Vec<NodeId>> {
241        let registry = self.sorted_indexes_read(crate::StoredIndexEntity::Node);
242        // The sorted index returns a [lo, hi] inclusive candidate set.
243        // The executor refilters each id against the precise predicate
244        // (handling `>` vs `>=`, `<` vs `<=`).
245        let candidates = registry.range_candidates(label, property, lo, hi)?;
246        Some(candidates.into_iter().collect())
247    }
248
249    fn node_point_within_bbox(
250        &self,
251        label: &str,
252        property: &str,
253        ll: (f64, f64),
254        ur: (f64, f64),
255    ) -> Option<Vec<NodeId>> {
256        let registry = self.point_indexes_read(crate::StoredIndexEntity::Node);
257        let candidates = registry.within_bbox(label, property, ll, ur)?;
258        Some(candidates.into_iter().collect())
259    }
260
261    fn node_point_within_distance(
262        &self,
263        label: &str,
264        property: &str,
265        center: (f64, f64),
266        max_distance: f64,
267    ) -> Option<Vec<NodeId>> {
268        let registry = self.point_indexes_read(crate::StoredIndexEntity::Node);
269        let candidates = registry.within_distance(label, property, center, max_distance)?;
270        Some(candidates.into_iter().collect())
271    }
272
273    fn relationship_text_candidates(
274        &self,
275        rel_type: &str,
276        property: &str,
277        query: &str,
278    ) -> Option<Vec<RelationshipId>> {
279        let registry = self.text_indexes_read(crate::StoredIndexEntity::Relationship);
280        let candidates = registry.candidates(rel_type, property, query)?;
281        Some(candidates.into_iter().collect())
282    }
283
284    fn relationship_range_candidates(
285        &self,
286        rel_type: &str,
287        property: &str,
288        lo: Option<&PropertyValue>,
289        hi: Option<&PropertyValue>,
290    ) -> Option<Vec<RelationshipId>> {
291        let registry = self.sorted_indexes_read(crate::StoredIndexEntity::Relationship);
292        let candidates = registry.range_candidates(rel_type, property, lo, hi)?;
293        Some(candidates.into_iter().collect())
294    }
295
296    fn relationship_point_within_bbox(
297        &self,
298        rel_type: &str,
299        property: &str,
300        ll: (f64, f64),
301        ur: (f64, f64),
302    ) -> Option<Vec<RelationshipId>> {
303        let registry = self.point_indexes_read(crate::StoredIndexEntity::Relationship);
304        let candidates = registry.within_bbox(rel_type, property, ll, ur)?;
305        Some(candidates.into_iter().collect())
306    }
307
308    fn relationship_point_within_distance(
309        &self,
310        rel_type: &str,
311        property: &str,
312        center: (f64, f64),
313        max_distance: f64,
314    ) -> Option<Vec<RelationshipId>> {
315        let registry = self.point_indexes_read(crate::StoredIndexEntity::Relationship);
316        let candidates = registry.within_distance(rel_type, property, center, max_distance)?;
317        Some(candidates.into_iter().collect())
318    }
319
320    fn contains_node(&self, id: NodeId) -> bool {
321        self.node_at(id).is_some()
322    }
323
324    fn node(&self, id: NodeId) -> Option<NodeRecord> {
325        self.node_at(id).cloned()
326    }
327
328    fn all_node_ids(&self) -> Vec<NodeId> {
329        self.iter_node_ids().collect()
330    }
331
332    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
333        match self.nodes_by_label.get(label) {
334            Some(ids) => ids.clone(),
335            None => Vec::new(),
336        }
337    }
338
339    fn contains_relationship(&self, id: RelationshipId) -> bool {
340        self.rel_at(id).is_some()
341    }
342
343    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
344        self.rel_at(id).cloned()
345    }
346
347    fn all_rel_ids(&self) -> Vec<RelationshipId> {
348        self.iter_rel_ids().collect()
349    }
350
351    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
352        match self.relationships_by_type.get(rel_type) {
353            Some(ids) => ids.clone(),
354            None => Vec::new(),
355        }
356    }
357
358    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
359        self.rel_at(id).map(|r| (r.src, r.dst))
360    }
361
362    fn expand_ids(
363        &self,
364        node_id: NodeId,
365        direction: Direction,
366        types: &[String],
367    ) -> Vec<(RelationshipId, NodeId)> {
368        if self.node_at(node_id).is_none() {
369            return Vec::new();
370        }
371
372        // Walk the adjacency Vec(s) directly into a single output Vec,
373        // skipping the previous intermediate `Vec<RelationshipId>`
374        // allocation that `relationship_ids_for_direction` produced.
375        // For type-filtered traversal we read `rel.rel_type` once per
376        // edge against the (typically tiny) `types` slice.
377        let mut out: Vec<(RelationshipId, NodeId)> = Vec::new();
378
379        let single_type = match types {
380            [single] => Some(single.as_str()),
381            _ => None,
382        };
383        let has_type_filter = !types.is_empty();
384
385        let push_from = |adj: &[RelationshipId],
386                         skip_self_loops: bool,
387                         out: &mut Vec<(RelationshipId, NodeId)>| {
388            for &rel_id in adj {
389                let Some(rel) = self.rel_at(rel_id) else {
390                    continue;
391                };
392                if skip_self_loops && rel.src == node_id && rel.dst == node_id {
393                    continue;
394                }
395                if let Some(single) = single_type {
396                    if rel.rel_type != single {
397                        continue;
398                    }
399                } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
400                    continue;
401                }
402                let Some(other_id) = Self::other_endpoint(rel, node_id) else {
403                    continue;
404                };
405                out.push((rel_id, other_id));
406            }
407        };
408
409        match direction {
410            Direction::Right => {
411                if let Some(adj) = self.outgoing_at(node_id) {
412                    out.reserve(adj.len());
413                    push_from(adj, false, &mut out);
414                }
415            }
416            Direction::Left => {
417                if let Some(adj) = self.incoming_at(node_id) {
418                    out.reserve(adj.len());
419                    push_from(adj, false, &mut out);
420                }
421            }
422            Direction::Undirected => {
423                let out_len = self.outgoing_at(node_id).map(<[_]>::len).unwrap_or(0);
424                let in_len = self.incoming_at(node_id).map(<[_]>::len).unwrap_or(0);
425                out.reserve(out_len + in_len);
426                if let Some(adj) = self.outgoing_at(node_id) {
427                    push_from(adj, false, &mut out);
428                }
429                if let Some(adj) = self.incoming_at(node_id) {
430                    push_from(adj, true, &mut out);
431                }
432            }
433        }
434
435        out
436    }
437
438    fn try_for_each_expand_id<F, E>(
439        &self,
440        node_id: NodeId,
441        direction: Direction,
442        types: &[String],
443        visit: F,
444    ) -> Result<(), E>
445    where
446        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
447        Self: Sized,
448    {
449        self.try_for_each_adjacent_id(node_id, direction, types, visit)
450    }
451
452    fn all_labels(&self) -> Vec<String> {
453        self.nodes_by_label.keys().cloned().collect()
454    }
455
456    fn all_relationship_types(&self) -> Vec<String> {
457        self.relationships_by_type.keys().cloned().collect()
458    }
459
460    // ---------- Optimization hooks: zero-clone borrow access ----------
461
462    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
463    where
464        F: FnOnce(&NodeRecord) -> R,
465        Self: Sized,
466    {
467        self.node_at(id).map(f)
468    }
469
470    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
471    where
472        F: FnOnce(&RelationshipRecord) -> R,
473        Self: Sized,
474    {
475        self.rel_at(id).map(f)
476    }
477
478    // ---------- Overrides: counts + existence ----------
479
480    fn has_node(&self, id: NodeId) -> bool {
481        self.node_at(id).is_some()
482    }
483
484    fn has_relationship(&self, id: RelationshipId) -> bool {
485        self.rel_at(id).is_some()
486    }
487
488    fn node_count(&self) -> usize {
489        self.live_node_count
490    }
491
492    fn relationship_count(&self) -> usize {
493        self.live_rel_count
494    }
495
496    // ---------- Overrides: record-returning scans (direct iteration) ----------
497
498    fn all_nodes(&self) -> Vec<NodeRecord> {
499        self.iter_node_records().cloned().collect()
500    }
501
502    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
503        self.nodes_by_label
504            .get(label)
505            .into_iter()
506            .flat_map(|ids| ids.iter())
507            .filter_map(|&id| self.node_at(id).cloned())
508            .collect()
509    }
510
511    fn all_relationships(&self) -> Vec<RelationshipRecord> {
512        self.iter_rel_records().cloned().collect()
513    }
514
515    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
516        self.relationships_by_type
517            .get(rel_type)
518            .into_iter()
519            .flat_map(|ids| ids.iter())
520            .filter_map(|&id| self.rel_at(id).cloned())
521            .collect()
522    }
523
524    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
525        self.relationship_ids_for_direction(node_id, direction)
526    }
527
528    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
529        self.outgoing_at(node_id)
530            .into_iter()
531            .flat_map(|ids| ids.iter())
532            .filter_map(|&id| self.rel_at(id).cloned())
533            .collect()
534    }
535
536    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
537        self.incoming_at(node_id)
538            .into_iter()
539            .flat_map(|ids| ids.iter())
540            .filter_map(|&id| self.rel_at(id).cloned())
541            .collect()
542    }
543
544    fn relationships_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipRecord> {
545        let mut out = Vec::new();
546        let _ = self.try_for_each_expand_id(node_id, direction, &[], |rel_id, _| {
547            if let Some(rel) = self.rel_at(rel_id) {
548                out.push(rel.clone());
549            }
550            Ok::<(), ()>(())
551        });
552        out
553    }
554
555    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
556        match direction {
557            Direction::Right => self.outgoing_at(node_id).map(|s| s.len()).unwrap_or(0),
558            Direction::Left => self.incoming_at(node_id).map(|s| s.len()).unwrap_or(0),
559            Direction::Undirected => {
560                let out_count = self.outgoing_at(node_id).map(<[_]>::len).unwrap_or(0);
561                let incoming_non_self = self
562                    .incoming_at(node_id)
563                    .into_iter()
564                    .flat_map(|ids| ids.iter())
565                    .filter(|&&rel_id| {
566                        self.rel_at(rel_id)
567                            .map(|rel| rel.src != node_id || rel.dst != node_id)
568                            .unwrap_or(false)
569                    })
570                    .count();
571                out_count + incoming_non_self
572            }
573        }
574    }
575
576    fn expand(
577        &self,
578        node_id: NodeId,
579        direction: Direction,
580        types: &[String],
581    ) -> Vec<(RelationshipRecord, NodeRecord)> {
582        if self.node_at(node_id).is_none() {
583            return Vec::new();
584        }
585
586        let mut out = Vec::new();
587        let _ = self.try_for_each_expand_id(node_id, direction, types, |rel_id, other_id| {
588            if let (Some(rel), Some(other)) = (self.rel_at(rel_id), self.node_at(other_id)) {
589                out.push((rel.clone(), other.clone()));
590            }
591            Ok::<(), ()>(())
592        });
593        out
594    }
595
596    fn neighbors(
597        &self,
598        node_id: NodeId,
599        direction: Direction,
600        types: &[String],
601    ) -> Vec<NodeRecord> {
602        let mut out = Vec::new();
603        let _ = self.try_for_each_expand_id(node_id, direction, types, |_, other_id| {
604            if let Some(node) = self.node_at(other_id) {
605                out.push(node.clone());
606            }
607            Ok::<(), ()>(())
608        });
609        out
610    }
611
612    fn all_node_property_keys(&self) -> Vec<String> {
613        let mut keys = BTreeSet::new();
614        for node in self.iter_node_records() {
615            for key in node.properties.keys() {
616                keys.insert(key.to_string());
617            }
618        }
619        keys.into_iter().collect()
620    }
621
622    // ---------- Overrides: traversal (direct adjacency) ----------
623
624    fn all_relationship_property_keys(&self) -> Vec<String> {
625        let mut keys = BTreeSet::new();
626        for rel in self.iter_rel_records() {
627            for key in rel.properties.keys() {
628                keys.insert(key.to_string());
629            }
630        }
631        keys.into_iter().collect()
632    }
633
634    fn label_property_keys(&self, label: &str) -> Vec<String> {
635        let mut keys = BTreeSet::new();
636
637        if let Some(ids) = self.nodes_by_label.get(label) {
638            for &id in ids {
639                if let Some(node) = self.node_at(id) {
640                    for key in node.properties.keys() {
641                        keys.insert(key.to_string());
642                    }
643                }
644            }
645        }
646
647        keys.into_iter().collect()
648    }
649
650    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
651        let mut keys = BTreeSet::new();
652
653        if let Some(ids) = self.relationships_by_type.get(rel_type) {
654            for &id in ids {
655                if let Some(rel) = self.rel_at(id) {
656                    for key in rel.properties.keys() {
657                        keys.insert(key.to_string());
658                    }
659                }
660            }
661        }
662
663        keys.into_iter().collect()
664    }
665
666    fn find_nodes_by_property(
667        &self,
668        label: Option<&str>,
669        key: &str,
670        value: &PropertyValue,
671    ) -> Vec<NodeRecord>
672    where
673        Self: Sized,
674    {
675        if PropertyIndexKey::from_value(value).is_none() {
676            return self.scan_nodes_by_property(label, key, value);
677        }
678
679        self.ensure_node_property_index(key);
680        let indexes = self.indexes_read();
681
682        match label {
683            Some(label) => {
684                let Some(ids) = indexes.node_properties.scoped_ids_for(label, key, value) else {
685                    return Vec::new();
686                };
687                ids.iter()
688                    .filter_map(|&id| self.node_at(id).cloned())
689                    .collect()
690            }
691            None => indexes
692                .node_properties
693                .ids_for(key, value)
694                .into_iter()
695                .flat_map(|ids| ids.iter())
696                .filter_map(|&id| self.node_at(id).cloned())
697                .collect(),
698        }
699    }
700
701    fn find_node_ids_by_property(
702        &self,
703        label: Option<&str>,
704        key: &str,
705        value: &PropertyValue,
706    ) -> Vec<NodeId>
707    where
708        Self: Sized,
709    {
710        if PropertyIndexKey::from_value(value).is_none() {
711            return self.scan_node_ids_by_property(label, key, value);
712        }
713
714        self.ensure_node_property_index(key);
715        let indexes = self.indexes_read();
716
717        match label {
718            Some(label) => indexes
719                .node_properties
720                .scoped_ids_for(label, key, value)
721                .map(|ids| ids.to_vec())
722                .unwrap_or_default(),
723            None => indexes
724                .node_properties
725                .ids_for(key, value)
726                .map(|ids| ids.to_vec())
727                .unwrap_or_default(),
728        }
729    }
730    // ---------- Overrides: schema introspection ----------
731
732    fn find_relationships_by_property(
733        &self,
734        rel_type: Option<&str>,
735        key: &str,
736        value: &PropertyValue,
737    ) -> Vec<RelationshipRecord>
738    where
739        Self: Sized,
740    {
741        if PropertyIndexKey::from_value(value).is_none() {
742            return self.scan_relationships_by_property(rel_type, key, value);
743        }
744
745        self.ensure_relationship_property_index(key);
746        let indexes = self.indexes_read();
747
748        match rel_type {
749            Some(rel_type) => {
750                let Some(ids) = indexes
751                    .relationship_properties
752                    .scoped_ids_for(rel_type, key, value)
753                else {
754                    return Vec::new();
755                };
756                ids.iter()
757                    .filter_map(|&id| self.rel_at(id).cloned())
758                    .collect()
759            }
760            None => indexes
761                .relationship_properties
762                .ids_for(key, value)
763                .into_iter()
764                .flat_map(|ids| ids.iter())
765                .filter_map(|&id| self.rel_at(id).cloned())
766                .collect(),
767        }
768    }
769
770    fn find_relationship_ids_by_property(
771        &self,
772        rel_type: Option<&str>,
773        key: &str,
774        value: &PropertyValue,
775    ) -> Vec<RelationshipId>
776    where
777        Self: Sized,
778    {
779        if PropertyIndexKey::from_value(value).is_none() {
780            return self.scan_relationship_ids_by_property(rel_type, key, value);
781        }
782
783        self.ensure_relationship_property_index(key);
784        let indexes = self.indexes_read();
785
786        match rel_type {
787            Some(rel_type) => indexes
788                .relationship_properties
789                .scoped_ids_for(rel_type, key, value)
790                .map(|ids| ids.to_vec())
791                .unwrap_or_default(),
792            None => indexes
793                .relationship_properties
794                .ids_for(key, value)
795                .map(|ids| ids.to_vec())
796                .unwrap_or_default(),
797        }
798    }
799
800    fn node_exists_with_label_and_property(
801        &self,
802        label: &str,
803        key: &str,
804        value: &PropertyValue,
805    ) -> bool
806    where
807        Self: Sized,
808    {
809        if PropertyIndexKey::from_value(value).is_none() {
810            return self.any_node_by_property(label, key, value);
811        }
812
813        self.ensure_node_property_index(key);
814        let indexes = self.indexes_read();
815        indexes
816            .node_properties
817            .scoped_ids_for(label, key, value)
818            .map(|ids| !ids.is_empty())
819            .unwrap_or(false)
820    }
821
822    fn relationship_exists_with_type_and_property(
823        &self,
824        rel_type: &str,
825        key: &str,
826        value: &PropertyValue,
827    ) -> bool
828    where
829        Self: Sized,
830    {
831        if PropertyIndexKey::from_value(value).is_none() {
832            return self.any_relationship_by_property(rel_type, key, value);
833        }
834
835        self.ensure_relationship_property_index(key);
836        let indexes = self.indexes_read();
837        indexes
838            .relationship_properties
839            .scoped_ids_for(rel_type, key, value)
840            .map(|ids| !ids.is_empty())
841            .unwrap_or(false)
842    }
843}
844
845impl BorrowedGraphStorage for InMemoryGraph {
846    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
847        self.node_at(id)
848    }
849
850    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
851        self.rel_at(id)
852    }
853
854    fn node_refs(&self) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
855        Box::new(self.iter_node_records())
856    }
857
858    fn node_refs_by_label(&self, label: &str) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
859        Box::new(
860            self.nodes_by_label
861                .get(label)
862                .into_iter()
863                .flat_map(|ids| ids.iter())
864                .filter_map(|&id| self.node_at(id)),
865        )
866    }
867
868    fn relationship_refs(&self) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
869        Box::new(self.iter_rel_records())
870    }
871
872    fn relationship_refs_by_type(
873        &self,
874        rel_type: &str,
875    ) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
876        Box::new(
877            self.relationships_by_type
878                .get(rel_type)
879                .into_iter()
880                .flat_map(|ids| ids.iter())
881                .filter_map(|&id| self.rel_at(id)),
882        )
883    }
884}
885
886impl GraphStorageMut for InMemoryGraph {
887    fn try_create_node(
888        &mut self,
889        labels: Vec<String>,
890        properties: Properties,
891    ) -> Option<NodeRecord> {
892        let (id, idx) = self.try_reserve_next_node_slot()?;
893        let labels = Self::normalize_labels(labels);
894
895        let node = NodeRecord {
896            id,
897            labels: labels.clone(),
898            properties,
899        };
900
901        self.put_node_at_slot(idx, node.clone());
902
903        self.on_node_created(&node);
904
905        // ensure_node_slot grew both adjacency Vecs to cover this id when
906        // we put_node above.
907
908        self.emit(|| MutationEvent::CreateNode {
909            id,
910            labels: node.labels.clone(),
911            properties: node.properties.clone(),
912        });
913
914        Some(node)
915    }
916
917    fn create_relationship(
918        &mut self,
919        src: NodeId,
920        dst: NodeId,
921        rel_type: &str,
922        properties: Properties,
923    ) -> Option<RelationshipRecord> {
924        if self.node_at(src).is_none() || self.node_at(dst).is_none() {
925            return None;
926        }
927
928        let trimmed = rel_type.trim();
929        if trimmed.is_empty() {
930            return None;
931        }
932
933        let (id, idx) = self.try_reserve_next_rel_slot()?;
934        let rel = RelationshipRecord {
935            id,
936            src,
937            dst,
938            rel_type: trimmed.to_string(),
939            properties,
940        };
941
942        self.put_rel_at_slot(idx, rel.clone());
943        self.on_relationship_created(&rel);
944
945        self.emit(|| MutationEvent::CreateRelationship {
946            id,
947            src,
948            dst,
949            rel_type: rel.rel_type.clone(),
950            properties: rel.properties.clone(),
951        });
952
953        Some(rel)
954    }
955
956    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
957        if self.node_at(node_id).is_none() {
958            return false;
959        }
960
961        let old = match self.node_at_mut(node_id) {
962            // Reuse the existing key Arc when this is an overwrite — the
963            // common SET-existing-property path then skips the intern
964            // table entirely (and the read-lock + hash lookup it costs).
965            Some(node) => {
966                if let Some(slot) = node.properties.get_mut(key.as_str()) {
967                    Some(std::mem::replace(slot, value.clone()))
968                } else {
969                    let key_arc = crate::intern(&key);
970                    node.properties.insert(key_arc, value.clone())
971                }
972            }
973            None => return false,
974        };
975        self.on_node_property_set(node_id, &key, old.as_ref(), &value);
976
977        self.emit(|| MutationEvent::SetNodeProperty {
978            node_id,
979            key: key.clone(),
980            value: value.clone(),
981        });
982
983        true
984    }
985
986    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
987        let removed = match self.node_at_mut(node_id) {
988            Some(node) => node.properties.remove(key),
989            None => return false,
990        };
991        let Some(removed) = removed else {
992            return false;
993        };
994
995        self.on_node_property_removed(node_id, key, &removed);
996
997        self.emit(|| MutationEvent::RemoveNodeProperty {
998            node_id,
999            key: key.to_string(),
1000        });
1001
1002        true
1003    }
1004
1005    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1006        let label = label.trim();
1007        if label.is_empty() {
1008            return false;
1009        }
1010
1011        let applied = match self.node_at_mut(node_id) {
1012            Some(node) => {
1013                if node.labels.iter().any(|l| l == label) {
1014                    return false;
1015                }
1016
1017                node.labels.push(label.to_string());
1018                true
1019            }
1020            None => false,
1021        };
1022        if applied {
1023            self.on_node_label_added(node_id, label);
1024            self.emit(|| MutationEvent::AddNodeLabel {
1025                node_id,
1026                label: label.to_string(),
1027            });
1028        }
1029        applied
1030    }
1031
1032    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1033        let applied = match self.node_at_mut(node_id) {
1034            Some(node) => {
1035                let original_len = node.labels.len();
1036                node.labels.retain(|l| l != label);
1037                node.labels.len() != original_len
1038            }
1039            None => false,
1040        };
1041        if applied {
1042            self.on_node_label_removed(node_id, label);
1043            self.emit(|| MutationEvent::RemoveNodeLabel {
1044                node_id,
1045                label: label.to_string(),
1046            });
1047        }
1048        applied
1049    }
1050
1051    fn set_relationship_property(
1052        &mut self,
1053        rel_id: RelationshipId,
1054        key: String,
1055        value: PropertyValue,
1056    ) -> bool {
1057        if self.rel_at(rel_id).is_none() {
1058            return false;
1059        }
1060
1061        let old = match self.rel_at_mut(rel_id) {
1062            Some(rel) => {
1063                if let Some(slot) = rel.properties.get_mut(key.as_str()) {
1064                    Some(std::mem::replace(slot, value.clone()))
1065                } else {
1066                    let key_arc = crate::intern(&key);
1067                    rel.properties.insert(key_arc, value.clone())
1068                }
1069            }
1070            None => return false,
1071        };
1072        self.on_relationship_property_set(rel_id, &key, old.as_ref(), &value);
1073
1074        self.emit(|| MutationEvent::SetRelationshipProperty {
1075            rel_id,
1076            key: key.clone(),
1077            value: value.clone(),
1078        });
1079
1080        true
1081    }
1082
1083    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
1084        let removed = match self.rel_at_mut(rel_id) {
1085            Some(rel) => rel.properties.remove(key),
1086            None => return false,
1087        };
1088        let Some(removed) = removed else {
1089            return false;
1090        };
1091
1092        self.on_relationship_property_removed(rel_id, key, &removed);
1093
1094        self.emit(|| MutationEvent::RemoveRelationshipProperty {
1095            rel_id,
1096            key: key.to_string(),
1097        });
1098
1099        true
1100    }
1101
1102    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
1103        let applied = match self.take_rel(rel_id) {
1104            Some(rel) => {
1105                self.on_relationship_deleted(&rel);
1106                true
1107            }
1108            None => false,
1109        };
1110        if applied {
1111            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
1112        }
1113        applied
1114    }
1115
1116    fn delete_node(&mut self, node_id: NodeId) -> bool {
1117        if self.node_at(node_id).is_none() {
1118            return false;
1119        }
1120
1121        if self.has_incident_relationships(node_id) {
1122            return false;
1123        }
1124
1125        let node = match self.take_node(node_id) {
1126            Some(node) => node,
1127            None => return false,
1128        };
1129
1130        self.on_node_deleted(&node);
1131
1132        // take_node already cleared the per-node adjacency Vecs.
1133
1134        self.emit(|| MutationEvent::DeleteNode { node_id });
1135
1136        true
1137    }
1138
1139    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
1140        if self.node_at(node_id).is_none() {
1141            return false;
1142        }
1143
1144        let rel_ids: Vec<_> = self
1145            .incident_relationship_ids(node_id)
1146            .into_iter()
1147            .collect();
1148
1149        // We deliberately fire per-relationship DeleteRelationship events
1150        // here (via `delete_relationship`) and a DetachDeleteNode event at
1151        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
1152        // preceding DeleteRelationship events — or, equivalently, replay
1153        // them and the DetachDeleteNode becomes a no-op on the remaining
1154        // (now-empty) node. The emit-before-delete choice costs one extra
1155        // event per mutation but keeps the replay contract simple:
1156        // "apply every event in order".
1157        for rel_id in rel_ids {
1158            let _ = self.delete_relationship(rel_id);
1159        }
1160
1161        if self.delete_node(node_id) {
1162            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
1163            true
1164        } else {
1165            false
1166        }
1167    }
1168
1169    fn clear(&mut self) {
1170        // Keep the recorder across clear so observers can see the Clear
1171        // event plus whatever follows. Matches WAL semantics where the log
1172        // is the source of truth across a truncation.
1173        let recorder = self.recorder.take();
1174        *self = Self::default();
1175        self.recorder = recorder;
1176        self.emit(|| MutationEvent::Clear);
1177    }
1178
1179    fn create_index(
1180        &mut self,
1181        request: IndexRequest,
1182        if_not_exists: bool,
1183    ) -> Result<CreateIndexOutcome, CreateIndexError> {
1184        // `register_index` only needs `&self` (catalog + property-index
1185        // registries are behind RwLock for interior mutability), but we
1186        // keep the trait signature `&mut self` so downstream callers can
1187        // hold a uniquely-owned mutation lock. That mirrors the pattern
1188        // used for property writes elsewhere in this impl.
1189        self.register_index(request, if_not_exists)
1190    }
1191
1192    fn drop_index(
1193        &mut self,
1194        name: &str,
1195        if_exists: bool,
1196    ) -> Result<DropIndexOutcome, DropIndexError> {
1197        self.drop_named_index(name, if_exists)
1198    }
1199
1200    fn create_constraint(
1201        &mut self,
1202        request: ConstraintRequest,
1203        if_not_exists: bool,
1204    ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
1205        self.register_constraint(request, if_not_exists)
1206    }
1207
1208    fn drop_constraint(
1209        &mut self,
1210        name: &str,
1211        if_exists: bool,
1212    ) -> Result<DropConstraintOutcome, DropConstraintError> {
1213        self.drop_named_constraint(name, if_exists)
1214    }
1215}