Skip to main content

meshdb_storage/
rocksdb_engine.rs

1//! RocksDB-backed implementation of [`StorageEngine`].
2//!
3//! Holds the on-disk state for a single local peer: nodes, edges,
4//! adjacency lists, label/type/property indexes, and the index metadata
5//! needed to rehydrate at `open` time. Column families carve the backend
6//! into purpose-keyed namespaces so each access pattern walks a tight
7//! prefix.
8//!
9//! External callers should hold this as `Arc<dyn StorageEngine>` and
10//! route through the trait — the concrete type is only visible at the
11//! construction site (`RocksDbStorageEngine::open`) and at the Raft
12//! snapshot restore path, which is backend-bound by design.
13
14use crate::{
15    engine::{
16        ConstraintScope, EdgePointIndexSpec, EdgePropertyIndexSpec, GraphMutation, PointIndexSpec,
17        PropertyConstraintKind, PropertyConstraintSpec, PropertyIndexSpec, PropertyType,
18        StorageEngine,
19    },
20    error::{Error, Result},
21    keys::{
22        adj_key, cell_from_point_index_key, constraint_meta_decode, constraint_meta_encode,
23        decode_point_index_value, edge_from_adj_key, edge_id_from_property_index_key,
24        edge_property_index_composite_key, edge_property_index_composite_value_prefix,
25        encode_index_tuple, encode_index_value, id_from_str_index_key, label_index_key,
26        label_index_prefix, node_from_adj_value, node_id_from_point_index_key,
27        node_id_from_property_index_key, parse_property_index_entry_props, point_cell,
28        point_cell_range, point_index_key, point_index_label_prop_prefix, point_index_srid_prefix,
29        point_index_value, property_index_composite_key, property_index_composite_value_prefix,
30        property_index_label_prefix, type_index_key, type_index_prefix, ID_LEN,
31    },
32};
33use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
34use rocksdb::{
35    checkpoint::Checkpoint, ColumnFamilyDescriptor, Direction, IteratorMode, Options, WriteBatch,
36    DB,
37};
38use std::path::Path;
39use std::sync::RwLock;
40
41const CF_NODES: &str = "nodes";
42const CF_EDGES: &str = "edges";
43const CF_ADJ_OUT: &str = "adj_out";
44const CF_ADJ_IN: &str = "adj_in";
45const CF_LABEL_INDEX: &str = "label_index";
46const CF_TYPE_INDEX: &str = "type_index";
47const CF_PROPERTY_INDEX: &str = "property_index";
48const CF_INDEX_META: &str = "index_meta";
49const CF_EDGE_PROPERTY_INDEX: &str = "edge_property_index";
50const CF_EDGE_INDEX_META: &str = "edge_index_meta";
51const CF_CONSTRAINT_META: &str = "constraint_meta";
52const CF_POINT_INDEX: &str = "point_index";
53const CF_POINT_INDEX_META: &str = "point_index_meta";
54const CF_EDGE_POINT_INDEX: &str = "edge_point_index";
55const CF_EDGE_POINT_INDEX_META: &str = "edge_point_index_meta";
56/// Persisted `apoc.trigger.*` registry. Keyed by trigger name
57/// (UTF-8 bytes), valued by the JSON-encoded trigger spec
58/// (cypher / selector / config / installed_at). Local-only in
59/// this release — the trigger registry doesn't replicate
60/// across cluster peers, so each node holds its own set.
61const CF_TRIGGER_META: &str = "trigger_meta";
62/// Persisted multi-raft pending-tx staging. Keyed by partition-
63/// namespaced txid (4-byte LE partition + txid bytes), valued by
64/// the serde-encoded `Vec<GraphMutation>` to apply on `CommitTx`.
65/// Lets the partition applier reconstruct `pending_txs` after a
66/// restart instead of relying on openraft to re-replay entries
67/// past `last_applied` (which it doesn't).
68const CF_PENDING_TX_META: &str = "pending_tx_meta";
69
70const ALL_CFS: &[&str] = &[
71    CF_NODES,
72    CF_EDGES,
73    CF_ADJ_OUT,
74    CF_ADJ_IN,
75    CF_LABEL_INDEX,
76    CF_TYPE_INDEX,
77    CF_PROPERTY_INDEX,
78    CF_INDEX_META,
79    CF_EDGE_PROPERTY_INDEX,
80    CF_EDGE_INDEX_META,
81    CF_CONSTRAINT_META,
82    CF_POINT_INDEX,
83    CF_POINT_INDEX_META,
84    CF_EDGE_POINT_INDEX,
85    CF_EDGE_POINT_INDEX_META,
86    CF_TRIGGER_META,
87    CF_PENDING_TX_META,
88];
89
90const EMPTY: &[u8] = &[];
91
92/// Encode a [`PropertyIndexSpec`] as a stable `CF_INDEX_META` key:
93/// `<label>\0<prop1>\0<prop2>\0...\0<propN>`. No component can contain
94/// a NUL (identifier-only grammar), so splitting on NUL decodes
95/// unambiguously. Length-1 specs encode byte-identically to the
96/// pre-composite-refactor format (`<label>\0<prop>`), so existing
97/// on-disk data survives the upgrade.
98fn index_meta_key(spec: &PropertyIndexSpec) -> Vec<u8> {
99    let cap = spec.label.len()
100        + spec.properties.iter().map(|p| p.len()).sum::<usize>()
101        + spec.properties.len();
102    let mut k = Vec::with_capacity(cap);
103    k.extend_from_slice(spec.label.as_bytes());
104    for p in &spec.properties {
105        k.push(0);
106        k.extend_from_slice(p.as_bytes());
107    }
108    k
109}
110
111fn index_meta_key_decode(key: &[u8]) -> Result<PropertyIndexSpec> {
112    let mut parts = key.split(|b| *b == 0);
113    let label_bytes = parts.next().ok_or(Error::CorruptBytes {
114        cf: CF_INDEX_META,
115        expected: 1,
116        actual: 0,
117    })?;
118    let label = std::str::from_utf8(label_bytes)
119        .map_err(|_| Error::CorruptBytes {
120            cf: CF_INDEX_META,
121            expected: label_bytes.len(),
122            actual: label_bytes.len(),
123        })?
124        .to_string();
125    let mut properties: Vec<String> = Vec::new();
126    for part in parts {
127        let s = std::str::from_utf8(part)
128            .map_err(|_| Error::CorruptBytes {
129                cf: CF_INDEX_META,
130                expected: part.len(),
131                actual: part.len(),
132            })?
133            .to_string();
134        properties.push(s);
135    }
136    if properties.is_empty() {
137        return Err(Error::CorruptBytes {
138            cf: CF_INDEX_META,
139            expected: 1,
140            actual: 0,
141        });
142    }
143    Ok(PropertyIndexSpec { label, properties })
144}
145
146/// Encode an [`EdgePropertyIndexSpec`] as a stable `CF_EDGE_INDEX_META`
147/// key: `<edge_type>\0<prop1>\0<prop2>\0...\0<propN>`. Same NUL-split
148/// shape as the node form, same backward-compat guarantee for
149/// length-1 specs.
150fn edge_index_meta_key(spec: &EdgePropertyIndexSpec) -> Vec<u8> {
151    let cap = spec.edge_type.len()
152        + spec.properties.iter().map(|p| p.len()).sum::<usize>()
153        + spec.properties.len();
154    let mut k = Vec::with_capacity(cap);
155    k.extend_from_slice(spec.edge_type.as_bytes());
156    for p in &spec.properties {
157        k.push(0);
158        k.extend_from_slice(p.as_bytes());
159    }
160    k
161}
162
163fn edge_index_meta_key_decode(key: &[u8]) -> Result<EdgePropertyIndexSpec> {
164    let mut parts = key.split(|b| *b == 0);
165    let type_bytes = parts.next().ok_or(Error::CorruptBytes {
166        cf: CF_EDGE_INDEX_META,
167        expected: 1,
168        actual: 0,
169    })?;
170    let edge_type = std::str::from_utf8(type_bytes)
171        .map_err(|_| Error::CorruptBytes {
172            cf: CF_EDGE_INDEX_META,
173            expected: type_bytes.len(),
174            actual: type_bytes.len(),
175        })?
176        .to_string();
177    let mut properties: Vec<String> = Vec::new();
178    for part in parts {
179        let s = std::str::from_utf8(part)
180            .map_err(|_| Error::CorruptBytes {
181                cf: CF_EDGE_INDEX_META,
182                expected: part.len(),
183                actual: part.len(),
184            })?
185            .to_string();
186        properties.push(s);
187    }
188    if properties.is_empty() {
189        return Err(Error::CorruptBytes {
190            cf: CF_EDGE_INDEX_META,
191            expected: 1,
192            actual: 0,
193        });
194    }
195    Ok(EdgePropertyIndexSpec {
196        edge_type,
197        properties,
198    })
199}
200
201/// Encode a [`PointIndexSpec`] as a stable `CF_POINT_INDEX_META`
202/// key: `<label>\0<property>`. Single-property on purpose — the
203/// spec only carries one property. NUL-split, same shape as the
204/// non-spatial index meta format so tooling that inspects CFs by
205/// hand stays predictable.
206fn point_index_meta_key(spec: &PointIndexSpec) -> Vec<u8> {
207    let mut k = Vec::with_capacity(spec.label.len() + spec.property.len() + 1);
208    k.extend_from_slice(spec.label.as_bytes());
209    k.push(0);
210    k.extend_from_slice(spec.property.as_bytes());
211    k
212}
213
214fn point_index_meta_key_decode(key: &[u8]) -> Result<PointIndexSpec> {
215    let mut parts = key.splitn(2, |b| *b == 0);
216    let label_bytes = parts.next().ok_or(Error::CorruptBytes {
217        cf: CF_POINT_INDEX_META,
218        expected: 1,
219        actual: 0,
220    })?;
221    let property_bytes = parts.next().ok_or(Error::CorruptBytes {
222        cf: CF_POINT_INDEX_META,
223        expected: label_bytes.len() + 2,
224        actual: key.len(),
225    })?;
226    let label = std::str::from_utf8(label_bytes)
227        .map_err(|_| Error::CorruptBytes {
228            cf: CF_POINT_INDEX_META,
229            expected: label_bytes.len(),
230            actual: label_bytes.len(),
231        })?
232        .to_string();
233    let property = std::str::from_utf8(property_bytes)
234        .map_err(|_| Error::CorruptBytes {
235            cf: CF_POINT_INDEX_META,
236            expected: property_bytes.len(),
237            actual: property_bytes.len(),
238        })?
239        .to_string();
240    if property.is_empty() {
241        return Err(Error::CorruptBytes {
242            cf: CF_POINT_INDEX_META,
243            expected: 1,
244            actual: 0,
245        });
246    }
247    Ok(PointIndexSpec { label, property })
248}
249
250/// Relationship-scope analogue of [`point_index_meta_key`]. Same
251/// `<edge_type>\0<property>` shape.
252fn edge_point_index_meta_key(spec: &EdgePointIndexSpec) -> Vec<u8> {
253    let mut k = Vec::with_capacity(spec.edge_type.len() + spec.property.len() + 1);
254    k.extend_from_slice(spec.edge_type.as_bytes());
255    k.push(0);
256    k.extend_from_slice(spec.property.as_bytes());
257    k
258}
259
260fn edge_point_index_meta_key_decode(key: &[u8]) -> Result<EdgePointIndexSpec> {
261    let mut parts = key.splitn(2, |b| *b == 0);
262    let type_bytes = parts.next().ok_or(Error::CorruptBytes {
263        cf: CF_EDGE_POINT_INDEX_META,
264        expected: 1,
265        actual: 0,
266    })?;
267    let property_bytes = parts.next().ok_or(Error::CorruptBytes {
268        cf: CF_EDGE_POINT_INDEX_META,
269        expected: type_bytes.len() + 2,
270        actual: key.len(),
271    })?;
272    let edge_type = std::str::from_utf8(type_bytes)
273        .map_err(|_| Error::CorruptBytes {
274            cf: CF_EDGE_POINT_INDEX_META,
275            expected: type_bytes.len(),
276            actual: type_bytes.len(),
277        })?
278        .to_string();
279    let property = std::str::from_utf8(property_bytes)
280        .map_err(|_| Error::CorruptBytes {
281            cf: CF_EDGE_POINT_INDEX_META,
282            expected: property_bytes.len(),
283            actual: property_bytes.len(),
284        })?
285        .to_string();
286    if property.is_empty() {
287        return Err(Error::CorruptBytes {
288            cf: CF_EDGE_POINT_INDEX_META,
289            expected: 1,
290            actual: 0,
291        });
292    }
293    Ok(EdgePointIndexSpec {
294        edge_type,
295        property,
296    })
297}
298
299pub struct RocksDbStorageEngine {
300    db: DB,
301    /// In-memory view of the registered property indexes, populated
302    /// from [`CF_INDEX_META`] at open time and kept in sync by
303    /// [`RocksDbStorageEngine::create_property_index`] and
304    /// [`RocksDbStorageEngine::drop_property_index`].
305    ///
306    /// The write-path consults this on every `append_put_node` /
307    /// `append_detach_delete_node` call to decide which index entries
308    /// to emit, so it absolutely must not hit the DB on the hot path.
309    /// `RwLock` rather than `Mutex` because the common access pattern
310    /// is many concurrent reads with only DDL-time writes.
311    indexes: RwLock<Vec<PropertyIndexSpec>>,
312    /// Relationship-scope analogue of `indexes`. Populated from
313    /// [`CF_EDGE_INDEX_META`] at open time and kept in sync by
314    /// [`RocksDbStorageEngine::create_edge_property_index`] /
315    /// [`RocksDbStorageEngine::drop_edge_property_index`]. Consulted on
316    /// every edge write path (`append_put_edge`, `append_delete_edge`,
317    /// `append_detach_delete_node`) and by UNIQUE edge-constraint
318    /// enforcement, so the same "must not hit the DB on the hot path"
319    /// contract applies.
320    edge_indexes: RwLock<Vec<EdgePropertyIndexSpec>>,
321    /// In-memory view of the registered property constraints,
322    /// populated from [`CF_CONSTRAINT_META`] at open time and kept in
323    /// sync by [`RocksDbStorageEngine::create_property_constraint`] and
324    /// [`RocksDbStorageEngine::drop_property_constraint`].
325    ///
326    /// Every `put_node` consults this to validate UNIQUE / NOT NULL
327    /// invariants, so — as with `indexes` — it must not hit the DB on
328    /// the hot path. Enforcement reads the catalog under a read lock
329    /// and runs `nodes_by_property` against the backing index only
330    /// when a UNIQUE check actually needs it.
331    constraints: RwLock<Vec<PropertyConstraintSpec>>,
332    /// In-memory view of the registered point indexes. Loaded from
333    /// [`CF_POINT_INDEX_META`] at open and kept in sync by
334    /// [`RocksDbStorageEngine::create_point_index`] /
335    /// [`RocksDbStorageEngine::drop_point_index`]. `append_put_node`
336    /// and `append_detach_delete_node` walk this list to maintain
337    /// entries, so — as with the other in-memory index catalogs — it
338    /// must not hit the DB on the hot path.
339    point_indexes: RwLock<Vec<PointIndexSpec>>,
340    /// Relationship-scope analogue of `point_indexes`. Consulted by
341    /// every edge write (`append_put_edge`, `append_delete_edge`,
342    /// `append_detach_delete_node`) to maintain
343    /// [`CF_EDGE_POINT_INDEX`] entries.
344    edge_point_indexes: RwLock<Vec<EdgePointIndexSpec>>,
345}
346
347/// Operator-tunable rocksdb options. Defaults match what
348/// [`RocksDbStorageEngine::open`] applied historically (modest
349/// `max_open_files`, capped LOG retention) — production
350/// deployments override via the [`storage]` config section.
351#[derive(Debug, Clone)]
352pub struct StorageOptions {
353    /// Cap on open SST files. RocksDB's default of -1 keeps
354    /// every SST in the table cache forever, which blows past
355    /// typical FD soft limits in CI / test setups. 64 is plenty
356    /// for the hot set in the foundational test suites; bump
357    /// for production read-heavy workloads.
358    pub max_open_files: i32,
359    /// Maximum number of historical INFO log files to keep on
360    /// disk. RocksDB's default is 1000 which produces a long
361    /// tail of LOG.old.* files that compounds disk pressure
362    /// when many databases run in parallel.
363    pub keep_log_file_num: usize,
364    /// Per-column-family memtable size (write buffer). Bigger
365    /// = fewer L0 files + more memory per CF. RocksDB's default
366    /// is 64MB; tuning below that helps small / many-tenant
367    /// deployments, above for write-heavy single-tenant ones.
368    /// `None` keeps the upstream default.
369    pub write_buffer_size_bytes: Option<usize>,
370    /// Maximum number of memtables before writes stall. Bigger
371    /// = more headroom for write bursts at the cost of memory.
372    /// RocksDB's default is 2; 4-8 is common for write-heavy
373    /// loads. `None` keeps the upstream default.
374    pub max_write_buffer_number: Option<i32>,
375    /// Bloom filter bits per key on the block-based table
376    /// format. Bigger = lower false-positive rate on point
377    /// reads at the cost of memory. RocksDB's default builds
378    /// no bloom filter unless explicitly configured; 10 bits
379    /// gives ~1% FP rate and is the typical recommendation.
380    /// `None` keeps the upstream default (no bloom filter).
381    pub bloom_filter_bits_per_key: Option<i32>,
382}
383
384impl Default for StorageOptions {
385    fn default() -> Self {
386        Self {
387            max_open_files: 64,
388            keep_log_file_num: 4,
389            write_buffer_size_bytes: None,
390            max_write_buffer_number: None,
391            bloom_filter_bits_per_key: None,
392        }
393    }
394}
395
396impl RocksDbStorageEngine {
397    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
398        Self::open_with_options(path, StorageOptions::default())
399    }
400
401    /// Open with operator-tunable rocksdb knobs. Applied to both
402    /// the database-level options (FD cap, LOG retention) and
403    /// every column family's table options (bloom filter, write
404    /// buffer). Defaults match [`RocksDbStorageEngine::open`].
405    pub fn open_with_options(path: impl AsRef<Path>, opts: StorageOptions) -> Result<Self> {
406        let mut db_opts = Options::default();
407        db_opts.create_if_missing(true);
408        db_opts.create_missing_column_families(true);
409        db_opts.set_max_open_files(opts.max_open_files);
410        db_opts.set_keep_log_file_num(opts.keep_log_file_num);
411
412        // Per-CF options. Same options applied to every column
413        // family — none of them have wildly different access
414        // patterns, and the cost of tuning each independently
415        // exceeds the win for v1.
416        let mut cf_opts = Options::default();
417        if let Some(sz) = opts.write_buffer_size_bytes {
418            cf_opts.set_write_buffer_size(sz);
419        }
420        if let Some(n) = opts.max_write_buffer_number {
421            cf_opts.set_max_write_buffer_number(n);
422        }
423        if let Some(bits) = opts.bloom_filter_bits_per_key {
424            let mut block_opts = rocksdb::BlockBasedOptions::default();
425            block_opts.set_bloom_filter(bits as f64, false);
426            cf_opts.set_block_based_table_factory(&block_opts);
427        }
428
429        let cfs: Vec<ColumnFamilyDescriptor> = ALL_CFS
430            .iter()
431            .map(|name| ColumnFamilyDescriptor::new(*name, cf_opts.clone()))
432            .collect();
433
434        let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
435        let indexes = load_index_meta(&db)?;
436        let edge_indexes = load_edge_index_meta(&db)?;
437        let constraints = load_constraint_meta(&db)?;
438        let point_indexes = load_point_index_meta(&db)?;
439        let edge_point_indexes = load_edge_point_index_meta(&db)?;
440        Ok(Self {
441            db,
442            indexes: RwLock::new(indexes),
443            edge_indexes: RwLock::new(edge_indexes),
444            constraints: RwLock::new(constraints),
445            point_indexes: RwLock::new(point_indexes),
446            edge_point_indexes: RwLock::new(edge_point_indexes),
447        })
448    }
449
450    fn cf(&self, name: &'static str) -> Result<&rocksdb::ColumnFamily> {
451        self.db
452            .cf_handle(name)
453            .ok_or(Error::MissingColumnFamily(name))
454    }
455
456    pub fn put_node(&self, node: &Node) -> Result<()> {
457        let mut batch = WriteBatch::default();
458        self.append_put_node(&mut batch, node)?;
459        self.db.write(batch)?;
460        Ok(())
461    }
462
463    fn append_put_node(&self, batch: &mut WriteBatch, node: &Node) -> Result<()> {
464        let nodes_cf = self.cf(CF_NODES)?;
465        let label_cf = self.cf(CF_LABEL_INDEX)?;
466        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
467
468        let existing: Option<Node> = match self.db.get_cf(nodes_cf, node.id.as_bytes())? {
469            Some(bytes) => Some(serde_json::from_slice(&bytes)?),
470            None => None,
471        };
472        let existing_labels: &[String] = existing.as_ref().map(|n| &n.labels[..]).unwrap_or(&[]);
473
474        // Constraint enforcement: walk every registered constraint and
475        // confirm the post-write state satisfies it. UNIQUE queries
476        // the backing property index (guaranteed to exist — see
477        // `create_property_constraint`) for rival holders of the same
478        // value. NOT NULL checks the incoming property map directly.
479        // Runs before we touch the batch, so a violation short-circuits
480        // without leaving partial writes behind.
481        self.enforce_constraints(node, existing.as_ref())?;
482
483        let bytes = serde_json::to_vec(node)?;
484        batch.put_cf(nodes_cf, node.id.as_bytes(), bytes);
485
486        for old in existing_labels {
487            if !node.labels.contains(old) {
488                batch.delete_cf(label_cf, label_index_key(old, node.id));
489            }
490        }
491        for new in &node.labels {
492            if !existing_labels.contains(new) {
493                batch.put_cf(label_cf, label_index_key(new, node.id), EMPTY);
494            }
495        }
496
497        // Property-index maintenance: for each registered (label, prop)
498        // index, compute the delta between the previous entry (if any)
499        // and the new one, and emit the minimal set of put/delete
500        // operations. Skipping an update when the entry is unchanged
501        // keeps steady-state writes free of index churn.
502        let indexes = self.indexes.read().expect("indexes lock poisoned");
503        for spec in indexes.iter() {
504            let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
505            let now_indexed = node.labels.iter().any(|l| l == &spec.label);
506            let old_encoded = if was_indexed {
507                existing
508                    .as_ref()
509                    .and_then(|n| encode_index_tuple(&n.properties, &spec.properties))
510            } else {
511                None
512            };
513            let new_encoded = if now_indexed {
514                encode_index_tuple(&node.properties, &spec.properties)
515            } else {
516                None
517            };
518            if old_encoded == new_encoded {
519                continue;
520            }
521            if let Some(values) = &old_encoded {
522                batch.delete_cf(
523                    prop_cf,
524                    property_index_composite_key(&spec.label, &spec.properties, values, node.id),
525                );
526            }
527            if let Some(values) = &new_encoded {
528                batch.put_cf(
529                    prop_cf,
530                    property_index_composite_key(&spec.label, &spec.properties, values, node.id),
531                    EMPTY,
532                );
533            }
534        }
535
536        // Point-index maintenance. Same delta shape as the
537        // property-index loop: compare old vs new indexed point and
538        // emit the minimum entry set. Cheap-bail when neither the
539        // label membership nor the point value has changed.
540        let point_indexes = self
541            .point_indexes
542            .read()
543            .expect("point_indexes lock poisoned");
544        if !point_indexes.is_empty() {
545            let point_cf = self.cf(CF_POINT_INDEX)?;
546            for spec in point_indexes.iter() {
547                let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
548                let now_indexed = node.labels.iter().any(|l| l == &spec.label);
549                let old_point = if was_indexed {
550                    existing
551                        .as_ref()
552                        .and_then(|n| extract_indexable_point(&n.properties, &spec.property))
553                } else {
554                    None
555                };
556                let new_point = if now_indexed {
557                    extract_indexable_point(&node.properties, &spec.property)
558                } else {
559                    None
560                };
561                if old_point == new_point {
562                    continue;
563                }
564                if let Some(p) = &old_point {
565                    let cell = point_cell(p.srid, p.x, p.y);
566                    batch.delete_cf(
567                        point_cf,
568                        point_index_key(
569                            &spec.label,
570                            &spec.property,
571                            p.srid,
572                            cell,
573                            node.id.as_bytes(),
574                        ),
575                    );
576                }
577                if let Some(p) = &new_point {
578                    let cell = point_cell(p.srid, p.x, p.y);
579                    batch.put_cf(
580                        point_cf,
581                        point_index_key(
582                            &spec.label,
583                            &spec.property,
584                            p.srid,
585                            cell,
586                            node.id.as_bytes(),
587                        ),
588                        point_index_value(p.x, p.y, p.z),
589                    );
590                }
591            }
592        }
593
594        Ok(())
595    }
596
597    pub fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
598        let cf = self.cf(CF_NODES)?;
599        match self.db.get_cf(cf, id.as_bytes())? {
600            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
601            None => Ok(None),
602        }
603    }
604
605    pub fn put_edge(&self, edge: &Edge) -> Result<()> {
606        let mut batch = WriteBatch::default();
607        self.append_put_edge(&mut batch, edge)?;
608        self.db.write(batch)?;
609        Ok(())
610    }
611
612    fn append_put_edge(&self, batch: &mut WriteBatch, edge: &Edge) -> Result<()> {
613        let edges_cf = self.cf(CF_EDGES)?;
614        let out_cf = self.cf(CF_ADJ_OUT)?;
615        let in_cf = self.cf(CF_ADJ_IN)?;
616        let type_cf = self.cf(CF_TYPE_INDEX)?;
617        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
618
619        // Relationship-scope constraint enforcement. Same short-
620        // circuit pattern as `append_put_node` — if a constraint
621        // would be violated, abort before touching the batch so the
622        // write never lands.
623        let existing_edge = self.get_edge(edge.id)?;
624        self.enforce_edge_constraints(edge, existing_edge.as_ref())?;
625
626        let bytes = serde_json::to_vec(edge)?;
627        batch.put_cf(edges_cf, edge.id.as_bytes(), bytes);
628        batch.put_cf(
629            out_cf,
630            adj_key(edge.source, edge.id),
631            edge.target.as_bytes(),
632        );
633        batch.put_cf(in_cf, adj_key(edge.target, edge.id), edge.source.as_bytes());
634        batch.put_cf(type_cf, type_index_key(&edge.edge_type, edge.id), EMPTY);
635
636        // Edge-property-index maintenance: for each registered
637        // (edge_type, prop) index, delta the previous entry against
638        // the new one and emit the minimal set of put/delete
639        // operations. Mirrors the node path in `append_put_node`.
640        // An existing edge whose type doesn't match the registered
641        // index contributes no work — edges never change their type,
642        // so a type-mismatched entry can't exist to clean up.
643        let edge_indexes = self
644            .edge_indexes
645            .read()
646            .expect("edge_indexes lock poisoned");
647        for spec in edge_indexes.iter() {
648            if spec.edge_type != edge.edge_type {
649                continue;
650            }
651            let old_encoded = existing_edge
652                .as_ref()
653                .and_then(|e| encode_index_tuple(&e.properties, &spec.properties));
654            let new_encoded = encode_index_tuple(&edge.properties, &spec.properties);
655            if old_encoded == new_encoded {
656                continue;
657            }
658            if let Some(values) = &old_encoded {
659                batch.delete_cf(
660                    edge_prop_cf,
661                    edge_property_index_composite_key(
662                        &spec.edge_type,
663                        &spec.properties,
664                        values,
665                        edge.id,
666                    ),
667                );
668            }
669            if let Some(values) = &new_encoded {
670                batch.put_cf(
671                    edge_prop_cf,
672                    edge_property_index_composite_key(
673                        &spec.edge_type,
674                        &spec.properties,
675                        values,
676                        edge.id,
677                    ),
678                    EMPTY,
679                );
680            }
681        }
682
683        // Edge-point-index maintenance — relationship-scope mirror
684        // of the node-point-index loop in `append_put_node`. Same
685        // delta shape: diff the indexed point across the previous
686        // and new edge state and emit the minimum put/delete set.
687        let edge_point_indexes = self
688            .edge_point_indexes
689            .read()
690            .expect("edge_point_indexes lock poisoned");
691        if !edge_point_indexes.is_empty() {
692            let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
693            for spec in edge_point_indexes.iter() {
694                if spec.edge_type != edge.edge_type {
695                    continue;
696                }
697                let old_point = existing_edge
698                    .as_ref()
699                    .and_then(|e| extract_indexable_point(&e.properties, &spec.property));
700                let new_point = extract_indexable_point(&edge.properties, &spec.property);
701                if old_point == new_point {
702                    continue;
703                }
704                if let Some(p) = &old_point {
705                    let cell = point_cell(p.srid, p.x, p.y);
706                    batch.delete_cf(
707                        point_cf,
708                        point_index_key(
709                            &spec.edge_type,
710                            &spec.property,
711                            p.srid,
712                            cell,
713                            edge.id.as_bytes(),
714                        ),
715                    );
716                }
717                if let Some(p) = &new_point {
718                    let cell = point_cell(p.srid, p.x, p.y);
719                    batch.put_cf(
720                        point_cf,
721                        point_index_key(
722                            &spec.edge_type,
723                            &spec.property,
724                            p.srid,
725                            cell,
726                            edge.id.as_bytes(),
727                        ),
728                        point_index_value(p.x, p.y, p.z),
729                    );
730                }
731            }
732        }
733        Ok(())
734    }
735
736    pub fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
737        let cf = self.cf(CF_EDGES)?;
738        match self.db.get_cf(cf, id.as_bytes())? {
739            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
740            None => Ok(None),
741        }
742    }
743
744    pub fn delete_edge(&self, id: EdgeId) -> Result<()> {
745        let edge = self.get_edge(id)?.ok_or(Error::EdgeNotFound(id))?;
746        let mut batch = WriteBatch::default();
747        self.append_delete_edge(&mut batch, id, &edge)?;
748        self.db.write(batch)?;
749        Ok(())
750    }
751
752    fn append_delete_edge(&self, batch: &mut WriteBatch, id: EdgeId, edge: &Edge) -> Result<()> {
753        let edges_cf = self.cf(CF_EDGES)?;
754        let out_cf = self.cf(CF_ADJ_OUT)?;
755        let in_cf = self.cf(CF_ADJ_IN)?;
756        let type_cf = self.cf(CF_TYPE_INDEX)?;
757        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
758
759        batch.delete_cf(edges_cf, id.as_bytes());
760        batch.delete_cf(out_cf, adj_key(edge.source, id));
761        batch.delete_cf(in_cf, adj_key(edge.target, id));
762        batch.delete_cf(type_cf, type_index_key(&edge.edge_type, id));
763
764        // Remove any edge-property-index entries this edge owned.
765        // Mirrors the put path: for each registered index whose type
766        // matches, the encoded value (if any) pinned an index key we
767        // now need to delete.
768        let edge_indexes = self
769            .edge_indexes
770            .read()
771            .expect("edge_indexes lock poisoned");
772        for spec in edge_indexes.iter() {
773            if spec.edge_type != edge.edge_type {
774                continue;
775            }
776            if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
777                batch.delete_cf(
778                    edge_prop_cf,
779                    edge_property_index_composite_key(
780                        &spec.edge_type,
781                        &spec.properties,
782                        &values,
783                        id,
784                    ),
785                );
786            }
787        }
788
789        // Edge-point-index sweep for the deleted edge. Mirrors the
790        // node-point-index sweep in `append_detach_delete_node`.
791        let edge_point_indexes = self
792            .edge_point_indexes
793            .read()
794            .expect("edge_point_indexes lock poisoned");
795        if !edge_point_indexes.is_empty() {
796            let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
797            for spec in edge_point_indexes.iter() {
798                if spec.edge_type != edge.edge_type {
799                    continue;
800                }
801                if let Some(p) = extract_indexable_point(&edge.properties, &spec.property) {
802                    let cell = point_cell(p.srid, p.x, p.y);
803                    batch.delete_cf(
804                        point_cf,
805                        point_index_key(
806                            &spec.edge_type,
807                            &spec.property,
808                            p.srid,
809                            cell,
810                            id.as_bytes(),
811                        ),
812                    );
813                }
814            }
815        }
816        Ok(())
817    }
818
819    pub fn detach_delete_node(&self, id: NodeId) -> Result<()> {
820        let mut batch = WriteBatch::default();
821        self.append_detach_delete_node(&mut batch, id)?;
822        self.db.write(batch)?;
823        Ok(())
824    }
825
826    fn append_detach_delete_node(&self, batch: &mut WriteBatch, id: NodeId) -> Result<()> {
827        let node = self.get_node(id)?;
828        let outgoing = self.outgoing(id)?;
829        let incoming = self.incoming(id)?;
830
831        let nodes_cf = self.cf(CF_NODES)?;
832        let edges_cf = self.cf(CF_EDGES)?;
833        let out_cf = self.cf(CF_ADJ_OUT)?;
834        let in_cf = self.cf(CF_ADJ_IN)?;
835        let label_cf = self.cf(CF_LABEL_INDEX)?;
836        let type_cf = self.cf(CF_TYPE_INDEX)?;
837        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
838        let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
839
840        if let Some(n) = &node {
841            for label in &n.labels {
842                batch.delete_cf(label_cf, label_index_key(label, id));
843            }
844            // Remove every property-index entry this node was holding
845            // open. Mirrors the put path: for each registered index
846            // whose label matches, if the node carries an indexable
847            // value for the property, delete the corresponding entry.
848            let indexes = self.indexes.read().expect("indexes lock poisoned");
849            for spec in indexes.iter() {
850                if !n.labels.iter().any(|l| l == &spec.label) {
851                    continue;
852                }
853                if let Some(values) = encode_index_tuple(&n.properties, &spec.properties) {
854                    batch.delete_cf(
855                        prop_cf,
856                        property_index_composite_key(&spec.label, &spec.properties, &values, id),
857                    );
858                }
859            }
860            // Point-index entries held by this node, mirror of the
861            // property-index sweep above.
862            let point_indexes = self
863                .point_indexes
864                .read()
865                .expect("point_indexes lock poisoned");
866            if !point_indexes.is_empty() {
867                let point_cf = self.cf(CF_POINT_INDEX)?;
868                for spec in point_indexes.iter() {
869                    if !n.labels.iter().any(|l| l == &spec.label) {
870                        continue;
871                    }
872                    if let Some(p) = extract_indexable_point(&n.properties, &spec.property) {
873                        let cell = point_cell(p.srid, p.x, p.y);
874                        batch.delete_cf(
875                            point_cf,
876                            point_index_key(
877                                &spec.label,
878                                &spec.property,
879                                p.srid,
880                                cell,
881                                id.as_bytes(),
882                            ),
883                        );
884                    }
885                }
886            }
887        }
888
889        // Edges detached by this delete also need their
890        // edge-property-index entries swept. Read the catalog once so
891        // the inner loop doesn't reacquire the lock per edge.
892        let edge_indexes_snapshot = self
893            .edge_indexes
894            .read()
895            .expect("edge_indexes lock poisoned")
896            .clone();
897        let edge_point_indexes_snapshot = self
898            .edge_point_indexes
899            .read()
900            .expect("edge_point_indexes lock poisoned")
901            .clone();
902        let edge_point_cf_opt = if edge_point_indexes_snapshot.is_empty() {
903            None
904        } else {
905            Some(self.cf(CF_EDGE_POINT_INDEX)?)
906        };
907
908        for (edge_id, target) in &outgoing {
909            if let Some(e) = self.get_edge(*edge_id)? {
910                batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
911                for spec in &edge_indexes_snapshot {
912                    if spec.edge_type != e.edge_type {
913                        continue;
914                    }
915                    if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
916                        batch.delete_cf(
917                            edge_prop_cf,
918                            edge_property_index_composite_key(
919                                &spec.edge_type,
920                                &spec.properties,
921                                &values,
922                                *edge_id,
923                            ),
924                        );
925                    }
926                }
927                if let Some(point_cf) = edge_point_cf_opt {
928                    for spec in &edge_point_indexes_snapshot {
929                        if spec.edge_type != e.edge_type {
930                            continue;
931                        }
932                        if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
933                            let cell = point_cell(p.srid, p.x, p.y);
934                            batch.delete_cf(
935                                point_cf,
936                                point_index_key(
937                                    &spec.edge_type,
938                                    &spec.property,
939                                    p.srid,
940                                    cell,
941                                    edge_id.as_bytes(),
942                                ),
943                            );
944                        }
945                    }
946                }
947            }
948            batch.delete_cf(edges_cf, edge_id.as_bytes());
949            batch.delete_cf(out_cf, adj_key(id, *edge_id));
950            batch.delete_cf(in_cf, adj_key(*target, *edge_id));
951        }
952        for (edge_id, source) in &incoming {
953            if let Some(e) = self.get_edge(*edge_id)? {
954                batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
955                for spec in &edge_indexes_snapshot {
956                    if spec.edge_type != e.edge_type {
957                        continue;
958                    }
959                    if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
960                        batch.delete_cf(
961                            edge_prop_cf,
962                            edge_property_index_composite_key(
963                                &spec.edge_type,
964                                &spec.properties,
965                                &values,
966                                *edge_id,
967                            ),
968                        );
969                    }
970                }
971                if let Some(point_cf) = edge_point_cf_opt {
972                    for spec in &edge_point_indexes_snapshot {
973                        if spec.edge_type != e.edge_type {
974                            continue;
975                        }
976                        if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
977                            let cell = point_cell(p.srid, p.x, p.y);
978                            batch.delete_cf(
979                                point_cf,
980                                point_index_key(
981                                    &spec.edge_type,
982                                    &spec.property,
983                                    p.srid,
984                                    cell,
985                                    edge_id.as_bytes(),
986                                ),
987                            );
988                        }
989                    }
990                }
991            }
992            batch.delete_cf(edges_cf, edge_id.as_bytes());
993            batch.delete_cf(out_cf, adj_key(*source, *edge_id));
994            batch.delete_cf(in_cf, adj_key(id, *edge_id));
995        }
996        batch.delete_cf(nodes_cf, id.as_bytes());
997        Ok(())
998    }
999
1000    /// Apply a sequence of mutations as one atomic rocksdb write. Either
1001    /// every mutation lands or none does — no replica can observe a
1002    /// partial result, even across a process crash.
1003    ///
1004    /// Reads performed during batch construction (existing labels for
1005    /// PutNode, edge metadata for DeleteEdge / DetachDeleteNode) hit the
1006    /// live store, not the in-flight batch. Read-your-writes across
1007    /// mutations in the same batch is *not* supported. The executor never
1008    /// generates such patterns: it produces fresh ids per row and
1009    /// dedupes mutated entities before flushing.
1010    pub fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
1011        let mut batch = WriteBatch::default();
1012        for m in mutations {
1013            match m {
1014                GraphMutation::PutNode(n) => self.append_put_node(&mut batch, n)?,
1015                GraphMutation::PutEdge(e) => self.append_put_edge(&mut batch, e)?,
1016                GraphMutation::DeleteEdge(id) => {
1017                    if let Some(edge) = self.get_edge(*id)? {
1018                        self.append_delete_edge(&mut batch, *id, &edge)?;
1019                    }
1020                }
1021                GraphMutation::DetachDeleteNode(id) => {
1022                    self.append_detach_delete_node(&mut batch, *id)?;
1023                }
1024            }
1025        }
1026        self.db.write(batch)?;
1027        Ok(())
1028    }
1029
1030    /// Walk every node in the store. Used for snapshot construction; not
1031    /// suitable as a query primitive since it materializes the full set.
1032    pub fn all_nodes(&self) -> Result<Vec<Node>> {
1033        let cf = self.cf(CF_NODES)?;
1034        let mut nodes = Vec::new();
1035        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1036            let (_, value) = item?;
1037            nodes.push(serde_json::from_slice(&value)?);
1038        }
1039        Ok(nodes)
1040    }
1041
1042    /// Walk every edge in the store. Used for snapshot construction.
1043    pub fn all_edges(&self) -> Result<Vec<Edge>> {
1044        let cf = self.cf(CF_EDGES)?;
1045        let mut edges = Vec::new();
1046        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1047            let (_, value) = item?;
1048            edges.push(serde_json::from_slice(&value)?);
1049        }
1050        Ok(edges)
1051    }
1052
1053    /// Produce a consistent point-in-time checkpoint of the store at
1054    /// `path`. Wraps rocksdb's [`Checkpoint`] API — on the same
1055    /// filesystem the checkpoint is effectively free (hard links over
1056    /// the underlying SST files); cross-filesystem it falls back to a
1057    /// full copy. `path` must NOT already exist; rocksdb creates it.
1058    ///
1059    /// Used by the Raft state machine's streaming snapshot path: a
1060    /// snapshot is built by checkpointing into a temp dir, packing
1061    /// the checkpoint's files into a length-prefixed archive, and
1062    /// shipping the bytes. Skips the "Vec<Node> in memory" materialization
1063    /// path that the previous JSON-over-everything snapshot used.
1064    pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<()> {
1065        let checkpoint = Checkpoint::new(&self.db)?;
1066        checkpoint.create_checkpoint(path)?;
1067        Ok(())
1068    }
1069
1070    /// Drop every key from every column family. Used by snapshot install
1071    /// to wipe local state before applying the leader's snapshot. Cheap
1072    /// for empty / small stores; for large stores this is O(n).
1073    pub fn clear_all(&self) -> Result<()> {
1074        let mut batch = WriteBatch::default();
1075        for cf_name in ALL_CFS {
1076            let cf = self.cf(cf_name)?;
1077            for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1078                let (key, _) = item?;
1079                batch.delete_cf(cf, key);
1080            }
1081        }
1082        self.db.write(batch)?;
1083        Ok(())
1084    }
1085
1086    pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
1087        let cf = self.cf(CF_NODES)?;
1088        let mut results = Vec::new();
1089        for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1090            let (key, _) = item?;
1091            if key.len() != ID_LEN {
1092                return Err(Error::CorruptBytes {
1093                    cf: CF_NODES,
1094                    expected: ID_LEN,
1095                    actual: key.len(),
1096                });
1097            }
1098            let mut bytes = [0u8; ID_LEN];
1099            bytes.copy_from_slice(&key);
1100            results.push(NodeId::from_bytes(bytes));
1101        }
1102        Ok(results)
1103    }
1104
1105    pub fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1106        self.scan_adj(CF_ADJ_OUT, source)
1107    }
1108
1109    pub fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1110        self.scan_adj(CF_ADJ_IN, target)
1111    }
1112
1113    /// Snapshot the currently-registered property indexes. Cheap clone
1114    /// — specs are tiny (a label + a property key).
1115    pub fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
1116        self.indexes.read().expect("indexes lock poisoned").clone()
1117    }
1118
1119    /// Declare a new `(label, property)` single-property equality
1120    /// index and backfill it by scanning every node that currently
1121    /// carries `label`. Idempotent: re-creating an already-registered
1122    /// index is a no-op, matching how Neo4j's `CREATE INDEX IF NOT
1123    /// EXISTS` behaves.
1124    ///
1125    /// Backfill is done in the same [`WriteBatch`] as the meta insert,
1126    /// so a crash mid-backfill leaves the store with either zero
1127    /// entries (batch not yet written) or a fully-populated index
1128    /// (batch committed). No partial-build state is ever visible.
1129    pub fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
1130        self.create_property_index_composite(label, &[property.to_string()])
1131    }
1132
1133    /// Composite form of [`create_property_index`]. Declares a
1134    /// `(label, properties)` tuple index and backfills it by scanning
1135    /// every node currently carrying `label`. A node that's missing
1136    /// any of the properties, or carries an unindexable value for
1137    /// any of them, contributes no entry — composite indexes are
1138    /// all-or-nothing on the tuple.
1139    ///
1140    /// Single-property calls delegate here with a length-1 slice;
1141    /// the on-disk key format is byte-identical to the pre-composite
1142    /// layout in that case.
1143    pub fn create_property_index_composite(
1144        &self,
1145        label: &str,
1146        properties: &[String],
1147    ) -> Result<()> {
1148        assert!(
1149            !properties.is_empty(),
1150            "create_property_index_composite requires at least one property"
1151        );
1152        let spec = PropertyIndexSpec {
1153            label: label.to_string(),
1154            properties: properties.to_vec(),
1155        };
1156        let mut guard = self.indexes.write().expect("indexes lock poisoned");
1157        if guard.contains(&spec) {
1158            return Ok(());
1159        }
1160
1161        let meta_cf = self.cf(CF_INDEX_META)?;
1162        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1163        let mut batch = WriteBatch::default();
1164        batch.put_cf(meta_cf, index_meta_key(&spec), EMPTY);
1165
1166        for node_id in self.nodes_by_label(label)? {
1167            let node = match self.get_node(node_id)? {
1168                Some(n) => n,
1169                None => continue,
1170            };
1171            if let Some(values) = encode_index_tuple(&node.properties, &spec.properties) {
1172                batch.put_cf(
1173                    prop_cf,
1174                    property_index_composite_key(label, properties, &values, node_id),
1175                    EMPTY,
1176                );
1177            }
1178        }
1179
1180        self.db.write(batch)?;
1181        guard.push(spec);
1182        Ok(())
1183    }
1184
1185    /// Declare a new property constraint. Behavior summary:
1186    ///
1187    /// * The name is the user-facing identifier. When `name` is
1188    ///   `None`, a deterministic default (`constraint_<label>_<prop>_<kind>`)
1189    ///   is generated so `DROP CONSTRAINT` can still target it.
1190    /// * If the chosen name is already registered with the same
1191    ///   `(label, property, kind)`, this is a no-op (the existing
1192    ///   spec is returned). If the name collides on a *different*
1193    ///   spec, returns `ConstraintNameConflict` unless
1194    ///   `if_not_exists` is set, in which case the existing spec is
1195    ///   preserved and returned unchanged.
1196    /// * UNIQUE constraints implicitly require a backing property
1197    ///   index. `create_property_constraint` creates one on
1198    ///   `(label, property)` if it isn't already registered —
1199    ///   matching Neo4j's "a unique constraint also provides an
1200    ///   index" contract.
1201    /// * Before committing, the method scans existing nodes to
1202    ///   verify the constraint is satisfied. Any violation aborts
1203    ///   the creation with `ConstraintViolation`; the registry is
1204    ///   left unchanged.
1205    ///
1206    /// The meta write and the implicit-index write are batched into a
1207    /// single [`WriteBatch`] so the on-disk state stays consistent
1208    /// under crash.
1209    pub fn create_property_constraint(
1210        &self,
1211        name: Option<&str>,
1212        scope: &ConstraintScope,
1213        properties: &[String],
1214        kind: PropertyConstraintKind,
1215        if_not_exists: bool,
1216    ) -> Result<PropertyConstraintSpec> {
1217        // Arity validation: single-property kinds get exactly one
1218        // property; NodeKey accepts any positive count. Empty lists
1219        // are always invalid. NodeKey on relationship scope isn't
1220        // allowed — the "NODE" in the name is specifically about
1221        // node semantics; relationship equivalents would need their
1222        // own variant.
1223        if properties.is_empty() {
1224            return Err(Error::ConstraintArity {
1225                kind: kind.as_string(),
1226                details: "at least one property is required".into(),
1227            });
1228        }
1229        if !kind.allows_multi_property() && properties.len() != 1 {
1230            return Err(Error::ConstraintArity {
1231                kind: kind.as_string(),
1232                details: format!("expects exactly one property, got {}", properties.len()),
1233            });
1234        }
1235        if matches!(kind, PropertyConstraintKind::NodeKey)
1236            && matches!(scope, ConstraintScope::Relationship(_))
1237        {
1238            return Err(Error::ConstraintArity {
1239                kind: kind.as_string(),
1240                details: "NODE KEY cannot be applied to a relationship scope".into(),
1241            });
1242        }
1243        let resolved_name = match name {
1244            Some(n) => n.to_string(),
1245            None => default_constraint_name(scope, properties, kind),
1246        };
1247        let spec = PropertyConstraintSpec {
1248            name: resolved_name.clone(),
1249            scope: scope.clone(),
1250            properties: properties.to_vec(),
1251            kind,
1252        };
1253
1254        // Idempotency / conflict check against the existing registry
1255        // under a read lock first — avoids taking the write lock when
1256        // the answer is "nothing to do".
1257        {
1258            let guard = self.constraints.read().expect("constraints lock poisoned");
1259            if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1260                if existing == &spec {
1261                    return Ok(existing.clone());
1262                }
1263                if if_not_exists {
1264                    return Ok(existing.clone());
1265                }
1266                return Err(Error::ConstraintNameConflict {
1267                    name: resolved_name,
1268                });
1269            }
1270        }
1271
1272        // UNIQUE needs a backing single-property equality index on
1273        // both scopes so enforcement stays O(log N) per insert. The
1274        // create calls are idempotent, mirroring Neo4j's "a unique
1275        // constraint also provides an index" contract. The backing
1276        // index is not torn down on `DROP CONSTRAINT`; users who
1277        // want it gone issue a separate `DROP INDEX`. NODE KEY gets
1278        // the same auto-provisioning against a composite tuple
1279        // index so its uniqueness check is a single seek instead of
1280        // an O(M * K) label walk.
1281        match (kind, scope) {
1282            (PropertyConstraintKind::Unique, ConstraintScope::Node(label)) => {
1283                self.create_property_index(label, &properties[0])?;
1284            }
1285            (PropertyConstraintKind::Unique, ConstraintScope::Relationship(edge_type)) => {
1286                self.create_edge_property_index(edge_type, &properties[0])?;
1287            }
1288            (PropertyConstraintKind::NodeKey, ConstraintScope::Node(label)) => {
1289                self.create_property_index_composite(label, properties)?;
1290            }
1291            _ => {}
1292        }
1293
1294        // Validate existing data against the new constraint before we
1295        // commit the meta entry. A single pre-existing violation makes
1296        // the whole declaration fail — matching how Neo4j rejects
1297        // constraint creation on data that doesn't satisfy it.
1298        validate_existing_data(self, &spec)?;
1299
1300        let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1301        let mut batch = WriteBatch::default();
1302        batch.put_cf(
1303            meta_cf,
1304            resolved_name.as_bytes(),
1305            constraint_meta_encode(&spec),
1306        );
1307        self.db.write(batch)?;
1308
1309        let mut guard = self.constraints.write().expect("constraints lock poisoned");
1310        // Re-check under the write lock to avoid a TOCTOU where two
1311        // concurrent CREATEs race through the read-lock idempotency
1312        // check. The second writer sees the first's insertion and
1313        // returns the existing spec.
1314        if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1315            return Ok(existing.clone());
1316        }
1317        guard.push(spec.clone());
1318        Ok(spec)
1319    }
1320
1321    /// Tear down a constraint by name. When `if_exists` is true,
1322    /// dropping a non-existent constraint is a no-op. Never drops the
1323    /// backing index for UNIQUE — users who want it gone issue a
1324    /// separate `DROP INDEX`.
1325    pub fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1326        let mut guard = self.constraints.write().expect("constraints lock poisoned");
1327        let idx = guard.iter().position(|s| s.name == name);
1328        match idx {
1329            None if if_exists => Ok(()),
1330            None => Err(Error::ConstraintNotFound {
1331                name: name.to_string(),
1332            }),
1333            Some(i) => {
1334                let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1335                let mut batch = WriteBatch::default();
1336                batch.delete_cf(meta_cf, name.as_bytes());
1337                self.db.write(batch)?;
1338                guard.remove(i);
1339                Ok(())
1340            }
1341        }
1342    }
1343
1344    /// Snapshot the currently-registered constraints. Cheap clone —
1345    /// every spec is a handful of small strings and an enum.
1346    pub fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
1347        self.constraints
1348            .read()
1349            .expect("constraints lock poisoned")
1350            .clone()
1351    }
1352
1353    /// Enforce registered constraints against the post-write state of
1354    /// `node`. Invoked by the `put_node` path before the batch is
1355    /// constructed. `existing` is the previously-stored snapshot of
1356    /// the same node (if any) so the check can distinguish "this node
1357    /// is updating its own value" from "some other node owns this
1358    /// value". Relationship-scope constraints are ignored here — the
1359    /// edge write path has its own entry point.
1360    fn enforce_constraints(&self, node: &Node, existing: Option<&Node>) -> Result<()> {
1361        let constraints = self.constraints.read().expect("constraints lock poisoned");
1362        if constraints.is_empty() {
1363            return Ok(());
1364        }
1365        let now_has_label = |label: &str| node.labels.iter().any(|l| l == label);
1366        for spec in constraints.iter() {
1367            let label = match &spec.scope {
1368                ConstraintScope::Node(l) => l,
1369                // Relationship-scope constraints apply to edges, not
1370                // nodes. Skip here; `enforce_edge_constraints` runs
1371                // on the edge write path.
1372                ConstraintScope::Relationship(_) => continue,
1373            };
1374            if !now_has_label(label) {
1375                continue;
1376            }
1377            // Single-property kinds store the target in
1378            // `properties[0]`; NodeKey walks the whole list.
1379            let primary = spec.primary_property();
1380            match spec.kind {
1381                PropertyConstraintKind::NotNull => {
1382                    let present = node
1383                        .properties
1384                        .get(primary)
1385                        .is_some_and(|v| !matches!(v, Property::Null));
1386                    if !present {
1387                        return Err(Error::ConstraintViolation {
1388                            name: spec.name.clone(),
1389                            kind: spec.kind.as_string(),
1390                            label: label.clone(),
1391                            property: primary.to_string(),
1392                            details: format!(
1393                                "node {} is missing required property `{}`",
1394                                node.id, primary
1395                            ),
1396                        });
1397                    }
1398                }
1399                PropertyConstraintKind::Unique => {
1400                    let Some(value) = node.properties.get(primary) else {
1401                        // No value → no uniqueness collision possible.
1402                        // Matches Neo4j: UNIQUE alone allows NULL;
1403                        // combine with NOT NULL for strict presence.
1404                        continue;
1405                    };
1406                    if matches!(value, Property::Null) {
1407                        continue;
1408                    }
1409                    if encode_index_value(value).is_none() {
1410                        continue;
1411                    }
1412                    let holders = self.nodes_by_property(label, primary, value)?;
1413                    for other in holders {
1414                        if other == node.id {
1415                            continue;
1416                        }
1417                        let was_self = existing
1418                            .and_then(|n| n.properties.get(primary))
1419                            .is_some_and(|v| v == value);
1420                        let _ = was_self;
1421                        return Err(Error::ConstraintViolation {
1422                            name: spec.name.clone(),
1423                            kind: spec.kind.as_string(),
1424                            label: label.clone(),
1425                            property: primary.to_string(),
1426                            details: format!("value already held by node {}", other),
1427                        });
1428                    }
1429                }
1430                PropertyConstraintKind::PropertyType(target) => {
1431                    let Some(value) = node.properties.get(primary) else {
1432                        continue;
1433                    };
1434                    if matches!(value, Property::Null) {
1435                        continue;
1436                    }
1437                    if !property_matches_type(value, target) {
1438                        return Err(Error::ConstraintViolation {
1439                            name: spec.name.clone(),
1440                            kind: spec.kind.as_string(),
1441                            label: label.clone(),
1442                            property: primary.to_string(),
1443                            details: format!(
1444                                "node {} has value of type {} (expected {})",
1445                                node.id,
1446                                value.type_name(),
1447                                target.as_str()
1448                            ),
1449                        });
1450                    }
1451                }
1452                PropertyConstraintKind::NodeKey => {
1453                    // Presence check first: every property must be
1454                    // present and non-null. Fails fast before the
1455                    // seek so a missing-property error takes
1456                    // precedence over a (potentially) conflicting
1457                    // existing tuple.
1458                    let mut my_tuple: Vec<Property> = Vec::with_capacity(spec.properties.len());
1459                    for prop in &spec.properties {
1460                        match node.properties.get(prop) {
1461                            Some(v) if !matches!(v, Property::Null) => my_tuple.push(v.clone()),
1462                            _ => {
1463                                return Err(Error::ConstraintViolation {
1464                                    name: spec.name.clone(),
1465                                    kind: spec.kind.as_string(),
1466                                    label: label.clone(),
1467                                    property: prop.clone(),
1468                                    details: format!(
1469                                        "node {} is missing required property `{}`",
1470                                        node.id, prop
1471                                    ),
1472                                });
1473                            }
1474                        }
1475                    }
1476                    // Uniqueness via the auto-provisioned composite
1477                    // index (see `create_property_constraint`). O(log N)
1478                    // per insert instead of the old O(M × K) label walk.
1479                    let hits = self.nodes_by_properties(label, &spec.properties, &my_tuple)?;
1480                    if let Some(other_id) = hits.iter().find(|&&id| id != node.id) {
1481                        return Err(Error::ConstraintViolation {
1482                            name: spec.name.clone(),
1483                            kind: spec.kind.as_string(),
1484                            label: label.clone(),
1485                            property: spec.properties.join(","),
1486                            details: format!("tuple already held by node {}", other_id),
1487                        });
1488                    }
1489                }
1490            }
1491        }
1492        Ok(())
1493    }
1494
1495    /// Enforce registered relationship-scope constraints against the
1496    /// post-write state of `edge`. Invoked by the `put_edge` path
1497    /// before the batch is constructed. Walks every constraint whose
1498    /// scope matches the edge's type and runs the kind-specific
1499    /// check. `existing` is the prior on-disk snapshot of the same
1500    /// edge (if any) so the uniqueness check can allow self-updates.
1501    fn enforce_edge_constraints(&self, edge: &Edge, existing: Option<&Edge>) -> Result<()> {
1502        let constraints = self.constraints.read().expect("constraints lock poisoned");
1503        if constraints.is_empty() {
1504            return Ok(());
1505        }
1506        for spec in constraints.iter() {
1507            let edge_type = match &spec.scope {
1508                ConstraintScope::Relationship(t) => t,
1509                // Node-scope constraints don't concern edges.
1510                ConstraintScope::Node(_) => continue,
1511            };
1512            if edge_type != &edge.edge_type {
1513                continue;
1514            }
1515            let primary = spec.primary_property();
1516            match spec.kind {
1517                PropertyConstraintKind::NotNull => {
1518                    let present = edge
1519                        .properties
1520                        .get(primary)
1521                        .is_some_and(|v| !matches!(v, Property::Null));
1522                    if !present {
1523                        return Err(Error::ConstraintViolation {
1524                            name: spec.name.clone(),
1525                            kind: spec.kind.as_string(),
1526                            label: edge_type.clone(),
1527                            property: primary.to_string(),
1528                            details: format!(
1529                                "edge {} is missing required property `{}`",
1530                                edge.id, primary
1531                            ),
1532                        });
1533                    }
1534                }
1535                PropertyConstraintKind::Unique => {
1536                    let Some(value) = edge.properties.get(primary) else {
1537                        // No value → no uniqueness collision possible.
1538                        // Matches the node path: UNIQUE alone allows
1539                        // NULL / missing; combine with NOT NULL for
1540                        // strict presence.
1541                        continue;
1542                    };
1543                    if matches!(value, Property::Null) {
1544                        continue;
1545                    }
1546                    if encode_index_value(value).is_none() {
1547                        // Value type doesn't support equality in the
1548                        // index (Float, List, Map, etc.). Skip — the
1549                        // node path makes the same call for
1550                        // consistency, and the index can't help us
1551                        // here anyway.
1552                        continue;
1553                    }
1554                    // Probe the backing edge-property index (created
1555                    // by `create_property_constraint` above) for
1556                    // rival holders. O(log N) + O(matching edges)
1557                    // vs. the previous O(E_type) per-insert scan.
1558                    let holders = self.edges_by_property(edge_type, primary, value)?;
1559                    for other_id in holders {
1560                        if other_id == edge.id {
1561                            continue;
1562                        }
1563                        let was_self = existing
1564                            .and_then(|e| e.properties.get(primary))
1565                            .is_some_and(|v| v == value);
1566                        let _ = was_self;
1567                        return Err(Error::ConstraintViolation {
1568                            name: spec.name.clone(),
1569                            kind: spec.kind.as_string(),
1570                            label: edge_type.clone(),
1571                            property: primary.to_string(),
1572                            details: format!("value already held by edge {}", other_id),
1573                        });
1574                    }
1575                }
1576                PropertyConstraintKind::PropertyType(target) => {
1577                    let Some(value) = edge.properties.get(primary) else {
1578                        continue;
1579                    };
1580                    if matches!(value, Property::Null) {
1581                        continue;
1582                    }
1583                    if !property_matches_type(value, target) {
1584                        return Err(Error::ConstraintViolation {
1585                            name: spec.name.clone(),
1586                            kind: spec.kind.as_string(),
1587                            label: edge_type.clone(),
1588                            property: primary.to_string(),
1589                            details: format!(
1590                                "edge {} has value of type {} (expected {})",
1591                                edge.id,
1592                                value.type_name(),
1593                                target.as_str()
1594                            ),
1595                        });
1596                    }
1597                }
1598                PropertyConstraintKind::NodeKey => {
1599                    // Disallowed at create time — see
1600                    // `create_property_constraint`.
1601                    unreachable!(
1602                        "NODE KEY on relationship scope should have been rejected at create time"
1603                    );
1604                }
1605            }
1606        }
1607        Ok(())
1608    }
1609
1610    /// Tear down a property index: removes the meta entry and every
1611    /// entry under the `(label, prop)` prefix. Idempotent: dropping a
1612    /// non-existent index is a no-op. Atomic via one [`WriteBatch`].
1613    pub fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
1614        self.drop_property_index_composite(label, &[property.to_string()])
1615    }
1616
1617    pub fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
1618        assert!(
1619            !properties.is_empty(),
1620            "drop_property_index_composite requires at least one property"
1621        );
1622        let spec = PropertyIndexSpec {
1623            label: label.to_string(),
1624            properties: properties.to_vec(),
1625        };
1626        let mut guard = self.indexes.write().expect("indexes lock poisoned");
1627        if !guard.contains(&spec) {
1628            return Ok(());
1629        }
1630
1631        // Reject the drop if a UNIQUE or NODE KEY constraint on the
1632        // same `(label, properties)` is registered — enforcement for
1633        // those kinds runs a seek through this exact index, so
1634        // silently removing it would collapse the uniqueness check
1635        // to "always passes" without producing any user-visible
1636        // signal. `DROP CONSTRAINT` is the right prerequisite.
1637        if let Some(constraint) = find_constraint_backing_node_index(
1638            &self.constraints.read().expect("constraints lock poisoned"),
1639            label,
1640            properties,
1641        ) {
1642            return Err(Error::IndexInUse { constraint });
1643        }
1644
1645        let meta_cf = self.cf(CF_INDEX_META)?;
1646        let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1647        let mut batch = WriteBatch::default();
1648        batch.delete_cf(meta_cf, index_meta_key(&spec));
1649
1650        // Interleaved key layout means no clean `[label][props...]`
1651        // byte-prefix scopes a composite index's entries on its own —
1652        // an index `(a, b)` would share a `[label][a]` prefix with
1653        // single-property index `(a)`. Iterate by the label-only
1654        // prefix and parse each entry's property sequence, deleting
1655        // those that exactly match this spec's list.
1656        let label_prefix = property_index_label_prefix(label);
1657        let iter = self.db.iterator_cf(
1658            prop_cf,
1659            IteratorMode::From(&label_prefix, Direction::Forward),
1660        );
1661        for item in iter {
1662            let (key, _) = item?;
1663            if !key.starts_with(&label_prefix) {
1664                break;
1665            }
1666            let Some(parsed) = parse_property_index_entry_props(&key) else {
1667                continue;
1668            };
1669            if parsed.len() == properties.len()
1670                && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1671            {
1672                batch.delete_cf(prop_cf, key);
1673            }
1674        }
1675
1676        self.db.write(batch)?;
1677        guard.retain(|s| s != &spec);
1678        Ok(())
1679    }
1680
1681    /// Snapshot the currently-registered edge property indexes.
1682    /// Mirror of [`RocksDbStorageEngine::list_property_indexes`] for
1683    /// relationship scope. Cheap clone — specs are tiny.
1684    pub fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
1685        self.edge_indexes
1686            .read()
1687            .expect("edge_indexes lock poisoned")
1688            .clone()
1689    }
1690
1691    /// Declare a new `(edge_type, property)` edge property index and
1692    /// backfill it by scanning every edge currently carrying
1693    /// `edge_type`. Idempotent: re-creating an already-registered
1694    /// index is a no-op. Mirror of
1695    /// [`RocksDbStorageEngine::create_property_index`] for
1696    /// relationship scope; the same "same WriteBatch" atomicity
1697    /// guarantee applies — a crash mid-backfill leaves the store with
1698    /// either zero entries or a fully-populated index, never a
1699    /// partially-built one.
1700    pub fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1701        self.create_edge_property_index_composite(edge_type, &[property.to_string()])
1702    }
1703
1704    pub fn create_edge_property_index_composite(
1705        &self,
1706        edge_type: &str,
1707        properties: &[String],
1708    ) -> Result<()> {
1709        assert!(
1710            !properties.is_empty(),
1711            "create_edge_property_index_composite requires at least one property"
1712        );
1713        let spec = EdgePropertyIndexSpec {
1714            edge_type: edge_type.to_string(),
1715            properties: properties.to_vec(),
1716        };
1717        let mut guard = self
1718            .edge_indexes
1719            .write()
1720            .expect("edge_indexes lock poisoned");
1721        if guard.contains(&spec) {
1722            return Ok(());
1723        }
1724
1725        let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1726        let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1727        let mut batch = WriteBatch::default();
1728        batch.put_cf(meta_cf, edge_index_meta_key(&spec), EMPTY);
1729
1730        for edge_id in self.edges_by_type(edge_type)? {
1731            let edge = match self.get_edge(edge_id)? {
1732                Some(e) => e,
1733                None => continue,
1734            };
1735            if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
1736                batch.put_cf(
1737                    prop_cf,
1738                    edge_property_index_composite_key(edge_type, properties, &values, edge_id),
1739                    EMPTY,
1740                );
1741            }
1742        }
1743
1744        self.db.write(batch)?;
1745        guard.push(spec);
1746        Ok(())
1747    }
1748
1749    pub fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1750        self.drop_edge_property_index_composite(edge_type, &[property.to_string()])
1751    }
1752
1753    pub fn drop_edge_property_index_composite(
1754        &self,
1755        edge_type: &str,
1756        properties: &[String],
1757    ) -> Result<()> {
1758        assert!(
1759            !properties.is_empty(),
1760            "drop_edge_property_index_composite requires at least one property"
1761        );
1762        let spec = EdgePropertyIndexSpec {
1763            edge_type: edge_type.to_string(),
1764            properties: properties.to_vec(),
1765        };
1766        let mut guard = self
1767            .edge_indexes
1768            .write()
1769            .expect("edge_indexes lock poisoned");
1770        if !guard.contains(&spec) {
1771            return Ok(());
1772        }
1773
1774        // Same rationale as the node-side drop guard: a UNIQUE
1775        // constraint on this relationship type + property seeks
1776        // through this exact index during enforcement, so silently
1777        // removing it would defeat the check. Require
1778        // `DROP CONSTRAINT` first.
1779        if let Some(constraint) = find_constraint_backing_edge_index(
1780            &self.constraints.read().expect("constraints lock poisoned"),
1781            edge_type,
1782            properties,
1783        ) {
1784            return Err(Error::IndexInUse { constraint });
1785        }
1786
1787        let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1788        let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1789        let mut batch = WriteBatch::default();
1790        batch.delete_cf(meta_cf, edge_index_meta_key(&spec));
1791
1792        // See the node-side drop for why DROP parses entries rather
1793        // than scoping with a property-name byte prefix.
1794        let label_prefix = property_index_label_prefix(edge_type);
1795        let iter = self.db.iterator_cf(
1796            prop_cf,
1797            IteratorMode::From(&label_prefix, Direction::Forward),
1798        );
1799        for item in iter {
1800            let (key, _) = item?;
1801            if !key.starts_with(&label_prefix) {
1802                break;
1803            }
1804            let Some(parsed) = parse_property_index_entry_props(&key) else {
1805                continue;
1806            };
1807            if parsed.len() == properties.len()
1808                && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1809            {
1810                batch.delete_cf(prop_cf, key);
1811            }
1812        }
1813
1814        self.db.write(batch)?;
1815        guard.retain(|s| s != &spec);
1816        Ok(())
1817    }
1818
1819    // ---------------------------------------------------------------
1820    // Point / spatial index (node scope).
1821    //
1822    // Z-order (Morton) cell encoding over a fixed per-SRID domain.
1823    // Each indexed `Property::Point` produces one row keyed by
1824    // `<label>\0<prop>\0<srid:4><cell:8><node_id:16>` with the point's
1825    // coordinates stored in the value. Cross-SRID queries are
1826    // impossible by construction — the SRID is in the key prefix, so
1827    // a bbox scan is naturally scoped to one CRS.
1828    //
1829    // Single-property only for now. Extending to composite spatial
1830    // or to relationship scope is additive — separate CFs and
1831    // separate spec shapes.
1832    // ---------------------------------------------------------------
1833
1834    pub fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
1835        self.point_indexes
1836            .read()
1837            .expect("point_indexes lock poisoned")
1838            .clone()
1839    }
1840
1841    /// Declare a new point index on `(label, property)` and backfill
1842    /// it by scanning every node that currently carries the label.
1843    /// Idempotent: re-creating a registered index is a no-op. Same
1844    /// "single WriteBatch" atomicity guarantee as the property-index
1845    /// create path — a crash mid-backfill leaves the store with
1846    /// either zero entries or a fully-populated index.
1847    pub fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
1848        let spec = PointIndexSpec {
1849            label: label.to_string(),
1850            property: property.to_string(),
1851        };
1852        let mut guard = self
1853            .point_indexes
1854            .write()
1855            .expect("point_indexes lock poisoned");
1856        if guard.contains(&spec) {
1857            return Ok(());
1858        }
1859
1860        let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1861        let point_cf = self.cf(CF_POINT_INDEX)?;
1862        let mut batch = WriteBatch::default();
1863        batch.put_cf(meta_cf, point_index_meta_key(&spec), EMPTY);
1864
1865        for node_id in self.nodes_by_label(label)? {
1866            let node = match self.get_node(node_id)? {
1867                Some(n) => n,
1868                None => continue,
1869            };
1870            if let Some(Property::Point(p)) = node.properties.get(property) {
1871                let cell = point_cell(p.srid, p.x, p.y);
1872                batch.put_cf(
1873                    point_cf,
1874                    point_index_key(label, property, p.srid, cell, node_id.as_bytes()),
1875                    point_index_value(p.x, p.y, p.z),
1876                );
1877            }
1878        }
1879
1880        self.db.write(batch)?;
1881        guard.push(spec);
1882        Ok(())
1883    }
1884
1885    /// Tear down a point index. Removes the meta entry and every
1886    /// entry under the `<label>\0<prop>\0` prefix — that sweep
1887    /// covers all SRID variants at once, since SRID is to the right
1888    /// of the label+property header. Idempotent.
1889    pub fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
1890        let spec = PointIndexSpec {
1891            label: label.to_string(),
1892            property: property.to_string(),
1893        };
1894        let mut guard = self
1895            .point_indexes
1896            .write()
1897            .expect("point_indexes lock poisoned");
1898        if !guard.contains(&spec) {
1899            return Ok(());
1900        }
1901
1902        let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1903        let point_cf = self.cf(CF_POINT_INDEX)?;
1904        let mut batch = WriteBatch::default();
1905        batch.delete_cf(meta_cf, point_index_meta_key(&spec));
1906
1907        let prefix = point_index_label_prop_prefix(label, property);
1908        let iter = self
1909            .db
1910            .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
1911        for item in iter {
1912            let (key, _) = item?;
1913            if !key.starts_with(&prefix) {
1914                break;
1915            }
1916            batch.delete_cf(point_cf, key);
1917        }
1918
1919        self.db.write(batch)?;
1920        guard.retain(|s| s != &spec);
1921        Ok(())
1922    }
1923
1924    /// Range query: node ids whose indexed point falls inside the
1925    /// axis-aligned bounding box `[(xlo, ylo), (xhi, yhi)]` under
1926    /// `srid`. The scan walks the Morton-code cell range that
1927    /// covers the bbox and applies a precise per-row filter against
1928    /// the coordinates stored in the value — the cell range is a
1929    /// superset, so the filter is what guarantees correctness.
1930    ///
1931    /// A missing index returns empty rather than erroring, matching
1932    /// the property-seek convention (lets the planner race a DROP
1933    /// without a fatal). Results are sorted by `NodeId` for
1934    /// determinism across runs.
1935    pub fn nodes_in_bbox(
1936        &self,
1937        label: &str,
1938        property: &str,
1939        srid: i32,
1940        xlo: f64,
1941        ylo: f64,
1942        xhi: f64,
1943        yhi: f64,
1944    ) -> Result<Vec<NodeId>> {
1945        let spec = PointIndexSpec {
1946            label: label.to_string(),
1947            property: property.to_string(),
1948        };
1949        {
1950            let guard = self
1951                .point_indexes
1952                .read()
1953                .expect("point_indexes lock poisoned");
1954            if !guard.contains(&spec) {
1955                return Ok(Vec::new());
1956            }
1957        }
1958
1959        let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
1960        let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
1961        let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
1962
1963        let prefix = point_index_srid_prefix(label, property, srid);
1964        let header_len = prefix.len();
1965        let mut seek = prefix.clone();
1966        seek.extend_from_slice(&min_cell.to_be_bytes());
1967
1968        let cf = self.cf(CF_POINT_INDEX)?;
1969        let iter = self
1970            .db
1971            .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
1972        let mut results = Vec::new();
1973        for item in iter {
1974            let (key, value) = item?;
1975            if !key.starts_with(&prefix) {
1976                break;
1977            }
1978            let cell = cell_from_point_index_key(CF_POINT_INDEX, &key, header_len)?;
1979            if cell > max_cell {
1980                break;
1981            }
1982            let (x, y, _z) = decode_point_index_value(CF_POINT_INDEX, &value)?;
1983            if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
1984                continue;
1985            }
1986            let id_bytes = node_id_from_point_index_key(CF_POINT_INDEX, &key)?;
1987            results.push(NodeId::from_bytes(id_bytes));
1988        }
1989        results.sort();
1990        Ok(results)
1991    }
1992
1993    // ---------------------------------------------------------------
1994    // Point / spatial index (relationship scope).
1995    //
1996    // Mirror of the node-scope block above. Lives in its own CF so
1997    // node and edge spatial entries can't alias and the two can be
1998    // dropped independently — matches how non-spatial property
1999    // indexes are split between `CF_PROPERTY_INDEX` and
2000    // `CF_EDGE_PROPERTY_INDEX`.
2001    // ---------------------------------------------------------------
2002
2003    pub fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2004        self.edge_point_indexes
2005            .read()
2006            .expect("edge_point_indexes lock poisoned")
2007            .clone()
2008    }
2009
2010    /// Declare a new edge point index and backfill it by scanning
2011    /// every edge currently carrying `edge_type`. Idempotent.
2012    /// Single-property / relationship-scope analogue of
2013    /// [`RocksDbStorageEngine::create_point_index`].
2014    pub fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2015        let spec = EdgePointIndexSpec {
2016            edge_type: edge_type.to_string(),
2017            property: property.to_string(),
2018        };
2019        let mut guard = self
2020            .edge_point_indexes
2021            .write()
2022            .expect("edge_point_indexes lock poisoned");
2023        if guard.contains(&spec) {
2024            return Ok(());
2025        }
2026
2027        let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2028        let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2029        let mut batch = WriteBatch::default();
2030        batch.put_cf(meta_cf, edge_point_index_meta_key(&spec), EMPTY);
2031
2032        for edge_id in self.edges_by_type(edge_type)? {
2033            let edge = match self.get_edge(edge_id)? {
2034                Some(e) => e,
2035                None => continue,
2036            };
2037            if let Some(Property::Point(p)) = edge.properties.get(property) {
2038                let cell = point_cell(p.srid, p.x, p.y);
2039                batch.put_cf(
2040                    point_cf,
2041                    // Reuse the node-scope key builder: the key layout
2042                    // `<label>\0<prop>\0<srid:4><cell:8><id:16>` doesn't
2043                    // care whether `id` is a node or edge id — both are
2044                    // 16-byte UUIDs. The separate CF is what scopes
2045                    // relationship entries away from node entries.
2046                    point_index_key(edge_type, property, p.srid, cell, edge_id.as_bytes()),
2047                    point_index_value(p.x, p.y, p.z),
2048                );
2049            }
2050        }
2051
2052        self.db.write(batch)?;
2053        guard.push(spec);
2054        Ok(())
2055    }
2056
2057    /// Tear down an edge point index. Idempotent.
2058    pub fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2059        let spec = EdgePointIndexSpec {
2060            edge_type: edge_type.to_string(),
2061            property: property.to_string(),
2062        };
2063        let mut guard = self
2064            .edge_point_indexes
2065            .write()
2066            .expect("edge_point_indexes lock poisoned");
2067        if !guard.contains(&spec) {
2068            return Ok(());
2069        }
2070
2071        let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2072        let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2073        let mut batch = WriteBatch::default();
2074        batch.delete_cf(meta_cf, edge_point_index_meta_key(&spec));
2075
2076        let prefix = point_index_label_prop_prefix(edge_type, property);
2077        let iter = self
2078            .db
2079            .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
2080        for item in iter {
2081            let (key, _) = item?;
2082            if !key.starts_with(&prefix) {
2083                break;
2084            }
2085            batch.delete_cf(point_cf, key);
2086        }
2087
2088        self.db.write(batch)?;
2089        guard.retain(|s| s != &spec);
2090        Ok(())
2091    }
2092
2093    /// Relationship-scope analogue of
2094    /// [`RocksDbStorageEngine::nodes_in_bbox`]. Same Z-order cell
2095    /// range + precise bbox filter via stored coords, just over
2096    /// `CF_EDGE_POINT_INDEX`.
2097    pub fn edges_in_bbox(
2098        &self,
2099        edge_type: &str,
2100        property: &str,
2101        srid: i32,
2102        xlo: f64,
2103        ylo: f64,
2104        xhi: f64,
2105        yhi: f64,
2106    ) -> Result<Vec<EdgeId>> {
2107        let spec = EdgePointIndexSpec {
2108            edge_type: edge_type.to_string(),
2109            property: property.to_string(),
2110        };
2111        {
2112            let guard = self
2113                .edge_point_indexes
2114                .read()
2115                .expect("edge_point_indexes lock poisoned");
2116            if !guard.contains(&spec) {
2117                return Ok(Vec::new());
2118            }
2119        }
2120
2121        let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
2122        let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
2123        let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
2124
2125        let prefix = point_index_srid_prefix(edge_type, property, srid);
2126        let header_len = prefix.len();
2127        let mut seek = prefix.clone();
2128        seek.extend_from_slice(&min_cell.to_be_bytes());
2129
2130        let cf = self.cf(CF_EDGE_POINT_INDEX)?;
2131        let iter = self
2132            .db
2133            .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
2134        let mut results = Vec::new();
2135        for item in iter {
2136            let (key, value) = item?;
2137            if !key.starts_with(&prefix) {
2138                break;
2139            }
2140            let cell = cell_from_point_index_key(CF_EDGE_POINT_INDEX, &key, header_len)?;
2141            if cell > max_cell {
2142                break;
2143            }
2144            let (x, y, _z) = decode_point_index_value(CF_EDGE_POINT_INDEX, &value)?;
2145            if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
2146                continue;
2147            }
2148            let id_bytes = node_id_from_point_index_key(CF_EDGE_POINT_INDEX, &key)?;
2149            results.push(EdgeId::from_bytes(id_bytes));
2150        }
2151        results.sort();
2152        Ok(results)
2153    }
2154
2155    /// Look up edge ids for a `(edge_type, property, value)` equality
2156    /// via the edge-property-index CF. Mirror of
2157    /// [`RocksDbStorageEngine::nodes_by_property`] for relationship
2158    /// scope; unindexable value types surface as
2159    /// [`Error::UnindexableValue`] so callers can fall back to a
2160    /// type-wide scan rather than silently returning empty.
2161    pub fn edges_by_property(
2162        &self,
2163        edge_type: &str,
2164        property: &str,
2165        value: &Property,
2166    ) -> Result<Vec<EdgeId>> {
2167        self.edges_by_properties(
2168            edge_type,
2169            std::slice::from_ref(&property.to_string()),
2170            std::slice::from_ref(value),
2171        )
2172    }
2173
2174    /// Composite form of [`edges_by_property`]. Same contract as
2175    /// [`RocksDbStorageEngine::nodes_by_properties`] on the
2176    /// relationship side.
2177    pub fn edges_by_properties(
2178        &self,
2179        edge_type: &str,
2180        properties: &[String],
2181        values: &[Property],
2182    ) -> Result<Vec<EdgeId>> {
2183        assert_eq!(
2184            properties.len(),
2185            values.len(),
2186            "composite edge seek: properties and values must have equal length"
2187        );
2188        let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2189        for (p, v) in properties.iter().zip(values.iter()) {
2190            let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2191                property: p.clone(),
2192                kind: v.type_name(),
2193            })?;
2194            encoded.push(bytes);
2195        }
2196        let cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
2197        let prefix = edge_property_index_composite_value_prefix(edge_type, properties, &encoded);
2198        let mut results = Vec::new();
2199        let iter = self
2200            .db
2201            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2202        for item in iter {
2203            let (key, _) = item?;
2204            if !key.starts_with(&prefix) {
2205                break;
2206            }
2207            let bytes = edge_id_from_property_index_key(&key, prefix.len())?;
2208            results.push(EdgeId::from_bytes(bytes));
2209        }
2210        Ok(results)
2211    }
2212
2213    /// Look up node ids for a `(label, property, value)` equality via
2214    /// the property index CF. The caller is responsible for checking
2215    /// that the index exists first (e.g. the planner only emits
2216    /// `IndexSeek` when it has verified via
2217    /// [`RocksDbStorageEngine::list_property_indexes`]).
2218    ///
2219    /// Unindexable value types (Float64, List, Map, Null) return
2220    /// [`Error::UnindexableValue`] — callers should surface this to
2221    /// the user rather than silently returning an empty result.
2222    pub fn nodes_by_property(
2223        &self,
2224        label: &str,
2225        property: &str,
2226        value: &Property,
2227    ) -> Result<Vec<NodeId>> {
2228        self.nodes_by_properties(
2229            label,
2230            std::slice::from_ref(&property.to_string()),
2231            std::slice::from_ref(value),
2232        )
2233    }
2234
2235    /// Composite form of [`nodes_by_property`]. Seeks every node
2236    /// whose `(label, property_i = value_i)` tuple matches in order.
2237    /// The two slices must have equal length. Unindexable value
2238    /// types surface as [`Error::UnindexableValue`] the same way
2239    /// the single-property form does — composite indexes are
2240    /// all-or-nothing and an unindexable slot can never match a
2241    /// stored tuple.
2242    pub fn nodes_by_properties(
2243        &self,
2244        label: &str,
2245        properties: &[String],
2246        values: &[Property],
2247    ) -> Result<Vec<NodeId>> {
2248        assert_eq!(
2249            properties.len(),
2250            values.len(),
2251            "composite seek: properties and values must have equal length"
2252        );
2253        let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2254        for (p, v) in properties.iter().zip(values.iter()) {
2255            let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2256                property: p.clone(),
2257                kind: v.type_name(),
2258            })?;
2259            encoded.push(bytes);
2260        }
2261        let cf = self.cf(CF_PROPERTY_INDEX)?;
2262        let prefix = property_index_composite_value_prefix(label, properties, &encoded);
2263        let mut results = Vec::new();
2264        let iter = self
2265            .db
2266            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2267        for item in iter {
2268            let (key, _) = item?;
2269            if !key.starts_with(&prefix) {
2270                break;
2271            }
2272            let bytes = node_id_from_property_index_key(&key, prefix.len())?;
2273            results.push(NodeId::from_bytes(bytes));
2274        }
2275        Ok(results)
2276    }
2277
2278    pub fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2279        let cf = self.cf(CF_LABEL_INDEX)?;
2280        let prefix = label_index_prefix(label);
2281        let mut results = Vec::new();
2282        let iter = self
2283            .db
2284            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2285        for item in iter {
2286            let (key, _) = item?;
2287            if !key.starts_with(&prefix) {
2288                break;
2289            }
2290            let bytes = id_from_str_index_key(CF_LABEL_INDEX, &key, label.len())?;
2291            results.push(NodeId::from_bytes(bytes));
2292        }
2293        Ok(results)
2294    }
2295
2296    pub fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2297        let cf = self.cf(CF_TYPE_INDEX)?;
2298        let prefix = type_index_prefix(edge_type);
2299        let mut results = Vec::new();
2300        let iter = self
2301            .db
2302            .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2303        for item in iter {
2304            let (key, _) = item?;
2305            if !key.starts_with(&prefix) {
2306                break;
2307            }
2308            let bytes = id_from_str_index_key(CF_TYPE_INDEX, &key, edge_type.len())?;
2309            results.push(EdgeId::from_bytes(bytes));
2310        }
2311        Ok(results)
2312    }
2313
2314    fn scan_adj(&self, cf_name: &'static str, node: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2315        let cf = self.cf(cf_name)?;
2316        let prefix: &[u8] = node.as_bytes();
2317        let mut results = Vec::new();
2318        let iter = self
2319            .db
2320            .iterator_cf(cf, IteratorMode::From(prefix, Direction::Forward));
2321        for item in iter {
2322            let (key, value) = item?;
2323            if !key.starts_with(prefix) {
2324                break;
2325            }
2326            let edge_id = edge_from_adj_key(cf_name, &key)?;
2327            let other = node_from_adj_value(cf_name, &value)?;
2328            results.push((edge_id, other));
2329        }
2330        Ok(results)
2331    }
2332}
2333
2334/// Return the [`meshdb_core::Point`] at `properties[key]` when the
2335/// property is present and of point type. Used by point-index
2336/// maintenance to decide whether a node's indexed property changed
2337/// across a put. Non-point or missing values yield `None`, which the
2338/// index maintenance treats as "no entry" — the same way
2339/// `encode_index_tuple` treats missing / unindexable values for
2340/// property indexes.
2341fn extract_indexable_point(
2342    properties: &std::collections::HashMap<String, Property>,
2343    key: &str,
2344) -> Option<meshdb_core::Point> {
2345    match properties.get(key)? {
2346        Property::Point(p) => Some(*p),
2347        _ => None,
2348    }
2349}
2350
2351/// Deterministic auto-name for a constraint the user didn't name.
2352/// Shape: `constraint_<label>_<property_list>_<kind>` where the
2353/// property list joins the property names with `_` and `<kind>` is
2354/// `unique`, `not_null`, `type_<t>`, or `node_key`. Stable across
2355/// restarts — users can target the auto-name with `DROP CONSTRAINT`
2356/// without having to inspect `SHOW CONSTRAINTS` first.
2357fn default_constraint_name(
2358    scope: &ConstraintScope,
2359    properties: &[String],
2360    kind: PropertyConstraintKind,
2361) -> String {
2362    let joined = properties.join("_");
2363    format!(
2364        "constraint_{scope_tag}_{target}_{joined}_{kind_tag}",
2365        scope_tag = scope.name_tag(),
2366        target = scope.target(),
2367        kind_tag = kind.name_tag(),
2368    )
2369}
2370
2371/// Return the name of a UNIQUE or NODE KEY constraint whose backing
2372/// index is exactly `(label, properties)`, or `None` if no such
2373/// constraint is registered. The enforcement paths for both kinds seek
2374/// through that index — dropping it silently defeats the check — so
2375/// `drop_property_index_composite` consults this helper before
2376/// removing any entries.
2377fn find_constraint_backing_node_index(
2378    constraints: &[PropertyConstraintSpec],
2379    label: &str,
2380    properties: &[String],
2381) -> Option<String> {
2382    constraints
2383        .iter()
2384        .filter(|c| matches!(&c.scope, ConstraintScope::Node(l) if l == label))
2385        .filter(|c| {
2386            matches!(
2387                c.kind,
2388                PropertyConstraintKind::Unique | PropertyConstraintKind::NodeKey
2389            )
2390        })
2391        .find(|c| c.properties.as_slice() == properties)
2392        .map(|c| c.name.clone())
2393}
2394
2395/// Relationship-scope mirror of
2396/// [`find_constraint_backing_node_index`]. NODE KEY is node-only, so
2397/// only UNIQUE can back an edge index.
2398fn find_constraint_backing_edge_index(
2399    constraints: &[PropertyConstraintSpec],
2400    edge_type: &str,
2401    properties: &[String],
2402) -> Option<String> {
2403    constraints
2404        .iter()
2405        .filter(|c| matches!(&c.scope, ConstraintScope::Relationship(t) if t == edge_type))
2406        .filter(|c| matches!(c.kind, PropertyConstraintKind::Unique))
2407        .find(|c| c.properties.as_slice() == properties)
2408        .map(|c| c.name.clone())
2409}
2410
2411/// Scan existing nodes and verify that `spec` holds. Called on
2412/// constraint creation so a user declaring a constraint against
2413/// already-invalid data gets a clear error instead of a partial
2414/// rollout. Walks only the nodes carrying the constrained label — for
2415/// label-sparse graphs this avoids the full-node scan.
2416fn validate_existing_data(
2417    engine: &RocksDbStorageEngine,
2418    spec: &PropertyConstraintSpec,
2419) -> Result<()> {
2420    match &spec.scope {
2421        ConstraintScope::Node(label) => validate_existing_nodes(engine, spec, label),
2422        ConstraintScope::Relationship(edge_type) => {
2423            validate_existing_edges(engine, spec, edge_type)
2424        }
2425    }
2426}
2427
2428fn validate_existing_nodes(
2429    engine: &RocksDbStorageEngine,
2430    spec: &PropertyConstraintSpec,
2431    label: &str,
2432) -> Result<()> {
2433    let label_members = engine.nodes_by_label(label)?;
2434    let primary = spec.primary_property();
2435    match spec.kind {
2436        PropertyConstraintKind::NotNull => {
2437            for id in label_members {
2438                let node = match engine.get_node(id)? {
2439                    Some(n) => n,
2440                    None => continue,
2441                };
2442                let present = node
2443                    .properties
2444                    .get(primary)
2445                    .is_some_and(|v| !matches!(v, Property::Null));
2446                if !present {
2447                    return Err(Error::ConstraintViolation {
2448                        name: spec.name.clone(),
2449                        kind: spec.kind.as_string(),
2450                        label: label.to_string(),
2451                        property: primary.to_string(),
2452                        details: format!("node {id} is missing required property"),
2453                    });
2454                }
2455            }
2456        }
2457        PropertyConstraintKind::Unique => {
2458            use std::collections::HashMap;
2459            let mut seen: HashMap<Vec<u8>, meshdb_core::NodeId> = HashMap::new();
2460            for id in label_members {
2461                let node = match engine.get_node(id)? {
2462                    Some(n) => n,
2463                    None => continue,
2464                };
2465                let Some(value) = node.properties.get(primary) else {
2466                    continue;
2467                };
2468                let Some(encoded) = encode_index_value(value) else {
2469                    continue;
2470                };
2471                if let Some(&first) = seen.get(&encoded) {
2472                    return Err(Error::ConstraintViolation {
2473                        name: spec.name.clone(),
2474                        kind: spec.kind.as_string(),
2475                        label: label.to_string(),
2476                        property: primary.to_string(),
2477                        details: format!("duplicate value held by nodes {first} and {id}"),
2478                    });
2479                }
2480                seen.insert(encoded, id);
2481            }
2482        }
2483        PropertyConstraintKind::PropertyType(target) => {
2484            for id in label_members {
2485                let node = match engine.get_node(id)? {
2486                    Some(n) => n,
2487                    None => continue,
2488                };
2489                let Some(value) = node.properties.get(primary) else {
2490                    continue;
2491                };
2492                if matches!(value, Property::Null) {
2493                    continue;
2494                }
2495                if !property_matches_type(value, target) {
2496                    return Err(Error::ConstraintViolation {
2497                        name: spec.name.clone(),
2498                        kind: spec.kind.as_string(),
2499                        label: label.to_string(),
2500                        property: primary.to_string(),
2501                        details: format!(
2502                            "node {id} has value of type {} (expected {})",
2503                            value.type_name(),
2504                            target.as_str()
2505                        ),
2506                    });
2507                }
2508            }
2509        }
2510        PropertyConstraintKind::NodeKey => {
2511            use std::collections::HashMap;
2512            let mut seen: HashMap<Vec<Vec<u8>>, meshdb_core::NodeId> = HashMap::new();
2513            for id in label_members {
2514                let node = match engine.get_node(id)? {
2515                    Some(n) => n,
2516                    None => continue,
2517                };
2518                let mut tuple: Vec<Vec<u8>> = Vec::with_capacity(spec.properties.len());
2519                let mut missing: Option<&str> = None;
2520                for prop in &spec.properties {
2521                    match node.properties.get(prop) {
2522                        Some(v) if !matches!(v, Property::Null) => {
2523                            if let Some(encoded) = encode_index_value(v) {
2524                                tuple.push(encoded);
2525                            } else {
2526                                missing = Some(prop);
2527                                break;
2528                            }
2529                        }
2530                        _ => {
2531                            missing = Some(prop.as_str());
2532                            break;
2533                        }
2534                    }
2535                }
2536                if let Some(prop) = missing {
2537                    return Err(Error::ConstraintViolation {
2538                        name: spec.name.clone(),
2539                        kind: spec.kind.as_string(),
2540                        label: label.to_string(),
2541                        property: prop.to_string(),
2542                        details: format!("node {id} is missing required property `{prop}`"),
2543                    });
2544                }
2545                if let Some(&first) = seen.get(&tuple) {
2546                    return Err(Error::ConstraintViolation {
2547                        name: spec.name.clone(),
2548                        kind: spec.kind.as_string(),
2549                        label: label.to_string(),
2550                        property: spec.properties.join(","),
2551                        details: format!("duplicate tuple held by nodes {first} and {id}"),
2552                    });
2553                }
2554                seen.insert(tuple, id);
2555            }
2556        }
2557    }
2558    Ok(())
2559}
2560
2561/// Relationship-scope analogue to `validate_existing_nodes`. Walks
2562/// `edges_by_type(edge_type)` because we don't have an edge property
2563/// index; complexity is O(E_type) per newly-declared constraint. `NodeKey` is rejected at create time
2564/// for relationship scope, so this path doesn't handle it.
2565fn validate_existing_edges(
2566    engine: &RocksDbStorageEngine,
2567    spec: &PropertyConstraintSpec,
2568    edge_type: &str,
2569) -> Result<()> {
2570    let edge_ids = engine.edges_by_type(edge_type)?;
2571    let primary = spec.primary_property();
2572    match spec.kind {
2573        PropertyConstraintKind::NotNull => {
2574            for id in edge_ids {
2575                let edge = match engine.get_edge(id)? {
2576                    Some(e) => e,
2577                    None => continue,
2578                };
2579                let present = edge
2580                    .properties
2581                    .get(primary)
2582                    .is_some_and(|v| !matches!(v, Property::Null));
2583                if !present {
2584                    return Err(Error::ConstraintViolation {
2585                        name: spec.name.clone(),
2586                        kind: spec.kind.as_string(),
2587                        label: edge_type.to_string(),
2588                        property: primary.to_string(),
2589                        details: format!("edge {id} is missing required property"),
2590                    });
2591                }
2592            }
2593        }
2594        PropertyConstraintKind::Unique => {
2595            use std::collections::HashMap;
2596            let mut seen: HashMap<Vec<u8>, meshdb_core::EdgeId> = HashMap::new();
2597            for id in edge_ids {
2598                let edge = match engine.get_edge(id)? {
2599                    Some(e) => e,
2600                    None => continue,
2601                };
2602                let Some(value) = edge.properties.get(primary) else {
2603                    continue;
2604                };
2605                let Some(encoded) = encode_index_value(value) else {
2606                    continue;
2607                };
2608                if let Some(&first) = seen.get(&encoded) {
2609                    return Err(Error::ConstraintViolation {
2610                        name: spec.name.clone(),
2611                        kind: spec.kind.as_string(),
2612                        label: edge_type.to_string(),
2613                        property: primary.to_string(),
2614                        details: format!("duplicate value held by edges {first} and {id}"),
2615                    });
2616                }
2617                seen.insert(encoded, id);
2618            }
2619        }
2620        PropertyConstraintKind::PropertyType(target) => {
2621            for id in edge_ids {
2622                let edge = match engine.get_edge(id)? {
2623                    Some(e) => e,
2624                    None => continue,
2625                };
2626                let Some(value) = edge.properties.get(primary) else {
2627                    continue;
2628                };
2629                if matches!(value, Property::Null) {
2630                    continue;
2631                }
2632                if !property_matches_type(value, target) {
2633                    return Err(Error::ConstraintViolation {
2634                        name: spec.name.clone(),
2635                        kind: spec.kind.as_string(),
2636                        label: edge_type.to_string(),
2637                        property: primary.to_string(),
2638                        details: format!(
2639                            "edge {id} has value of type {} (expected {})",
2640                            value.type_name(),
2641                            target.as_str()
2642                        ),
2643                    });
2644                }
2645            }
2646        }
2647        PropertyConstraintKind::NodeKey => {
2648            unreachable!("NODE KEY on relationship scope rejected at create time")
2649        }
2650    }
2651    Ok(())
2652}
2653
2654/// Strict `Property` / `PropertyType` match. No numeric coercion — an
2655/// `Int64` does NOT satisfy `FLOAT`, matching Neo4j's `IS :: FLOAT`
2656/// semantics. Returns `false` for `Null`; callers should pre-filter
2657/// null values (`IS :: T` allows nulls by convention — pair with
2658/// `IS NOT NULL` for strict presence).
2659fn property_matches_type(value: &Property, target: PropertyType) -> bool {
2660    matches!(
2661        (target, value),
2662        (PropertyType::String, Property::String(_))
2663            | (PropertyType::Integer, Property::Int64(_))
2664            | (PropertyType::Float, Property::Float64(_))
2665            | (PropertyType::Boolean, Property::Bool(_))
2666    )
2667}
2668
2669/// Rehydrate the constraint registry from [`CF_CONSTRAINT_META`] at
2670/// [`RocksDbStorageEngine::open`] time. Walks the CF sequentially —
2671/// constraint counts are tiny (single digits in practice) so this
2672/// stays cheap regardless of graph size.
2673fn load_constraint_meta(db: &DB) -> Result<Vec<PropertyConstraintSpec>> {
2674    let cf = db
2675        .cf_handle(CF_CONSTRAINT_META)
2676        .ok_or(Error::MissingColumnFamily(CF_CONSTRAINT_META))?;
2677    let mut specs = Vec::new();
2678    for item in db.iterator_cf(cf, IteratorMode::Start) {
2679        let (key, value) = item?;
2680        let name = std::str::from_utf8(&key)
2681            .map_err(|_| Error::CorruptBytes {
2682                cf: CF_CONSTRAINT_META,
2683                expected: key.len(),
2684                actual: key.len(),
2685            })?
2686            .to_string();
2687        specs.push(constraint_meta_decode(CF_CONSTRAINT_META, name, &value)?);
2688    }
2689    Ok(specs)
2690}
2691
2692/// Rehydrate the property-index registry from [`CF_INDEX_META`] at
2693/// [`RocksDbStorageEngine::open`] time. Runs once per process, so
2694/// walking the CF sequentially is fine even for large index counts.
2695fn load_index_meta(db: &DB) -> Result<Vec<PropertyIndexSpec>> {
2696    let cf = db
2697        .cf_handle(CF_INDEX_META)
2698        .ok_or(Error::MissingColumnFamily(CF_INDEX_META))?;
2699    let mut specs = Vec::new();
2700    for item in db.iterator_cf(cf, IteratorMode::Start) {
2701        let (key, _) = item?;
2702        specs.push(index_meta_key_decode(&key)?);
2703    }
2704    Ok(specs)
2705}
2706
2707/// Relationship-scope analogue of [`load_index_meta`]. Rehydrates the
2708/// edge-property-index registry from [`CF_EDGE_INDEX_META`] at open
2709/// time.
2710fn load_edge_index_meta(db: &DB) -> Result<Vec<EdgePropertyIndexSpec>> {
2711    let cf = db
2712        .cf_handle(CF_EDGE_INDEX_META)
2713        .ok_or(Error::MissingColumnFamily(CF_EDGE_INDEX_META))?;
2714    let mut specs = Vec::new();
2715    for item in db.iterator_cf(cf, IteratorMode::Start) {
2716        let (key, _) = item?;
2717        specs.push(edge_index_meta_key_decode(&key)?);
2718    }
2719    Ok(specs)
2720}
2721
2722/// Rehydrate the point-index registry from [`CF_POINT_INDEX_META`]
2723/// at open. Same once-per-process walk as the other meta loaders.
2724fn load_point_index_meta(db: &DB) -> Result<Vec<PointIndexSpec>> {
2725    let cf = db
2726        .cf_handle(CF_POINT_INDEX_META)
2727        .ok_or(Error::MissingColumnFamily(CF_POINT_INDEX_META))?;
2728    let mut specs = Vec::new();
2729    for item in db.iterator_cf(cf, IteratorMode::Start) {
2730        let (key, _) = item?;
2731        specs.push(point_index_meta_key_decode(&key)?);
2732    }
2733    Ok(specs)
2734}
2735
2736/// Relationship-scope analogue of [`load_point_index_meta`].
2737fn load_edge_point_index_meta(db: &DB) -> Result<Vec<EdgePointIndexSpec>> {
2738    let cf = db
2739        .cf_handle(CF_EDGE_POINT_INDEX_META)
2740        .ok_or(Error::MissingColumnFamily(CF_EDGE_POINT_INDEX_META))?;
2741    let mut specs = Vec::new();
2742    for item in db.iterator_cf(cf, IteratorMode::Start) {
2743        let (key, _) = item?;
2744        specs.push(edge_point_index_meta_key_decode(&key)?);
2745    }
2746    Ok(specs)
2747}
2748
2749impl StorageEngine for RocksDbStorageEngine {
2750    fn put_node(&self, node: &Node) -> Result<()> {
2751        RocksDbStorageEngine::put_node(self, node)
2752    }
2753
2754    fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
2755        RocksDbStorageEngine::get_node(self, id)
2756    }
2757
2758    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
2759        RocksDbStorageEngine::detach_delete_node(self, id)
2760    }
2761
2762    fn put_edge(&self, edge: &Edge) -> Result<()> {
2763        RocksDbStorageEngine::put_edge(self, edge)
2764    }
2765
2766    fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
2767        RocksDbStorageEngine::get_edge(self, id)
2768    }
2769
2770    fn delete_edge(&self, id: EdgeId) -> Result<()> {
2771        RocksDbStorageEngine::delete_edge(self, id)
2772    }
2773
2774    fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
2775        RocksDbStorageEngine::apply_batch(self, mutations)
2776    }
2777
2778    fn all_nodes(&self) -> Result<Vec<Node>> {
2779        RocksDbStorageEngine::all_nodes(self)
2780    }
2781
2782    fn all_edges(&self) -> Result<Vec<Edge>> {
2783        RocksDbStorageEngine::all_edges(self)
2784    }
2785
2786    fn all_node_ids(&self) -> Result<Vec<NodeId>> {
2787        RocksDbStorageEngine::all_node_ids(self)
2788    }
2789
2790    fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2791        RocksDbStorageEngine::outgoing(self, source)
2792    }
2793
2794    fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2795        RocksDbStorageEngine::incoming(self, target)
2796    }
2797
2798    fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2799        RocksDbStorageEngine::nodes_by_label(self, label)
2800    }
2801
2802    fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2803        RocksDbStorageEngine::edges_by_type(self, edge_type)
2804    }
2805
2806    fn nodes_by_property(
2807        &self,
2808        label: &str,
2809        property: &str,
2810        value: &Property,
2811    ) -> Result<Vec<NodeId>> {
2812        RocksDbStorageEngine::nodes_by_property(self, label, property, value)
2813    }
2814
2815    fn nodes_by_properties(
2816        &self,
2817        label: &str,
2818        properties: &[String],
2819        values: &[Property],
2820    ) -> Result<Vec<NodeId>> {
2821        RocksDbStorageEngine::nodes_by_properties(self, label, properties, values)
2822    }
2823
2824    fn edges_by_property(
2825        &self,
2826        edge_type: &str,
2827        property: &str,
2828        value: &Property,
2829    ) -> Result<Vec<EdgeId>> {
2830        RocksDbStorageEngine::edges_by_property(self, edge_type, property, value)
2831    }
2832
2833    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
2834        RocksDbStorageEngine::create_property_index(self, label, property)
2835    }
2836
2837    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
2838        RocksDbStorageEngine::drop_property_index(self, label, property)
2839    }
2840
2841    fn create_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2842        RocksDbStorageEngine::create_property_index_composite(self, label, properties)
2843    }
2844
2845    fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2846        RocksDbStorageEngine::drop_property_index_composite(self, label, properties)
2847    }
2848
2849    fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
2850        RocksDbStorageEngine::list_property_indexes(self)
2851    }
2852
2853    fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2854        RocksDbStorageEngine::create_edge_property_index(self, edge_type, property)
2855    }
2856
2857    fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2858        RocksDbStorageEngine::drop_edge_property_index(self, edge_type, property)
2859    }
2860
2861    fn create_edge_property_index_composite(
2862        &self,
2863        edge_type: &str,
2864        properties: &[String],
2865    ) -> Result<()> {
2866        RocksDbStorageEngine::create_edge_property_index_composite(self, edge_type, properties)
2867    }
2868
2869    fn drop_edge_property_index_composite(
2870        &self,
2871        edge_type: &str,
2872        properties: &[String],
2873    ) -> Result<()> {
2874        RocksDbStorageEngine::drop_edge_property_index_composite(self, edge_type, properties)
2875    }
2876
2877    fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
2878        RocksDbStorageEngine::list_edge_property_indexes(self)
2879    }
2880
2881    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
2882        RocksDbStorageEngine::create_point_index(self, label, property)
2883    }
2884
2885    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
2886        RocksDbStorageEngine::drop_point_index(self, label, property)
2887    }
2888
2889    fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
2890        RocksDbStorageEngine::list_point_indexes(self)
2891    }
2892
2893    fn nodes_in_bbox(
2894        &self,
2895        label: &str,
2896        property: &str,
2897        srid: i32,
2898        xlo: f64,
2899        ylo: f64,
2900        xhi: f64,
2901        yhi: f64,
2902    ) -> Result<Vec<NodeId>> {
2903        RocksDbStorageEngine::nodes_in_bbox(self, label, property, srid, xlo, ylo, xhi, yhi)
2904    }
2905
2906    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2907        RocksDbStorageEngine::create_edge_point_index(self, edge_type, property)
2908    }
2909
2910    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2911        RocksDbStorageEngine::drop_edge_point_index(self, edge_type, property)
2912    }
2913
2914    fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2915        RocksDbStorageEngine::list_edge_point_indexes(self)
2916    }
2917
2918    fn edges_in_bbox(
2919        &self,
2920        edge_type: &str,
2921        property: &str,
2922        srid: i32,
2923        xlo: f64,
2924        ylo: f64,
2925        xhi: f64,
2926        yhi: f64,
2927    ) -> Result<Vec<EdgeId>> {
2928        RocksDbStorageEngine::edges_in_bbox(self, edge_type, property, srid, xlo, ylo, xhi, yhi)
2929    }
2930
2931    fn create_property_constraint(
2932        &self,
2933        name: Option<&str>,
2934        scope: &ConstraintScope,
2935        properties: &[String],
2936        kind: PropertyConstraintKind,
2937        if_not_exists: bool,
2938    ) -> Result<PropertyConstraintSpec> {
2939        RocksDbStorageEngine::create_property_constraint(
2940            self,
2941            name,
2942            scope,
2943            properties,
2944            kind,
2945            if_not_exists,
2946        )
2947    }
2948
2949    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2950        RocksDbStorageEngine::drop_property_constraint(self, name, if_exists)
2951    }
2952
2953    fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
2954        RocksDbStorageEngine::list_property_constraints(self)
2955    }
2956
2957    fn put_trigger(&self, name: &str, value: &[u8]) -> Result<()> {
2958        let cf = self.cf(CF_TRIGGER_META)?;
2959        self.db
2960            .put_cf(cf, name.as_bytes(), value)
2961            .map_err(Error::from)
2962    }
2963
2964    fn delete_trigger(&self, name: &str) -> Result<()> {
2965        let cf = self.cf(CF_TRIGGER_META)?;
2966        self.db.delete_cf(cf, name.as_bytes()).map_err(Error::from)
2967    }
2968
2969    fn list_triggers(&self) -> Result<Vec<(String, Vec<u8>)>> {
2970        let cf = self.cf(CF_TRIGGER_META)?;
2971        let mut out: Vec<(String, Vec<u8>)> = Vec::new();
2972        let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2973        for entry in iter {
2974            let (k, v) = entry.map_err(Error::from)?;
2975            // Names are stored as raw UTF-8 bytes — anything else
2976            // would be a corruption since `put_trigger` only
2977            // accepts `&str`. Lossy decode is defensive.
2978            let name = String::from_utf8_lossy(&k).into_owned();
2979            out.push((name, v.to_vec()));
2980        }
2981        Ok(out)
2982    }
2983
2984    fn put_pending_tx(&self, key: &[u8], value: &[u8]) -> Result<()> {
2985        let cf = self.cf(CF_PENDING_TX_META)?;
2986        self.db.put_cf(cf, key, value).map_err(Error::from)
2987    }
2988
2989    fn delete_pending_tx(&self, key: &[u8]) -> Result<()> {
2990        let cf = self.cf(CF_PENDING_TX_META)?;
2991        self.db.delete_cf(cf, key).map_err(Error::from)
2992    }
2993
2994    fn list_pending_txs(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2995        let cf = self.cf(CF_PENDING_TX_META)?;
2996        let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
2997        let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2998        for entry in iter {
2999            let (k, v) = entry.map_err(Error::from)?;
3000            out.push((k.to_vec(), v.to_vec()));
3001        }
3002        Ok(out)
3003    }
3004
3005    fn create_checkpoint(&self, path: &Path) -> Result<()> {
3006        RocksDbStorageEngine::create_checkpoint(self, path)
3007    }
3008
3009    fn clear_all(&self) -> Result<()> {
3010        RocksDbStorageEngine::clear_all(self)
3011    }
3012}