1use crate::{
15 engine::{
16 ConstraintScope, EdgePointIndexSpec, EdgePropertyIndexSpec, GraphMutation, PointIndexSpec,
17 PropertyConstraintKind, PropertyConstraintSpec, PropertyIndexSpec, PropertyType,
18 StorageEngine,
19 },
20 error::{Error, Result},
21 keys::{
22 adj_key, cell_from_point_index_key, constraint_meta_decode, constraint_meta_encode,
23 decode_point_index_value, edge_from_adj_key, edge_id_from_property_index_key,
24 edge_property_index_composite_key, edge_property_index_composite_value_prefix,
25 encode_index_tuple, encode_index_value, id_from_str_index_key, label_index_key,
26 label_index_prefix, node_from_adj_value, node_id_from_point_index_key,
27 node_id_from_property_index_key, parse_property_index_entry_props, point_cell,
28 point_cell_range, point_index_key, point_index_label_prop_prefix, point_index_srid_prefix,
29 point_index_value, property_index_composite_key, property_index_composite_value_prefix,
30 property_index_label_prefix, type_index_key, type_index_prefix, ID_LEN,
31 },
32};
33use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
34use rocksdb::{
35 checkpoint::Checkpoint, ColumnFamilyDescriptor, Direction, IteratorMode, Options, WriteBatch,
36 DB,
37};
38use std::path::Path;
39use std::sync::RwLock;
40
41const CF_NODES: &str = "nodes";
42const CF_EDGES: &str = "edges";
43const CF_ADJ_OUT: &str = "adj_out";
44const CF_ADJ_IN: &str = "adj_in";
45const CF_LABEL_INDEX: &str = "label_index";
46const CF_TYPE_INDEX: &str = "type_index";
47const CF_PROPERTY_INDEX: &str = "property_index";
48const CF_INDEX_META: &str = "index_meta";
49const CF_EDGE_PROPERTY_INDEX: &str = "edge_property_index";
50const CF_EDGE_INDEX_META: &str = "edge_index_meta";
51const CF_CONSTRAINT_META: &str = "constraint_meta";
52const CF_POINT_INDEX: &str = "point_index";
53const CF_POINT_INDEX_META: &str = "point_index_meta";
54const CF_EDGE_POINT_INDEX: &str = "edge_point_index";
55const CF_EDGE_POINT_INDEX_META: &str = "edge_point_index_meta";
56const CF_TRIGGER_META: &str = "trigger_meta";
62
63const ALL_CFS: &[&str] = &[
64 CF_NODES,
65 CF_EDGES,
66 CF_ADJ_OUT,
67 CF_ADJ_IN,
68 CF_LABEL_INDEX,
69 CF_TYPE_INDEX,
70 CF_PROPERTY_INDEX,
71 CF_INDEX_META,
72 CF_EDGE_PROPERTY_INDEX,
73 CF_EDGE_INDEX_META,
74 CF_CONSTRAINT_META,
75 CF_POINT_INDEX,
76 CF_POINT_INDEX_META,
77 CF_EDGE_POINT_INDEX,
78 CF_EDGE_POINT_INDEX_META,
79 CF_TRIGGER_META,
80];
81
82const EMPTY: &[u8] = &[];
83
84fn index_meta_key(spec: &PropertyIndexSpec) -> Vec<u8> {
91 let cap = spec.label.len()
92 + spec.properties.iter().map(|p| p.len()).sum::<usize>()
93 + spec.properties.len();
94 let mut k = Vec::with_capacity(cap);
95 k.extend_from_slice(spec.label.as_bytes());
96 for p in &spec.properties {
97 k.push(0);
98 k.extend_from_slice(p.as_bytes());
99 }
100 k
101}
102
103fn index_meta_key_decode(key: &[u8]) -> Result<PropertyIndexSpec> {
104 let mut parts = key.split(|b| *b == 0);
105 let label_bytes = parts.next().ok_or(Error::CorruptBytes {
106 cf: CF_INDEX_META,
107 expected: 1,
108 actual: 0,
109 })?;
110 let label = std::str::from_utf8(label_bytes)
111 .map_err(|_| Error::CorruptBytes {
112 cf: CF_INDEX_META,
113 expected: label_bytes.len(),
114 actual: label_bytes.len(),
115 })?
116 .to_string();
117 let mut properties: Vec<String> = Vec::new();
118 for part in parts {
119 let s = std::str::from_utf8(part)
120 .map_err(|_| Error::CorruptBytes {
121 cf: CF_INDEX_META,
122 expected: part.len(),
123 actual: part.len(),
124 })?
125 .to_string();
126 properties.push(s);
127 }
128 if properties.is_empty() {
129 return Err(Error::CorruptBytes {
130 cf: CF_INDEX_META,
131 expected: 1,
132 actual: 0,
133 });
134 }
135 Ok(PropertyIndexSpec { label, properties })
136}
137
138fn edge_index_meta_key(spec: &EdgePropertyIndexSpec) -> Vec<u8> {
143 let cap = spec.edge_type.len()
144 + spec.properties.iter().map(|p| p.len()).sum::<usize>()
145 + spec.properties.len();
146 let mut k = Vec::with_capacity(cap);
147 k.extend_from_slice(spec.edge_type.as_bytes());
148 for p in &spec.properties {
149 k.push(0);
150 k.extend_from_slice(p.as_bytes());
151 }
152 k
153}
154
155fn edge_index_meta_key_decode(key: &[u8]) -> Result<EdgePropertyIndexSpec> {
156 let mut parts = key.split(|b| *b == 0);
157 let type_bytes = parts.next().ok_or(Error::CorruptBytes {
158 cf: CF_EDGE_INDEX_META,
159 expected: 1,
160 actual: 0,
161 })?;
162 let edge_type = std::str::from_utf8(type_bytes)
163 .map_err(|_| Error::CorruptBytes {
164 cf: CF_EDGE_INDEX_META,
165 expected: type_bytes.len(),
166 actual: type_bytes.len(),
167 })?
168 .to_string();
169 let mut properties: Vec<String> = Vec::new();
170 for part in parts {
171 let s = std::str::from_utf8(part)
172 .map_err(|_| Error::CorruptBytes {
173 cf: CF_EDGE_INDEX_META,
174 expected: part.len(),
175 actual: part.len(),
176 })?
177 .to_string();
178 properties.push(s);
179 }
180 if properties.is_empty() {
181 return Err(Error::CorruptBytes {
182 cf: CF_EDGE_INDEX_META,
183 expected: 1,
184 actual: 0,
185 });
186 }
187 Ok(EdgePropertyIndexSpec {
188 edge_type,
189 properties,
190 })
191}
192
193fn point_index_meta_key(spec: &PointIndexSpec) -> Vec<u8> {
199 let mut k = Vec::with_capacity(spec.label.len() + spec.property.len() + 1);
200 k.extend_from_slice(spec.label.as_bytes());
201 k.push(0);
202 k.extend_from_slice(spec.property.as_bytes());
203 k
204}
205
206fn point_index_meta_key_decode(key: &[u8]) -> Result<PointIndexSpec> {
207 let mut parts = key.splitn(2, |b| *b == 0);
208 let label_bytes = parts.next().ok_or(Error::CorruptBytes {
209 cf: CF_POINT_INDEX_META,
210 expected: 1,
211 actual: 0,
212 })?;
213 let property_bytes = parts.next().ok_or(Error::CorruptBytes {
214 cf: CF_POINT_INDEX_META,
215 expected: label_bytes.len() + 2,
216 actual: key.len(),
217 })?;
218 let label = std::str::from_utf8(label_bytes)
219 .map_err(|_| Error::CorruptBytes {
220 cf: CF_POINT_INDEX_META,
221 expected: label_bytes.len(),
222 actual: label_bytes.len(),
223 })?
224 .to_string();
225 let property = std::str::from_utf8(property_bytes)
226 .map_err(|_| Error::CorruptBytes {
227 cf: CF_POINT_INDEX_META,
228 expected: property_bytes.len(),
229 actual: property_bytes.len(),
230 })?
231 .to_string();
232 if property.is_empty() {
233 return Err(Error::CorruptBytes {
234 cf: CF_POINT_INDEX_META,
235 expected: 1,
236 actual: 0,
237 });
238 }
239 Ok(PointIndexSpec { label, property })
240}
241
242fn edge_point_index_meta_key(spec: &EdgePointIndexSpec) -> Vec<u8> {
245 let mut k = Vec::with_capacity(spec.edge_type.len() + spec.property.len() + 1);
246 k.extend_from_slice(spec.edge_type.as_bytes());
247 k.push(0);
248 k.extend_from_slice(spec.property.as_bytes());
249 k
250}
251
252fn edge_point_index_meta_key_decode(key: &[u8]) -> Result<EdgePointIndexSpec> {
253 let mut parts = key.splitn(2, |b| *b == 0);
254 let type_bytes = parts.next().ok_or(Error::CorruptBytes {
255 cf: CF_EDGE_POINT_INDEX_META,
256 expected: 1,
257 actual: 0,
258 })?;
259 let property_bytes = parts.next().ok_or(Error::CorruptBytes {
260 cf: CF_EDGE_POINT_INDEX_META,
261 expected: type_bytes.len() + 2,
262 actual: key.len(),
263 })?;
264 let edge_type = std::str::from_utf8(type_bytes)
265 .map_err(|_| Error::CorruptBytes {
266 cf: CF_EDGE_POINT_INDEX_META,
267 expected: type_bytes.len(),
268 actual: type_bytes.len(),
269 })?
270 .to_string();
271 let property = std::str::from_utf8(property_bytes)
272 .map_err(|_| Error::CorruptBytes {
273 cf: CF_EDGE_POINT_INDEX_META,
274 expected: property_bytes.len(),
275 actual: property_bytes.len(),
276 })?
277 .to_string();
278 if property.is_empty() {
279 return Err(Error::CorruptBytes {
280 cf: CF_EDGE_POINT_INDEX_META,
281 expected: 1,
282 actual: 0,
283 });
284 }
285 Ok(EdgePointIndexSpec {
286 edge_type,
287 property,
288 })
289}
290
291pub struct RocksDbStorageEngine {
292 db: DB,
293 indexes: RwLock<Vec<PropertyIndexSpec>>,
304 edge_indexes: RwLock<Vec<EdgePropertyIndexSpec>>,
313 constraints: RwLock<Vec<PropertyConstraintSpec>>,
324 point_indexes: RwLock<Vec<PointIndexSpec>>,
332 edge_point_indexes: RwLock<Vec<EdgePointIndexSpec>>,
337}
338
339impl RocksDbStorageEngine {
340 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
341 let mut db_opts = Options::default();
342 db_opts.create_if_missing(true);
343 db_opts.create_missing_column_families(true);
344 db_opts.set_max_open_files(64);
354 db_opts.set_keep_log_file_num(4);
359
360 let cfs: Vec<ColumnFamilyDescriptor> = ALL_CFS
361 .iter()
362 .map(|name| ColumnFamilyDescriptor::new(*name, Options::default()))
363 .collect();
364
365 let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
366 let indexes = load_index_meta(&db)?;
367 let edge_indexes = load_edge_index_meta(&db)?;
368 let constraints = load_constraint_meta(&db)?;
369 let point_indexes = load_point_index_meta(&db)?;
370 let edge_point_indexes = load_edge_point_index_meta(&db)?;
371 Ok(Self {
372 db,
373 indexes: RwLock::new(indexes),
374 edge_indexes: RwLock::new(edge_indexes),
375 constraints: RwLock::new(constraints),
376 point_indexes: RwLock::new(point_indexes),
377 edge_point_indexes: RwLock::new(edge_point_indexes),
378 })
379 }
380
381 fn cf(&self, name: &'static str) -> Result<&rocksdb::ColumnFamily> {
382 self.db
383 .cf_handle(name)
384 .ok_or(Error::MissingColumnFamily(name))
385 }
386
387 pub fn put_node(&self, node: &Node) -> Result<()> {
388 let mut batch = WriteBatch::default();
389 self.append_put_node(&mut batch, node)?;
390 self.db.write(batch)?;
391 Ok(())
392 }
393
394 fn append_put_node(&self, batch: &mut WriteBatch, node: &Node) -> Result<()> {
395 let nodes_cf = self.cf(CF_NODES)?;
396 let label_cf = self.cf(CF_LABEL_INDEX)?;
397 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
398
399 let existing: Option<Node> = match self.db.get_cf(nodes_cf, node.id.as_bytes())? {
400 Some(bytes) => Some(serde_json::from_slice(&bytes)?),
401 None => None,
402 };
403 let existing_labels: &[String] = existing.as_ref().map(|n| &n.labels[..]).unwrap_or(&[]);
404
405 self.enforce_constraints(node, existing.as_ref())?;
413
414 let bytes = serde_json::to_vec(node)?;
415 batch.put_cf(nodes_cf, node.id.as_bytes(), bytes);
416
417 for old in existing_labels {
418 if !node.labels.contains(old) {
419 batch.delete_cf(label_cf, label_index_key(old, node.id));
420 }
421 }
422 for new in &node.labels {
423 if !existing_labels.contains(new) {
424 batch.put_cf(label_cf, label_index_key(new, node.id), EMPTY);
425 }
426 }
427
428 let indexes = self.indexes.read().expect("indexes lock poisoned");
434 for spec in indexes.iter() {
435 let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
436 let now_indexed = node.labels.iter().any(|l| l == &spec.label);
437 let old_encoded = if was_indexed {
438 existing
439 .as_ref()
440 .and_then(|n| encode_index_tuple(&n.properties, &spec.properties))
441 } else {
442 None
443 };
444 let new_encoded = if now_indexed {
445 encode_index_tuple(&node.properties, &spec.properties)
446 } else {
447 None
448 };
449 if old_encoded == new_encoded {
450 continue;
451 }
452 if let Some(values) = &old_encoded {
453 batch.delete_cf(
454 prop_cf,
455 property_index_composite_key(&spec.label, &spec.properties, values, node.id),
456 );
457 }
458 if let Some(values) = &new_encoded {
459 batch.put_cf(
460 prop_cf,
461 property_index_composite_key(&spec.label, &spec.properties, values, node.id),
462 EMPTY,
463 );
464 }
465 }
466
467 let point_indexes = self
472 .point_indexes
473 .read()
474 .expect("point_indexes lock poisoned");
475 if !point_indexes.is_empty() {
476 let point_cf = self.cf(CF_POINT_INDEX)?;
477 for spec in point_indexes.iter() {
478 let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
479 let now_indexed = node.labels.iter().any(|l| l == &spec.label);
480 let old_point = if was_indexed {
481 existing
482 .as_ref()
483 .and_then(|n| extract_indexable_point(&n.properties, &spec.property))
484 } else {
485 None
486 };
487 let new_point = if now_indexed {
488 extract_indexable_point(&node.properties, &spec.property)
489 } else {
490 None
491 };
492 if old_point == new_point {
493 continue;
494 }
495 if let Some(p) = &old_point {
496 let cell = point_cell(p.srid, p.x, p.y);
497 batch.delete_cf(
498 point_cf,
499 point_index_key(
500 &spec.label,
501 &spec.property,
502 p.srid,
503 cell,
504 node.id.as_bytes(),
505 ),
506 );
507 }
508 if let Some(p) = &new_point {
509 let cell = point_cell(p.srid, p.x, p.y);
510 batch.put_cf(
511 point_cf,
512 point_index_key(
513 &spec.label,
514 &spec.property,
515 p.srid,
516 cell,
517 node.id.as_bytes(),
518 ),
519 point_index_value(p.x, p.y, p.z),
520 );
521 }
522 }
523 }
524
525 Ok(())
526 }
527
528 pub fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
529 let cf = self.cf(CF_NODES)?;
530 match self.db.get_cf(cf, id.as_bytes())? {
531 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
532 None => Ok(None),
533 }
534 }
535
536 pub fn put_edge(&self, edge: &Edge) -> Result<()> {
537 let mut batch = WriteBatch::default();
538 self.append_put_edge(&mut batch, edge)?;
539 self.db.write(batch)?;
540 Ok(())
541 }
542
543 fn append_put_edge(&self, batch: &mut WriteBatch, edge: &Edge) -> Result<()> {
544 let edges_cf = self.cf(CF_EDGES)?;
545 let out_cf = self.cf(CF_ADJ_OUT)?;
546 let in_cf = self.cf(CF_ADJ_IN)?;
547 let type_cf = self.cf(CF_TYPE_INDEX)?;
548 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
549
550 let existing_edge = self.get_edge(edge.id)?;
555 self.enforce_edge_constraints(edge, existing_edge.as_ref())?;
556
557 let bytes = serde_json::to_vec(edge)?;
558 batch.put_cf(edges_cf, edge.id.as_bytes(), bytes);
559 batch.put_cf(
560 out_cf,
561 adj_key(edge.source, edge.id),
562 edge.target.as_bytes(),
563 );
564 batch.put_cf(in_cf, adj_key(edge.target, edge.id), edge.source.as_bytes());
565 batch.put_cf(type_cf, type_index_key(&edge.edge_type, edge.id), EMPTY);
566
567 let edge_indexes = self
575 .edge_indexes
576 .read()
577 .expect("edge_indexes lock poisoned");
578 for spec in edge_indexes.iter() {
579 if spec.edge_type != edge.edge_type {
580 continue;
581 }
582 let old_encoded = existing_edge
583 .as_ref()
584 .and_then(|e| encode_index_tuple(&e.properties, &spec.properties));
585 let new_encoded = encode_index_tuple(&edge.properties, &spec.properties);
586 if old_encoded == new_encoded {
587 continue;
588 }
589 if let Some(values) = &old_encoded {
590 batch.delete_cf(
591 edge_prop_cf,
592 edge_property_index_composite_key(
593 &spec.edge_type,
594 &spec.properties,
595 values,
596 edge.id,
597 ),
598 );
599 }
600 if let Some(values) = &new_encoded {
601 batch.put_cf(
602 edge_prop_cf,
603 edge_property_index_composite_key(
604 &spec.edge_type,
605 &spec.properties,
606 values,
607 edge.id,
608 ),
609 EMPTY,
610 );
611 }
612 }
613
614 let edge_point_indexes = self
619 .edge_point_indexes
620 .read()
621 .expect("edge_point_indexes lock poisoned");
622 if !edge_point_indexes.is_empty() {
623 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
624 for spec in edge_point_indexes.iter() {
625 if spec.edge_type != edge.edge_type {
626 continue;
627 }
628 let old_point = existing_edge
629 .as_ref()
630 .and_then(|e| extract_indexable_point(&e.properties, &spec.property));
631 let new_point = extract_indexable_point(&edge.properties, &spec.property);
632 if old_point == new_point {
633 continue;
634 }
635 if let Some(p) = &old_point {
636 let cell = point_cell(p.srid, p.x, p.y);
637 batch.delete_cf(
638 point_cf,
639 point_index_key(
640 &spec.edge_type,
641 &spec.property,
642 p.srid,
643 cell,
644 edge.id.as_bytes(),
645 ),
646 );
647 }
648 if let Some(p) = &new_point {
649 let cell = point_cell(p.srid, p.x, p.y);
650 batch.put_cf(
651 point_cf,
652 point_index_key(
653 &spec.edge_type,
654 &spec.property,
655 p.srid,
656 cell,
657 edge.id.as_bytes(),
658 ),
659 point_index_value(p.x, p.y, p.z),
660 );
661 }
662 }
663 }
664 Ok(())
665 }
666
667 pub fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
668 let cf = self.cf(CF_EDGES)?;
669 match self.db.get_cf(cf, id.as_bytes())? {
670 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
671 None => Ok(None),
672 }
673 }
674
675 pub fn delete_edge(&self, id: EdgeId) -> Result<()> {
676 let edge = self.get_edge(id)?.ok_or(Error::EdgeNotFound(id))?;
677 let mut batch = WriteBatch::default();
678 self.append_delete_edge(&mut batch, id, &edge)?;
679 self.db.write(batch)?;
680 Ok(())
681 }
682
683 fn append_delete_edge(&self, batch: &mut WriteBatch, id: EdgeId, edge: &Edge) -> Result<()> {
684 let edges_cf = self.cf(CF_EDGES)?;
685 let out_cf = self.cf(CF_ADJ_OUT)?;
686 let in_cf = self.cf(CF_ADJ_IN)?;
687 let type_cf = self.cf(CF_TYPE_INDEX)?;
688 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
689
690 batch.delete_cf(edges_cf, id.as_bytes());
691 batch.delete_cf(out_cf, adj_key(edge.source, id));
692 batch.delete_cf(in_cf, adj_key(edge.target, id));
693 batch.delete_cf(type_cf, type_index_key(&edge.edge_type, id));
694
695 let edge_indexes = self
700 .edge_indexes
701 .read()
702 .expect("edge_indexes lock poisoned");
703 for spec in edge_indexes.iter() {
704 if spec.edge_type != edge.edge_type {
705 continue;
706 }
707 if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
708 batch.delete_cf(
709 edge_prop_cf,
710 edge_property_index_composite_key(
711 &spec.edge_type,
712 &spec.properties,
713 &values,
714 id,
715 ),
716 );
717 }
718 }
719
720 let edge_point_indexes = self
723 .edge_point_indexes
724 .read()
725 .expect("edge_point_indexes lock poisoned");
726 if !edge_point_indexes.is_empty() {
727 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
728 for spec in edge_point_indexes.iter() {
729 if spec.edge_type != edge.edge_type {
730 continue;
731 }
732 if let Some(p) = extract_indexable_point(&edge.properties, &spec.property) {
733 let cell = point_cell(p.srid, p.x, p.y);
734 batch.delete_cf(
735 point_cf,
736 point_index_key(
737 &spec.edge_type,
738 &spec.property,
739 p.srid,
740 cell,
741 id.as_bytes(),
742 ),
743 );
744 }
745 }
746 }
747 Ok(())
748 }
749
750 pub fn detach_delete_node(&self, id: NodeId) -> Result<()> {
751 let mut batch = WriteBatch::default();
752 self.append_detach_delete_node(&mut batch, id)?;
753 self.db.write(batch)?;
754 Ok(())
755 }
756
757 fn append_detach_delete_node(&self, batch: &mut WriteBatch, id: NodeId) -> Result<()> {
758 let node = self.get_node(id)?;
759 let outgoing = self.outgoing(id)?;
760 let incoming = self.incoming(id)?;
761
762 let nodes_cf = self.cf(CF_NODES)?;
763 let edges_cf = self.cf(CF_EDGES)?;
764 let out_cf = self.cf(CF_ADJ_OUT)?;
765 let in_cf = self.cf(CF_ADJ_IN)?;
766 let label_cf = self.cf(CF_LABEL_INDEX)?;
767 let type_cf = self.cf(CF_TYPE_INDEX)?;
768 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
769 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
770
771 if let Some(n) = &node {
772 for label in &n.labels {
773 batch.delete_cf(label_cf, label_index_key(label, id));
774 }
775 let indexes = self.indexes.read().expect("indexes lock poisoned");
780 for spec in indexes.iter() {
781 if !n.labels.iter().any(|l| l == &spec.label) {
782 continue;
783 }
784 if let Some(values) = encode_index_tuple(&n.properties, &spec.properties) {
785 batch.delete_cf(
786 prop_cf,
787 property_index_composite_key(&spec.label, &spec.properties, &values, id),
788 );
789 }
790 }
791 let point_indexes = self
794 .point_indexes
795 .read()
796 .expect("point_indexes lock poisoned");
797 if !point_indexes.is_empty() {
798 let point_cf = self.cf(CF_POINT_INDEX)?;
799 for spec in point_indexes.iter() {
800 if !n.labels.iter().any(|l| l == &spec.label) {
801 continue;
802 }
803 if let Some(p) = extract_indexable_point(&n.properties, &spec.property) {
804 let cell = point_cell(p.srid, p.x, p.y);
805 batch.delete_cf(
806 point_cf,
807 point_index_key(
808 &spec.label,
809 &spec.property,
810 p.srid,
811 cell,
812 id.as_bytes(),
813 ),
814 );
815 }
816 }
817 }
818 }
819
820 let edge_indexes_snapshot = self
824 .edge_indexes
825 .read()
826 .expect("edge_indexes lock poisoned")
827 .clone();
828 let edge_point_indexes_snapshot = self
829 .edge_point_indexes
830 .read()
831 .expect("edge_point_indexes lock poisoned")
832 .clone();
833 let edge_point_cf_opt = if edge_point_indexes_snapshot.is_empty() {
834 None
835 } else {
836 Some(self.cf(CF_EDGE_POINT_INDEX)?)
837 };
838
839 for (edge_id, target) in &outgoing {
840 if let Some(e) = self.get_edge(*edge_id)? {
841 batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
842 for spec in &edge_indexes_snapshot {
843 if spec.edge_type != e.edge_type {
844 continue;
845 }
846 if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
847 batch.delete_cf(
848 edge_prop_cf,
849 edge_property_index_composite_key(
850 &spec.edge_type,
851 &spec.properties,
852 &values,
853 *edge_id,
854 ),
855 );
856 }
857 }
858 if let Some(point_cf) = edge_point_cf_opt {
859 for spec in &edge_point_indexes_snapshot {
860 if spec.edge_type != e.edge_type {
861 continue;
862 }
863 if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
864 let cell = point_cell(p.srid, p.x, p.y);
865 batch.delete_cf(
866 point_cf,
867 point_index_key(
868 &spec.edge_type,
869 &spec.property,
870 p.srid,
871 cell,
872 edge_id.as_bytes(),
873 ),
874 );
875 }
876 }
877 }
878 }
879 batch.delete_cf(edges_cf, edge_id.as_bytes());
880 batch.delete_cf(out_cf, adj_key(id, *edge_id));
881 batch.delete_cf(in_cf, adj_key(*target, *edge_id));
882 }
883 for (edge_id, source) in &incoming {
884 if let Some(e) = self.get_edge(*edge_id)? {
885 batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
886 for spec in &edge_indexes_snapshot {
887 if spec.edge_type != e.edge_type {
888 continue;
889 }
890 if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
891 batch.delete_cf(
892 edge_prop_cf,
893 edge_property_index_composite_key(
894 &spec.edge_type,
895 &spec.properties,
896 &values,
897 *edge_id,
898 ),
899 );
900 }
901 }
902 if let Some(point_cf) = edge_point_cf_opt {
903 for spec in &edge_point_indexes_snapshot {
904 if spec.edge_type != e.edge_type {
905 continue;
906 }
907 if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
908 let cell = point_cell(p.srid, p.x, p.y);
909 batch.delete_cf(
910 point_cf,
911 point_index_key(
912 &spec.edge_type,
913 &spec.property,
914 p.srid,
915 cell,
916 edge_id.as_bytes(),
917 ),
918 );
919 }
920 }
921 }
922 }
923 batch.delete_cf(edges_cf, edge_id.as_bytes());
924 batch.delete_cf(out_cf, adj_key(*source, *edge_id));
925 batch.delete_cf(in_cf, adj_key(id, *edge_id));
926 }
927 batch.delete_cf(nodes_cf, id.as_bytes());
928 Ok(())
929 }
930
931 pub fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
942 let mut batch = WriteBatch::default();
943 for m in mutations {
944 match m {
945 GraphMutation::PutNode(n) => self.append_put_node(&mut batch, n)?,
946 GraphMutation::PutEdge(e) => self.append_put_edge(&mut batch, e)?,
947 GraphMutation::DeleteEdge(id) => {
948 if let Some(edge) = self.get_edge(*id)? {
949 self.append_delete_edge(&mut batch, *id, &edge)?;
950 }
951 }
952 GraphMutation::DetachDeleteNode(id) => {
953 self.append_detach_delete_node(&mut batch, *id)?;
954 }
955 }
956 }
957 self.db.write(batch)?;
958 Ok(())
959 }
960
961 pub fn all_nodes(&self) -> Result<Vec<Node>> {
964 let cf = self.cf(CF_NODES)?;
965 let mut nodes = Vec::new();
966 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
967 let (_, value) = item?;
968 nodes.push(serde_json::from_slice(&value)?);
969 }
970 Ok(nodes)
971 }
972
973 pub fn all_edges(&self) -> Result<Vec<Edge>> {
975 let cf = self.cf(CF_EDGES)?;
976 let mut edges = Vec::new();
977 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
978 let (_, value) = item?;
979 edges.push(serde_json::from_slice(&value)?);
980 }
981 Ok(edges)
982 }
983
984 pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<()> {
996 let checkpoint = Checkpoint::new(&self.db)?;
997 checkpoint.create_checkpoint(path)?;
998 Ok(())
999 }
1000
1001 pub fn clear_all(&self) -> Result<()> {
1005 let mut batch = WriteBatch::default();
1006 for cf_name in ALL_CFS {
1007 let cf = self.cf(cf_name)?;
1008 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1009 let (key, _) = item?;
1010 batch.delete_cf(cf, key);
1011 }
1012 }
1013 self.db.write(batch)?;
1014 Ok(())
1015 }
1016
1017 pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
1018 let cf = self.cf(CF_NODES)?;
1019 let mut results = Vec::new();
1020 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1021 let (key, _) = item?;
1022 if key.len() != ID_LEN {
1023 return Err(Error::CorruptBytes {
1024 cf: CF_NODES,
1025 expected: ID_LEN,
1026 actual: key.len(),
1027 });
1028 }
1029 let mut bytes = [0u8; ID_LEN];
1030 bytes.copy_from_slice(&key);
1031 results.push(NodeId::from_bytes(bytes));
1032 }
1033 Ok(results)
1034 }
1035
1036 pub fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1037 self.scan_adj(CF_ADJ_OUT, source)
1038 }
1039
1040 pub fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1041 self.scan_adj(CF_ADJ_IN, target)
1042 }
1043
1044 pub fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
1047 self.indexes.read().expect("indexes lock poisoned").clone()
1048 }
1049
1050 pub fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
1061 self.create_property_index_composite(label, &[property.to_string()])
1062 }
1063
1064 pub fn create_property_index_composite(
1075 &self,
1076 label: &str,
1077 properties: &[String],
1078 ) -> Result<()> {
1079 assert!(
1080 !properties.is_empty(),
1081 "create_property_index_composite requires at least one property"
1082 );
1083 let spec = PropertyIndexSpec {
1084 label: label.to_string(),
1085 properties: properties.to_vec(),
1086 };
1087 let mut guard = self.indexes.write().expect("indexes lock poisoned");
1088 if guard.contains(&spec) {
1089 return Ok(());
1090 }
1091
1092 let meta_cf = self.cf(CF_INDEX_META)?;
1093 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1094 let mut batch = WriteBatch::default();
1095 batch.put_cf(meta_cf, index_meta_key(&spec), EMPTY);
1096
1097 for node_id in self.nodes_by_label(label)? {
1098 let node = match self.get_node(node_id)? {
1099 Some(n) => n,
1100 None => continue,
1101 };
1102 if let Some(values) = encode_index_tuple(&node.properties, &spec.properties) {
1103 batch.put_cf(
1104 prop_cf,
1105 property_index_composite_key(label, properties, &values, node_id),
1106 EMPTY,
1107 );
1108 }
1109 }
1110
1111 self.db.write(batch)?;
1112 guard.push(spec);
1113 Ok(())
1114 }
1115
1116 pub fn create_property_constraint(
1141 &self,
1142 name: Option<&str>,
1143 scope: &ConstraintScope,
1144 properties: &[String],
1145 kind: PropertyConstraintKind,
1146 if_not_exists: bool,
1147 ) -> Result<PropertyConstraintSpec> {
1148 if properties.is_empty() {
1155 return Err(Error::ConstraintArity {
1156 kind: kind.as_string(),
1157 details: "at least one property is required".into(),
1158 });
1159 }
1160 if !kind.allows_multi_property() && properties.len() != 1 {
1161 return Err(Error::ConstraintArity {
1162 kind: kind.as_string(),
1163 details: format!("expects exactly one property, got {}", properties.len()),
1164 });
1165 }
1166 if matches!(kind, PropertyConstraintKind::NodeKey)
1167 && matches!(scope, ConstraintScope::Relationship(_))
1168 {
1169 return Err(Error::ConstraintArity {
1170 kind: kind.as_string(),
1171 details: "NODE KEY cannot be applied to a relationship scope".into(),
1172 });
1173 }
1174 let resolved_name = match name {
1175 Some(n) => n.to_string(),
1176 None => default_constraint_name(scope, properties, kind),
1177 };
1178 let spec = PropertyConstraintSpec {
1179 name: resolved_name.clone(),
1180 scope: scope.clone(),
1181 properties: properties.to_vec(),
1182 kind,
1183 };
1184
1185 {
1189 let guard = self.constraints.read().expect("constraints lock poisoned");
1190 if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1191 if existing == &spec {
1192 return Ok(existing.clone());
1193 }
1194 if if_not_exists {
1195 return Ok(existing.clone());
1196 }
1197 return Err(Error::ConstraintNameConflict {
1198 name: resolved_name,
1199 });
1200 }
1201 }
1202
1203 match (kind, scope) {
1213 (PropertyConstraintKind::Unique, ConstraintScope::Node(label)) => {
1214 self.create_property_index(label, &properties[0])?;
1215 }
1216 (PropertyConstraintKind::Unique, ConstraintScope::Relationship(edge_type)) => {
1217 self.create_edge_property_index(edge_type, &properties[0])?;
1218 }
1219 (PropertyConstraintKind::NodeKey, ConstraintScope::Node(label)) => {
1220 self.create_property_index_composite(label, properties)?;
1221 }
1222 _ => {}
1223 }
1224
1225 validate_existing_data(self, &spec)?;
1230
1231 let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1232 let mut batch = WriteBatch::default();
1233 batch.put_cf(
1234 meta_cf,
1235 resolved_name.as_bytes(),
1236 constraint_meta_encode(&spec),
1237 );
1238 self.db.write(batch)?;
1239
1240 let mut guard = self.constraints.write().expect("constraints lock poisoned");
1241 if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1246 return Ok(existing.clone());
1247 }
1248 guard.push(spec.clone());
1249 Ok(spec)
1250 }
1251
1252 pub fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1257 let mut guard = self.constraints.write().expect("constraints lock poisoned");
1258 let idx = guard.iter().position(|s| s.name == name);
1259 match idx {
1260 None if if_exists => Ok(()),
1261 None => Err(Error::ConstraintNotFound {
1262 name: name.to_string(),
1263 }),
1264 Some(i) => {
1265 let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1266 let mut batch = WriteBatch::default();
1267 batch.delete_cf(meta_cf, name.as_bytes());
1268 self.db.write(batch)?;
1269 guard.remove(i);
1270 Ok(())
1271 }
1272 }
1273 }
1274
1275 pub fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
1278 self.constraints
1279 .read()
1280 .expect("constraints lock poisoned")
1281 .clone()
1282 }
1283
1284 fn enforce_constraints(&self, node: &Node, existing: Option<&Node>) -> Result<()> {
1292 let constraints = self.constraints.read().expect("constraints lock poisoned");
1293 if constraints.is_empty() {
1294 return Ok(());
1295 }
1296 let now_has_label = |label: &str| node.labels.iter().any(|l| l == label);
1297 for spec in constraints.iter() {
1298 let label = match &spec.scope {
1299 ConstraintScope::Node(l) => l,
1300 ConstraintScope::Relationship(_) => continue,
1304 };
1305 if !now_has_label(label) {
1306 continue;
1307 }
1308 let primary = spec.primary_property();
1311 match spec.kind {
1312 PropertyConstraintKind::NotNull => {
1313 let present = node
1314 .properties
1315 .get(primary)
1316 .is_some_and(|v| !matches!(v, Property::Null));
1317 if !present {
1318 return Err(Error::ConstraintViolation {
1319 name: spec.name.clone(),
1320 kind: spec.kind.as_string(),
1321 label: label.clone(),
1322 property: primary.to_string(),
1323 details: format!(
1324 "node {} is missing required property `{}`",
1325 node.id, primary
1326 ),
1327 });
1328 }
1329 }
1330 PropertyConstraintKind::Unique => {
1331 let Some(value) = node.properties.get(primary) else {
1332 continue;
1336 };
1337 if matches!(value, Property::Null) {
1338 continue;
1339 }
1340 if encode_index_value(value).is_none() {
1341 continue;
1342 }
1343 let holders = self.nodes_by_property(label, primary, value)?;
1344 for other in holders {
1345 if other == node.id {
1346 continue;
1347 }
1348 let was_self = existing
1349 .and_then(|n| n.properties.get(primary))
1350 .is_some_and(|v| v == value);
1351 let _ = was_self;
1352 return Err(Error::ConstraintViolation {
1353 name: spec.name.clone(),
1354 kind: spec.kind.as_string(),
1355 label: label.clone(),
1356 property: primary.to_string(),
1357 details: format!("value already held by node {}", other),
1358 });
1359 }
1360 }
1361 PropertyConstraintKind::PropertyType(target) => {
1362 let Some(value) = node.properties.get(primary) else {
1363 continue;
1364 };
1365 if matches!(value, Property::Null) {
1366 continue;
1367 }
1368 if !property_matches_type(value, target) {
1369 return Err(Error::ConstraintViolation {
1370 name: spec.name.clone(),
1371 kind: spec.kind.as_string(),
1372 label: label.clone(),
1373 property: primary.to_string(),
1374 details: format!(
1375 "node {} has value of type {} (expected {})",
1376 node.id,
1377 value.type_name(),
1378 target.as_str()
1379 ),
1380 });
1381 }
1382 }
1383 PropertyConstraintKind::NodeKey => {
1384 let mut my_tuple: Vec<Property> = Vec::with_capacity(spec.properties.len());
1390 for prop in &spec.properties {
1391 match node.properties.get(prop) {
1392 Some(v) if !matches!(v, Property::Null) => my_tuple.push(v.clone()),
1393 _ => {
1394 return Err(Error::ConstraintViolation {
1395 name: spec.name.clone(),
1396 kind: spec.kind.as_string(),
1397 label: label.clone(),
1398 property: prop.clone(),
1399 details: format!(
1400 "node {} is missing required property `{}`",
1401 node.id, prop
1402 ),
1403 });
1404 }
1405 }
1406 }
1407 let hits = self.nodes_by_properties(label, &spec.properties, &my_tuple)?;
1411 if let Some(other_id) = hits.iter().find(|&&id| id != node.id) {
1412 return Err(Error::ConstraintViolation {
1413 name: spec.name.clone(),
1414 kind: spec.kind.as_string(),
1415 label: label.clone(),
1416 property: spec.properties.join(","),
1417 details: format!("tuple already held by node {}", other_id),
1418 });
1419 }
1420 }
1421 }
1422 }
1423 Ok(())
1424 }
1425
1426 fn enforce_edge_constraints(&self, edge: &Edge, existing: Option<&Edge>) -> Result<()> {
1433 let constraints = self.constraints.read().expect("constraints lock poisoned");
1434 if constraints.is_empty() {
1435 return Ok(());
1436 }
1437 for spec in constraints.iter() {
1438 let edge_type = match &spec.scope {
1439 ConstraintScope::Relationship(t) => t,
1440 ConstraintScope::Node(_) => continue,
1442 };
1443 if edge_type != &edge.edge_type {
1444 continue;
1445 }
1446 let primary = spec.primary_property();
1447 match spec.kind {
1448 PropertyConstraintKind::NotNull => {
1449 let present = edge
1450 .properties
1451 .get(primary)
1452 .is_some_and(|v| !matches!(v, Property::Null));
1453 if !present {
1454 return Err(Error::ConstraintViolation {
1455 name: spec.name.clone(),
1456 kind: spec.kind.as_string(),
1457 label: edge_type.clone(),
1458 property: primary.to_string(),
1459 details: format!(
1460 "edge {} is missing required property `{}`",
1461 edge.id, primary
1462 ),
1463 });
1464 }
1465 }
1466 PropertyConstraintKind::Unique => {
1467 let Some(value) = edge.properties.get(primary) else {
1468 continue;
1473 };
1474 if matches!(value, Property::Null) {
1475 continue;
1476 }
1477 if encode_index_value(value).is_none() {
1478 continue;
1484 }
1485 let holders = self.edges_by_property(edge_type, primary, value)?;
1490 for other_id in holders {
1491 if other_id == edge.id {
1492 continue;
1493 }
1494 let was_self = existing
1495 .and_then(|e| e.properties.get(primary))
1496 .is_some_and(|v| v == value);
1497 let _ = was_self;
1498 return Err(Error::ConstraintViolation {
1499 name: spec.name.clone(),
1500 kind: spec.kind.as_string(),
1501 label: edge_type.clone(),
1502 property: primary.to_string(),
1503 details: format!("value already held by edge {}", other_id),
1504 });
1505 }
1506 }
1507 PropertyConstraintKind::PropertyType(target) => {
1508 let Some(value) = edge.properties.get(primary) else {
1509 continue;
1510 };
1511 if matches!(value, Property::Null) {
1512 continue;
1513 }
1514 if !property_matches_type(value, target) {
1515 return Err(Error::ConstraintViolation {
1516 name: spec.name.clone(),
1517 kind: spec.kind.as_string(),
1518 label: edge_type.clone(),
1519 property: primary.to_string(),
1520 details: format!(
1521 "edge {} has value of type {} (expected {})",
1522 edge.id,
1523 value.type_name(),
1524 target.as_str()
1525 ),
1526 });
1527 }
1528 }
1529 PropertyConstraintKind::NodeKey => {
1530 unreachable!(
1533 "NODE KEY on relationship scope should have been rejected at create time"
1534 );
1535 }
1536 }
1537 }
1538 Ok(())
1539 }
1540
1541 pub fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
1545 self.drop_property_index_composite(label, &[property.to_string()])
1546 }
1547
1548 pub fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
1549 assert!(
1550 !properties.is_empty(),
1551 "drop_property_index_composite requires at least one property"
1552 );
1553 let spec = PropertyIndexSpec {
1554 label: label.to_string(),
1555 properties: properties.to_vec(),
1556 };
1557 let mut guard = self.indexes.write().expect("indexes lock poisoned");
1558 if !guard.contains(&spec) {
1559 return Ok(());
1560 }
1561
1562 if let Some(constraint) = find_constraint_backing_node_index(
1569 &self.constraints.read().expect("constraints lock poisoned"),
1570 label,
1571 properties,
1572 ) {
1573 return Err(Error::IndexInUse { constraint });
1574 }
1575
1576 let meta_cf = self.cf(CF_INDEX_META)?;
1577 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1578 let mut batch = WriteBatch::default();
1579 batch.delete_cf(meta_cf, index_meta_key(&spec));
1580
1581 let label_prefix = property_index_label_prefix(label);
1588 let iter = self.db.iterator_cf(
1589 prop_cf,
1590 IteratorMode::From(&label_prefix, Direction::Forward),
1591 );
1592 for item in iter {
1593 let (key, _) = item?;
1594 if !key.starts_with(&label_prefix) {
1595 break;
1596 }
1597 let Some(parsed) = parse_property_index_entry_props(&key) else {
1598 continue;
1599 };
1600 if parsed.len() == properties.len()
1601 && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1602 {
1603 batch.delete_cf(prop_cf, key);
1604 }
1605 }
1606
1607 self.db.write(batch)?;
1608 guard.retain(|s| s != &spec);
1609 Ok(())
1610 }
1611
1612 pub fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
1616 self.edge_indexes
1617 .read()
1618 .expect("edge_indexes lock poisoned")
1619 .clone()
1620 }
1621
1622 pub fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1632 self.create_edge_property_index_composite(edge_type, &[property.to_string()])
1633 }
1634
1635 pub fn create_edge_property_index_composite(
1636 &self,
1637 edge_type: &str,
1638 properties: &[String],
1639 ) -> Result<()> {
1640 assert!(
1641 !properties.is_empty(),
1642 "create_edge_property_index_composite requires at least one property"
1643 );
1644 let spec = EdgePropertyIndexSpec {
1645 edge_type: edge_type.to_string(),
1646 properties: properties.to_vec(),
1647 };
1648 let mut guard = self
1649 .edge_indexes
1650 .write()
1651 .expect("edge_indexes lock poisoned");
1652 if guard.contains(&spec) {
1653 return Ok(());
1654 }
1655
1656 let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1657 let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1658 let mut batch = WriteBatch::default();
1659 batch.put_cf(meta_cf, edge_index_meta_key(&spec), EMPTY);
1660
1661 for edge_id in self.edges_by_type(edge_type)? {
1662 let edge = match self.get_edge(edge_id)? {
1663 Some(e) => e,
1664 None => continue,
1665 };
1666 if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
1667 batch.put_cf(
1668 prop_cf,
1669 edge_property_index_composite_key(edge_type, properties, &values, edge_id),
1670 EMPTY,
1671 );
1672 }
1673 }
1674
1675 self.db.write(batch)?;
1676 guard.push(spec);
1677 Ok(())
1678 }
1679
1680 pub fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1681 self.drop_edge_property_index_composite(edge_type, &[property.to_string()])
1682 }
1683
1684 pub fn drop_edge_property_index_composite(
1685 &self,
1686 edge_type: &str,
1687 properties: &[String],
1688 ) -> Result<()> {
1689 assert!(
1690 !properties.is_empty(),
1691 "drop_edge_property_index_composite requires at least one property"
1692 );
1693 let spec = EdgePropertyIndexSpec {
1694 edge_type: edge_type.to_string(),
1695 properties: properties.to_vec(),
1696 };
1697 let mut guard = self
1698 .edge_indexes
1699 .write()
1700 .expect("edge_indexes lock poisoned");
1701 if !guard.contains(&spec) {
1702 return Ok(());
1703 }
1704
1705 if let Some(constraint) = find_constraint_backing_edge_index(
1711 &self.constraints.read().expect("constraints lock poisoned"),
1712 edge_type,
1713 properties,
1714 ) {
1715 return Err(Error::IndexInUse { constraint });
1716 }
1717
1718 let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1719 let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1720 let mut batch = WriteBatch::default();
1721 batch.delete_cf(meta_cf, edge_index_meta_key(&spec));
1722
1723 let label_prefix = property_index_label_prefix(edge_type);
1726 let iter = self.db.iterator_cf(
1727 prop_cf,
1728 IteratorMode::From(&label_prefix, Direction::Forward),
1729 );
1730 for item in iter {
1731 let (key, _) = item?;
1732 if !key.starts_with(&label_prefix) {
1733 break;
1734 }
1735 let Some(parsed) = parse_property_index_entry_props(&key) else {
1736 continue;
1737 };
1738 if parsed.len() == properties.len()
1739 && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1740 {
1741 batch.delete_cf(prop_cf, key);
1742 }
1743 }
1744
1745 self.db.write(batch)?;
1746 guard.retain(|s| s != &spec);
1747 Ok(())
1748 }
1749
1750 pub fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
1766 self.point_indexes
1767 .read()
1768 .expect("point_indexes lock poisoned")
1769 .clone()
1770 }
1771
1772 pub fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
1779 let spec = PointIndexSpec {
1780 label: label.to_string(),
1781 property: property.to_string(),
1782 };
1783 let mut guard = self
1784 .point_indexes
1785 .write()
1786 .expect("point_indexes lock poisoned");
1787 if guard.contains(&spec) {
1788 return Ok(());
1789 }
1790
1791 let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1792 let point_cf = self.cf(CF_POINT_INDEX)?;
1793 let mut batch = WriteBatch::default();
1794 batch.put_cf(meta_cf, point_index_meta_key(&spec), EMPTY);
1795
1796 for node_id in self.nodes_by_label(label)? {
1797 let node = match self.get_node(node_id)? {
1798 Some(n) => n,
1799 None => continue,
1800 };
1801 if let Some(Property::Point(p)) = node.properties.get(property) {
1802 let cell = point_cell(p.srid, p.x, p.y);
1803 batch.put_cf(
1804 point_cf,
1805 point_index_key(label, property, p.srid, cell, node_id.as_bytes()),
1806 point_index_value(p.x, p.y, p.z),
1807 );
1808 }
1809 }
1810
1811 self.db.write(batch)?;
1812 guard.push(spec);
1813 Ok(())
1814 }
1815
1816 pub fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
1821 let spec = PointIndexSpec {
1822 label: label.to_string(),
1823 property: property.to_string(),
1824 };
1825 let mut guard = self
1826 .point_indexes
1827 .write()
1828 .expect("point_indexes lock poisoned");
1829 if !guard.contains(&spec) {
1830 return Ok(());
1831 }
1832
1833 let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1834 let point_cf = self.cf(CF_POINT_INDEX)?;
1835 let mut batch = WriteBatch::default();
1836 batch.delete_cf(meta_cf, point_index_meta_key(&spec));
1837
1838 let prefix = point_index_label_prop_prefix(label, property);
1839 let iter = self
1840 .db
1841 .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
1842 for item in iter {
1843 let (key, _) = item?;
1844 if !key.starts_with(&prefix) {
1845 break;
1846 }
1847 batch.delete_cf(point_cf, key);
1848 }
1849
1850 self.db.write(batch)?;
1851 guard.retain(|s| s != &spec);
1852 Ok(())
1853 }
1854
1855 pub fn nodes_in_bbox(
1867 &self,
1868 label: &str,
1869 property: &str,
1870 srid: i32,
1871 xlo: f64,
1872 ylo: f64,
1873 xhi: f64,
1874 yhi: f64,
1875 ) -> Result<Vec<NodeId>> {
1876 let spec = PointIndexSpec {
1877 label: label.to_string(),
1878 property: property.to_string(),
1879 };
1880 {
1881 let guard = self
1882 .point_indexes
1883 .read()
1884 .expect("point_indexes lock poisoned");
1885 if !guard.contains(&spec) {
1886 return Ok(Vec::new());
1887 }
1888 }
1889
1890 let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
1891 let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
1892 let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
1893
1894 let prefix = point_index_srid_prefix(label, property, srid);
1895 let header_len = prefix.len();
1896 let mut seek = prefix.clone();
1897 seek.extend_from_slice(&min_cell.to_be_bytes());
1898
1899 let cf = self.cf(CF_POINT_INDEX)?;
1900 let iter = self
1901 .db
1902 .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
1903 let mut results = Vec::new();
1904 for item in iter {
1905 let (key, value) = item?;
1906 if !key.starts_with(&prefix) {
1907 break;
1908 }
1909 let cell = cell_from_point_index_key(CF_POINT_INDEX, &key, header_len)?;
1910 if cell > max_cell {
1911 break;
1912 }
1913 let (x, y, _z) = decode_point_index_value(CF_POINT_INDEX, &value)?;
1914 if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
1915 continue;
1916 }
1917 let id_bytes = node_id_from_point_index_key(CF_POINT_INDEX, &key)?;
1918 results.push(NodeId::from_bytes(id_bytes));
1919 }
1920 results.sort();
1921 Ok(results)
1922 }
1923
1924 pub fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
1935 self.edge_point_indexes
1936 .read()
1937 .expect("edge_point_indexes lock poisoned")
1938 .clone()
1939 }
1940
1941 pub fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
1946 let spec = EdgePointIndexSpec {
1947 edge_type: edge_type.to_string(),
1948 property: property.to_string(),
1949 };
1950 let mut guard = self
1951 .edge_point_indexes
1952 .write()
1953 .expect("edge_point_indexes lock poisoned");
1954 if guard.contains(&spec) {
1955 return Ok(());
1956 }
1957
1958 let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
1959 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
1960 let mut batch = WriteBatch::default();
1961 batch.put_cf(meta_cf, edge_point_index_meta_key(&spec), EMPTY);
1962
1963 for edge_id in self.edges_by_type(edge_type)? {
1964 let edge = match self.get_edge(edge_id)? {
1965 Some(e) => e,
1966 None => continue,
1967 };
1968 if let Some(Property::Point(p)) = edge.properties.get(property) {
1969 let cell = point_cell(p.srid, p.x, p.y);
1970 batch.put_cf(
1971 point_cf,
1972 point_index_key(edge_type, property, p.srid, cell, edge_id.as_bytes()),
1978 point_index_value(p.x, p.y, p.z),
1979 );
1980 }
1981 }
1982
1983 self.db.write(batch)?;
1984 guard.push(spec);
1985 Ok(())
1986 }
1987
1988 pub fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
1990 let spec = EdgePointIndexSpec {
1991 edge_type: edge_type.to_string(),
1992 property: property.to_string(),
1993 };
1994 let mut guard = self
1995 .edge_point_indexes
1996 .write()
1997 .expect("edge_point_indexes lock poisoned");
1998 if !guard.contains(&spec) {
1999 return Ok(());
2000 }
2001
2002 let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2003 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2004 let mut batch = WriteBatch::default();
2005 batch.delete_cf(meta_cf, edge_point_index_meta_key(&spec));
2006
2007 let prefix = point_index_label_prop_prefix(edge_type, property);
2008 let iter = self
2009 .db
2010 .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
2011 for item in iter {
2012 let (key, _) = item?;
2013 if !key.starts_with(&prefix) {
2014 break;
2015 }
2016 batch.delete_cf(point_cf, key);
2017 }
2018
2019 self.db.write(batch)?;
2020 guard.retain(|s| s != &spec);
2021 Ok(())
2022 }
2023
2024 pub fn edges_in_bbox(
2029 &self,
2030 edge_type: &str,
2031 property: &str,
2032 srid: i32,
2033 xlo: f64,
2034 ylo: f64,
2035 xhi: f64,
2036 yhi: f64,
2037 ) -> Result<Vec<EdgeId>> {
2038 let spec = EdgePointIndexSpec {
2039 edge_type: edge_type.to_string(),
2040 property: property.to_string(),
2041 };
2042 {
2043 let guard = self
2044 .edge_point_indexes
2045 .read()
2046 .expect("edge_point_indexes lock poisoned");
2047 if !guard.contains(&spec) {
2048 return Ok(Vec::new());
2049 }
2050 }
2051
2052 let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
2053 let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
2054 let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
2055
2056 let prefix = point_index_srid_prefix(edge_type, property, srid);
2057 let header_len = prefix.len();
2058 let mut seek = prefix.clone();
2059 seek.extend_from_slice(&min_cell.to_be_bytes());
2060
2061 let cf = self.cf(CF_EDGE_POINT_INDEX)?;
2062 let iter = self
2063 .db
2064 .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
2065 let mut results = Vec::new();
2066 for item in iter {
2067 let (key, value) = item?;
2068 if !key.starts_with(&prefix) {
2069 break;
2070 }
2071 let cell = cell_from_point_index_key(CF_EDGE_POINT_INDEX, &key, header_len)?;
2072 if cell > max_cell {
2073 break;
2074 }
2075 let (x, y, _z) = decode_point_index_value(CF_EDGE_POINT_INDEX, &value)?;
2076 if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
2077 continue;
2078 }
2079 let id_bytes = node_id_from_point_index_key(CF_EDGE_POINT_INDEX, &key)?;
2080 results.push(EdgeId::from_bytes(id_bytes));
2081 }
2082 results.sort();
2083 Ok(results)
2084 }
2085
2086 pub fn edges_by_property(
2093 &self,
2094 edge_type: &str,
2095 property: &str,
2096 value: &Property,
2097 ) -> Result<Vec<EdgeId>> {
2098 self.edges_by_properties(
2099 edge_type,
2100 std::slice::from_ref(&property.to_string()),
2101 std::slice::from_ref(value),
2102 )
2103 }
2104
2105 pub fn edges_by_properties(
2109 &self,
2110 edge_type: &str,
2111 properties: &[String],
2112 values: &[Property],
2113 ) -> Result<Vec<EdgeId>> {
2114 assert_eq!(
2115 properties.len(),
2116 values.len(),
2117 "composite edge seek: properties and values must have equal length"
2118 );
2119 let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2120 for (p, v) in properties.iter().zip(values.iter()) {
2121 let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2122 property: p.clone(),
2123 kind: v.type_name(),
2124 })?;
2125 encoded.push(bytes);
2126 }
2127 let cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
2128 let prefix = edge_property_index_composite_value_prefix(edge_type, properties, &encoded);
2129 let mut results = Vec::new();
2130 let iter = self
2131 .db
2132 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2133 for item in iter {
2134 let (key, _) = item?;
2135 if !key.starts_with(&prefix) {
2136 break;
2137 }
2138 let bytes = edge_id_from_property_index_key(&key, prefix.len())?;
2139 results.push(EdgeId::from_bytes(bytes));
2140 }
2141 Ok(results)
2142 }
2143
2144 pub fn nodes_by_property(
2154 &self,
2155 label: &str,
2156 property: &str,
2157 value: &Property,
2158 ) -> Result<Vec<NodeId>> {
2159 self.nodes_by_properties(
2160 label,
2161 std::slice::from_ref(&property.to_string()),
2162 std::slice::from_ref(value),
2163 )
2164 }
2165
2166 pub fn nodes_by_properties(
2174 &self,
2175 label: &str,
2176 properties: &[String],
2177 values: &[Property],
2178 ) -> Result<Vec<NodeId>> {
2179 assert_eq!(
2180 properties.len(),
2181 values.len(),
2182 "composite seek: properties and values must have equal length"
2183 );
2184 let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2185 for (p, v) in properties.iter().zip(values.iter()) {
2186 let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2187 property: p.clone(),
2188 kind: v.type_name(),
2189 })?;
2190 encoded.push(bytes);
2191 }
2192 let cf = self.cf(CF_PROPERTY_INDEX)?;
2193 let prefix = property_index_composite_value_prefix(label, properties, &encoded);
2194 let mut results = Vec::new();
2195 let iter = self
2196 .db
2197 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2198 for item in iter {
2199 let (key, _) = item?;
2200 if !key.starts_with(&prefix) {
2201 break;
2202 }
2203 let bytes = node_id_from_property_index_key(&key, prefix.len())?;
2204 results.push(NodeId::from_bytes(bytes));
2205 }
2206 Ok(results)
2207 }
2208
2209 pub fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2210 let cf = self.cf(CF_LABEL_INDEX)?;
2211 let prefix = label_index_prefix(label);
2212 let mut results = Vec::new();
2213 let iter = self
2214 .db
2215 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2216 for item in iter {
2217 let (key, _) = item?;
2218 if !key.starts_with(&prefix) {
2219 break;
2220 }
2221 let bytes = id_from_str_index_key(CF_LABEL_INDEX, &key, label.len())?;
2222 results.push(NodeId::from_bytes(bytes));
2223 }
2224 Ok(results)
2225 }
2226
2227 pub fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2228 let cf = self.cf(CF_TYPE_INDEX)?;
2229 let prefix = type_index_prefix(edge_type);
2230 let mut results = Vec::new();
2231 let iter = self
2232 .db
2233 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2234 for item in iter {
2235 let (key, _) = item?;
2236 if !key.starts_with(&prefix) {
2237 break;
2238 }
2239 let bytes = id_from_str_index_key(CF_TYPE_INDEX, &key, edge_type.len())?;
2240 results.push(EdgeId::from_bytes(bytes));
2241 }
2242 Ok(results)
2243 }
2244
2245 fn scan_adj(&self, cf_name: &'static str, node: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2246 let cf = self.cf(cf_name)?;
2247 let prefix: &[u8] = node.as_bytes();
2248 let mut results = Vec::new();
2249 let iter = self
2250 .db
2251 .iterator_cf(cf, IteratorMode::From(prefix, Direction::Forward));
2252 for item in iter {
2253 let (key, value) = item?;
2254 if !key.starts_with(prefix) {
2255 break;
2256 }
2257 let edge_id = edge_from_adj_key(cf_name, &key)?;
2258 let other = node_from_adj_value(cf_name, &value)?;
2259 results.push((edge_id, other));
2260 }
2261 Ok(results)
2262 }
2263}
2264
2265fn extract_indexable_point(
2273 properties: &std::collections::HashMap<String, Property>,
2274 key: &str,
2275) -> Option<meshdb_core::Point> {
2276 match properties.get(key)? {
2277 Property::Point(p) => Some(*p),
2278 _ => None,
2279 }
2280}
2281
2282fn default_constraint_name(
2289 scope: &ConstraintScope,
2290 properties: &[String],
2291 kind: PropertyConstraintKind,
2292) -> String {
2293 let joined = properties.join("_");
2294 format!(
2295 "constraint_{scope_tag}_{target}_{joined}_{kind_tag}",
2296 scope_tag = scope.name_tag(),
2297 target = scope.target(),
2298 kind_tag = kind.name_tag(),
2299 )
2300}
2301
2302fn find_constraint_backing_node_index(
2309 constraints: &[PropertyConstraintSpec],
2310 label: &str,
2311 properties: &[String],
2312) -> Option<String> {
2313 constraints
2314 .iter()
2315 .filter(|c| matches!(&c.scope, ConstraintScope::Node(l) if l == label))
2316 .filter(|c| {
2317 matches!(
2318 c.kind,
2319 PropertyConstraintKind::Unique | PropertyConstraintKind::NodeKey
2320 )
2321 })
2322 .find(|c| c.properties.as_slice() == properties)
2323 .map(|c| c.name.clone())
2324}
2325
2326fn find_constraint_backing_edge_index(
2330 constraints: &[PropertyConstraintSpec],
2331 edge_type: &str,
2332 properties: &[String],
2333) -> Option<String> {
2334 constraints
2335 .iter()
2336 .filter(|c| matches!(&c.scope, ConstraintScope::Relationship(t) if t == edge_type))
2337 .filter(|c| matches!(c.kind, PropertyConstraintKind::Unique))
2338 .find(|c| c.properties.as_slice() == properties)
2339 .map(|c| c.name.clone())
2340}
2341
2342fn validate_existing_data(
2348 engine: &RocksDbStorageEngine,
2349 spec: &PropertyConstraintSpec,
2350) -> Result<()> {
2351 match &spec.scope {
2352 ConstraintScope::Node(label) => validate_existing_nodes(engine, spec, label),
2353 ConstraintScope::Relationship(edge_type) => {
2354 validate_existing_edges(engine, spec, edge_type)
2355 }
2356 }
2357}
2358
2359fn validate_existing_nodes(
2360 engine: &RocksDbStorageEngine,
2361 spec: &PropertyConstraintSpec,
2362 label: &str,
2363) -> Result<()> {
2364 let label_members = engine.nodes_by_label(label)?;
2365 let primary = spec.primary_property();
2366 match spec.kind {
2367 PropertyConstraintKind::NotNull => {
2368 for id in label_members {
2369 let node = match engine.get_node(id)? {
2370 Some(n) => n,
2371 None => continue,
2372 };
2373 let present = node
2374 .properties
2375 .get(primary)
2376 .is_some_and(|v| !matches!(v, Property::Null));
2377 if !present {
2378 return Err(Error::ConstraintViolation {
2379 name: spec.name.clone(),
2380 kind: spec.kind.as_string(),
2381 label: label.to_string(),
2382 property: primary.to_string(),
2383 details: format!("node {id} is missing required property"),
2384 });
2385 }
2386 }
2387 }
2388 PropertyConstraintKind::Unique => {
2389 use std::collections::HashMap;
2390 let mut seen: HashMap<Vec<u8>, meshdb_core::NodeId> = HashMap::new();
2391 for id in label_members {
2392 let node = match engine.get_node(id)? {
2393 Some(n) => n,
2394 None => continue,
2395 };
2396 let Some(value) = node.properties.get(primary) else {
2397 continue;
2398 };
2399 let Some(encoded) = encode_index_value(value) else {
2400 continue;
2401 };
2402 if let Some(&first) = seen.get(&encoded) {
2403 return Err(Error::ConstraintViolation {
2404 name: spec.name.clone(),
2405 kind: spec.kind.as_string(),
2406 label: label.to_string(),
2407 property: primary.to_string(),
2408 details: format!("duplicate value held by nodes {first} and {id}"),
2409 });
2410 }
2411 seen.insert(encoded, id);
2412 }
2413 }
2414 PropertyConstraintKind::PropertyType(target) => {
2415 for id in label_members {
2416 let node = match engine.get_node(id)? {
2417 Some(n) => n,
2418 None => continue,
2419 };
2420 let Some(value) = node.properties.get(primary) else {
2421 continue;
2422 };
2423 if matches!(value, Property::Null) {
2424 continue;
2425 }
2426 if !property_matches_type(value, target) {
2427 return Err(Error::ConstraintViolation {
2428 name: spec.name.clone(),
2429 kind: spec.kind.as_string(),
2430 label: label.to_string(),
2431 property: primary.to_string(),
2432 details: format!(
2433 "node {id} has value of type {} (expected {})",
2434 value.type_name(),
2435 target.as_str()
2436 ),
2437 });
2438 }
2439 }
2440 }
2441 PropertyConstraintKind::NodeKey => {
2442 use std::collections::HashMap;
2443 let mut seen: HashMap<Vec<Vec<u8>>, meshdb_core::NodeId> = HashMap::new();
2444 for id in label_members {
2445 let node = match engine.get_node(id)? {
2446 Some(n) => n,
2447 None => continue,
2448 };
2449 let mut tuple: Vec<Vec<u8>> = Vec::with_capacity(spec.properties.len());
2450 let mut missing: Option<&str> = None;
2451 for prop in &spec.properties {
2452 match node.properties.get(prop) {
2453 Some(v) if !matches!(v, Property::Null) => {
2454 if let Some(encoded) = encode_index_value(v) {
2455 tuple.push(encoded);
2456 } else {
2457 missing = Some(prop);
2458 break;
2459 }
2460 }
2461 _ => {
2462 missing = Some(prop.as_str());
2463 break;
2464 }
2465 }
2466 }
2467 if let Some(prop) = missing {
2468 return Err(Error::ConstraintViolation {
2469 name: spec.name.clone(),
2470 kind: spec.kind.as_string(),
2471 label: label.to_string(),
2472 property: prop.to_string(),
2473 details: format!("node {id} is missing required property `{prop}`"),
2474 });
2475 }
2476 if let Some(&first) = seen.get(&tuple) {
2477 return Err(Error::ConstraintViolation {
2478 name: spec.name.clone(),
2479 kind: spec.kind.as_string(),
2480 label: label.to_string(),
2481 property: spec.properties.join(","),
2482 details: format!("duplicate tuple held by nodes {first} and {id}"),
2483 });
2484 }
2485 seen.insert(tuple, id);
2486 }
2487 }
2488 }
2489 Ok(())
2490}
2491
2492fn validate_existing_edges(
2497 engine: &RocksDbStorageEngine,
2498 spec: &PropertyConstraintSpec,
2499 edge_type: &str,
2500) -> Result<()> {
2501 let edge_ids = engine.edges_by_type(edge_type)?;
2502 let primary = spec.primary_property();
2503 match spec.kind {
2504 PropertyConstraintKind::NotNull => {
2505 for id in edge_ids {
2506 let edge = match engine.get_edge(id)? {
2507 Some(e) => e,
2508 None => continue,
2509 };
2510 let present = edge
2511 .properties
2512 .get(primary)
2513 .is_some_and(|v| !matches!(v, Property::Null));
2514 if !present {
2515 return Err(Error::ConstraintViolation {
2516 name: spec.name.clone(),
2517 kind: spec.kind.as_string(),
2518 label: edge_type.to_string(),
2519 property: primary.to_string(),
2520 details: format!("edge {id} is missing required property"),
2521 });
2522 }
2523 }
2524 }
2525 PropertyConstraintKind::Unique => {
2526 use std::collections::HashMap;
2527 let mut seen: HashMap<Vec<u8>, meshdb_core::EdgeId> = HashMap::new();
2528 for id in edge_ids {
2529 let edge = match engine.get_edge(id)? {
2530 Some(e) => e,
2531 None => continue,
2532 };
2533 let Some(value) = edge.properties.get(primary) else {
2534 continue;
2535 };
2536 let Some(encoded) = encode_index_value(value) else {
2537 continue;
2538 };
2539 if let Some(&first) = seen.get(&encoded) {
2540 return Err(Error::ConstraintViolation {
2541 name: spec.name.clone(),
2542 kind: spec.kind.as_string(),
2543 label: edge_type.to_string(),
2544 property: primary.to_string(),
2545 details: format!("duplicate value held by edges {first} and {id}"),
2546 });
2547 }
2548 seen.insert(encoded, id);
2549 }
2550 }
2551 PropertyConstraintKind::PropertyType(target) => {
2552 for id in edge_ids {
2553 let edge = match engine.get_edge(id)? {
2554 Some(e) => e,
2555 None => continue,
2556 };
2557 let Some(value) = edge.properties.get(primary) else {
2558 continue;
2559 };
2560 if matches!(value, Property::Null) {
2561 continue;
2562 }
2563 if !property_matches_type(value, target) {
2564 return Err(Error::ConstraintViolation {
2565 name: spec.name.clone(),
2566 kind: spec.kind.as_string(),
2567 label: edge_type.to_string(),
2568 property: primary.to_string(),
2569 details: format!(
2570 "edge {id} has value of type {} (expected {})",
2571 value.type_name(),
2572 target.as_str()
2573 ),
2574 });
2575 }
2576 }
2577 }
2578 PropertyConstraintKind::NodeKey => {
2579 unreachable!("NODE KEY on relationship scope rejected at create time")
2580 }
2581 }
2582 Ok(())
2583}
2584
2585fn property_matches_type(value: &Property, target: PropertyType) -> bool {
2591 matches!(
2592 (target, value),
2593 (PropertyType::String, Property::String(_))
2594 | (PropertyType::Integer, Property::Int64(_))
2595 | (PropertyType::Float, Property::Float64(_))
2596 | (PropertyType::Boolean, Property::Bool(_))
2597 )
2598}
2599
2600fn load_constraint_meta(db: &DB) -> Result<Vec<PropertyConstraintSpec>> {
2605 let cf = db
2606 .cf_handle(CF_CONSTRAINT_META)
2607 .ok_or(Error::MissingColumnFamily(CF_CONSTRAINT_META))?;
2608 let mut specs = Vec::new();
2609 for item in db.iterator_cf(cf, IteratorMode::Start) {
2610 let (key, value) = item?;
2611 let name = std::str::from_utf8(&key)
2612 .map_err(|_| Error::CorruptBytes {
2613 cf: CF_CONSTRAINT_META,
2614 expected: key.len(),
2615 actual: key.len(),
2616 })?
2617 .to_string();
2618 specs.push(constraint_meta_decode(CF_CONSTRAINT_META, name, &value)?);
2619 }
2620 Ok(specs)
2621}
2622
2623fn load_index_meta(db: &DB) -> Result<Vec<PropertyIndexSpec>> {
2627 let cf = db
2628 .cf_handle(CF_INDEX_META)
2629 .ok_or(Error::MissingColumnFamily(CF_INDEX_META))?;
2630 let mut specs = Vec::new();
2631 for item in db.iterator_cf(cf, IteratorMode::Start) {
2632 let (key, _) = item?;
2633 specs.push(index_meta_key_decode(&key)?);
2634 }
2635 Ok(specs)
2636}
2637
2638fn load_edge_index_meta(db: &DB) -> Result<Vec<EdgePropertyIndexSpec>> {
2642 let cf = db
2643 .cf_handle(CF_EDGE_INDEX_META)
2644 .ok_or(Error::MissingColumnFamily(CF_EDGE_INDEX_META))?;
2645 let mut specs = Vec::new();
2646 for item in db.iterator_cf(cf, IteratorMode::Start) {
2647 let (key, _) = item?;
2648 specs.push(edge_index_meta_key_decode(&key)?);
2649 }
2650 Ok(specs)
2651}
2652
2653fn load_point_index_meta(db: &DB) -> Result<Vec<PointIndexSpec>> {
2656 let cf = db
2657 .cf_handle(CF_POINT_INDEX_META)
2658 .ok_or(Error::MissingColumnFamily(CF_POINT_INDEX_META))?;
2659 let mut specs = Vec::new();
2660 for item in db.iterator_cf(cf, IteratorMode::Start) {
2661 let (key, _) = item?;
2662 specs.push(point_index_meta_key_decode(&key)?);
2663 }
2664 Ok(specs)
2665}
2666
2667fn load_edge_point_index_meta(db: &DB) -> Result<Vec<EdgePointIndexSpec>> {
2669 let cf = db
2670 .cf_handle(CF_EDGE_POINT_INDEX_META)
2671 .ok_or(Error::MissingColumnFamily(CF_EDGE_POINT_INDEX_META))?;
2672 let mut specs = Vec::new();
2673 for item in db.iterator_cf(cf, IteratorMode::Start) {
2674 let (key, _) = item?;
2675 specs.push(edge_point_index_meta_key_decode(&key)?);
2676 }
2677 Ok(specs)
2678}
2679
2680impl StorageEngine for RocksDbStorageEngine {
2681 fn put_node(&self, node: &Node) -> Result<()> {
2682 RocksDbStorageEngine::put_node(self, node)
2683 }
2684
2685 fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
2686 RocksDbStorageEngine::get_node(self, id)
2687 }
2688
2689 fn detach_delete_node(&self, id: NodeId) -> Result<()> {
2690 RocksDbStorageEngine::detach_delete_node(self, id)
2691 }
2692
2693 fn put_edge(&self, edge: &Edge) -> Result<()> {
2694 RocksDbStorageEngine::put_edge(self, edge)
2695 }
2696
2697 fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
2698 RocksDbStorageEngine::get_edge(self, id)
2699 }
2700
2701 fn delete_edge(&self, id: EdgeId) -> Result<()> {
2702 RocksDbStorageEngine::delete_edge(self, id)
2703 }
2704
2705 fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
2706 RocksDbStorageEngine::apply_batch(self, mutations)
2707 }
2708
2709 fn all_nodes(&self) -> Result<Vec<Node>> {
2710 RocksDbStorageEngine::all_nodes(self)
2711 }
2712
2713 fn all_edges(&self) -> Result<Vec<Edge>> {
2714 RocksDbStorageEngine::all_edges(self)
2715 }
2716
2717 fn all_node_ids(&self) -> Result<Vec<NodeId>> {
2718 RocksDbStorageEngine::all_node_ids(self)
2719 }
2720
2721 fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2722 RocksDbStorageEngine::outgoing(self, source)
2723 }
2724
2725 fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2726 RocksDbStorageEngine::incoming(self, target)
2727 }
2728
2729 fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2730 RocksDbStorageEngine::nodes_by_label(self, label)
2731 }
2732
2733 fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2734 RocksDbStorageEngine::edges_by_type(self, edge_type)
2735 }
2736
2737 fn nodes_by_property(
2738 &self,
2739 label: &str,
2740 property: &str,
2741 value: &Property,
2742 ) -> Result<Vec<NodeId>> {
2743 RocksDbStorageEngine::nodes_by_property(self, label, property, value)
2744 }
2745
2746 fn nodes_by_properties(
2747 &self,
2748 label: &str,
2749 properties: &[String],
2750 values: &[Property],
2751 ) -> Result<Vec<NodeId>> {
2752 RocksDbStorageEngine::nodes_by_properties(self, label, properties, values)
2753 }
2754
2755 fn edges_by_property(
2756 &self,
2757 edge_type: &str,
2758 property: &str,
2759 value: &Property,
2760 ) -> Result<Vec<EdgeId>> {
2761 RocksDbStorageEngine::edges_by_property(self, edge_type, property, value)
2762 }
2763
2764 fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
2765 RocksDbStorageEngine::create_property_index(self, label, property)
2766 }
2767
2768 fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
2769 RocksDbStorageEngine::drop_property_index(self, label, property)
2770 }
2771
2772 fn create_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2773 RocksDbStorageEngine::create_property_index_composite(self, label, properties)
2774 }
2775
2776 fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2777 RocksDbStorageEngine::drop_property_index_composite(self, label, properties)
2778 }
2779
2780 fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
2781 RocksDbStorageEngine::list_property_indexes(self)
2782 }
2783
2784 fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2785 RocksDbStorageEngine::create_edge_property_index(self, edge_type, property)
2786 }
2787
2788 fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2789 RocksDbStorageEngine::drop_edge_property_index(self, edge_type, property)
2790 }
2791
2792 fn create_edge_property_index_composite(
2793 &self,
2794 edge_type: &str,
2795 properties: &[String],
2796 ) -> Result<()> {
2797 RocksDbStorageEngine::create_edge_property_index_composite(self, edge_type, properties)
2798 }
2799
2800 fn drop_edge_property_index_composite(
2801 &self,
2802 edge_type: &str,
2803 properties: &[String],
2804 ) -> Result<()> {
2805 RocksDbStorageEngine::drop_edge_property_index_composite(self, edge_type, properties)
2806 }
2807
2808 fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
2809 RocksDbStorageEngine::list_edge_property_indexes(self)
2810 }
2811
2812 fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
2813 RocksDbStorageEngine::create_point_index(self, label, property)
2814 }
2815
2816 fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
2817 RocksDbStorageEngine::drop_point_index(self, label, property)
2818 }
2819
2820 fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
2821 RocksDbStorageEngine::list_point_indexes(self)
2822 }
2823
2824 fn nodes_in_bbox(
2825 &self,
2826 label: &str,
2827 property: &str,
2828 srid: i32,
2829 xlo: f64,
2830 ylo: f64,
2831 xhi: f64,
2832 yhi: f64,
2833 ) -> Result<Vec<NodeId>> {
2834 RocksDbStorageEngine::nodes_in_bbox(self, label, property, srid, xlo, ylo, xhi, yhi)
2835 }
2836
2837 fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2838 RocksDbStorageEngine::create_edge_point_index(self, edge_type, property)
2839 }
2840
2841 fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2842 RocksDbStorageEngine::drop_edge_point_index(self, edge_type, property)
2843 }
2844
2845 fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2846 RocksDbStorageEngine::list_edge_point_indexes(self)
2847 }
2848
2849 fn edges_in_bbox(
2850 &self,
2851 edge_type: &str,
2852 property: &str,
2853 srid: i32,
2854 xlo: f64,
2855 ylo: f64,
2856 xhi: f64,
2857 yhi: f64,
2858 ) -> Result<Vec<EdgeId>> {
2859 RocksDbStorageEngine::edges_in_bbox(self, edge_type, property, srid, xlo, ylo, xhi, yhi)
2860 }
2861
2862 fn create_property_constraint(
2863 &self,
2864 name: Option<&str>,
2865 scope: &ConstraintScope,
2866 properties: &[String],
2867 kind: PropertyConstraintKind,
2868 if_not_exists: bool,
2869 ) -> Result<PropertyConstraintSpec> {
2870 RocksDbStorageEngine::create_property_constraint(
2871 self,
2872 name,
2873 scope,
2874 properties,
2875 kind,
2876 if_not_exists,
2877 )
2878 }
2879
2880 fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2881 RocksDbStorageEngine::drop_property_constraint(self, name, if_exists)
2882 }
2883
2884 fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
2885 RocksDbStorageEngine::list_property_constraints(self)
2886 }
2887
2888 fn put_trigger(&self, name: &str, value: &[u8]) -> Result<()> {
2889 let cf = self.cf(CF_TRIGGER_META)?;
2890 self.db
2891 .put_cf(cf, name.as_bytes(), value)
2892 .map_err(Error::from)
2893 }
2894
2895 fn delete_trigger(&self, name: &str) -> Result<()> {
2896 let cf = self.cf(CF_TRIGGER_META)?;
2897 self.db.delete_cf(cf, name.as_bytes()).map_err(Error::from)
2898 }
2899
2900 fn list_triggers(&self) -> Result<Vec<(String, Vec<u8>)>> {
2901 let cf = self.cf(CF_TRIGGER_META)?;
2902 let mut out: Vec<(String, Vec<u8>)> = Vec::new();
2903 let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2904 for entry in iter {
2905 let (k, v) = entry.map_err(Error::from)?;
2906 let name = String::from_utf8_lossy(&k).into_owned();
2910 out.push((name, v.to_vec()));
2911 }
2912 Ok(out)
2913 }
2914
2915 fn create_checkpoint(&self, path: &Path) -> Result<()> {
2916 RocksDbStorageEngine::create_checkpoint(self, path)
2917 }
2918
2919 fn clear_all(&self) -> Result<()> {
2920 RocksDbStorageEngine::clear_all(self)
2921 }
2922}