Skip to main content

meshdb_storage/
rocksdb_engine.rs

1//! RocksDB-backed implementation of [`StorageEngine`].
2//!
3//! Holds the on-disk state for a single local peer: nodes, edges,
4//! adjacency lists, label/type/property indexes, and the index metadata
5//! needed to rehydrate at `open` time. Column families carve the backend
6//! into purpose-keyed namespaces so each access pattern walks a tight
7//! prefix.
8//!
9//! External callers should hold this as `Arc<dyn StorageEngine>` and
10//! route through the trait — the concrete type is only visible at the
11//! construction site (`RocksDbStorageEngine::open`) and at the Raft
12//! snapshot restore path, which is backend-bound by design.
13
14use crate::{
15    engine::{
16        ConstraintScope, EdgePointIndexSpec, EdgePropertyIndexSpec, GraphMutation, PointIndexSpec,
17        PropertyConstraintKind, PropertyConstraintSpec, PropertyIndexSpec, PropertyType,
18        StorageEngine,
19    },
20    error::{Error, Result},
21    keys::{
22        adj_key, cell_from_point_index_key, constraint_meta_decode, constraint_meta_encode,
23        decode_point_index_value, edge_from_adj_key, edge_id_from_property_index_key,
24        edge_property_index_composite_key, edge_property_index_composite_value_prefix,
25        encode_index_tuple, encode_index_value, id_from_str_index_key, label_index_key,
26        label_index_prefix, node_from_adj_value, node_id_from_point_index_key,
27        node_id_from_property_index_key, parse_property_index_entry_props, point_cell,
28        point_cell_range, point_index_key, point_index_label_prop_prefix, point_index_srid_prefix,
29        point_index_value, property_index_composite_key, property_index_composite_value_prefix,
30        property_index_label_prefix, type_index_key, type_index_prefix, ID_LEN,
31    },
32};
33use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
34use rocksdb::{
35    checkpoint::Checkpoint, ColumnFamilyDescriptor, Direction, IteratorMode, Options, WriteBatch,
36    DB,
37};
38use std::path::Path;
39use std::sync::RwLock;
40
41const CF_NODES: &str = "nodes";
42const CF_EDGES: &str = "edges";
43const CF_ADJ_OUT: &str = "adj_out";
44const CF_ADJ_IN: &str = "adj_in";
45const CF_LABEL_INDEX: &str = "label_index";
46const CF_TYPE_INDEX: &str = "type_index";
47const CF_PROPERTY_INDEX: &str = "property_index";
48const CF_INDEX_META: &str = "index_meta";
49const CF_EDGE_PROPERTY_INDEX: &str = "edge_property_index";
50const CF_EDGE_INDEX_META: &str = "edge_index_meta";
51const CF_CONSTRAINT_META: &str = "constraint_meta";
52const CF_POINT_INDEX: &str = "point_index";
53const CF_POINT_INDEX_META: &str = "point_index_meta";
54const CF_EDGE_POINT_INDEX: &str = "edge_point_index";
55const CF_EDGE_POINT_INDEX_META: &str = "edge_point_index_meta";
56/// Persisted `apoc.trigger.*` registry. Keyed by trigger name
57/// (UTF-8 bytes), valued by the JSON-encoded trigger spec
58/// (cypher / selector / config / installed_at). Local-only in
59/// this release — the trigger registry doesn't replicate
60/// across cluster peers, so each node holds its own set.
61const CF_TRIGGER_META: &str = "trigger_meta";
62
63const ALL_CFS: &[&str] = &[
64    CF_NODES,
65    CF_EDGES,
66    CF_ADJ_OUT,
67    CF_ADJ_IN,
68    CF_LABEL_INDEX,
69    CF_TYPE_INDEX,
70    CF_PROPERTY_INDEX,
71    CF_INDEX_META,
72    CF_EDGE_PROPERTY_INDEX,
73    CF_EDGE_INDEX_META,
74    CF_CONSTRAINT_META,
75    CF_POINT_INDEX,
76    CF_POINT_INDEX_META,
77    CF_EDGE_POINT_INDEX,
78    CF_EDGE_POINT_INDEX_META,
79    CF_TRIGGER_META,
80];
81
82const EMPTY: &[u8] = &[];
83
84/// Encode a [`PropertyIndexSpec`] as a stable `CF_INDEX_META` key:
85/// `<label>\0<prop1>\0<prop2>\0...\0<propN>`. No component can contain
86/// a NUL (identifier-only grammar), so splitting on NUL decodes
87/// unambiguously. Length-1 specs encode byte-identically to the
88/// pre-composite-refactor format (`<label>\0<prop>`), so existing
89/// on-disk data survives the upgrade.
90fn index_meta_key(spec: &PropertyIndexSpec) -> Vec<u8> {
91    let cap = spec.label.len()
92        + spec.properties.iter().map(|p| p.len()).sum::<usize>()
93        + spec.properties.len();
94    let mut k = Vec::with_capacity(cap);
95    k.extend_from_slice(spec.label.as_bytes());
96    for p in &spec.properties {
97        k.push(0);
98        k.extend_from_slice(p.as_bytes());
99    }
100    k
101}
102
103fn index_meta_key_decode(key: &[u8]) -> Result<PropertyIndexSpec> {
104    let mut parts = key.split(|b| *b == 0);
105    let label_bytes = parts.next().ok_or(Error::CorruptBytes {
106        cf: CF_INDEX_META,
107        expected: 1,
108        actual: 0,
109    })?;
110    let label = std::str::from_utf8(label_bytes)
111        .map_err(|_| Error::CorruptBytes {
112            cf: CF_INDEX_META,
113            expected: label_bytes.len(),
114            actual: label_bytes.len(),
115        })?
116        .to_string();
117    let mut properties: Vec<String> = Vec::new();
118    for part in parts {
119        let s = std::str::from_utf8(part)
120            .map_err(|_| Error::CorruptBytes {
121                cf: CF_INDEX_META,
122                expected: part.len(),
123                actual: part.len(),
124            })?
125            .to_string();
126        properties.push(s);
127    }
128    if properties.is_empty() {
129        return Err(Error::CorruptBytes {
130            cf: CF_INDEX_META,
131            expected: 1,
132            actual: 0,
133        });
134    }
135    Ok(PropertyIndexSpec { label, properties })
136}
137
138/// Encode an [`EdgePropertyIndexSpec`] as a stable `CF_EDGE_INDEX_META`
139/// key: `<edge_type>\0<prop1>\0<prop2>\0...\0<propN>`. Same NUL-split
140/// shape as the node form, same backward-compat guarantee for
141/// length-1 specs.
142fn edge_index_meta_key(spec: &EdgePropertyIndexSpec) -> Vec<u8> {
143    let cap = spec.edge_type.len()
144        + spec.properties.iter().map(|p| p.len()).sum::<usize>()
145        + spec.properties.len();
146    let mut k = Vec::with_capacity(cap);
147    k.extend_from_slice(spec.edge_type.as_bytes());
148    for p in &spec.properties {
149        k.push(0);
150        k.extend_from_slice(p.as_bytes());
151    }
152    k
153}
154
155fn edge_index_meta_key_decode(key: &[u8]) -> Result<EdgePropertyIndexSpec> {
156    let mut parts = key.split(|b| *b == 0);
157    let type_bytes = parts.next().ok_or(Error::CorruptBytes {
158        cf: CF_EDGE_INDEX_META,
159        expected: 1,
160        actual: 0,
161    })?;
162    let edge_type = std::str::from_utf8(type_bytes)
163        .map_err(|_| Error::CorruptBytes {
164            cf: CF_EDGE_INDEX_META,
165            expected: type_bytes.len(),
166            actual: type_bytes.len(),
167        })?
168        .to_string();
169    let mut properties: Vec<String> = Vec::new();
170    for part in parts {
171        let s = std::str::from_utf8(part)
172            .map_err(|_| Error::CorruptBytes {
173                cf: CF_EDGE_INDEX_META,
174                expected: part.len(),
175                actual: part.len(),
176            })?
177            .to_string();
178        properties.push(s);
179    }
180    if properties.is_empty() {
181        return Err(Error::CorruptBytes {
182            cf: CF_EDGE_INDEX_META,
183            expected: 1,
184            actual: 0,
185        });
186    }
187    Ok(EdgePropertyIndexSpec {
188        edge_type,
189        properties,
190    })
191}
192
193/// Encode a [`PointIndexSpec`] as a stable `CF_POINT_INDEX_META`
194/// key: `<label>\0<property>`. Single-property on purpose — the
195/// spec only carries one property. NUL-split, same shape as the
196/// non-spatial index meta format so tooling that inspects CFs by
197/// hand stays predictable.
198fn point_index_meta_key(spec: &PointIndexSpec) -> Vec<u8> {
199    let mut k = Vec::with_capacity(spec.label.len() + spec.property.len() + 1);
200    k.extend_from_slice(spec.label.as_bytes());
201    k.push(0);
202    k.extend_from_slice(spec.property.as_bytes());
203    k
204}
205
206fn point_index_meta_key_decode(key: &[u8]) -> Result<PointIndexSpec> {
207    let mut parts = key.splitn(2, |b| *b == 0);
208    let label_bytes = parts.next().ok_or(Error::CorruptBytes {
209        cf: CF_POINT_INDEX_META,
210        expected: 1,
211        actual: 0,
212    })?;
213    let property_bytes = parts.next().ok_or(Error::CorruptBytes {
214        cf: CF_POINT_INDEX_META,
215        expected: label_bytes.len() + 2,
216        actual: key.len(),
217    })?;
218    let label = std::str::from_utf8(label_bytes)
219        .map_err(|_| Error::CorruptBytes {
220            cf: CF_POINT_INDEX_META,
221            expected: label_bytes.len(),
222            actual: label_bytes.len(),
223        })?
224        .to_string();
225    let property = std::str::from_utf8(property_bytes)
226        .map_err(|_| Error::CorruptBytes {
227            cf: CF_POINT_INDEX_META,
228            expected: property_bytes.len(),
229            actual: property_bytes.len(),
230        })?
231        .to_string();
232    if property.is_empty() {
233        return Err(Error::CorruptBytes {
234            cf: CF_POINT_INDEX_META,
235            expected: 1,
236            actual: 0,
237        });
238    }
239    Ok(PointIndexSpec { label, property })
240}
241
242/// Relationship-scope analogue of [`point_index_meta_key`]. Same
243/// `<edge_type>\0<property>` shape.
244fn edge_point_index_meta_key(spec: &EdgePointIndexSpec) -> Vec<u8> {
245    let mut k = Vec::with_capacity(spec.edge_type.len() + spec.property.len() + 1);
246    k.extend_from_slice(spec.edge_type.as_bytes());
247    k.push(0);
248    k.extend_from_slice(spec.property.as_bytes());
249    k
250}
251
252fn edge_point_index_meta_key_decode(key: &[u8]) -> Result<EdgePointIndexSpec> {
253    let mut parts = key.splitn(2, |b| *b == 0);
254    let type_bytes = parts.next().ok_or(Error::CorruptBytes {
255        cf: CF_EDGE_POINT_INDEX_META,
256        expected: 1,
257        actual: 0,
258    })?;
259    let property_bytes = parts.next().ok_or(Error::CorruptBytes {
260        cf: CF_EDGE_POINT_INDEX_META,
261        expected: type_bytes.len() + 2,
262        actual: key.len(),
263    })?;
264    let edge_type = std::str::from_utf8(type_bytes)
265        .map_err(|_| Error::CorruptBytes {
266            cf: CF_EDGE_POINT_INDEX_META,
267            expected: type_bytes.len(),
268            actual: type_bytes.len(),
269        })?
270        .to_string();
271    let property = std::str::from_utf8(property_bytes)
272        .map_err(|_| Error::CorruptBytes {
273            cf: CF_EDGE_POINT_INDEX_META,
274            expected: property_bytes.len(),
275            actual: property_bytes.len(),
276        })?
277        .to_string();
278    if property.is_empty() {
279        return Err(Error::CorruptBytes {
280            cf: CF_EDGE_POINT_INDEX_META,
281            expected: 1,
282            actual: 0,
283        });
284    }
285    Ok(EdgePointIndexSpec {
286        edge_type,
287        property,
288    })
289}
290
291pub struct RocksDbStorageEngine {
292    db: DB,
293    /// In-memory view of the registered property indexes, populated
294    /// from [`CF_INDEX_META`] at open time and kept in sync by
295    /// [`RocksDbStorageEngine::create_property_index`] and
296    /// [`RocksDbStorageEngine::drop_property_index`].
297    ///
298    /// The write-path consults this on every `append_put_node` /
299    /// `append_detach_delete_node` call to decide which index entries
300    /// to emit, so it absolutely must not hit the DB on the hot path.
301    /// `RwLock` rather than `Mutex` because the common access pattern
302    /// is many concurrent reads with only DDL-time writes.
303    indexes: RwLock<Vec<PropertyIndexSpec>>,
304    /// Relationship-scope analogue of `indexes`. Populated from
305    /// [`CF_EDGE_INDEX_META`] at open time and kept in sync by
306    /// [`RocksDbStorageEngine::create_edge_property_index`] /
307    /// [`RocksDbStorageEngine::drop_edge_property_index`]. Consulted on
308    /// every edge write path (`append_put_edge`, `append_delete_edge`,
309    /// `append_detach_delete_node`) and by UNIQUE edge-constraint
310    /// enforcement, so the same "must not hit the DB on the hot path"
311    /// contract applies.
312    edge_indexes: RwLock<Vec<EdgePropertyIndexSpec>>,
313    /// In-memory view of the registered property constraints,
314    /// populated from [`CF_CONSTRAINT_META`] at open time and kept in
315    /// sync by [`RocksDbStorageEngine::create_property_constraint`] and
316    /// [`RocksDbStorageEngine::drop_property_constraint`].
317    ///
318    /// Every `put_node` consults this to validate UNIQUE / NOT NULL
319    /// invariants, so — as with `indexes` — it must not hit the DB on
320    /// the hot path. Enforcement reads the catalog under a read lock
321    /// and runs `nodes_by_property` against the backing index only
322    /// when a UNIQUE check actually needs it.
323    constraints: RwLock<Vec<PropertyConstraintSpec>>,
324    /// In-memory view of the registered point indexes. Loaded from
325    /// [`CF_POINT_INDEX_META`] at open and kept in sync by
326    /// [`RocksDbStorageEngine::create_point_index`] /
327    /// [`RocksDbStorageEngine::drop_point_index`]. `append_put_node`
328    /// and `append_detach_delete_node` walk this list to maintain
329    /// entries, so — as with the other in-memory index catalogs — it
330    /// must not hit the DB on the hot path.
331    point_indexes: RwLock<Vec<PointIndexSpec>>,
332    /// Relationship-scope analogue of `point_indexes`. Consulted by
333    /// every edge write (`append_put_edge`, `append_delete_edge`,
334    /// `append_detach_delete_node`) to maintain
335    /// [`CF_EDGE_POINT_INDEX`] entries.
336    edge_point_indexes: RwLock<Vec<EdgePointIndexSpec>>,
337}
338
339impl RocksDbStorageEngine {
340    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
341        let mut db_opts = Options::default();
342        db_opts.create_if_missing(true);
343        db_opts.create_missing_column_families(true);
344        // RocksDB's default `max_open_files = -1` keeps every SST
345        // file cached in the table cache forever. With 15 column
346        // families per database and the cluster integration tests
347        // spawning multiple two-peer clusters in parallel, that
348        // unbounded cache blows past typical FD soft limits
349        // (`ulimit -n` of 1024 on stock Linux). 64 is plenty for
350        // the SST hot set; cold files get reopened on access.
351        // Production read-heavy workloads can override via tuning
352        // config later.
353        db_opts.set_max_open_files(64);
354        // Cap retained INFO log files at a small number — the
355        // default of 1000 produces a long tail of LOG.old.* files
356        // that, while not all open as FDs, do compound disk
357        // pressure when many test databases run in parallel.
358        db_opts.set_keep_log_file_num(4);
359
360        let cfs: Vec<ColumnFamilyDescriptor> = ALL_CFS
361            .iter()
362            .map(|name| ColumnFamilyDescriptor::new(*name, Options::default()))
363            .collect();
364
365        let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
366        let indexes = load_index_meta(&db)?;
367        let edge_indexes = load_edge_index_meta(&db)?;
368        let constraints = load_constraint_meta(&db)?;
369        let point_indexes = load_point_index_meta(&db)?;
370        let edge_point_indexes = load_edge_point_index_meta(&db)?;
371        Ok(Self {
372            db,
373            indexes: RwLock::new(indexes),
374            edge_indexes: RwLock::new(edge_indexes),
375            constraints: RwLock::new(constraints),
376            point_indexes: RwLock::new(point_indexes),
377            edge_point_indexes: RwLock::new(edge_point_indexes),
378        })
379    }
380
381    fn cf(&self, name: &'static str) -> Result<&rocksdb::ColumnFamily> {
382        self.db
383            .cf_handle(name)
384            .ok_or(Error::MissingColumnFamily(name))
385    }
386
387    pub fn put_node(&self, node: &Node) -> Result<()> {
388        let mut batch = WriteBatch::default();
389        self.append_put_node(&mut batch, node)?;
390        self.db.write(batch)?;
391        Ok(())
392    }
393
394    fn append_put_node(&self, batch: &mut WriteBatch, node: &Node) -> Result<()> {
395        let nodes_cf = self.cf(CF_NODES)?;
396        let label_cf = self.cf(CF_LABEL_INDEX)?;
397        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
398
399        let existing: Option<Node> = match self.db.get_cf(nodes_cf, node.id.as_bytes())? {
400            Some(bytes) => Some(serde_json::from_slice(&bytes)?),
401            None => None,
402        };
403        let existing_labels: &[String] = existing.as_ref().map(|n| &n.labels[..]).unwrap_or(&[]);
404
405        // Constraint enforcement: walk every registered constraint and
406        // confirm the post-write state satisfies it. UNIQUE queries
407        // the backing property index (guaranteed to exist — see
408        // `create_property_constraint`) for rival holders of the same
409        // value. NOT NULL checks the incoming property map directly.
410        // Runs before we touch the batch, so a violation short-circuits
411        // without leaving partial writes behind.
412        self.enforce_constraints(node, existing.as_ref())?;
413
414        let bytes = serde_json::to_vec(node)?;
415        batch.put_cf(nodes_cf, node.id.as_bytes(), bytes);
416
417        for old in existing_labels {
418            if !node.labels.contains(old) {
419                batch.delete_cf(label_cf, label_index_key(old, node.id));
420            }
421        }
422        for new in &node.labels {
423            if !existing_labels.contains(new) {
424                batch.put_cf(label_cf, label_index_key(new, node.id), EMPTY);
425            }
426        }
427
428        // Property-index maintenance: for each registered (label, prop)
429        // index, compute the delta between the previous entry (if any)
430        // and the new one, and emit the minimal set of put/delete
431        // operations. Skipping an update when the entry is unchanged
432        // keeps steady-state writes free of index churn.
433        let indexes = self.indexes.read().expect("indexes lock poisoned");
434        for spec in indexes.iter() {
435            let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
436            let now_indexed = node.labels.iter().any(|l| l == &spec.label);
437            let old_encoded = if was_indexed {
438                existing
439                    .as_ref()
440                    .and_then(|n| encode_index_tuple(&n.properties, &spec.properties))
441            } else {
442                None
443            };
444            let new_encoded = if now_indexed {
445                encode_index_tuple(&node.properties, &spec.properties)
446            } else {
447                None
448            };
449            if old_encoded == new_encoded {
450                continue;
451            }
452            if let Some(values) = &old_encoded {
453                batch.delete_cf(
454                    prop_cf,
455                    property_index_composite_key(&spec.label, &spec.properties, values, node.id),
456                );
457            }
458            if let Some(values) = &new_encoded {
459                batch.put_cf(
460                    prop_cf,
461                    property_index_composite_key(&spec.label, &spec.properties, values, node.id),
462                    EMPTY,
463                );
464            }
465        }
466
467        // Point-index maintenance. Same delta shape as the
468        // property-index loop: compare old vs new indexed point and
469        // emit the minimum entry set. Cheap-bail when neither the
470        // label membership nor the point value has changed.
471        let point_indexes = self
472            .point_indexes
473            .read()
474            .expect("point_indexes lock poisoned");
475        if !point_indexes.is_empty() {
476            let point_cf = self.cf(CF_POINT_INDEX)?;
477            for spec in point_indexes.iter() {
478                let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
479                let now_indexed = node.labels.iter().any(|l| l == &spec.label);
480                let old_point = if was_indexed {
481                    existing
482                        .as_ref()
483                        .and_then(|n| extract_indexable_point(&n.properties, &spec.property))
484                } else {
485                    None
486                };
487                let new_point = if now_indexed {
488                    extract_indexable_point(&node.properties, &spec.property)
489                } else {
490                    None
491                };
492                if old_point == new_point {
493                    continue;
494                }
495                if let Some(p) = &old_point {
496                    let cell = point_cell(p.srid, p.x, p.y);
497                    batch.delete_cf(
498                        point_cf,
499                        point_index_key(
500                            &spec.label,
501                            &spec.property,
502                            p.srid,
503                            cell,
504                            node.id.as_bytes(),
505                        ),
506                    );
507                }
508                if let Some(p) = &new_point {
509                    let cell = point_cell(p.srid, p.x, p.y);
510                    batch.put_cf(
511                        point_cf,
512                        point_index_key(
513                            &spec.label,
514                            &spec.property,
515                            p.srid,
516                            cell,
517                            node.id.as_bytes(),
518                        ),
519                        point_index_value(p.x, p.y, p.z),
520                    );
521                }
522            }
523        }
524
525        Ok(())
526    }
527
528    pub fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
529        let cf = self.cf(CF_NODES)?;
530        match self.db.get_cf(cf, id.as_bytes())? {
531            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
532            None => Ok(None),
533        }
534    }
535
536    pub fn put_edge(&self, edge: &Edge) -> Result<()> {
537        let mut batch = WriteBatch::default();
538        self.append_put_edge(&mut batch, edge)?;
539        self.db.write(batch)?;
540        Ok(())
541    }
542
543    fn append_put_edge(&self, batch: &mut WriteBatch, edge: &Edge) -> Result<()> {
544        let edges_cf = self.cf(CF_EDGES)?;
545        let out_cf = self.cf(CF_ADJ_OUT)?;
546        let in_cf = self.cf(CF_ADJ_IN)?;
547        let type_cf = self.cf(CF_TYPE_INDEX)?;
548        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
549
550        // Relationship-scope constraint enforcement. Same short-
551        // circuit pattern as `append_put_node` — if a constraint
552        // would be violated, abort before touching the batch so the
553        // write never lands.
554        let existing_edge = self.get_edge(edge.id)?;
555        self.enforce_edge_constraints(edge, existing_edge.as_ref())?;
556
557        let bytes = serde_json::to_vec(edge)?;
558        batch.put_cf(edges_cf, edge.id.as_bytes(), bytes);
559        batch.put_cf(
560            out_cf,
561            adj_key(edge.source, edge.id),
562            edge.target.as_bytes(),
563        );
564        batch.put_cf(in_cf, adj_key(edge.target, edge.id), edge.source.as_bytes());
565        batch.put_cf(type_cf, type_index_key(&edge.edge_type, edge.id), EMPTY);
566
567        // Edge-property-index maintenance: for each registered
568        // (edge_type, prop) index, delta the previous entry against
569        // the new one and emit the minimal set of put/delete
570        // operations. Mirrors the node path in `append_put_node`.
571        // An existing edge whose type doesn't match the registered
572        // index contributes no work — edges never change their type,
573        // so a type-mismatched entry can't exist to clean up.
574        let edge_indexes = self
575            .edge_indexes
576            .read()
577            .expect("edge_indexes lock poisoned");
578        for spec in edge_indexes.iter() {
579            if spec.edge_type != edge.edge_type {
580                continue;
581            }
582            let old_encoded = existing_edge
583                .as_ref()
584                .and_then(|e| encode_index_tuple(&e.properties, &spec.properties));
585            let new_encoded = encode_index_tuple(&edge.properties, &spec.properties);
586            if old_encoded == new_encoded {
587                continue;
588            }
589            if let Some(values) = &old_encoded {
590                batch.delete_cf(
591                    edge_prop_cf,
592                    edge_property_index_composite_key(
593                        &spec.edge_type,
594                        &spec.properties,
595                        values,
596                        edge.id,
597                    ),
598                );
599            }
600            if let Some(values) = &new_encoded {
601                batch.put_cf(
602                    edge_prop_cf,
603                    edge_property_index_composite_key(
604                        &spec.edge_type,
605                        &spec.properties,
606                        values,
607                        edge.id,
608                    ),
609                    EMPTY,
610                );
611            }
612        }
613
614        // Edge-point-index maintenance — relationship-scope mirror
615        // of the node-point-index loop in `append_put_node`. Same
616        // delta shape: diff the indexed point across the previous
617        // and new edge state and emit the minimum put/delete set.
618        let edge_point_indexes = self
619            .edge_point_indexes
620            .read()
621            .expect("edge_point_indexes lock poisoned");
622        if !edge_point_indexes.is_empty() {
623            let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
624            for spec in edge_point_indexes.iter() {
625                if spec.edge_type != edge.edge_type {
626                    continue;
627                }
628                let old_point = existing_edge
629                    .as_ref()
630                    .and_then(|e| extract_indexable_point(&e.properties, &spec.property));
631                let new_point = extract_indexable_point(&edge.properties, &spec.property);
632                if old_point == new_point {
633                    continue;
634                }
635                if let Some(p) = &old_point {
636                    let cell = point_cell(p.srid, p.x, p.y);
637                    batch.delete_cf(
638                        point_cf,
639                        point_index_key(
640                            &spec.edge_type,
641                            &spec.property,
642                            p.srid,
643                            cell,
644                            edge.id.as_bytes(),
645                        ),
646                    );
647                }
648                if let Some(p) = &new_point {
649                    let cell = point_cell(p.srid, p.x, p.y);
650                    batch.put_cf(
651                        point_cf,
652                        point_index_key(
653                            &spec.edge_type,
654                            &spec.property,
655                            p.srid,
656                            cell,
657                            edge.id.as_bytes(),
658                        ),
659                        point_index_value(p.x, p.y, p.z),
660                    );
661                }
662            }
663        }
664        Ok(())
665    }
666
667    pub fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
668        let cf = self.cf(CF_EDGES)?;
669        match self.db.get_cf(cf, id.as_bytes())? {
670            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
671            None => Ok(None),
672        }
673    }
674
675    pub fn delete_edge(&self, id: EdgeId) -> Result<()> {
676        let edge = self.get_edge(id)?.ok_or(Error::EdgeNotFound(id))?;
677        let mut batch = WriteBatch::default();
678        self.append_delete_edge(&mut batch, id, &edge)?;
679        self.db.write(batch)?;
680        Ok(())
681    }
682
683    fn append_delete_edge(&self, batch: &mut WriteBatch, id: EdgeId, edge: &Edge) -> Result<()> {
684        let edges_cf = self.cf(CF_EDGES)?;
685        let out_cf = self.cf(CF_ADJ_OUT)?;
686        let in_cf = self.cf(CF_ADJ_IN)?;
687        let type_cf = self.cf(CF_TYPE_INDEX)?;
688        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
689
690        batch.delete_cf(edges_cf, id.as_bytes());
691        batch.delete_cf(out_cf, adj_key(edge.source, id));
692        batch.delete_cf(in_cf, adj_key(edge.target, id));
693        batch.delete_cf(type_cf, type_index_key(&edge.edge_type, id));
694
695        // Remove any edge-property-index entries this edge owned.
696        // Mirrors the put path: for each registered index whose type
697        // matches, the encoded value (if any) pinned an index key we
698        // now need to delete.
699        let edge_indexes = self
700            .edge_indexes
701            .read()
702            .expect("edge_indexes lock poisoned");
703        for spec in edge_indexes.iter() {
704            if spec.edge_type != edge.edge_type {
705                continue;
706            }
707            if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
708                batch.delete_cf(
709                    edge_prop_cf,
710                    edge_property_index_composite_key(
711                        &spec.edge_type,
712                        &spec.properties,
713                        &values,
714                        id,
715                    ),
716                );
717            }
718        }
719
720        // Edge-point-index sweep for the deleted edge. Mirrors the
721        // node-point-index sweep in `append_detach_delete_node`.
722        let edge_point_indexes = self
723            .edge_point_indexes
724            .read()
725            .expect("edge_point_indexes lock poisoned");
726        if !edge_point_indexes.is_empty() {
727            let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
728            for spec in edge_point_indexes.iter() {
729                if spec.edge_type != edge.edge_type {
730                    continue;
731                }
732                if let Some(p) = extract_indexable_point(&edge.properties, &spec.property) {
733                    let cell = point_cell(p.srid, p.x, p.y);
734                    batch.delete_cf(
735                        point_cf,
736                        point_index_key(
737                            &spec.edge_type,
738                            &spec.property,
739                            p.srid,
740                            cell,
741                            id.as_bytes(),
742                        ),
743                    );
744                }
745            }
746        }
747        Ok(())
748    }
749
750    pub fn detach_delete_node(&self, id: NodeId) -> Result<()> {
751        let mut batch = WriteBatch::default();
752        self.append_detach_delete_node(&mut batch, id)?;
753        self.db.write(batch)?;
754        Ok(())
755    }
756
757    fn append_detach_delete_node(&self, batch: &mut WriteBatch, id: NodeId) -> Result<()> {
758        let node = self.get_node(id)?;
759        let outgoing = self.outgoing(id)?;
760        let incoming = self.incoming(id)?;
761
762        let nodes_cf = self.cf(CF_NODES)?;
763        let edges_cf = self.cf(CF_EDGES)?;
764        let out_cf = self.cf(CF_ADJ_OUT)?;
765        let in_cf = self.cf(CF_ADJ_IN)?;
766        let label_cf = self.cf(CF_LABEL_INDEX)?;
767        let type_cf = self.cf(CF_TYPE_INDEX)?;
768        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
769        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
770
771        if let Some(n) = &node {
772            for label in &n.labels {
773                batch.delete_cf(label_cf, label_index_key(label, id));
774            }
775            // Remove every property-index entry this node was holding
776            // open. Mirrors the put path: for each registered index
777            // whose label matches, if the node carries an indexable
778            // value for the property, delete the corresponding entry.
779            let indexes = self.indexes.read().expect("indexes lock poisoned");
780            for spec in indexes.iter() {
781                if !n.labels.iter().any(|l| l == &spec.label) {
782                    continue;
783                }
784                if let Some(values) = encode_index_tuple(&n.properties, &spec.properties) {
785                    batch.delete_cf(
786                        prop_cf,
787                        property_index_composite_key(&spec.label, &spec.properties, &values, id),
788                    );
789                }
790            }
791            // Point-index entries held by this node, mirror of the
792            // property-index sweep above.
793            let point_indexes = self
794                .point_indexes
795                .read()
796                .expect("point_indexes lock poisoned");
797            if !point_indexes.is_empty() {
798                let point_cf = self.cf(CF_POINT_INDEX)?;
799                for spec in point_indexes.iter() {
800                    if !n.labels.iter().any(|l| l == &spec.label) {
801                        continue;
802                    }
803                    if let Some(p) = extract_indexable_point(&n.properties, &spec.property) {
804                        let cell = point_cell(p.srid, p.x, p.y);
805                        batch.delete_cf(
806                            point_cf,
807                            point_index_key(
808                                &spec.label,
809                                &spec.property,
810                                p.srid,
811                                cell,
812                                id.as_bytes(),
813                            ),
814                        );
815                    }
816                }
817            }
818        }
819
820        // Edges detached by this delete also need their
821        // edge-property-index entries swept. Read the catalog once so
822        // the inner loop doesn't reacquire the lock per edge.
823        let edge_indexes_snapshot = self
824            .edge_indexes
825            .read()
826            .expect("edge_indexes lock poisoned")
827            .clone();
828        let edge_point_indexes_snapshot = self
829            .edge_point_indexes
830            .read()
831            .expect("edge_point_indexes lock poisoned")
832            .clone();
833        let edge_point_cf_opt = if edge_point_indexes_snapshot.is_empty() {
834            None
835        } else {
836            Some(self.cf(CF_EDGE_POINT_INDEX)?)
837        };
838
839        for (edge_id, target) in &outgoing {
840            if let Some(e) = self.get_edge(*edge_id)? {
841                batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
842                for spec in &edge_indexes_snapshot {
843                    if spec.edge_type != e.edge_type {
844                        continue;
845                    }
846                    if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
847                        batch.delete_cf(
848                            edge_prop_cf,
849                            edge_property_index_composite_key(
850                                &spec.edge_type,
851                                &spec.properties,
852                                &values,
853                                *edge_id,
854                            ),
855                        );
856                    }
857                }
858                if let Some(point_cf) = edge_point_cf_opt {
859                    for spec in &edge_point_indexes_snapshot {
860                        if spec.edge_type != e.edge_type {
861                            continue;
862                        }
863                        if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
864                            let cell = point_cell(p.srid, p.x, p.y);
865                            batch.delete_cf(
866                                point_cf,
867                                point_index_key(
868                                    &spec.edge_type,
869                                    &spec.property,
870                                    p.srid,
871                                    cell,
872                                    edge_id.as_bytes(),
873                                ),
874                            );
875                        }
876                    }
877                }
878            }
879            batch.delete_cf(edges_cf, edge_id.as_bytes());
880            batch.delete_cf(out_cf, adj_key(id, *edge_id));
881            batch.delete_cf(in_cf, adj_key(*target, *edge_id));
882        }
883        for (edge_id, source) in &incoming {
884            if let Some(e) = self.get_edge(*edge_id)? {
885                batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
886                for spec in &edge_indexes_snapshot {
887                    if spec.edge_type != e.edge_type {
888                        continue;
889                    }
890                    if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
891                        batch.delete_cf(
892                            edge_prop_cf,
893                            edge_property_index_composite_key(
894                                &spec.edge_type,
895                                &spec.properties,
896                                &values,
897                                *edge_id,
898                            ),
899                        );
900                    }
901                }
902                if let Some(point_cf) = edge_point_cf_opt {
903                    for spec in &edge_point_indexes_snapshot {
904                        if spec.edge_type != e.edge_type {
905                            continue;
906                        }
907                        if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
908                            let cell = point_cell(p.srid, p.x, p.y);
909                            batch.delete_cf(
910                                point_cf,
911                                point_index_key(
912                                    &spec.edge_type,
913                                    &spec.property,
914                                    p.srid,
915                                    cell,
916                                    edge_id.as_bytes(),
917                                ),
918                            );
919                        }
920                    }
921                }
922            }
923            batch.delete_cf(edges_cf, edge_id.as_bytes());
924            batch.delete_cf(out_cf, adj_key(*source, *edge_id));
925            batch.delete_cf(in_cf, adj_key(id, *edge_id));
926        }
927        batch.delete_cf(nodes_cf, id.as_bytes());
928        Ok(())
929    }
930
931    /// Apply a sequence of mutations as one atomic rocksdb write. Either
932    /// every mutation lands or none does — no replica can observe a
933    /// partial result, even across a process crash.
934    ///
935    /// Reads performed during batch construction (existing labels for
936    /// PutNode, edge metadata for DeleteEdge / DetachDeleteNode) hit the
937    /// live store, not the in-flight batch. Read-your-writes across
938    /// mutations in the same batch is *not* supported. The executor never
939    /// generates such patterns: it produces fresh ids per row and
940    /// dedupes mutated entities before flushing.
941    pub fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
942        let mut batch = WriteBatch::default();
943        for m in mutations {
944            match m {
945                GraphMutation::PutNode(n) => self.append_put_node(&mut batch, n)?,
946                GraphMutation::PutEdge(e) => self.append_put_edge(&mut batch, e)?,
947                GraphMutation::DeleteEdge(id) => {
948                    if let Some(edge) = self.get_edge(*id)? {
949                        self.append_delete_edge(&mut batch, *id, &edge)?;
950                    }
951                }
952                GraphMutation::DetachDeleteNode(id) => {
953                    self.append_detach_delete_node(&mut batch, *id)?;
954                }
955            }
956        }
957        self.db.write(batch)?;
958        Ok(())
959    }
960
961    /// Walk every node in the store. Used for snapshot construction; not
962    /// suitable as a query primitive since it materializes the full set.
963    pub fn all_nodes(&self) -> Result<Vec<Node>> {
964        let cf = self.cf(CF_NODES)?;
965        let mut nodes = Vec::new();
966        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
967            let (_, value) = item?;
968            nodes.push(serde_json::from_slice(&value)?);
969        }
970        Ok(nodes)
971    }
972
973    /// Walk every edge in the store. Used for snapshot construction.
974    pub fn all_edges(&self) -> Result<Vec<Edge>> {
975        let cf = self.cf(CF_EDGES)?;
976        let mut edges = Vec::new();
977        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
978            let (_, value) = item?;
979            edges.push(serde_json::from_slice(&value)?);
980        }
981        Ok(edges)
982    }
983
984    /// Produce a consistent point-in-time checkpoint of the store at
985    /// `path`. Wraps rocksdb's [`Checkpoint`] API — on the same
986    /// filesystem the checkpoint is effectively free (hard links over
987    /// the underlying SST files); cross-filesystem it falls back to a
988    /// full copy. `path` must NOT already exist; rocksdb creates it.
989    ///
990    /// Used by the Raft state machine's streaming snapshot path: a
991    /// snapshot is built by checkpointing into a temp dir, packing
992    /// the checkpoint's files into a length-prefixed archive, and
993    /// shipping the bytes. Skips the "Vec<Node> in memory" materialization
994    /// path that the previous JSON-over-everything snapshot used.
995    pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<()> {
996        let checkpoint = Checkpoint::new(&self.db)?;
997        checkpoint.create_checkpoint(path)?;
998        Ok(())
999    }
1000
1001    /// Drop every key from every column family. Used by snapshot install
1002    /// to wipe local state before applying the leader's snapshot. Cheap
1003    /// for empty / small stores; for large stores this is O(n).
1004    pub fn clear_all(&self) -> Result<()> {
1005        let mut batch = WriteBatch::default();
1006        for cf_name in ALL_CFS {
1007            let cf = self.cf(cf_name)?;
1008            for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1009                let (key, _) = item?;
1010                batch.delete_cf(cf, key);
1011            }
1012        }
1013        self.db.write(batch)?;
1014        Ok(())
1015    }
1016
1017    pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
1018        let cf = self.cf(CF_NODES)?;
1019        let mut results = Vec::new();
1020        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1021            let (key, _) = item?;
1022            if key.len() != ID_LEN {
1023                return Err(Error::CorruptBytes {
1024                    cf: CF_NODES,
1025                    expected: ID_LEN,
1026                    actual: key.len(),
1027                });
1028            }
1029            let mut bytes = [0u8; ID_LEN];
1030            bytes.copy_from_slice(&key);
1031            results.push(NodeId::from_bytes(bytes));
1032        }
1033        Ok(results)
1034    }
1035
1036    pub fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1037        self.scan_adj(CF_ADJ_OUT, source)
1038    }
1039
1040    pub fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1041        self.scan_adj(CF_ADJ_IN, target)
1042    }
1043
1044    /// Snapshot the currently-registered property indexes. Cheap clone
1045    /// — specs are tiny (a label + a property key).
1046    pub fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
1047        self.indexes.read().expect("indexes lock poisoned").clone()
1048    }
1049
1050    /// Declare a new `(label, property)` single-property equality
1051    /// index and backfill it by scanning every node that currently
1052    /// carries `label`. Idempotent: re-creating an already-registered
1053    /// index is a no-op, matching how Neo4j's `CREATE INDEX IF NOT
1054    /// EXISTS` behaves.
1055    ///
1056    /// Backfill is done in the same [`WriteBatch`] as the meta insert,
1057    /// so a crash mid-backfill leaves the store with either zero
1058    /// entries (batch not yet written) or a fully-populated index
1059    /// (batch committed). No partial-build state is ever visible.
1060    pub fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
1061        self.create_property_index_composite(label, &[property.to_string()])
1062    }
1063
1064    /// Composite form of [`create_property_index`]. Declares a
1065    /// `(label, properties)` tuple index and backfills it by scanning
1066    /// every node currently carrying `label`. A node that's missing
1067    /// any of the properties, or carries an unindexable value for
1068    /// any of them, contributes no entry — composite indexes are
1069    /// all-or-nothing on the tuple.
1070    ///
1071    /// Single-property calls delegate here with a length-1 slice;
1072    /// the on-disk key format is byte-identical to the pre-composite
1073    /// layout in that case.
1074    pub fn create_property_index_composite(
1075        &self,
1076        label: &str,
1077        properties: &[String],
1078    ) -> Result<()> {
1079        assert!(
1080            !properties.is_empty(),
1081            "create_property_index_composite requires at least one property"
1082        );
1083        let spec = PropertyIndexSpec {
1084            label: label.to_string(),
1085            properties: properties.to_vec(),
1086        };
1087        let mut guard = self.indexes.write().expect("indexes lock poisoned");
1088        if guard.contains(&spec) {
1089            return Ok(());
1090        }
1091
1092        let meta_cf = self.cf(CF_INDEX_META)?;
1093        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1094        let mut batch = WriteBatch::default();
1095        batch.put_cf(meta_cf, index_meta_key(&spec), EMPTY);
1096
1097        for node_id in self.nodes_by_label(label)? {
1098            let node = match self.get_node(node_id)? {
1099                Some(n) => n,
1100                None => continue,
1101            };
1102            if let Some(values) = encode_index_tuple(&node.properties, &spec.properties) {
1103                batch.put_cf(
1104                    prop_cf,
1105                    property_index_composite_key(label, properties, &values, node_id),
1106                    EMPTY,
1107                );
1108            }
1109        }
1110
1111        self.db.write(batch)?;
1112        guard.push(spec);
1113        Ok(())
1114    }
1115
1116    /// Declare a new property constraint. Behavior summary:
1117    ///
1118    /// * The name is the user-facing identifier. When `name` is
1119    ///   `None`, a deterministic default (`constraint_<label>_<prop>_<kind>`)
1120    ///   is generated so `DROP CONSTRAINT` can still target it.
1121    /// * If the chosen name is already registered with the same
1122    ///   `(label, property, kind)`, this is a no-op (the existing
1123    ///   spec is returned). If the name collides on a *different*
1124    ///   spec, returns `ConstraintNameConflict` unless
1125    ///   `if_not_exists` is set, in which case the existing spec is
1126    ///   preserved and returned unchanged.
1127    /// * UNIQUE constraints implicitly require a backing property
1128    ///   index. `create_property_constraint` creates one on
1129    ///   `(label, property)` if it isn't already registered —
1130    ///   matching Neo4j's "a unique constraint also provides an
1131    ///   index" contract.
1132    /// * Before committing, the method scans existing nodes to
1133    ///   verify the constraint is satisfied. Any violation aborts
1134    ///   the creation with `ConstraintViolation`; the registry is
1135    ///   left unchanged.
1136    ///
1137    /// The meta write and the implicit-index write are batched into a
1138    /// single [`WriteBatch`] so the on-disk state stays consistent
1139    /// under crash.
1140    pub fn create_property_constraint(
1141        &self,
1142        name: Option<&str>,
1143        scope: &ConstraintScope,
1144        properties: &[String],
1145        kind: PropertyConstraintKind,
1146        if_not_exists: bool,
1147    ) -> Result<PropertyConstraintSpec> {
1148        // Arity validation: single-property kinds get exactly one
1149        // property; NodeKey accepts any positive count. Empty lists
1150        // are always invalid. NodeKey on relationship scope isn't
1151        // allowed — the "NODE" in the name is specifically about
1152        // node semantics; relationship equivalents would need their
1153        // own variant.
1154        if properties.is_empty() {
1155            return Err(Error::ConstraintArity {
1156                kind: kind.as_string(),
1157                details: "at least one property is required".into(),
1158            });
1159        }
1160        if !kind.allows_multi_property() && properties.len() != 1 {
1161            return Err(Error::ConstraintArity {
1162                kind: kind.as_string(),
1163                details: format!("expects exactly one property, got {}", properties.len()),
1164            });
1165        }
1166        if matches!(kind, PropertyConstraintKind::NodeKey)
1167            && matches!(scope, ConstraintScope::Relationship(_))
1168        {
1169            return Err(Error::ConstraintArity {
1170                kind: kind.as_string(),
1171                details: "NODE KEY cannot be applied to a relationship scope".into(),
1172            });
1173        }
1174        let resolved_name = match name {
1175            Some(n) => n.to_string(),
1176            None => default_constraint_name(scope, properties, kind),
1177        };
1178        let spec = PropertyConstraintSpec {
1179            name: resolved_name.clone(),
1180            scope: scope.clone(),
1181            properties: properties.to_vec(),
1182            kind,
1183        };
1184
1185        // Idempotency / conflict check against the existing registry
1186        // under a read lock first — avoids taking the write lock when
1187        // the answer is "nothing to do".
1188        {
1189            let guard = self.constraints.read().expect("constraints lock poisoned");
1190            if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1191                if existing == &spec {
1192                    return Ok(existing.clone());
1193                }
1194                if if_not_exists {
1195                    return Ok(existing.clone());
1196                }
1197                return Err(Error::ConstraintNameConflict {
1198                    name: resolved_name,
1199                });
1200            }
1201        }
1202
1203        // UNIQUE needs a backing single-property equality index on
1204        // both scopes so enforcement stays O(log N) per insert. The
1205        // create calls are idempotent, mirroring Neo4j's "a unique
1206        // constraint also provides an index" contract. The backing
1207        // index is not torn down on `DROP CONSTRAINT`; users who
1208        // want it gone issue a separate `DROP INDEX`. NODE KEY gets
1209        // the same auto-provisioning against a composite tuple
1210        // index so its uniqueness check is a single seek instead of
1211        // an O(M * K) label walk.
1212        match (kind, scope) {
1213            (PropertyConstraintKind::Unique, ConstraintScope::Node(label)) => {
1214                self.create_property_index(label, &properties[0])?;
1215            }
1216            (PropertyConstraintKind::Unique, ConstraintScope::Relationship(edge_type)) => {
1217                self.create_edge_property_index(edge_type, &properties[0])?;
1218            }
1219            (PropertyConstraintKind::NodeKey, ConstraintScope::Node(label)) => {
1220                self.create_property_index_composite(label, properties)?;
1221            }
1222            _ => {}
1223        }
1224
1225        // Validate existing data against the new constraint before we
1226        // commit the meta entry. A single pre-existing violation makes
1227        // the whole declaration fail — matching how Neo4j rejects
1228        // constraint creation on data that doesn't satisfy it.
1229        validate_existing_data(self, &spec)?;
1230
1231        let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1232        let mut batch = WriteBatch::default();
1233        batch.put_cf(
1234            meta_cf,
1235            resolved_name.as_bytes(),
1236            constraint_meta_encode(&spec),
1237        );
1238        self.db.write(batch)?;
1239
1240        let mut guard = self.constraints.write().expect("constraints lock poisoned");
1241        // Re-check under the write lock to avoid a TOCTOU where two
1242        // concurrent CREATEs race through the read-lock idempotency
1243        // check. The second writer sees the first's insertion and
1244        // returns the existing spec.
1245        if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1246            return Ok(existing.clone());
1247        }
1248        guard.push(spec.clone());
1249        Ok(spec)
1250    }
1251
1252    /// Tear down a constraint by name. When `if_exists` is true,
1253    /// dropping a non-existent constraint is a no-op. Never drops the
1254    /// backing index for UNIQUE — users who want it gone issue a
1255    /// separate `DROP INDEX`.
1256    pub fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1257        let mut guard = self.constraints.write().expect("constraints lock poisoned");
1258        let idx = guard.iter().position(|s| s.name == name);
1259        match idx {
1260            None if if_exists => Ok(()),
1261            None => Err(Error::ConstraintNotFound {
1262                name: name.to_string(),
1263            }),
1264            Some(i) => {
1265                let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1266                let mut batch = WriteBatch::default();
1267                batch.delete_cf(meta_cf, name.as_bytes());
1268                self.db.write(batch)?;
1269                guard.remove(i);
1270                Ok(())
1271            }
1272        }
1273    }
1274
1275    /// Snapshot the currently-registered constraints. Cheap clone —
1276    /// every spec is a handful of small strings and an enum.
1277    pub fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
1278        self.constraints
1279            .read()
1280            .expect("constraints lock poisoned")
1281            .clone()
1282    }
1283
1284    /// Enforce registered constraints against the post-write state of
1285    /// `node`. Invoked by the `put_node` path before the batch is
1286    /// constructed. `existing` is the previously-stored snapshot of
1287    /// the same node (if any) so the check can distinguish "this node
1288    /// is updating its own value" from "some other node owns this
1289    /// value". Relationship-scope constraints are ignored here — the
1290    /// edge write path has its own entry point.
1291    fn enforce_constraints(&self, node: &Node, existing: Option<&Node>) -> Result<()> {
1292        let constraints = self.constraints.read().expect("constraints lock poisoned");
1293        if constraints.is_empty() {
1294            return Ok(());
1295        }
1296        let now_has_label = |label: &str| node.labels.iter().any(|l| l == label);
1297        for spec in constraints.iter() {
1298            let label = match &spec.scope {
1299                ConstraintScope::Node(l) => l,
1300                // Relationship-scope constraints apply to edges, not
1301                // nodes. Skip here; `enforce_edge_constraints` runs
1302                // on the edge write path.
1303                ConstraintScope::Relationship(_) => continue,
1304            };
1305            if !now_has_label(label) {
1306                continue;
1307            }
1308            // Single-property kinds store the target in
1309            // `properties[0]`; NodeKey walks the whole list.
1310            let primary = spec.primary_property();
1311            match spec.kind {
1312                PropertyConstraintKind::NotNull => {
1313                    let present = node
1314                        .properties
1315                        .get(primary)
1316                        .is_some_and(|v| !matches!(v, Property::Null));
1317                    if !present {
1318                        return Err(Error::ConstraintViolation {
1319                            name: spec.name.clone(),
1320                            kind: spec.kind.as_string(),
1321                            label: label.clone(),
1322                            property: primary.to_string(),
1323                            details: format!(
1324                                "node {} is missing required property `{}`",
1325                                node.id, primary
1326                            ),
1327                        });
1328                    }
1329                }
1330                PropertyConstraintKind::Unique => {
1331                    let Some(value) = node.properties.get(primary) else {
1332                        // No value → no uniqueness collision possible.
1333                        // Matches Neo4j: UNIQUE alone allows NULL;
1334                        // combine with NOT NULL for strict presence.
1335                        continue;
1336                    };
1337                    if matches!(value, Property::Null) {
1338                        continue;
1339                    }
1340                    if encode_index_value(value).is_none() {
1341                        continue;
1342                    }
1343                    let holders = self.nodes_by_property(label, primary, value)?;
1344                    for other in holders {
1345                        if other == node.id {
1346                            continue;
1347                        }
1348                        let was_self = existing
1349                            .and_then(|n| n.properties.get(primary))
1350                            .is_some_and(|v| v == value);
1351                        let _ = was_self;
1352                        return Err(Error::ConstraintViolation {
1353                            name: spec.name.clone(),
1354                            kind: spec.kind.as_string(),
1355                            label: label.clone(),
1356                            property: primary.to_string(),
1357                            details: format!("value already held by node {}", other),
1358                        });
1359                    }
1360                }
1361                PropertyConstraintKind::PropertyType(target) => {
1362                    let Some(value) = node.properties.get(primary) else {
1363                        continue;
1364                    };
1365                    if matches!(value, Property::Null) {
1366                        continue;
1367                    }
1368                    if !property_matches_type(value, target) {
1369                        return Err(Error::ConstraintViolation {
1370                            name: spec.name.clone(),
1371                            kind: spec.kind.as_string(),
1372                            label: label.clone(),
1373                            property: primary.to_string(),
1374                            details: format!(
1375                                "node {} has value of type {} (expected {})",
1376                                node.id,
1377                                value.type_name(),
1378                                target.as_str()
1379                            ),
1380                        });
1381                    }
1382                }
1383                PropertyConstraintKind::NodeKey => {
1384                    // Presence check first: every property must be
1385                    // present and non-null. Fails fast before the
1386                    // seek so a missing-property error takes
1387                    // precedence over a (potentially) conflicting
1388                    // existing tuple.
1389                    let mut my_tuple: Vec<Property> = Vec::with_capacity(spec.properties.len());
1390                    for prop in &spec.properties {
1391                        match node.properties.get(prop) {
1392                            Some(v) if !matches!(v, Property::Null) => my_tuple.push(v.clone()),
1393                            _ => {
1394                                return Err(Error::ConstraintViolation {
1395                                    name: spec.name.clone(),
1396                                    kind: spec.kind.as_string(),
1397                                    label: label.clone(),
1398                                    property: prop.clone(),
1399                                    details: format!(
1400                                        "node {} is missing required property `{}`",
1401                                        node.id, prop
1402                                    ),
1403                                });
1404                            }
1405                        }
1406                    }
1407                    // Uniqueness via the auto-provisioned composite
1408                    // index (see `create_property_constraint`). O(log N)
1409                    // per insert instead of the old O(M × K) label walk.
1410                    let hits = self.nodes_by_properties(label, &spec.properties, &my_tuple)?;
1411                    if let Some(other_id) = hits.iter().find(|&&id| id != node.id) {
1412                        return Err(Error::ConstraintViolation {
1413                            name: spec.name.clone(),
1414                            kind: spec.kind.as_string(),
1415                            label: label.clone(),
1416                            property: spec.properties.join(","),
1417                            details: format!("tuple already held by node {}", other_id),
1418                        });
1419                    }
1420                }
1421            }
1422        }
1423        Ok(())
1424    }
1425
1426    /// Enforce registered relationship-scope constraints against the
1427    /// post-write state of `edge`. Invoked by the `put_edge` path
1428    /// before the batch is constructed. Walks every constraint whose
1429    /// scope matches the edge's type and runs the kind-specific
1430    /// check. `existing` is the prior on-disk snapshot of the same
1431    /// edge (if any) so the uniqueness check can allow self-updates.
1432    fn enforce_edge_constraints(&self, edge: &Edge, existing: Option<&Edge>) -> Result<()> {
1433        let constraints = self.constraints.read().expect("constraints lock poisoned");
1434        if constraints.is_empty() {
1435            return Ok(());
1436        }
1437        for spec in constraints.iter() {
1438            let edge_type = match &spec.scope {
1439                ConstraintScope::Relationship(t) => t,
1440                // Node-scope constraints don't concern edges.
1441                ConstraintScope::Node(_) => continue,
1442            };
1443            if edge_type != &edge.edge_type {
1444                continue;
1445            }
1446            let primary = spec.primary_property();
1447            match spec.kind {
1448                PropertyConstraintKind::NotNull => {
1449                    let present = edge
1450                        .properties
1451                        .get(primary)
1452                        .is_some_and(|v| !matches!(v, Property::Null));
1453                    if !present {
1454                        return Err(Error::ConstraintViolation {
1455                            name: spec.name.clone(),
1456                            kind: spec.kind.as_string(),
1457                            label: edge_type.clone(),
1458                            property: primary.to_string(),
1459                            details: format!(
1460                                "edge {} is missing required property `{}`",
1461                                edge.id, primary
1462                            ),
1463                        });
1464                    }
1465                }
1466                PropertyConstraintKind::Unique => {
1467                    let Some(value) = edge.properties.get(primary) else {
1468                        // No value → no uniqueness collision possible.
1469                        // Matches the node path: UNIQUE alone allows
1470                        // NULL / missing; combine with NOT NULL for
1471                        // strict presence.
1472                        continue;
1473                    };
1474                    if matches!(value, Property::Null) {
1475                        continue;
1476                    }
1477                    if encode_index_value(value).is_none() {
1478                        // Value type doesn't support equality in the
1479                        // index (Float, List, Map, etc.). Skip — the
1480                        // node path makes the same call for
1481                        // consistency, and the index can't help us
1482                        // here anyway.
1483                        continue;
1484                    }
1485                    // Probe the backing edge-property index (created
1486                    // by `create_property_constraint` above) for
1487                    // rival holders. O(log N) + O(matching edges)
1488                    // vs. the previous O(E_type) per-insert scan.
1489                    let holders = self.edges_by_property(edge_type, primary, value)?;
1490                    for other_id in holders {
1491                        if other_id == edge.id {
1492                            continue;
1493                        }
1494                        let was_self = existing
1495                            .and_then(|e| e.properties.get(primary))
1496                            .is_some_and(|v| v == value);
1497                        let _ = was_self;
1498                        return Err(Error::ConstraintViolation {
1499                            name: spec.name.clone(),
1500                            kind: spec.kind.as_string(),
1501                            label: edge_type.clone(),
1502                            property: primary.to_string(),
1503                            details: format!("value already held by edge {}", other_id),
1504                        });
1505                    }
1506                }
1507                PropertyConstraintKind::PropertyType(target) => {
1508                    let Some(value) = edge.properties.get(primary) else {
1509                        continue;
1510                    };
1511                    if matches!(value, Property::Null) {
1512                        continue;
1513                    }
1514                    if !property_matches_type(value, target) {
1515                        return Err(Error::ConstraintViolation {
1516                            name: spec.name.clone(),
1517                            kind: spec.kind.as_string(),
1518                            label: edge_type.clone(),
1519                            property: primary.to_string(),
1520                            details: format!(
1521                                "edge {} has value of type {} (expected {})",
1522                                edge.id,
1523                                value.type_name(),
1524                                target.as_str()
1525                            ),
1526                        });
1527                    }
1528                }
1529                PropertyConstraintKind::NodeKey => {
1530                    // Disallowed at create time — see
1531                    // `create_property_constraint`.
1532                    unreachable!(
1533                        "NODE KEY on relationship scope should have been rejected at create time"
1534                    );
1535                }
1536            }
1537        }
1538        Ok(())
1539    }
1540
1541    /// Tear down a property index: removes the meta entry and every
1542    /// entry under the `(label, prop)` prefix. Idempotent: dropping a
1543    /// non-existent index is a no-op. Atomic via one [`WriteBatch`].
1544    pub fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
1545        self.drop_property_index_composite(label, &[property.to_string()])
1546    }
1547
1548    pub fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
1549        assert!(
1550            !properties.is_empty(),
1551            "drop_property_index_composite requires at least one property"
1552        );
1553        let spec = PropertyIndexSpec {
1554            label: label.to_string(),
1555            properties: properties.to_vec(),
1556        };
1557        let mut guard = self.indexes.write().expect("indexes lock poisoned");
1558        if !guard.contains(&spec) {
1559            return Ok(());
1560        }
1561
1562        // Reject the drop if a UNIQUE or NODE KEY constraint on the
1563        // same `(label, properties)` is registered — enforcement for
1564        // those kinds runs a seek through this exact index, so
1565        // silently removing it would collapse the uniqueness check
1566        // to "always passes" without producing any user-visible
1567        // signal. `DROP CONSTRAINT` is the right prerequisite.
1568        if let Some(constraint) = find_constraint_backing_node_index(
1569            &self.constraints.read().expect("constraints lock poisoned"),
1570            label,
1571            properties,
1572        ) {
1573            return Err(Error::IndexInUse { constraint });
1574        }
1575
1576        let meta_cf = self.cf(CF_INDEX_META)?;
1577        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1578        let mut batch = WriteBatch::default();
1579        batch.delete_cf(meta_cf, index_meta_key(&spec));
1580
1581        // Interleaved key layout means no clean `[label][props...]`
1582        // byte-prefix scopes a composite index's entries on its own —
1583        // an index `(a, b)` would share a `[label][a]` prefix with
1584        // single-property index `(a)`. Iterate by the label-only
1585        // prefix and parse each entry's property sequence, deleting
1586        // those that exactly match this spec's list.
1587        let label_prefix = property_index_label_prefix(label);
1588        let iter = self.db.iterator_cf(
1589            prop_cf,
1590            IteratorMode::From(&label_prefix, Direction::Forward),
1591        );
1592        for item in iter {
1593            let (key, _) = item?;
1594            if !key.starts_with(&label_prefix) {
1595                break;
1596            }
1597            let Some(parsed) = parse_property_index_entry_props(&key) else {
1598                continue;
1599            };
1600            if parsed.len() == properties.len()
1601                && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1602            {
1603                batch.delete_cf(prop_cf, key);
1604            }
1605        }
1606
1607        self.db.write(batch)?;
1608        guard.retain(|s| s != &spec);
1609        Ok(())
1610    }
1611
1612    /// Snapshot the currently-registered edge property indexes.
1613    /// Mirror of [`RocksDbStorageEngine::list_property_indexes`] for
1614    /// relationship scope. Cheap clone — specs are tiny.
1615    pub fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
1616        self.edge_indexes
1617            .read()
1618            .expect("edge_indexes lock poisoned")
1619            .clone()
1620    }
1621
1622    /// Declare a new `(edge_type, property)` edge property index and
1623    /// backfill it by scanning every edge currently carrying
1624    /// `edge_type`. Idempotent: re-creating an already-registered
1625    /// index is a no-op. Mirror of
1626    /// [`RocksDbStorageEngine::create_property_index`] for
1627    /// relationship scope; the same "same WriteBatch" atomicity
1628    /// guarantee applies — a crash mid-backfill leaves the store with
1629    /// either zero entries or a fully-populated index, never a
1630    /// partially-built one.
1631    pub fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1632        self.create_edge_property_index_composite(edge_type, &[property.to_string()])
1633    }
1634
1635    pub fn create_edge_property_index_composite(
1636        &self,
1637        edge_type: &str,
1638        properties: &[String],
1639    ) -> Result<()> {
1640        assert!(
1641            !properties.is_empty(),
1642            "create_edge_property_index_composite requires at least one property"
1643        );
1644        let spec = EdgePropertyIndexSpec {
1645            edge_type: edge_type.to_string(),
1646            properties: properties.to_vec(),
1647        };
1648        let mut guard = self
1649            .edge_indexes
1650            .write()
1651            .expect("edge_indexes lock poisoned");
1652        if guard.contains(&spec) {
1653            return Ok(());
1654        }
1655
1656        let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1657        let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1658        let mut batch = WriteBatch::default();
1659        batch.put_cf(meta_cf, edge_index_meta_key(&spec), EMPTY);
1660
1661        for edge_id in self.edges_by_type(edge_type)? {
1662            let edge = match self.get_edge(edge_id)? {
1663                Some(e) => e,
1664                None => continue,
1665            };
1666            if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
1667                batch.put_cf(
1668                    prop_cf,
1669                    edge_property_index_composite_key(edge_type, properties, &values, edge_id),
1670                    EMPTY,
1671                );
1672            }
1673        }
1674
1675        self.db.write(batch)?;
1676        guard.push(spec);
1677        Ok(())
1678    }
1679
1680    pub fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1681        self.drop_edge_property_index_composite(edge_type, &[property.to_string()])
1682    }
1683
1684    pub fn drop_edge_property_index_composite(
1685        &self,
1686        edge_type: &str,
1687        properties: &[String],
1688    ) -> Result<()> {
1689        assert!(
1690            !properties.is_empty(),
1691            "drop_edge_property_index_composite requires at least one property"
1692        );
1693        let spec = EdgePropertyIndexSpec {
1694            edge_type: edge_type.to_string(),
1695            properties: properties.to_vec(),
1696        };
1697        let mut guard = self
1698            .edge_indexes
1699            .write()
1700            .expect("edge_indexes lock poisoned");
1701        if !guard.contains(&spec) {
1702            return Ok(());
1703        }
1704
1705        // Same rationale as the node-side drop guard: a UNIQUE
1706        // constraint on this relationship type + property seeks
1707        // through this exact index during enforcement, so silently
1708        // removing it would defeat the check. Require
1709        // `DROP CONSTRAINT` first.
1710        if let Some(constraint) = find_constraint_backing_edge_index(
1711            &self.constraints.read().expect("constraints lock poisoned"),
1712            edge_type,
1713            properties,
1714        ) {
1715            return Err(Error::IndexInUse { constraint });
1716        }
1717
1718        let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1719        let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1720        let mut batch = WriteBatch::default();
1721        batch.delete_cf(meta_cf, edge_index_meta_key(&spec));
1722
1723        // See the node-side drop for why DROP parses entries rather
1724        // than scoping with a property-name byte prefix.
1725        let label_prefix = property_index_label_prefix(edge_type);
1726        let iter = self.db.iterator_cf(
1727            prop_cf,
1728            IteratorMode::From(&label_prefix, Direction::Forward),
1729        );
1730        for item in iter {
1731            let (key, _) = item?;
1732            if !key.starts_with(&label_prefix) {
1733                break;
1734            }
1735            let Some(parsed) = parse_property_index_entry_props(&key) else {
1736                continue;
1737            };
1738            if parsed.len() == properties.len()
1739                && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1740            {
1741                batch.delete_cf(prop_cf, key);
1742            }
1743        }
1744
1745        self.db.write(batch)?;
1746        guard.retain(|s| s != &spec);
1747        Ok(())
1748    }
1749
1750    // ---------------------------------------------------------------
1751    // Point / spatial index (node scope).
1752    //
1753    // Z-order (Morton) cell encoding over a fixed per-SRID domain.
1754    // Each indexed `Property::Point` produces one row keyed by
1755    // `<label>\0<prop>\0<srid:4><cell:8><node_id:16>` with the point's
1756    // coordinates stored in the value. Cross-SRID queries are
1757    // impossible by construction — the SRID is in the key prefix, so
1758    // a bbox scan is naturally scoped to one CRS.
1759    //
1760    // Single-property only for now. Extending to composite spatial
1761    // or to relationship scope is additive — separate CFs and
1762    // separate spec shapes.
1763    // ---------------------------------------------------------------
1764
1765    pub fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
1766        self.point_indexes
1767            .read()
1768            .expect("point_indexes lock poisoned")
1769            .clone()
1770    }
1771
1772    /// Declare a new point index on `(label, property)` and backfill
1773    /// it by scanning every node that currently carries the label.
1774    /// Idempotent: re-creating a registered index is a no-op. Same
1775    /// "single WriteBatch" atomicity guarantee as the property-index
1776    /// create path — a crash mid-backfill leaves the store with
1777    /// either zero entries or a fully-populated index.
1778    pub fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
1779        let spec = PointIndexSpec {
1780            label: label.to_string(),
1781            property: property.to_string(),
1782        };
1783        let mut guard = self
1784            .point_indexes
1785            .write()
1786            .expect("point_indexes lock poisoned");
1787        if guard.contains(&spec) {
1788            return Ok(());
1789        }
1790
1791        let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1792        let point_cf = self.cf(CF_POINT_INDEX)?;
1793        let mut batch = WriteBatch::default();
1794        batch.put_cf(meta_cf, point_index_meta_key(&spec), EMPTY);
1795
1796        for node_id in self.nodes_by_label(label)? {
1797            let node = match self.get_node(node_id)? {
1798                Some(n) => n,
1799                None => continue,
1800            };
1801            if let Some(Property::Point(p)) = node.properties.get(property) {
1802                let cell = point_cell(p.srid, p.x, p.y);
1803                batch.put_cf(
1804                    point_cf,
1805                    point_index_key(label, property, p.srid, cell, node_id.as_bytes()),
1806                    point_index_value(p.x, p.y, p.z),
1807                );
1808            }
1809        }
1810
1811        self.db.write(batch)?;
1812        guard.push(spec);
1813        Ok(())
1814    }
1815
1816    /// Tear down a point index. Removes the meta entry and every
1817    /// entry under the `<label>\0<prop>\0` prefix — that sweep
1818    /// covers all SRID variants at once, since SRID is to the right
1819    /// of the label+property header. Idempotent.
1820    pub fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
1821        let spec = PointIndexSpec {
1822            label: label.to_string(),
1823            property: property.to_string(),
1824        };
1825        let mut guard = self
1826            .point_indexes
1827            .write()
1828            .expect("point_indexes lock poisoned");
1829        if !guard.contains(&spec) {
1830            return Ok(());
1831        }
1832
1833        let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1834        let point_cf = self.cf(CF_POINT_INDEX)?;
1835        let mut batch = WriteBatch::default();
1836        batch.delete_cf(meta_cf, point_index_meta_key(&spec));
1837
1838        let prefix = point_index_label_prop_prefix(label, property);
1839        let iter = self
1840            .db
1841            .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
1842        for item in iter {
1843            let (key, _) = item?;
1844            if !key.starts_with(&prefix) {
1845                break;
1846            }
1847            batch.delete_cf(point_cf, key);
1848        }
1849
1850        self.db.write(batch)?;
1851        guard.retain(|s| s != &spec);
1852        Ok(())
1853    }
1854
1855    /// Range query: node ids whose indexed point falls inside the
1856    /// axis-aligned bounding box `[(xlo, ylo), (xhi, yhi)]` under
1857    /// `srid`. The scan walks the Morton-code cell range that
1858    /// covers the bbox and applies a precise per-row filter against
1859    /// the coordinates stored in the value — the cell range is a
1860    /// superset, so the filter is what guarantees correctness.
1861    ///
1862    /// A missing index returns empty rather than erroring, matching
1863    /// the property-seek convention (lets the planner race a DROP
1864    /// without a fatal). Results are sorted by `NodeId` for
1865    /// determinism across runs.
1866    pub fn nodes_in_bbox(
1867        &self,
1868        label: &str,
1869        property: &str,
1870        srid: i32,
1871        xlo: f64,
1872        ylo: f64,
1873        xhi: f64,
1874        yhi: f64,
1875    ) -> Result<Vec<NodeId>> {
1876        let spec = PointIndexSpec {
1877            label: label.to_string(),
1878            property: property.to_string(),
1879        };
1880        {
1881            let guard = self
1882                .point_indexes
1883                .read()
1884                .expect("point_indexes lock poisoned");
1885            if !guard.contains(&spec) {
1886                return Ok(Vec::new());
1887            }
1888        }
1889
1890        let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
1891        let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
1892        let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
1893
1894        let prefix = point_index_srid_prefix(label, property, srid);
1895        let header_len = prefix.len();
1896        let mut seek = prefix.clone();
1897        seek.extend_from_slice(&min_cell.to_be_bytes());
1898
1899        let cf = self.cf(CF_POINT_INDEX)?;
1900        let iter = self
1901            .db
1902            .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
1903        let mut results = Vec::new();
1904        for item in iter {
1905            let (key, value) = item?;
1906            if !key.starts_with(&prefix) {
1907                break;
1908            }
1909            let cell = cell_from_point_index_key(CF_POINT_INDEX, &key, header_len)?;
1910            if cell > max_cell {
1911                break;
1912            }
1913            let (x, y, _z) = decode_point_index_value(CF_POINT_INDEX, &value)?;
1914            if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
1915                continue;
1916            }
1917            let id_bytes = node_id_from_point_index_key(CF_POINT_INDEX, &key)?;
1918            results.push(NodeId::from_bytes(id_bytes));
1919        }
1920        results.sort();
1921        Ok(results)
1922    }
1923
1924    // ---------------------------------------------------------------
1925    // Point / spatial index (relationship scope).
1926    //
1927    // Mirror of the node-scope block above. Lives in its own CF so
1928    // node and edge spatial entries can't alias and the two can be
1929    // dropped independently — matches how non-spatial property
1930    // indexes are split between `CF_PROPERTY_INDEX` and
1931    // `CF_EDGE_PROPERTY_INDEX`.
1932    // ---------------------------------------------------------------
1933
1934    pub fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
1935        self.edge_point_indexes
1936            .read()
1937            .expect("edge_point_indexes lock poisoned")
1938            .clone()
1939    }
1940
1941    /// Declare a new edge point index and backfill it by scanning
1942    /// every edge currently carrying `edge_type`. Idempotent.
1943    /// Single-property / relationship-scope analogue of
1944    /// [`RocksDbStorageEngine::create_point_index`].
1945    pub fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
1946        let spec = EdgePointIndexSpec {
1947            edge_type: edge_type.to_string(),
1948            property: property.to_string(),
1949        };
1950        let mut guard = self
1951            .edge_point_indexes
1952            .write()
1953            .expect("edge_point_indexes lock poisoned");
1954        if guard.contains(&spec) {
1955            return Ok(());
1956        }
1957
1958        let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
1959        let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
1960        let mut batch = WriteBatch::default();
1961        batch.put_cf(meta_cf, edge_point_index_meta_key(&spec), EMPTY);
1962
1963        for edge_id in self.edges_by_type(edge_type)? {
1964            let edge = match self.get_edge(edge_id)? {
1965                Some(e) => e,
1966                None => continue,
1967            };
1968            if let Some(Property::Point(p)) = edge.properties.get(property) {
1969                let cell = point_cell(p.srid, p.x, p.y);
1970                batch.put_cf(
1971                    point_cf,
1972                    // Reuse the node-scope key builder: the key layout
1973                    // `<label>\0<prop>\0<srid:4><cell:8><id:16>` doesn't
1974                    // care whether `id` is a node or edge id — both are
1975                    // 16-byte UUIDs. The separate CF is what scopes
1976                    // relationship entries away from node entries.
1977                    point_index_key(edge_type, property, p.srid, cell, edge_id.as_bytes()),
1978                    point_index_value(p.x, p.y, p.z),
1979                );
1980            }
1981        }
1982
1983        self.db.write(batch)?;
1984        guard.push(spec);
1985        Ok(())
1986    }
1987
1988    /// Tear down an edge point index. Idempotent.
1989    pub fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
1990        let spec = EdgePointIndexSpec {
1991            edge_type: edge_type.to_string(),
1992            property: property.to_string(),
1993        };
1994        let mut guard = self
1995            .edge_point_indexes
1996            .write()
1997            .expect("edge_point_indexes lock poisoned");
1998        if !guard.contains(&spec) {
1999            return Ok(());
2000        }
2001
2002        let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2003        let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2004        let mut batch = WriteBatch::default();
2005        batch.delete_cf(meta_cf, edge_point_index_meta_key(&spec));
2006
2007        let prefix = point_index_label_prop_prefix(edge_type, property);
2008        let iter = self
2009            .db
2010            .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
2011        for item in iter {
2012            let (key, _) = item?;
2013            if !key.starts_with(&prefix) {
2014                break;
2015            }
2016            batch.delete_cf(point_cf, key);
2017        }
2018
2019        self.db.write(batch)?;
2020        guard.retain(|s| s != &spec);
2021        Ok(())
2022    }
2023
2024    /// Relationship-scope analogue of
2025    /// [`RocksDbStorageEngine::nodes_in_bbox`]. Same Z-order cell
2026    /// range + precise bbox filter via stored coords, just over
2027    /// `CF_EDGE_POINT_INDEX`.
2028    pub fn edges_in_bbox(
2029        &self,
2030        edge_type: &str,
2031        property: &str,
2032        srid: i32,
2033        xlo: f64,
2034        ylo: f64,
2035        xhi: f64,
2036        yhi: f64,
2037    ) -> Result<Vec<EdgeId>> {
2038        let spec = EdgePointIndexSpec {
2039            edge_type: edge_type.to_string(),
2040            property: property.to_string(),
2041        };
2042        {
2043            let guard = self
2044                .edge_point_indexes
2045                .read()
2046                .expect("edge_point_indexes lock poisoned");
2047            if !guard.contains(&spec) {
2048                return Ok(Vec::new());
2049            }
2050        }
2051
2052        let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
2053        let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
2054        let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
2055
2056        let prefix = point_index_srid_prefix(edge_type, property, srid);
2057        let header_len = prefix.len();
2058        let mut seek = prefix.clone();
2059        seek.extend_from_slice(&min_cell.to_be_bytes());
2060
2061        let cf = self.cf(CF_EDGE_POINT_INDEX)?;
2062        let iter = self
2063            .db
2064            .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
2065        let mut results = Vec::new();
2066        for item in iter {
2067            let (key, value) = item?;
2068            if !key.starts_with(&prefix) {
2069                break;
2070            }
2071            let cell = cell_from_point_index_key(CF_EDGE_POINT_INDEX, &key, header_len)?;
2072            if cell > max_cell {
2073                break;
2074            }
2075            let (x, y, _z) = decode_point_index_value(CF_EDGE_POINT_INDEX, &value)?;
2076            if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
2077                continue;
2078            }
2079            let id_bytes = node_id_from_point_index_key(CF_EDGE_POINT_INDEX, &key)?;
2080            results.push(EdgeId::from_bytes(id_bytes));
2081        }
2082        results.sort();
2083        Ok(results)
2084    }
2085
2086    /// Look up edge ids for a `(edge_type, property, value)` equality
2087    /// via the edge-property-index CF. Mirror of
2088    /// [`RocksDbStorageEngine::nodes_by_property`] for relationship
2089    /// scope; unindexable value types surface as
2090    /// [`Error::UnindexableValue`] so callers can fall back to a
2091    /// type-wide scan rather than silently returning empty.
2092    pub fn edges_by_property(
2093        &self,
2094        edge_type: &str,
2095        property: &str,
2096        value: &Property,
2097    ) -> Result<Vec<EdgeId>> {
2098        self.edges_by_properties(
2099            edge_type,
2100            std::slice::from_ref(&property.to_string()),
2101            std::slice::from_ref(value),
2102        )
2103    }
2104
2105    /// Composite form of [`edges_by_property`]. Same contract as
2106    /// [`RocksDbStorageEngine::nodes_by_properties`] on the
2107    /// relationship side.
2108    pub fn edges_by_properties(
2109        &self,
2110        edge_type: &str,
2111        properties: &[String],
2112        values: &[Property],
2113    ) -> Result<Vec<EdgeId>> {
2114        assert_eq!(
2115            properties.len(),
2116            values.len(),
2117            "composite edge seek: properties and values must have equal length"
2118        );
2119        let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2120        for (p, v) in properties.iter().zip(values.iter()) {
2121            let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2122                property: p.clone(),
2123                kind: v.type_name(),
2124            })?;
2125            encoded.push(bytes);
2126        }
2127        let cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
2128        let prefix = edge_property_index_composite_value_prefix(edge_type, properties, &encoded);
2129        let mut results = Vec::new();
2130        let iter = self
2131            .db
2132            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2133        for item in iter {
2134            let (key, _) = item?;
2135            if !key.starts_with(&prefix) {
2136                break;
2137            }
2138            let bytes = edge_id_from_property_index_key(&key, prefix.len())?;
2139            results.push(EdgeId::from_bytes(bytes));
2140        }
2141        Ok(results)
2142    }
2143
2144    /// Look up node ids for a `(label, property, value)` equality via
2145    /// the property index CF. The caller is responsible for checking
2146    /// that the index exists first (e.g. the planner only emits
2147    /// `IndexSeek` when it has verified via
2148    /// [`RocksDbStorageEngine::list_property_indexes`]).
2149    ///
2150    /// Unindexable value types (Float64, List, Map, Null) return
2151    /// [`Error::UnindexableValue`] — callers should surface this to
2152    /// the user rather than silently returning an empty result.
2153    pub fn nodes_by_property(
2154        &self,
2155        label: &str,
2156        property: &str,
2157        value: &Property,
2158    ) -> Result<Vec<NodeId>> {
2159        self.nodes_by_properties(
2160            label,
2161            std::slice::from_ref(&property.to_string()),
2162            std::slice::from_ref(value),
2163        )
2164    }
2165
2166    /// Composite form of [`nodes_by_property`]. Seeks every node
2167    /// whose `(label, property_i = value_i)` tuple matches in order.
2168    /// The two slices must have equal length. Unindexable value
2169    /// types surface as [`Error::UnindexableValue`] the same way
2170    /// the single-property form does — composite indexes are
2171    /// all-or-nothing and an unindexable slot can never match a
2172    /// stored tuple.
2173    pub fn nodes_by_properties(
2174        &self,
2175        label: &str,
2176        properties: &[String],
2177        values: &[Property],
2178    ) -> Result<Vec<NodeId>> {
2179        assert_eq!(
2180            properties.len(),
2181            values.len(),
2182            "composite seek: properties and values must have equal length"
2183        );
2184        let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2185        for (p, v) in properties.iter().zip(values.iter()) {
2186            let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2187                property: p.clone(),
2188                kind: v.type_name(),
2189            })?;
2190            encoded.push(bytes);
2191        }
2192        let cf = self.cf(CF_PROPERTY_INDEX)?;
2193        let prefix = property_index_composite_value_prefix(label, properties, &encoded);
2194        let mut results = Vec::new();
2195        let iter = self
2196            .db
2197            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2198        for item in iter {
2199            let (key, _) = item?;
2200            if !key.starts_with(&prefix) {
2201                break;
2202            }
2203            let bytes = node_id_from_property_index_key(&key, prefix.len())?;
2204            results.push(NodeId::from_bytes(bytes));
2205        }
2206        Ok(results)
2207    }
2208
2209    pub fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2210        let cf = self.cf(CF_LABEL_INDEX)?;
2211        let prefix = label_index_prefix(label);
2212        let mut results = Vec::new();
2213        let iter = self
2214            .db
2215            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2216        for item in iter {
2217            let (key, _) = item?;
2218            if !key.starts_with(&prefix) {
2219                break;
2220            }
2221            let bytes = id_from_str_index_key(CF_LABEL_INDEX, &key, label.len())?;
2222            results.push(NodeId::from_bytes(bytes));
2223        }
2224        Ok(results)
2225    }
2226
2227    pub fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2228        let cf = self.cf(CF_TYPE_INDEX)?;
2229        let prefix = type_index_prefix(edge_type);
2230        let mut results = Vec::new();
2231        let iter = self
2232            .db
2233            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2234        for item in iter {
2235            let (key, _) = item?;
2236            if !key.starts_with(&prefix) {
2237                break;
2238            }
2239            let bytes = id_from_str_index_key(CF_TYPE_INDEX, &key, edge_type.len())?;
2240            results.push(EdgeId::from_bytes(bytes));
2241        }
2242        Ok(results)
2243    }
2244
2245    fn scan_adj(&self, cf_name: &'static str, node: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2246        let cf = self.cf(cf_name)?;
2247        let prefix: &[u8] = node.as_bytes();
2248        let mut results = Vec::new();
2249        let iter = self
2250            .db
2251            .iterator_cf(cf, IteratorMode::From(prefix, Direction::Forward));
2252        for item in iter {
2253            let (key, value) = item?;
2254            if !key.starts_with(prefix) {
2255                break;
2256            }
2257            let edge_id = edge_from_adj_key(cf_name, &key)?;
2258            let other = node_from_adj_value(cf_name, &value)?;
2259            results.push((edge_id, other));
2260        }
2261        Ok(results)
2262    }
2263}
2264
2265/// Return the [`meshdb_core::Point`] at `properties[key]` when the
2266/// property is present and of point type. Used by point-index
2267/// maintenance to decide whether a node's indexed property changed
2268/// across a put. Non-point or missing values yield `None`, which the
2269/// index maintenance treats as "no entry" — the same way
2270/// `encode_index_tuple` treats missing / unindexable values for
2271/// property indexes.
2272fn extract_indexable_point(
2273    properties: &std::collections::HashMap<String, Property>,
2274    key: &str,
2275) -> Option<meshdb_core::Point> {
2276    match properties.get(key)? {
2277        Property::Point(p) => Some(*p),
2278        _ => None,
2279    }
2280}
2281
2282/// Deterministic auto-name for a constraint the user didn't name.
2283/// Shape: `constraint_<label>_<property_list>_<kind>` where the
2284/// property list joins the property names with `_` and `<kind>` is
2285/// `unique`, `not_null`, `type_<t>`, or `node_key`. Stable across
2286/// restarts — users can target the auto-name with `DROP CONSTRAINT`
2287/// without having to inspect `SHOW CONSTRAINTS` first.
2288fn default_constraint_name(
2289    scope: &ConstraintScope,
2290    properties: &[String],
2291    kind: PropertyConstraintKind,
2292) -> String {
2293    let joined = properties.join("_");
2294    format!(
2295        "constraint_{scope_tag}_{target}_{joined}_{kind_tag}",
2296        scope_tag = scope.name_tag(),
2297        target = scope.target(),
2298        kind_tag = kind.name_tag(),
2299    )
2300}
2301
2302/// Return the name of a UNIQUE or NODE KEY constraint whose backing
2303/// index is exactly `(label, properties)`, or `None` if no such
2304/// constraint is registered. The enforcement paths for both kinds seek
2305/// through that index — dropping it silently defeats the check — so
2306/// `drop_property_index_composite` consults this helper before
2307/// removing any entries.
2308fn find_constraint_backing_node_index(
2309    constraints: &[PropertyConstraintSpec],
2310    label: &str,
2311    properties: &[String],
2312) -> Option<String> {
2313    constraints
2314        .iter()
2315        .filter(|c| matches!(&c.scope, ConstraintScope::Node(l) if l == label))
2316        .filter(|c| {
2317            matches!(
2318                c.kind,
2319                PropertyConstraintKind::Unique | PropertyConstraintKind::NodeKey
2320            )
2321        })
2322        .find(|c| c.properties.as_slice() == properties)
2323        .map(|c| c.name.clone())
2324}
2325
2326/// Relationship-scope mirror of
2327/// [`find_constraint_backing_node_index`]. NODE KEY is node-only, so
2328/// only UNIQUE can back an edge index.
2329fn find_constraint_backing_edge_index(
2330    constraints: &[PropertyConstraintSpec],
2331    edge_type: &str,
2332    properties: &[String],
2333) -> Option<String> {
2334    constraints
2335        .iter()
2336        .filter(|c| matches!(&c.scope, ConstraintScope::Relationship(t) if t == edge_type))
2337        .filter(|c| matches!(c.kind, PropertyConstraintKind::Unique))
2338        .find(|c| c.properties.as_slice() == properties)
2339        .map(|c| c.name.clone())
2340}
2341
2342/// Scan existing nodes and verify that `spec` holds. Called on
2343/// constraint creation so a user declaring a constraint against
2344/// already-invalid data gets a clear error instead of a partial
2345/// rollout. Walks only the nodes carrying the constrained label — for
2346/// label-sparse graphs this avoids the full-node scan.
2347fn validate_existing_data(
2348    engine: &RocksDbStorageEngine,
2349    spec: &PropertyConstraintSpec,
2350) -> Result<()> {
2351    match &spec.scope {
2352        ConstraintScope::Node(label) => validate_existing_nodes(engine, spec, label),
2353        ConstraintScope::Relationship(edge_type) => {
2354            validate_existing_edges(engine, spec, edge_type)
2355        }
2356    }
2357}
2358
2359fn validate_existing_nodes(
2360    engine: &RocksDbStorageEngine,
2361    spec: &PropertyConstraintSpec,
2362    label: &str,
2363) -> Result<()> {
2364    let label_members = engine.nodes_by_label(label)?;
2365    let primary = spec.primary_property();
2366    match spec.kind {
2367        PropertyConstraintKind::NotNull => {
2368            for id in label_members {
2369                let node = match engine.get_node(id)? {
2370                    Some(n) => n,
2371                    None => continue,
2372                };
2373                let present = node
2374                    .properties
2375                    .get(primary)
2376                    .is_some_and(|v| !matches!(v, Property::Null));
2377                if !present {
2378                    return Err(Error::ConstraintViolation {
2379                        name: spec.name.clone(),
2380                        kind: spec.kind.as_string(),
2381                        label: label.to_string(),
2382                        property: primary.to_string(),
2383                        details: format!("node {id} is missing required property"),
2384                    });
2385                }
2386            }
2387        }
2388        PropertyConstraintKind::Unique => {
2389            use std::collections::HashMap;
2390            let mut seen: HashMap<Vec<u8>, meshdb_core::NodeId> = HashMap::new();
2391            for id in label_members {
2392                let node = match engine.get_node(id)? {
2393                    Some(n) => n,
2394                    None => continue,
2395                };
2396                let Some(value) = node.properties.get(primary) else {
2397                    continue;
2398                };
2399                let Some(encoded) = encode_index_value(value) else {
2400                    continue;
2401                };
2402                if let Some(&first) = seen.get(&encoded) {
2403                    return Err(Error::ConstraintViolation {
2404                        name: spec.name.clone(),
2405                        kind: spec.kind.as_string(),
2406                        label: label.to_string(),
2407                        property: primary.to_string(),
2408                        details: format!("duplicate value held by nodes {first} and {id}"),
2409                    });
2410                }
2411                seen.insert(encoded, id);
2412            }
2413        }
2414        PropertyConstraintKind::PropertyType(target) => {
2415            for id in label_members {
2416                let node = match engine.get_node(id)? {
2417                    Some(n) => n,
2418                    None => continue,
2419                };
2420                let Some(value) = node.properties.get(primary) else {
2421                    continue;
2422                };
2423                if matches!(value, Property::Null) {
2424                    continue;
2425                }
2426                if !property_matches_type(value, target) {
2427                    return Err(Error::ConstraintViolation {
2428                        name: spec.name.clone(),
2429                        kind: spec.kind.as_string(),
2430                        label: label.to_string(),
2431                        property: primary.to_string(),
2432                        details: format!(
2433                            "node {id} has value of type {} (expected {})",
2434                            value.type_name(),
2435                            target.as_str()
2436                        ),
2437                    });
2438                }
2439            }
2440        }
2441        PropertyConstraintKind::NodeKey => {
2442            use std::collections::HashMap;
2443            let mut seen: HashMap<Vec<Vec<u8>>, meshdb_core::NodeId> = HashMap::new();
2444            for id in label_members {
2445                let node = match engine.get_node(id)? {
2446                    Some(n) => n,
2447                    None => continue,
2448                };
2449                let mut tuple: Vec<Vec<u8>> = Vec::with_capacity(spec.properties.len());
2450                let mut missing: Option<&str> = None;
2451                for prop in &spec.properties {
2452                    match node.properties.get(prop) {
2453                        Some(v) if !matches!(v, Property::Null) => {
2454                            if let Some(encoded) = encode_index_value(v) {
2455                                tuple.push(encoded);
2456                            } else {
2457                                missing = Some(prop);
2458                                break;
2459                            }
2460                        }
2461                        _ => {
2462                            missing = Some(prop.as_str());
2463                            break;
2464                        }
2465                    }
2466                }
2467                if let Some(prop) = missing {
2468                    return Err(Error::ConstraintViolation {
2469                        name: spec.name.clone(),
2470                        kind: spec.kind.as_string(),
2471                        label: label.to_string(),
2472                        property: prop.to_string(),
2473                        details: format!("node {id} is missing required property `{prop}`"),
2474                    });
2475                }
2476                if let Some(&first) = seen.get(&tuple) {
2477                    return Err(Error::ConstraintViolation {
2478                        name: spec.name.clone(),
2479                        kind: spec.kind.as_string(),
2480                        label: label.to_string(),
2481                        property: spec.properties.join(","),
2482                        details: format!("duplicate tuple held by nodes {first} and {id}"),
2483                    });
2484                }
2485                seen.insert(tuple, id);
2486            }
2487        }
2488    }
2489    Ok(())
2490}
2491
2492/// Relationship-scope analogue to `validate_existing_nodes`. Walks
2493/// `edges_by_type(edge_type)` because we don't have an edge property
2494/// index; complexity is O(E_type) per newly-declared constraint. `NodeKey` is rejected at create time
2495/// for relationship scope, so this path doesn't handle it.
2496fn validate_existing_edges(
2497    engine: &RocksDbStorageEngine,
2498    spec: &PropertyConstraintSpec,
2499    edge_type: &str,
2500) -> Result<()> {
2501    let edge_ids = engine.edges_by_type(edge_type)?;
2502    let primary = spec.primary_property();
2503    match spec.kind {
2504        PropertyConstraintKind::NotNull => {
2505            for id in edge_ids {
2506                let edge = match engine.get_edge(id)? {
2507                    Some(e) => e,
2508                    None => continue,
2509                };
2510                let present = edge
2511                    .properties
2512                    .get(primary)
2513                    .is_some_and(|v| !matches!(v, Property::Null));
2514                if !present {
2515                    return Err(Error::ConstraintViolation {
2516                        name: spec.name.clone(),
2517                        kind: spec.kind.as_string(),
2518                        label: edge_type.to_string(),
2519                        property: primary.to_string(),
2520                        details: format!("edge {id} is missing required property"),
2521                    });
2522                }
2523            }
2524        }
2525        PropertyConstraintKind::Unique => {
2526            use std::collections::HashMap;
2527            let mut seen: HashMap<Vec<u8>, meshdb_core::EdgeId> = HashMap::new();
2528            for id in edge_ids {
2529                let edge = match engine.get_edge(id)? {
2530                    Some(e) => e,
2531                    None => continue,
2532                };
2533                let Some(value) = edge.properties.get(primary) else {
2534                    continue;
2535                };
2536                let Some(encoded) = encode_index_value(value) else {
2537                    continue;
2538                };
2539                if let Some(&first) = seen.get(&encoded) {
2540                    return Err(Error::ConstraintViolation {
2541                        name: spec.name.clone(),
2542                        kind: spec.kind.as_string(),
2543                        label: edge_type.to_string(),
2544                        property: primary.to_string(),
2545                        details: format!("duplicate value held by edges {first} and {id}"),
2546                    });
2547                }
2548                seen.insert(encoded, id);
2549            }
2550        }
2551        PropertyConstraintKind::PropertyType(target) => {
2552            for id in edge_ids {
2553                let edge = match engine.get_edge(id)? {
2554                    Some(e) => e,
2555                    None => continue,
2556                };
2557                let Some(value) = edge.properties.get(primary) else {
2558                    continue;
2559                };
2560                if matches!(value, Property::Null) {
2561                    continue;
2562                }
2563                if !property_matches_type(value, target) {
2564                    return Err(Error::ConstraintViolation {
2565                        name: spec.name.clone(),
2566                        kind: spec.kind.as_string(),
2567                        label: edge_type.to_string(),
2568                        property: primary.to_string(),
2569                        details: format!(
2570                            "edge {id} has value of type {} (expected {})",
2571                            value.type_name(),
2572                            target.as_str()
2573                        ),
2574                    });
2575                }
2576            }
2577        }
2578        PropertyConstraintKind::NodeKey => {
2579            unreachable!("NODE KEY on relationship scope rejected at create time")
2580        }
2581    }
2582    Ok(())
2583}
2584
2585/// Strict `Property` / `PropertyType` match. No numeric coercion — an
2586/// `Int64` does NOT satisfy `FLOAT`, matching Neo4j's `IS :: FLOAT`
2587/// semantics. Returns `false` for `Null`; callers should pre-filter
2588/// null values (`IS :: T` allows nulls by convention — pair with
2589/// `IS NOT NULL` for strict presence).
2590fn property_matches_type(value: &Property, target: PropertyType) -> bool {
2591    matches!(
2592        (target, value),
2593        (PropertyType::String, Property::String(_))
2594            | (PropertyType::Integer, Property::Int64(_))
2595            | (PropertyType::Float, Property::Float64(_))
2596            | (PropertyType::Boolean, Property::Bool(_))
2597    )
2598}
2599
2600/// Rehydrate the constraint registry from [`CF_CONSTRAINT_META`] at
2601/// [`RocksDbStorageEngine::open`] time. Walks the CF sequentially —
2602/// constraint counts are tiny (single digits in practice) so this
2603/// stays cheap regardless of graph size.
2604fn load_constraint_meta(db: &DB) -> Result<Vec<PropertyConstraintSpec>> {
2605    let cf = db
2606        .cf_handle(CF_CONSTRAINT_META)
2607        .ok_or(Error::MissingColumnFamily(CF_CONSTRAINT_META))?;
2608    let mut specs = Vec::new();
2609    for item in db.iterator_cf(cf, IteratorMode::Start) {
2610        let (key, value) = item?;
2611        let name = std::str::from_utf8(&key)
2612            .map_err(|_| Error::CorruptBytes {
2613                cf: CF_CONSTRAINT_META,
2614                expected: key.len(),
2615                actual: key.len(),
2616            })?
2617            .to_string();
2618        specs.push(constraint_meta_decode(CF_CONSTRAINT_META, name, &value)?);
2619    }
2620    Ok(specs)
2621}
2622
2623/// Rehydrate the property-index registry from [`CF_INDEX_META`] at
2624/// [`RocksDbStorageEngine::open`] time. Runs once per process, so
2625/// walking the CF sequentially is fine even for large index counts.
2626fn load_index_meta(db: &DB) -> Result<Vec<PropertyIndexSpec>> {
2627    let cf = db
2628        .cf_handle(CF_INDEX_META)
2629        .ok_or(Error::MissingColumnFamily(CF_INDEX_META))?;
2630    let mut specs = Vec::new();
2631    for item in db.iterator_cf(cf, IteratorMode::Start) {
2632        let (key, _) = item?;
2633        specs.push(index_meta_key_decode(&key)?);
2634    }
2635    Ok(specs)
2636}
2637
2638/// Relationship-scope analogue of [`load_index_meta`]. Rehydrates the
2639/// edge-property-index registry from [`CF_EDGE_INDEX_META`] at open
2640/// time.
2641fn load_edge_index_meta(db: &DB) -> Result<Vec<EdgePropertyIndexSpec>> {
2642    let cf = db
2643        .cf_handle(CF_EDGE_INDEX_META)
2644        .ok_or(Error::MissingColumnFamily(CF_EDGE_INDEX_META))?;
2645    let mut specs = Vec::new();
2646    for item in db.iterator_cf(cf, IteratorMode::Start) {
2647        let (key, _) = item?;
2648        specs.push(edge_index_meta_key_decode(&key)?);
2649    }
2650    Ok(specs)
2651}
2652
2653/// Rehydrate the point-index registry from [`CF_POINT_INDEX_META`]
2654/// at open. Same once-per-process walk as the other meta loaders.
2655fn load_point_index_meta(db: &DB) -> Result<Vec<PointIndexSpec>> {
2656    let cf = db
2657        .cf_handle(CF_POINT_INDEX_META)
2658        .ok_or(Error::MissingColumnFamily(CF_POINT_INDEX_META))?;
2659    let mut specs = Vec::new();
2660    for item in db.iterator_cf(cf, IteratorMode::Start) {
2661        let (key, _) = item?;
2662        specs.push(point_index_meta_key_decode(&key)?);
2663    }
2664    Ok(specs)
2665}
2666
2667/// Relationship-scope analogue of [`load_point_index_meta`].
2668fn load_edge_point_index_meta(db: &DB) -> Result<Vec<EdgePointIndexSpec>> {
2669    let cf = db
2670        .cf_handle(CF_EDGE_POINT_INDEX_META)
2671        .ok_or(Error::MissingColumnFamily(CF_EDGE_POINT_INDEX_META))?;
2672    let mut specs = Vec::new();
2673    for item in db.iterator_cf(cf, IteratorMode::Start) {
2674        let (key, _) = item?;
2675        specs.push(edge_point_index_meta_key_decode(&key)?);
2676    }
2677    Ok(specs)
2678}
2679
2680impl StorageEngine for RocksDbStorageEngine {
2681    fn put_node(&self, node: &Node) -> Result<()> {
2682        RocksDbStorageEngine::put_node(self, node)
2683    }
2684
2685    fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
2686        RocksDbStorageEngine::get_node(self, id)
2687    }
2688
2689    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
2690        RocksDbStorageEngine::detach_delete_node(self, id)
2691    }
2692
2693    fn put_edge(&self, edge: &Edge) -> Result<()> {
2694        RocksDbStorageEngine::put_edge(self, edge)
2695    }
2696
2697    fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
2698        RocksDbStorageEngine::get_edge(self, id)
2699    }
2700
2701    fn delete_edge(&self, id: EdgeId) -> Result<()> {
2702        RocksDbStorageEngine::delete_edge(self, id)
2703    }
2704
2705    fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
2706        RocksDbStorageEngine::apply_batch(self, mutations)
2707    }
2708
2709    fn all_nodes(&self) -> Result<Vec<Node>> {
2710        RocksDbStorageEngine::all_nodes(self)
2711    }
2712
2713    fn all_edges(&self) -> Result<Vec<Edge>> {
2714        RocksDbStorageEngine::all_edges(self)
2715    }
2716
2717    fn all_node_ids(&self) -> Result<Vec<NodeId>> {
2718        RocksDbStorageEngine::all_node_ids(self)
2719    }
2720
2721    fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2722        RocksDbStorageEngine::outgoing(self, source)
2723    }
2724
2725    fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2726        RocksDbStorageEngine::incoming(self, target)
2727    }
2728
2729    fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2730        RocksDbStorageEngine::nodes_by_label(self, label)
2731    }
2732
2733    fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2734        RocksDbStorageEngine::edges_by_type(self, edge_type)
2735    }
2736
2737    fn nodes_by_property(
2738        &self,
2739        label: &str,
2740        property: &str,
2741        value: &Property,
2742    ) -> Result<Vec<NodeId>> {
2743        RocksDbStorageEngine::nodes_by_property(self, label, property, value)
2744    }
2745
2746    fn nodes_by_properties(
2747        &self,
2748        label: &str,
2749        properties: &[String],
2750        values: &[Property],
2751    ) -> Result<Vec<NodeId>> {
2752        RocksDbStorageEngine::nodes_by_properties(self, label, properties, values)
2753    }
2754
2755    fn edges_by_property(
2756        &self,
2757        edge_type: &str,
2758        property: &str,
2759        value: &Property,
2760    ) -> Result<Vec<EdgeId>> {
2761        RocksDbStorageEngine::edges_by_property(self, edge_type, property, value)
2762    }
2763
2764    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
2765        RocksDbStorageEngine::create_property_index(self, label, property)
2766    }
2767
2768    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
2769        RocksDbStorageEngine::drop_property_index(self, label, property)
2770    }
2771
2772    fn create_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2773        RocksDbStorageEngine::create_property_index_composite(self, label, properties)
2774    }
2775
2776    fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2777        RocksDbStorageEngine::drop_property_index_composite(self, label, properties)
2778    }
2779
2780    fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
2781        RocksDbStorageEngine::list_property_indexes(self)
2782    }
2783
2784    fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2785        RocksDbStorageEngine::create_edge_property_index(self, edge_type, property)
2786    }
2787
2788    fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2789        RocksDbStorageEngine::drop_edge_property_index(self, edge_type, property)
2790    }
2791
2792    fn create_edge_property_index_composite(
2793        &self,
2794        edge_type: &str,
2795        properties: &[String],
2796    ) -> Result<()> {
2797        RocksDbStorageEngine::create_edge_property_index_composite(self, edge_type, properties)
2798    }
2799
2800    fn drop_edge_property_index_composite(
2801        &self,
2802        edge_type: &str,
2803        properties: &[String],
2804    ) -> Result<()> {
2805        RocksDbStorageEngine::drop_edge_property_index_composite(self, edge_type, properties)
2806    }
2807
2808    fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
2809        RocksDbStorageEngine::list_edge_property_indexes(self)
2810    }
2811
2812    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
2813        RocksDbStorageEngine::create_point_index(self, label, property)
2814    }
2815
2816    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
2817        RocksDbStorageEngine::drop_point_index(self, label, property)
2818    }
2819
2820    fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
2821        RocksDbStorageEngine::list_point_indexes(self)
2822    }
2823
2824    fn nodes_in_bbox(
2825        &self,
2826        label: &str,
2827        property: &str,
2828        srid: i32,
2829        xlo: f64,
2830        ylo: f64,
2831        xhi: f64,
2832        yhi: f64,
2833    ) -> Result<Vec<NodeId>> {
2834        RocksDbStorageEngine::nodes_in_bbox(self, label, property, srid, xlo, ylo, xhi, yhi)
2835    }
2836
2837    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2838        RocksDbStorageEngine::create_edge_point_index(self, edge_type, property)
2839    }
2840
2841    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2842        RocksDbStorageEngine::drop_edge_point_index(self, edge_type, property)
2843    }
2844
2845    fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2846        RocksDbStorageEngine::list_edge_point_indexes(self)
2847    }
2848
2849    fn edges_in_bbox(
2850        &self,
2851        edge_type: &str,
2852        property: &str,
2853        srid: i32,
2854        xlo: f64,
2855        ylo: f64,
2856        xhi: f64,
2857        yhi: f64,
2858    ) -> Result<Vec<EdgeId>> {
2859        RocksDbStorageEngine::edges_in_bbox(self, edge_type, property, srid, xlo, ylo, xhi, yhi)
2860    }
2861
2862    fn create_property_constraint(
2863        &self,
2864        name: Option<&str>,
2865        scope: &ConstraintScope,
2866        properties: &[String],
2867        kind: PropertyConstraintKind,
2868        if_not_exists: bool,
2869    ) -> Result<PropertyConstraintSpec> {
2870        RocksDbStorageEngine::create_property_constraint(
2871            self,
2872            name,
2873            scope,
2874            properties,
2875            kind,
2876            if_not_exists,
2877        )
2878    }
2879
2880    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2881        RocksDbStorageEngine::drop_property_constraint(self, name, if_exists)
2882    }
2883
2884    fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
2885        RocksDbStorageEngine::list_property_constraints(self)
2886    }
2887
2888    fn put_trigger(&self, name: &str, value: &[u8]) -> Result<()> {
2889        let cf = self.cf(CF_TRIGGER_META)?;
2890        self.db
2891            .put_cf(cf, name.as_bytes(), value)
2892            .map_err(Error::from)
2893    }
2894
2895    fn delete_trigger(&self, name: &str) -> Result<()> {
2896        let cf = self.cf(CF_TRIGGER_META)?;
2897        self.db.delete_cf(cf, name.as_bytes()).map_err(Error::from)
2898    }
2899
2900    fn list_triggers(&self) -> Result<Vec<(String, Vec<u8>)>> {
2901        let cf = self.cf(CF_TRIGGER_META)?;
2902        let mut out: Vec<(String, Vec<u8>)> = Vec::new();
2903        let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2904        for entry in iter {
2905            let (k, v) = entry.map_err(Error::from)?;
2906            // Names are stored as raw UTF-8 bytes — anything else
2907            // would be a corruption since `put_trigger` only
2908            // accepts `&str`. Lossy decode is defensive.
2909            let name = String::from_utf8_lossy(&k).into_owned();
2910            out.push((name, v.to_vec()));
2911        }
2912        Ok(out)
2913    }
2914
2915    fn create_checkpoint(&self, path: &Path) -> Result<()> {
2916        RocksDbStorageEngine::create_checkpoint(self, path)
2917    }
2918
2919    fn clear_all(&self) -> Result<()> {
2920        RocksDbStorageEngine::clear_all(self)
2921    }
2922}