Skip to main content

lora_store/memory/
impls.rs

1//! `GraphStorage` / `BorrowedGraphStorage` / `GraphStorageMut` impls
2//! for [`InMemoryGraph`]. The trait surfaces — read, borrow, mutate —
3//! all delegate into the inherent helpers defined in `super`.
4
5use std::collections::BTreeSet;
6
7use lora_ast::Direction;
8
9use crate::{
10    BorrowedGraphStorage, ConstraintDefinition, ConstraintRequest, CreateConstraintError,
11    CreateConstraintOutcome, CreateIndexError, CreateIndexOutcome, DropConstraintError,
12    DropConstraintOutcome, DropIndexError, DropIndexOutcome, GraphStats, GraphStorage,
13    GraphStorageMut, IndexDefinition, IndexRequest, MutationEvent, NodeId, NodeRecord, Properties,
14    PropertyValue, RelationshipId, RelationshipRecord, StoredIndexEntity,
15};
16
17use super::property_index::PropertyIndexKey;
18use super::InMemoryGraph;
19
20impl GraphStorage for InMemoryGraph {
21    // ---------- Required primitives ----------
22
23    fn list_indexes(&self) -> Vec<IndexDefinition> {
24        self.index_catalog_read().list()
25    }
26
27    fn get_index(&self, name: &str) -> Option<IndexDefinition> {
28        self.index_catalog_read().get(name).cloned()
29    }
30
31    fn fulltext_search(&self, name: &str, query: &str) -> Vec<(u64, f64)> {
32        // The index lives on exactly one entity scope; check both.
33        let node_hits = self
34            .fulltext_indexes_read(StoredIndexEntity::Node)
35            .get(name)
36            .map(|idx| idx.query(query));
37        if let Some(hits) = node_hits {
38            if !hits.is_empty() {
39                return hits;
40            }
41        }
42        let rel_hits = self
43            .fulltext_indexes_read(StoredIndexEntity::Relationship)
44            .get(name)
45            .map(|idx| idx.query(query));
46        rel_hits.unwrap_or_default()
47    }
48
49    fn list_constraints(&self) -> Vec<ConstraintDefinition> {
50        self.constraint_catalog_read().list()
51    }
52
53    fn get_constraint(&self, name: &str) -> Option<ConstraintDefinition> {
54        self.constraint_catalog_read().get(name).cloned()
55    }
56
57    fn check_node_create_against_constraints(
58        &self,
59        labels: &[String],
60        properties: &Properties,
61    ) -> Result<(), String> {
62        if !self.has_active_constraints() {
63            return Ok(());
64        }
65        let catalog = self.constraint_catalog_read();
66        crate::memory::constraint_enforce::check_node_create(&catalog, self, labels, properties)
67            .map_err(|e| format!("[{}] {e}", e.gql_status()))
68    }
69
70    fn check_relationship_create_against_constraints(
71        &self,
72        rel_type: &str,
73        properties: &Properties,
74    ) -> Result<(), String> {
75        if !self.has_active_constraints() {
76            return Ok(());
77        }
78        let catalog = self.constraint_catalog_read();
79        crate::memory::constraint_enforce::check_relationship_create(
80            &catalog, self, rel_type, properties,
81        )
82        .map_err(|e| format!("[{}] {e}", e.gql_status()))
83    }
84
85    fn check_node_set_property_against_constraints(
86        &self,
87        node_id: NodeId,
88        key: &str,
89        value: &PropertyValue,
90    ) -> Result<(), String> {
91        if !self.has_active_constraints() {
92            return Ok(());
93        }
94        let catalog = self.constraint_catalog_read();
95        crate::memory::constraint_enforce::check_node_set_property(
96            &catalog, self, node_id, key, value,
97        )
98        .map_err(|e| format!("[{}] {e}", e.gql_status()))
99    }
100
101    fn check_node_remove_property_against_constraints(
102        &self,
103        node_id: NodeId,
104        key: &str,
105    ) -> Result<(), String> {
106        if !self.has_active_constraints() {
107            return Ok(());
108        }
109        let catalog = self.constraint_catalog_read();
110        crate::memory::constraint_enforce::check_node_remove_property(&catalog, self, node_id, key)
111            .map_err(|e| format!("[{}] {e}", e.gql_status()))
112    }
113
114    fn check_node_replace_properties_against_constraints(
115        &self,
116        node_id: NodeId,
117        properties: &Properties,
118    ) -> Result<(), String> {
119        if !self.has_active_constraints() {
120            return Ok(());
121        }
122        let catalog = self.constraint_catalog_read();
123        crate::memory::constraint_enforce::check_node_replace_properties(
124            &catalog, self, node_id, properties,
125        )
126        .map_err(|e| format!("[{}] {e}", e.gql_status()))
127    }
128
129    fn check_relationship_set_property_against_constraints(
130        &self,
131        rel_id: RelationshipId,
132        key: &str,
133        value: &PropertyValue,
134    ) -> Result<(), String> {
135        if !self.has_active_constraints() {
136            return Ok(());
137        }
138        let catalog = self.constraint_catalog_read();
139        crate::memory::constraint_enforce::check_relationship_set_property(
140            &catalog, self, rel_id, key, value,
141        )
142        .map_err(|e| format!("[{}] {e}", e.gql_status()))
143    }
144
145    fn check_relationship_remove_property_against_constraints(
146        &self,
147        rel_id: RelationshipId,
148        key: &str,
149    ) -> Result<(), String> {
150        if !self.has_active_constraints() {
151            return Ok(());
152        }
153        let catalog = self.constraint_catalog_read();
154        crate::memory::constraint_enforce::check_relationship_remove_property(
155            &catalog, self, rel_id, key,
156        )
157        .map_err(|e| format!("[{}] {e}", e.gql_status()))
158    }
159
160    fn check_relationship_replace_properties_against_constraints(
161        &self,
162        rel_id: RelationshipId,
163        properties: &Properties,
164    ) -> Result<(), String> {
165        if !self.has_active_constraints() {
166            return Ok(());
167        }
168        let catalog = self.constraint_catalog_read();
169        crate::memory::constraint_enforce::check_relationship_replace_properties(
170            &catalog, self, rel_id, properties,
171        )
172        .map_err(|e| format!("[{}] {e}", e.gql_status()))
173    }
174
175    fn check_node_add_label_against_constraints(
176        &self,
177        node_id: NodeId,
178        label: &str,
179    ) -> Result<(), String> {
180        if !self.has_active_constraints() {
181            return Ok(());
182        }
183        let catalog = self.constraint_catalog_read();
184        crate::memory::constraint_enforce::check_node_add_label(&catalog, self, node_id, label)
185            .map_err(|e| format!("[{}] {e}", e.gql_status()))
186    }
187
188    fn graph_stats(&self) -> GraphStats {
189        InMemoryGraph::graph_stats(self)
190    }
191
192    fn node_text_candidates(
193        &self,
194        label: &str,
195        property: &str,
196        query: &str,
197    ) -> Option<Vec<NodeId>> {
198        let registry = self.text_indexes_read(crate::StoredIndexEntity::Node);
199        let candidates = registry.candidates(label, property, query)?;
200        Some(candidates.into_iter().collect())
201    }
202
203    fn node_range_candidates(
204        &self,
205        label: &str,
206        property: &str,
207        lo: Option<&PropertyValue>,
208        hi: Option<&PropertyValue>,
209    ) -> Option<Vec<NodeId>> {
210        let registry = self.sorted_indexes_read(crate::StoredIndexEntity::Node);
211        // The sorted index returns a [lo, hi] inclusive candidate set.
212        // The executor refilters each id against the precise predicate
213        // (handling `>` vs `>=`, `<` vs `<=`).
214        let candidates = registry.range_candidates(label, property, lo, hi)?;
215        Some(candidates.into_iter().collect())
216    }
217
218    fn node_point_within_bbox(
219        &self,
220        label: &str,
221        property: &str,
222        ll: (f64, f64),
223        ur: (f64, f64),
224    ) -> Option<Vec<NodeId>> {
225        let registry = self.point_indexes_read(crate::StoredIndexEntity::Node);
226        let candidates = registry.within_bbox(label, property, ll, ur)?;
227        Some(candidates.into_iter().collect())
228    }
229
230    fn node_point_within_distance(
231        &self,
232        label: &str,
233        property: &str,
234        center: (f64, f64),
235        max_distance: f64,
236    ) -> Option<Vec<NodeId>> {
237        let registry = self.point_indexes_read(crate::StoredIndexEntity::Node);
238        let candidates = registry.within_distance(label, property, center, max_distance)?;
239        Some(candidates.into_iter().collect())
240    }
241
242    fn relationship_text_candidates(
243        &self,
244        rel_type: &str,
245        property: &str,
246        query: &str,
247    ) -> Option<Vec<RelationshipId>> {
248        let registry = self.text_indexes_read(crate::StoredIndexEntity::Relationship);
249        let candidates = registry.candidates(rel_type, property, query)?;
250        Some(candidates.into_iter().collect())
251    }
252
253    fn relationship_range_candidates(
254        &self,
255        rel_type: &str,
256        property: &str,
257        lo: Option<&PropertyValue>,
258        hi: Option<&PropertyValue>,
259    ) -> Option<Vec<RelationshipId>> {
260        let registry = self.sorted_indexes_read(crate::StoredIndexEntity::Relationship);
261        let candidates = registry.range_candidates(rel_type, property, lo, hi)?;
262        Some(candidates.into_iter().collect())
263    }
264
265    fn relationship_point_within_bbox(
266        &self,
267        rel_type: &str,
268        property: &str,
269        ll: (f64, f64),
270        ur: (f64, f64),
271    ) -> Option<Vec<RelationshipId>> {
272        let registry = self.point_indexes_read(crate::StoredIndexEntity::Relationship);
273        let candidates = registry.within_bbox(rel_type, property, ll, ur)?;
274        Some(candidates.into_iter().collect())
275    }
276
277    fn relationship_point_within_distance(
278        &self,
279        rel_type: &str,
280        property: &str,
281        center: (f64, f64),
282        max_distance: f64,
283    ) -> Option<Vec<RelationshipId>> {
284        let registry = self.point_indexes_read(crate::StoredIndexEntity::Relationship);
285        let candidates = registry.within_distance(rel_type, property, center, max_distance)?;
286        Some(candidates.into_iter().collect())
287    }
288
289    fn contains_node(&self, id: NodeId) -> bool {
290        self.node_at(id).is_some()
291    }
292
293    fn node(&self, id: NodeId) -> Option<NodeRecord> {
294        self.node_at(id).cloned()
295    }
296
297    fn all_node_ids(&self) -> Vec<NodeId> {
298        self.iter_node_ids().collect()
299    }
300
301    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
302        match self.nodes_by_label.get(label) {
303            Some(ids) => ids.clone(),
304            None => Vec::new(),
305        }
306    }
307
308    fn contains_relationship(&self, id: RelationshipId) -> bool {
309        self.rel_at(id).is_some()
310    }
311
312    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
313        self.rel_at(id).cloned()
314    }
315
316    fn all_rel_ids(&self) -> Vec<RelationshipId> {
317        self.iter_rel_ids().collect()
318    }
319
320    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
321        match self.relationships_by_type.get(rel_type) {
322            Some(ids) => ids.clone(),
323            None => Vec::new(),
324        }
325    }
326
327    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
328        self.rel_at(id).map(|r| (r.src, r.dst))
329    }
330
331    fn expand_ids(
332        &self,
333        node_id: NodeId,
334        direction: Direction,
335        types: &[String],
336    ) -> Vec<(RelationshipId, NodeId)> {
337        if self.node_at(node_id).is_none() {
338            return Vec::new();
339        }
340
341        // Walk the adjacency Vec(s) directly into a single output Vec,
342        // skipping the previous intermediate `Vec<RelationshipId>`
343        // allocation that `relationship_ids_for_direction` produced.
344        // For type-filtered traversal we read `rel.rel_type` once per
345        // edge against the (typically tiny) `types` slice.
346        let mut out: Vec<(RelationshipId, NodeId)> = Vec::new();
347
348        let single_type = match types {
349            [single] => Some(single.as_str()),
350            _ => None,
351        };
352        let has_type_filter = !types.is_empty();
353
354        let push_from = |adj: &[RelationshipId],
355                         skip_self_loops: bool,
356                         out: &mut Vec<(RelationshipId, NodeId)>| {
357            for &rel_id in adj {
358                let Some(rel) = self.rel_at(rel_id) else {
359                    continue;
360                };
361                if skip_self_loops && rel.src == node_id && rel.dst == node_id {
362                    continue;
363                }
364                if let Some(single) = single_type {
365                    if rel.rel_type != single {
366                        continue;
367                    }
368                } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
369                    continue;
370                }
371                let Some(other_id) = Self::other_endpoint(rel, node_id) else {
372                    continue;
373                };
374                out.push((rel_id, other_id));
375            }
376        };
377
378        match direction {
379            Direction::Right => {
380                if let Some(adj) = self.outgoing_at(node_id) {
381                    out.reserve(adj.len());
382                    push_from(adj, false, &mut out);
383                }
384            }
385            Direction::Left => {
386                if let Some(adj) = self.incoming_at(node_id) {
387                    out.reserve(adj.len());
388                    push_from(adj, false, &mut out);
389                }
390            }
391            Direction::Undirected => {
392                let out_len = self.outgoing_at(node_id).map(<[_]>::len).unwrap_or(0);
393                let in_len = self.incoming_at(node_id).map(<[_]>::len).unwrap_or(0);
394                out.reserve(out_len + in_len);
395                if let Some(adj) = self.outgoing_at(node_id) {
396                    push_from(adj, false, &mut out);
397                }
398                if let Some(adj) = self.incoming_at(node_id) {
399                    push_from(adj, true, &mut out);
400                }
401            }
402        }
403
404        out
405    }
406
407    fn try_for_each_expand_id<F, E>(
408        &self,
409        node_id: NodeId,
410        direction: Direction,
411        types: &[String],
412        visit: F,
413    ) -> Result<(), E>
414    where
415        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
416        Self: Sized,
417    {
418        self.try_for_each_adjacent_id(node_id, direction, types, visit)
419    }
420
421    fn all_labels(&self) -> Vec<String> {
422        self.nodes_by_label.keys().cloned().collect()
423    }
424
425    fn all_relationship_types(&self) -> Vec<String> {
426        self.relationships_by_type.keys().cloned().collect()
427    }
428
429    // ---------- Optimization hooks: zero-clone borrow access ----------
430
431    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
432    where
433        F: FnOnce(&NodeRecord) -> R,
434        Self: Sized,
435    {
436        self.node_at(id).map(f)
437    }
438
439    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
440    where
441        F: FnOnce(&RelationshipRecord) -> R,
442        Self: Sized,
443    {
444        self.rel_at(id).map(f)
445    }
446
447    // ---------- Overrides: counts + existence ----------
448
449    fn has_node(&self, id: NodeId) -> bool {
450        self.node_at(id).is_some()
451    }
452
453    fn has_relationship(&self, id: RelationshipId) -> bool {
454        self.rel_at(id).is_some()
455    }
456
457    fn node_count(&self) -> usize {
458        self.live_node_count
459    }
460
461    fn relationship_count(&self) -> usize {
462        self.live_rel_count
463    }
464
465    // ---------- Overrides: record-returning scans (direct iteration) ----------
466
467    fn all_nodes(&self) -> Vec<NodeRecord> {
468        self.iter_node_records().cloned().collect()
469    }
470
471    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
472        self.nodes_by_label
473            .get(label)
474            .into_iter()
475            .flat_map(|ids| ids.iter())
476            .filter_map(|&id| self.node_at(id).cloned())
477            .collect()
478    }
479
480    fn all_relationships(&self) -> Vec<RelationshipRecord> {
481        self.iter_rel_records().cloned().collect()
482    }
483
484    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
485        self.relationships_by_type
486            .get(rel_type)
487            .into_iter()
488            .flat_map(|ids| ids.iter())
489            .filter_map(|&id| self.rel_at(id).cloned())
490            .collect()
491    }
492
493    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
494        self.relationship_ids_for_direction(node_id, direction)
495    }
496
497    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
498        self.outgoing_at(node_id)
499            .into_iter()
500            .flat_map(|ids| ids.iter())
501            .filter_map(|&id| self.rel_at(id).cloned())
502            .collect()
503    }
504
505    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
506        self.incoming_at(node_id)
507            .into_iter()
508            .flat_map(|ids| ids.iter())
509            .filter_map(|&id| self.rel_at(id).cloned())
510            .collect()
511    }
512
513    fn relationships_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipRecord> {
514        let mut out = Vec::new();
515        let _ = self.try_for_each_expand_id(node_id, direction, &[], |rel_id, _| {
516            if let Some(rel) = self.rel_at(rel_id) {
517                out.push(rel.clone());
518            }
519            Ok::<(), ()>(())
520        });
521        out
522    }
523
524    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
525        match direction {
526            Direction::Right => self.outgoing_at(node_id).map(|s| s.len()).unwrap_or(0),
527            Direction::Left => self.incoming_at(node_id).map(|s| s.len()).unwrap_or(0),
528            Direction::Undirected => {
529                let out_count = self.outgoing_at(node_id).map(<[_]>::len).unwrap_or(0);
530                let incoming_non_self = self
531                    .incoming_at(node_id)
532                    .into_iter()
533                    .flat_map(|ids| ids.iter())
534                    .filter(|&&rel_id| {
535                        self.rel_at(rel_id)
536                            .map(|rel| rel.src != node_id || rel.dst != node_id)
537                            .unwrap_or(false)
538                    })
539                    .count();
540                out_count + incoming_non_self
541            }
542        }
543    }
544
545    fn expand(
546        &self,
547        node_id: NodeId,
548        direction: Direction,
549        types: &[String],
550    ) -> Vec<(RelationshipRecord, NodeRecord)> {
551        if self.node_at(node_id).is_none() {
552            return Vec::new();
553        }
554
555        let mut out = Vec::new();
556        let _ = self.try_for_each_expand_id(node_id, direction, types, |rel_id, other_id| {
557            if let (Some(rel), Some(other)) = (self.rel_at(rel_id), self.node_at(other_id)) {
558                out.push((rel.clone(), other.clone()));
559            }
560            Ok::<(), ()>(())
561        });
562        out
563    }
564
565    fn neighbors(
566        &self,
567        node_id: NodeId,
568        direction: Direction,
569        types: &[String],
570    ) -> Vec<NodeRecord> {
571        let mut out = Vec::new();
572        let _ = self.try_for_each_expand_id(node_id, direction, types, |_, other_id| {
573            if let Some(node) = self.node_at(other_id) {
574                out.push(node.clone());
575            }
576            Ok::<(), ()>(())
577        });
578        out
579    }
580
581    fn all_node_property_keys(&self) -> Vec<String> {
582        let mut keys = BTreeSet::new();
583        for node in self.iter_node_records() {
584            for key in node.properties.keys() {
585                keys.insert(key.clone());
586            }
587        }
588        keys.into_iter().collect()
589    }
590
591    // ---------- Overrides: traversal (direct adjacency) ----------
592
593    fn all_relationship_property_keys(&self) -> Vec<String> {
594        let mut keys = BTreeSet::new();
595        for rel in self.iter_rel_records() {
596            for key in rel.properties.keys() {
597                keys.insert(key.clone());
598            }
599        }
600        keys.into_iter().collect()
601    }
602
603    fn label_property_keys(&self, label: &str) -> Vec<String> {
604        let mut keys = BTreeSet::new();
605
606        if let Some(ids) = self.nodes_by_label.get(label) {
607            for &id in ids {
608                if let Some(node) = self.node_at(id) {
609                    for key in node.properties.keys() {
610                        keys.insert(key.clone());
611                    }
612                }
613            }
614        }
615
616        keys.into_iter().collect()
617    }
618
619    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
620        let mut keys = BTreeSet::new();
621
622        if let Some(ids) = self.relationships_by_type.get(rel_type) {
623            for &id in ids {
624                if let Some(rel) = self.rel_at(id) {
625                    for key in rel.properties.keys() {
626                        keys.insert(key.clone());
627                    }
628                }
629            }
630        }
631
632        keys.into_iter().collect()
633    }
634
635    fn find_nodes_by_property(
636        &self,
637        label: Option<&str>,
638        key: &str,
639        value: &PropertyValue,
640    ) -> Vec<NodeRecord>
641    where
642        Self: Sized,
643    {
644        if PropertyIndexKey::from_value(value).is_none() {
645            return self.scan_nodes_by_property(label, key, value);
646        }
647
648        self.ensure_node_property_index(key);
649        let indexes = self.indexes_read();
650
651        match label {
652            Some(label) => {
653                let Some(ids) = indexes.node_properties.scoped_ids_for(label, key, value) else {
654                    return Vec::new();
655                };
656                ids.iter()
657                    .filter_map(|&id| self.node_at(id).cloned())
658                    .collect()
659            }
660            None => indexes
661                .node_properties
662                .ids_for(key, value)
663                .into_iter()
664                .flat_map(|ids| ids.iter())
665                .filter_map(|&id| self.node_at(id).cloned())
666                .collect(),
667        }
668    }
669
670    fn find_node_ids_by_property(
671        &self,
672        label: Option<&str>,
673        key: &str,
674        value: &PropertyValue,
675    ) -> Vec<NodeId>
676    where
677        Self: Sized,
678    {
679        if PropertyIndexKey::from_value(value).is_none() {
680            return self.scan_node_ids_by_property(label, key, value);
681        }
682
683        self.ensure_node_property_index(key);
684        let indexes = self.indexes_read();
685
686        match label {
687            Some(label) => indexes
688                .node_properties
689                .scoped_ids_for(label, key, value)
690                .map(|ids| ids.to_vec())
691                .unwrap_or_default(),
692            None => indexes
693                .node_properties
694                .ids_for(key, value)
695                .map(|ids| ids.to_vec())
696                .unwrap_or_default(),
697        }
698    }
699    // ---------- Overrides: schema introspection ----------
700
701    fn find_relationships_by_property(
702        &self,
703        rel_type: Option<&str>,
704        key: &str,
705        value: &PropertyValue,
706    ) -> Vec<RelationshipRecord>
707    where
708        Self: Sized,
709    {
710        if PropertyIndexKey::from_value(value).is_none() {
711            return self.scan_relationships_by_property(rel_type, key, value);
712        }
713
714        self.ensure_relationship_property_index(key);
715        let indexes = self.indexes_read();
716
717        match rel_type {
718            Some(rel_type) => {
719                let Some(ids) = indexes
720                    .relationship_properties
721                    .scoped_ids_for(rel_type, key, value)
722                else {
723                    return Vec::new();
724                };
725                ids.iter()
726                    .filter_map(|&id| self.rel_at(id).cloned())
727                    .collect()
728            }
729            None => indexes
730                .relationship_properties
731                .ids_for(key, value)
732                .into_iter()
733                .flat_map(|ids| ids.iter())
734                .filter_map(|&id| self.rel_at(id).cloned())
735                .collect(),
736        }
737    }
738
739    fn find_relationship_ids_by_property(
740        &self,
741        rel_type: Option<&str>,
742        key: &str,
743        value: &PropertyValue,
744    ) -> Vec<RelationshipId>
745    where
746        Self: Sized,
747    {
748        if PropertyIndexKey::from_value(value).is_none() {
749            return self.scan_relationship_ids_by_property(rel_type, key, value);
750        }
751
752        self.ensure_relationship_property_index(key);
753        let indexes = self.indexes_read();
754
755        match rel_type {
756            Some(rel_type) => indexes
757                .relationship_properties
758                .scoped_ids_for(rel_type, key, value)
759                .map(|ids| ids.to_vec())
760                .unwrap_or_default(),
761            None => indexes
762                .relationship_properties
763                .ids_for(key, value)
764                .map(|ids| ids.to_vec())
765                .unwrap_or_default(),
766        }
767    }
768
769    fn node_exists_with_label_and_property(
770        &self,
771        label: &str,
772        key: &str,
773        value: &PropertyValue,
774    ) -> bool
775    where
776        Self: Sized,
777    {
778        if PropertyIndexKey::from_value(value).is_none() {
779            return self.any_node_by_property(label, key, value);
780        }
781
782        self.ensure_node_property_index(key);
783        let indexes = self.indexes_read();
784        indexes
785            .node_properties
786            .scoped_ids_for(label, key, value)
787            .map(|ids| !ids.is_empty())
788            .unwrap_or(false)
789    }
790
791    fn relationship_exists_with_type_and_property(
792        &self,
793        rel_type: &str,
794        key: &str,
795        value: &PropertyValue,
796    ) -> bool
797    where
798        Self: Sized,
799    {
800        if PropertyIndexKey::from_value(value).is_none() {
801            return self.any_relationship_by_property(rel_type, key, value);
802        }
803
804        self.ensure_relationship_property_index(key);
805        let indexes = self.indexes_read();
806        indexes
807            .relationship_properties
808            .scoped_ids_for(rel_type, key, value)
809            .map(|ids| !ids.is_empty())
810            .unwrap_or(false)
811    }
812}
813
814impl BorrowedGraphStorage for InMemoryGraph {
815    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
816        self.node_at(id)
817    }
818
819    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
820        self.rel_at(id)
821    }
822
823    fn node_refs(&self) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
824        Box::new(self.iter_node_records())
825    }
826
827    fn node_refs_by_label(&self, label: &str) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
828        Box::new(
829            self.nodes_by_label
830                .get(label)
831                .into_iter()
832                .flat_map(|ids| ids.iter())
833                .filter_map(|&id| self.node_at(id)),
834        )
835    }
836
837    fn relationship_refs(&self) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
838        Box::new(self.iter_rel_records())
839    }
840
841    fn relationship_refs_by_type(
842        &self,
843        rel_type: &str,
844    ) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
845        Box::new(
846            self.relationships_by_type
847                .get(rel_type)
848                .into_iter()
849                .flat_map(|ids| ids.iter())
850                .filter_map(|&id| self.rel_at(id)),
851        )
852    }
853}
854
855impl GraphStorageMut for InMemoryGraph {
856    fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
857        let (id, idx) = self.reserve_next_node_slot();
858        let labels = Self::normalize_labels(labels);
859
860        let node = NodeRecord {
861            id,
862            labels: labels.clone(),
863            properties,
864        };
865
866        self.put_node_at_slot(idx, node.clone());
867
868        self.on_node_created(&node);
869
870        // ensure_node_slot grew both adjacency Vecs to cover this id when
871        // we put_node above.
872
873        self.emit(|| MutationEvent::CreateNode {
874            id,
875            labels: node.labels.clone(),
876            properties: node.properties.clone(),
877        });
878
879        node
880    }
881
882    fn create_relationship(
883        &mut self,
884        src: NodeId,
885        dst: NodeId,
886        rel_type: &str,
887        properties: Properties,
888    ) -> Option<RelationshipRecord> {
889        if self.node_at(src).is_none() || self.node_at(dst).is_none() {
890            return None;
891        }
892
893        let trimmed = rel_type.trim();
894        if trimmed.is_empty() {
895            return None;
896        }
897
898        let (id, idx) = self.try_reserve_next_rel_slot()?;
899        let rel = RelationshipRecord {
900            id,
901            src,
902            dst,
903            rel_type: trimmed.to_string(),
904            properties,
905        };
906
907        self.put_rel_at_slot(idx, rel.clone());
908        self.on_relationship_created(&rel);
909
910        self.emit(|| MutationEvent::CreateRelationship {
911            id,
912            src,
913            dst,
914            rel_type: rel.rel_type.clone(),
915            properties: rel.properties.clone(),
916        });
917
918        Some(rel)
919    }
920
921    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
922        if self.node_at(node_id).is_none() {
923            return false;
924        }
925
926        let old = match self.node_at_mut(node_id) {
927            Some(node) => node.properties.insert(key.clone(), value.clone()),
928            None => return false,
929        };
930        self.on_node_property_set(node_id, &key, old.as_ref(), &value);
931
932        self.emit(|| MutationEvent::SetNodeProperty {
933            node_id,
934            key: key.clone(),
935            value: value.clone(),
936        });
937
938        true
939    }
940
941    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
942        let removed = match self.node_at_mut(node_id) {
943            Some(node) => node.properties.remove(key),
944            None => return false,
945        };
946        let Some(removed) = removed else {
947            return false;
948        };
949
950        self.on_node_property_removed(node_id, key, &removed);
951
952        self.emit(|| MutationEvent::RemoveNodeProperty {
953            node_id,
954            key: key.to_string(),
955        });
956
957        true
958    }
959
960    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
961        let label = label.trim();
962        if label.is_empty() {
963            return false;
964        }
965
966        let applied = match self.node_at_mut(node_id) {
967            Some(node) => {
968                if node.labels.iter().any(|l| l == label) {
969                    return false;
970                }
971
972                node.labels.push(label.to_string());
973                true
974            }
975            None => false,
976        };
977        if applied {
978            self.on_node_label_added(node_id, label);
979            self.emit(|| MutationEvent::AddNodeLabel {
980                node_id,
981                label: label.to_string(),
982            });
983        }
984        applied
985    }
986
987    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
988        let applied = match self.node_at_mut(node_id) {
989            Some(node) => {
990                let original_len = node.labels.len();
991                node.labels.retain(|l| l != label);
992                node.labels.len() != original_len
993            }
994            None => false,
995        };
996        if applied {
997            self.on_node_label_removed(node_id, label);
998            self.emit(|| MutationEvent::RemoveNodeLabel {
999                node_id,
1000                label: label.to_string(),
1001            });
1002        }
1003        applied
1004    }
1005
1006    fn set_relationship_property(
1007        &mut self,
1008        rel_id: RelationshipId,
1009        key: String,
1010        value: PropertyValue,
1011    ) -> bool {
1012        if self.rel_at(rel_id).is_none() {
1013            return false;
1014        }
1015
1016        let old = match self.rel_at_mut(rel_id) {
1017            Some(rel) => rel.properties.insert(key.clone(), value.clone()),
1018            None => return false,
1019        };
1020        self.on_relationship_property_set(rel_id, &key, old.as_ref(), &value);
1021
1022        self.emit(|| MutationEvent::SetRelationshipProperty {
1023            rel_id,
1024            key: key.clone(),
1025            value: value.clone(),
1026        });
1027
1028        true
1029    }
1030
1031    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
1032        let removed = match self.rel_at_mut(rel_id) {
1033            Some(rel) => rel.properties.remove(key),
1034            None => return false,
1035        };
1036        let Some(removed) = removed else {
1037            return false;
1038        };
1039
1040        self.on_relationship_property_removed(rel_id, key, &removed);
1041
1042        self.emit(|| MutationEvent::RemoveRelationshipProperty {
1043            rel_id,
1044            key: key.to_string(),
1045        });
1046
1047        true
1048    }
1049
1050    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
1051        let applied = match self.take_rel(rel_id) {
1052            Some(rel) => {
1053                self.on_relationship_deleted(&rel);
1054                true
1055            }
1056            None => false,
1057        };
1058        if applied {
1059            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
1060        }
1061        applied
1062    }
1063
1064    fn delete_node(&mut self, node_id: NodeId) -> bool {
1065        if self.node_at(node_id).is_none() {
1066            return false;
1067        }
1068
1069        if self.has_incident_relationships(node_id) {
1070            return false;
1071        }
1072
1073        let node = match self.take_node(node_id) {
1074            Some(node) => node,
1075            None => return false,
1076        };
1077
1078        self.on_node_deleted(&node);
1079
1080        // take_node already cleared the per-node adjacency Vecs.
1081
1082        self.emit(|| MutationEvent::DeleteNode { node_id });
1083
1084        true
1085    }
1086
1087    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
1088        if self.node_at(node_id).is_none() {
1089            return false;
1090        }
1091
1092        let rel_ids: Vec<_> = self
1093            .incident_relationship_ids(node_id)
1094            .into_iter()
1095            .collect();
1096
1097        // We deliberately fire per-relationship DeleteRelationship events
1098        // here (via `delete_relationship`) and a DetachDeleteNode event at
1099        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
1100        // preceding DeleteRelationship events — or, equivalently, replay
1101        // them and the DetachDeleteNode becomes a no-op on the remaining
1102        // (now-empty) node. The emit-before-delete choice costs one extra
1103        // event per mutation but keeps the replay contract simple:
1104        // "apply every event in order".
1105        for rel_id in rel_ids {
1106            let _ = self.delete_relationship(rel_id);
1107        }
1108
1109        if self.delete_node(node_id) {
1110            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
1111            true
1112        } else {
1113            false
1114        }
1115    }
1116
1117    fn clear(&mut self) {
1118        // Keep the recorder across clear so observers can see the Clear
1119        // event plus whatever follows. Matches WAL semantics where the log
1120        // is the source of truth across a truncation.
1121        let recorder = self.recorder.take();
1122        *self = Self::default();
1123        self.recorder = recorder;
1124        self.emit(|| MutationEvent::Clear);
1125    }
1126
1127    fn create_index(
1128        &mut self,
1129        request: IndexRequest,
1130        if_not_exists: bool,
1131    ) -> Result<CreateIndexOutcome, CreateIndexError> {
1132        // `register_index` only needs `&self` (catalog + property-index
1133        // registries are behind RwLock for interior mutability), but we
1134        // keep the trait signature `&mut self` so downstream callers can
1135        // hold a uniquely-owned mutation lock. That mirrors the pattern
1136        // used for property writes elsewhere in this impl.
1137        self.register_index(request, if_not_exists)
1138    }
1139
1140    fn drop_index(
1141        &mut self,
1142        name: &str,
1143        if_exists: bool,
1144    ) -> Result<DropIndexOutcome, DropIndexError> {
1145        self.drop_named_index(name, if_exists)
1146    }
1147
1148    fn create_constraint(
1149        &mut self,
1150        request: ConstraintRequest,
1151        if_not_exists: bool,
1152    ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
1153        self.register_constraint(request, if_not_exists)
1154    }
1155
1156    fn drop_constraint(
1157        &mut self,
1158        name: &str,
1159        if_exists: bool,
1160    ) -> Result<DropConstraintOutcome, DropConstraintError> {
1161        self.drop_named_constraint(name, if_exists)
1162    }
1163}