1use crate::{
15 engine::{
16 ConstraintScope, EdgePointIndexSpec, EdgePropertyIndexSpec, GraphMutation, PointIndexSpec,
17 PropertyConstraintKind, PropertyConstraintSpec, PropertyIndexSpec, PropertyType,
18 StorageEngine,
19 },
20 error::{Error, Result},
21 keys::{
22 adj_key, cell_from_point_index_key, constraint_meta_decode, constraint_meta_encode,
23 decode_point_index_value, edge_from_adj_key, edge_id_from_property_index_key,
24 edge_property_index_composite_key, edge_property_index_composite_value_prefix,
25 encode_index_tuple, encode_index_value, id_from_str_index_key, label_index_key,
26 label_index_prefix, node_from_adj_value, node_id_from_point_index_key,
27 node_id_from_property_index_key, parse_property_index_entry_props, point_cell,
28 point_cell_range, point_index_key, point_index_label_prop_prefix, point_index_srid_prefix,
29 point_index_value, property_index_composite_key, property_index_composite_value_prefix,
30 property_index_label_prefix, type_index_key, type_index_prefix, ID_LEN,
31 },
32};
33use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
34use rocksdb::{
35 checkpoint::Checkpoint, ColumnFamilyDescriptor, Direction, IteratorMode, Options, WriteBatch,
36 DB,
37};
38use std::path::Path;
39use std::sync::RwLock;
40
41const CF_NODES: &str = "nodes";
42const CF_EDGES: &str = "edges";
43const CF_ADJ_OUT: &str = "adj_out";
44const CF_ADJ_IN: &str = "adj_in";
45const CF_LABEL_INDEX: &str = "label_index";
46const CF_TYPE_INDEX: &str = "type_index";
47const CF_PROPERTY_INDEX: &str = "property_index";
48const CF_INDEX_META: &str = "index_meta";
49const CF_EDGE_PROPERTY_INDEX: &str = "edge_property_index";
50const CF_EDGE_INDEX_META: &str = "edge_index_meta";
51const CF_CONSTRAINT_META: &str = "constraint_meta";
52const CF_POINT_INDEX: &str = "point_index";
53const CF_POINT_INDEX_META: &str = "point_index_meta";
54const CF_EDGE_POINT_INDEX: &str = "edge_point_index";
55const CF_EDGE_POINT_INDEX_META: &str = "edge_point_index_meta";
56const CF_TRIGGER_META: &str = "trigger_meta";
62const CF_PENDING_TX_META: &str = "pending_tx_meta";
69
70const ALL_CFS: &[&str] = &[
71 CF_NODES,
72 CF_EDGES,
73 CF_ADJ_OUT,
74 CF_ADJ_IN,
75 CF_LABEL_INDEX,
76 CF_TYPE_INDEX,
77 CF_PROPERTY_INDEX,
78 CF_INDEX_META,
79 CF_EDGE_PROPERTY_INDEX,
80 CF_EDGE_INDEX_META,
81 CF_CONSTRAINT_META,
82 CF_POINT_INDEX,
83 CF_POINT_INDEX_META,
84 CF_EDGE_POINT_INDEX,
85 CF_EDGE_POINT_INDEX_META,
86 CF_TRIGGER_META,
87 CF_PENDING_TX_META,
88];
89
90const EMPTY: &[u8] = &[];
91
92fn index_meta_key(spec: &PropertyIndexSpec) -> Vec<u8> {
99 let cap = spec.label.len()
100 + spec.properties.iter().map(|p| p.len()).sum::<usize>()
101 + spec.properties.len();
102 let mut k = Vec::with_capacity(cap);
103 k.extend_from_slice(spec.label.as_bytes());
104 for p in &spec.properties {
105 k.push(0);
106 k.extend_from_slice(p.as_bytes());
107 }
108 k
109}
110
111fn index_meta_key_decode(key: &[u8]) -> Result<PropertyIndexSpec> {
112 let mut parts = key.split(|b| *b == 0);
113 let label_bytes = parts.next().ok_or(Error::CorruptBytes {
114 cf: CF_INDEX_META,
115 expected: 1,
116 actual: 0,
117 })?;
118 let label = std::str::from_utf8(label_bytes)
119 .map_err(|_| Error::CorruptBytes {
120 cf: CF_INDEX_META,
121 expected: label_bytes.len(),
122 actual: label_bytes.len(),
123 })?
124 .to_string();
125 let mut properties: Vec<String> = Vec::new();
126 for part in parts {
127 let s = std::str::from_utf8(part)
128 .map_err(|_| Error::CorruptBytes {
129 cf: CF_INDEX_META,
130 expected: part.len(),
131 actual: part.len(),
132 })?
133 .to_string();
134 properties.push(s);
135 }
136 if properties.is_empty() {
137 return Err(Error::CorruptBytes {
138 cf: CF_INDEX_META,
139 expected: 1,
140 actual: 0,
141 });
142 }
143 Ok(PropertyIndexSpec { label, properties })
144}
145
146fn edge_index_meta_key(spec: &EdgePropertyIndexSpec) -> Vec<u8> {
151 let cap = spec.edge_type.len()
152 + spec.properties.iter().map(|p| p.len()).sum::<usize>()
153 + spec.properties.len();
154 let mut k = Vec::with_capacity(cap);
155 k.extend_from_slice(spec.edge_type.as_bytes());
156 for p in &spec.properties {
157 k.push(0);
158 k.extend_from_slice(p.as_bytes());
159 }
160 k
161}
162
163fn edge_index_meta_key_decode(key: &[u8]) -> Result<EdgePropertyIndexSpec> {
164 let mut parts = key.split(|b| *b == 0);
165 let type_bytes = parts.next().ok_or(Error::CorruptBytes {
166 cf: CF_EDGE_INDEX_META,
167 expected: 1,
168 actual: 0,
169 })?;
170 let edge_type = std::str::from_utf8(type_bytes)
171 .map_err(|_| Error::CorruptBytes {
172 cf: CF_EDGE_INDEX_META,
173 expected: type_bytes.len(),
174 actual: type_bytes.len(),
175 })?
176 .to_string();
177 let mut properties: Vec<String> = Vec::new();
178 for part in parts {
179 let s = std::str::from_utf8(part)
180 .map_err(|_| Error::CorruptBytes {
181 cf: CF_EDGE_INDEX_META,
182 expected: part.len(),
183 actual: part.len(),
184 })?
185 .to_string();
186 properties.push(s);
187 }
188 if properties.is_empty() {
189 return Err(Error::CorruptBytes {
190 cf: CF_EDGE_INDEX_META,
191 expected: 1,
192 actual: 0,
193 });
194 }
195 Ok(EdgePropertyIndexSpec {
196 edge_type,
197 properties,
198 })
199}
200
201fn point_index_meta_key(spec: &PointIndexSpec) -> Vec<u8> {
207 let mut k = Vec::with_capacity(spec.label.len() + spec.property.len() + 1);
208 k.extend_from_slice(spec.label.as_bytes());
209 k.push(0);
210 k.extend_from_slice(spec.property.as_bytes());
211 k
212}
213
214fn point_index_meta_key_decode(key: &[u8]) -> Result<PointIndexSpec> {
215 let mut parts = key.splitn(2, |b| *b == 0);
216 let label_bytes = parts.next().ok_or(Error::CorruptBytes {
217 cf: CF_POINT_INDEX_META,
218 expected: 1,
219 actual: 0,
220 })?;
221 let property_bytes = parts.next().ok_or(Error::CorruptBytes {
222 cf: CF_POINT_INDEX_META,
223 expected: label_bytes.len() + 2,
224 actual: key.len(),
225 })?;
226 let label = std::str::from_utf8(label_bytes)
227 .map_err(|_| Error::CorruptBytes {
228 cf: CF_POINT_INDEX_META,
229 expected: label_bytes.len(),
230 actual: label_bytes.len(),
231 })?
232 .to_string();
233 let property = std::str::from_utf8(property_bytes)
234 .map_err(|_| Error::CorruptBytes {
235 cf: CF_POINT_INDEX_META,
236 expected: property_bytes.len(),
237 actual: property_bytes.len(),
238 })?
239 .to_string();
240 if property.is_empty() {
241 return Err(Error::CorruptBytes {
242 cf: CF_POINT_INDEX_META,
243 expected: 1,
244 actual: 0,
245 });
246 }
247 Ok(PointIndexSpec { label, property })
248}
249
250fn edge_point_index_meta_key(spec: &EdgePointIndexSpec) -> Vec<u8> {
253 let mut k = Vec::with_capacity(spec.edge_type.len() + spec.property.len() + 1);
254 k.extend_from_slice(spec.edge_type.as_bytes());
255 k.push(0);
256 k.extend_from_slice(spec.property.as_bytes());
257 k
258}
259
260fn edge_point_index_meta_key_decode(key: &[u8]) -> Result<EdgePointIndexSpec> {
261 let mut parts = key.splitn(2, |b| *b == 0);
262 let type_bytes = parts.next().ok_or(Error::CorruptBytes {
263 cf: CF_EDGE_POINT_INDEX_META,
264 expected: 1,
265 actual: 0,
266 })?;
267 let property_bytes = parts.next().ok_or(Error::CorruptBytes {
268 cf: CF_EDGE_POINT_INDEX_META,
269 expected: type_bytes.len() + 2,
270 actual: key.len(),
271 })?;
272 let edge_type = std::str::from_utf8(type_bytes)
273 .map_err(|_| Error::CorruptBytes {
274 cf: CF_EDGE_POINT_INDEX_META,
275 expected: type_bytes.len(),
276 actual: type_bytes.len(),
277 })?
278 .to_string();
279 let property = std::str::from_utf8(property_bytes)
280 .map_err(|_| Error::CorruptBytes {
281 cf: CF_EDGE_POINT_INDEX_META,
282 expected: property_bytes.len(),
283 actual: property_bytes.len(),
284 })?
285 .to_string();
286 if property.is_empty() {
287 return Err(Error::CorruptBytes {
288 cf: CF_EDGE_POINT_INDEX_META,
289 expected: 1,
290 actual: 0,
291 });
292 }
293 Ok(EdgePointIndexSpec {
294 edge_type,
295 property,
296 })
297}
298
299pub struct RocksDbStorageEngine {
300 db: DB,
301 indexes: RwLock<Vec<PropertyIndexSpec>>,
312 edge_indexes: RwLock<Vec<EdgePropertyIndexSpec>>,
321 constraints: RwLock<Vec<PropertyConstraintSpec>>,
332 point_indexes: RwLock<Vec<PointIndexSpec>>,
340 edge_point_indexes: RwLock<Vec<EdgePointIndexSpec>>,
345}
346
347#[derive(Debug, Clone)]
352pub struct StorageOptions {
353 pub max_open_files: i32,
359 pub keep_log_file_num: usize,
364 pub write_buffer_size_bytes: Option<usize>,
370 pub max_write_buffer_number: Option<i32>,
375 pub bloom_filter_bits_per_key: Option<i32>,
382}
383
384impl Default for StorageOptions {
385 fn default() -> Self {
386 Self {
387 max_open_files: 64,
388 keep_log_file_num: 4,
389 write_buffer_size_bytes: None,
390 max_write_buffer_number: None,
391 bloom_filter_bits_per_key: None,
392 }
393 }
394}
395
396impl RocksDbStorageEngine {
397 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
398 Self::open_with_options(path, StorageOptions::default())
399 }
400
401 pub fn open_with_options(path: impl AsRef<Path>, opts: StorageOptions) -> Result<Self> {
406 let mut db_opts = Options::default();
407 db_opts.create_if_missing(true);
408 db_opts.create_missing_column_families(true);
409 db_opts.set_max_open_files(opts.max_open_files);
410 db_opts.set_keep_log_file_num(opts.keep_log_file_num);
411
412 let mut cf_opts = Options::default();
417 if let Some(sz) = opts.write_buffer_size_bytes {
418 cf_opts.set_write_buffer_size(sz);
419 }
420 if let Some(n) = opts.max_write_buffer_number {
421 cf_opts.set_max_write_buffer_number(n);
422 }
423 if let Some(bits) = opts.bloom_filter_bits_per_key {
424 let mut block_opts = rocksdb::BlockBasedOptions::default();
425 block_opts.set_bloom_filter(bits as f64, false);
426 cf_opts.set_block_based_table_factory(&block_opts);
427 }
428
429 let cfs: Vec<ColumnFamilyDescriptor> = ALL_CFS
430 .iter()
431 .map(|name| ColumnFamilyDescriptor::new(*name, cf_opts.clone()))
432 .collect();
433
434 let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
435 let indexes = load_index_meta(&db)?;
436 let edge_indexes = load_edge_index_meta(&db)?;
437 let constraints = load_constraint_meta(&db)?;
438 let point_indexes = load_point_index_meta(&db)?;
439 let edge_point_indexes = load_edge_point_index_meta(&db)?;
440 Ok(Self {
441 db,
442 indexes: RwLock::new(indexes),
443 edge_indexes: RwLock::new(edge_indexes),
444 constraints: RwLock::new(constraints),
445 point_indexes: RwLock::new(point_indexes),
446 edge_point_indexes: RwLock::new(edge_point_indexes),
447 })
448 }
449
450 fn cf(&self, name: &'static str) -> Result<&rocksdb::ColumnFamily> {
451 self.db
452 .cf_handle(name)
453 .ok_or(Error::MissingColumnFamily(name))
454 }
455
456 pub fn put_node(&self, node: &Node) -> Result<()> {
457 let mut batch = WriteBatch::default();
458 self.append_put_node(&mut batch, node)?;
459 self.db.write(batch)?;
460 Ok(())
461 }
462
463 fn append_put_node(&self, batch: &mut WriteBatch, node: &Node) -> Result<()> {
464 let nodes_cf = self.cf(CF_NODES)?;
465 let label_cf = self.cf(CF_LABEL_INDEX)?;
466 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
467
468 let existing: Option<Node> = match self.db.get_cf(nodes_cf, node.id.as_bytes())? {
469 Some(bytes) => Some(serde_json::from_slice(&bytes)?),
470 None => None,
471 };
472 let existing_labels: &[String] = existing.as_ref().map(|n| &n.labels[..]).unwrap_or(&[]);
473
474 self.enforce_constraints(node, existing.as_ref())?;
482
483 let bytes = serde_json::to_vec(node)?;
484 batch.put_cf(nodes_cf, node.id.as_bytes(), bytes);
485
486 for old in existing_labels {
487 if !node.labels.contains(old) {
488 batch.delete_cf(label_cf, label_index_key(old, node.id));
489 }
490 }
491 for new in &node.labels {
492 if !existing_labels.contains(new) {
493 batch.put_cf(label_cf, label_index_key(new, node.id), EMPTY);
494 }
495 }
496
497 let indexes = self.indexes.read().expect("indexes lock poisoned");
503 for spec in indexes.iter() {
504 let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
505 let now_indexed = node.labels.iter().any(|l| l == &spec.label);
506 let old_encoded = if was_indexed {
507 existing
508 .as_ref()
509 .and_then(|n| encode_index_tuple(&n.properties, &spec.properties))
510 } else {
511 None
512 };
513 let new_encoded = if now_indexed {
514 encode_index_tuple(&node.properties, &spec.properties)
515 } else {
516 None
517 };
518 if old_encoded == new_encoded {
519 continue;
520 }
521 if let Some(values) = &old_encoded {
522 batch.delete_cf(
523 prop_cf,
524 property_index_composite_key(&spec.label, &spec.properties, values, node.id),
525 );
526 }
527 if let Some(values) = &new_encoded {
528 batch.put_cf(
529 prop_cf,
530 property_index_composite_key(&spec.label, &spec.properties, values, node.id),
531 EMPTY,
532 );
533 }
534 }
535
536 let point_indexes = self
541 .point_indexes
542 .read()
543 .expect("point_indexes lock poisoned");
544 if !point_indexes.is_empty() {
545 let point_cf = self.cf(CF_POINT_INDEX)?;
546 for spec in point_indexes.iter() {
547 let was_indexed = existing_labels.iter().any(|l| l == &spec.label);
548 let now_indexed = node.labels.iter().any(|l| l == &spec.label);
549 let old_point = if was_indexed {
550 existing
551 .as_ref()
552 .and_then(|n| extract_indexable_point(&n.properties, &spec.property))
553 } else {
554 None
555 };
556 let new_point = if now_indexed {
557 extract_indexable_point(&node.properties, &spec.property)
558 } else {
559 None
560 };
561 if old_point == new_point {
562 continue;
563 }
564 if let Some(p) = &old_point {
565 let cell = point_cell(p.srid, p.x, p.y);
566 batch.delete_cf(
567 point_cf,
568 point_index_key(
569 &spec.label,
570 &spec.property,
571 p.srid,
572 cell,
573 node.id.as_bytes(),
574 ),
575 );
576 }
577 if let Some(p) = &new_point {
578 let cell = point_cell(p.srid, p.x, p.y);
579 batch.put_cf(
580 point_cf,
581 point_index_key(
582 &spec.label,
583 &spec.property,
584 p.srid,
585 cell,
586 node.id.as_bytes(),
587 ),
588 point_index_value(p.x, p.y, p.z),
589 );
590 }
591 }
592 }
593
594 Ok(())
595 }
596
597 pub fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
598 let cf = self.cf(CF_NODES)?;
599 match self.db.get_cf(cf, id.as_bytes())? {
600 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
601 None => Ok(None),
602 }
603 }
604
605 pub fn put_edge(&self, edge: &Edge) -> Result<()> {
606 let mut batch = WriteBatch::default();
607 self.append_put_edge(&mut batch, edge)?;
608 self.db.write(batch)?;
609 Ok(())
610 }
611
612 fn append_put_edge(&self, batch: &mut WriteBatch, edge: &Edge) -> Result<()> {
613 let edges_cf = self.cf(CF_EDGES)?;
614 let out_cf = self.cf(CF_ADJ_OUT)?;
615 let in_cf = self.cf(CF_ADJ_IN)?;
616 let type_cf = self.cf(CF_TYPE_INDEX)?;
617 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
618
619 let existing_edge = self.get_edge(edge.id)?;
624 self.enforce_edge_constraints(edge, existing_edge.as_ref())?;
625
626 let bytes = serde_json::to_vec(edge)?;
627 batch.put_cf(edges_cf, edge.id.as_bytes(), bytes);
628 batch.put_cf(
629 out_cf,
630 adj_key(edge.source, edge.id),
631 edge.target.as_bytes(),
632 );
633 batch.put_cf(in_cf, adj_key(edge.target, edge.id), edge.source.as_bytes());
634 batch.put_cf(type_cf, type_index_key(&edge.edge_type, edge.id), EMPTY);
635
636 let edge_indexes = self
644 .edge_indexes
645 .read()
646 .expect("edge_indexes lock poisoned");
647 for spec in edge_indexes.iter() {
648 if spec.edge_type != edge.edge_type {
649 continue;
650 }
651 let old_encoded = existing_edge
652 .as_ref()
653 .and_then(|e| encode_index_tuple(&e.properties, &spec.properties));
654 let new_encoded = encode_index_tuple(&edge.properties, &spec.properties);
655 if old_encoded == new_encoded {
656 continue;
657 }
658 if let Some(values) = &old_encoded {
659 batch.delete_cf(
660 edge_prop_cf,
661 edge_property_index_composite_key(
662 &spec.edge_type,
663 &spec.properties,
664 values,
665 edge.id,
666 ),
667 );
668 }
669 if let Some(values) = &new_encoded {
670 batch.put_cf(
671 edge_prop_cf,
672 edge_property_index_composite_key(
673 &spec.edge_type,
674 &spec.properties,
675 values,
676 edge.id,
677 ),
678 EMPTY,
679 );
680 }
681 }
682
683 let edge_point_indexes = self
688 .edge_point_indexes
689 .read()
690 .expect("edge_point_indexes lock poisoned");
691 if !edge_point_indexes.is_empty() {
692 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
693 for spec in edge_point_indexes.iter() {
694 if spec.edge_type != edge.edge_type {
695 continue;
696 }
697 let old_point = existing_edge
698 .as_ref()
699 .and_then(|e| extract_indexable_point(&e.properties, &spec.property));
700 let new_point = extract_indexable_point(&edge.properties, &spec.property);
701 if old_point == new_point {
702 continue;
703 }
704 if let Some(p) = &old_point {
705 let cell = point_cell(p.srid, p.x, p.y);
706 batch.delete_cf(
707 point_cf,
708 point_index_key(
709 &spec.edge_type,
710 &spec.property,
711 p.srid,
712 cell,
713 edge.id.as_bytes(),
714 ),
715 );
716 }
717 if let Some(p) = &new_point {
718 let cell = point_cell(p.srid, p.x, p.y);
719 batch.put_cf(
720 point_cf,
721 point_index_key(
722 &spec.edge_type,
723 &spec.property,
724 p.srid,
725 cell,
726 edge.id.as_bytes(),
727 ),
728 point_index_value(p.x, p.y, p.z),
729 );
730 }
731 }
732 }
733 Ok(())
734 }
735
736 pub fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
737 let cf = self.cf(CF_EDGES)?;
738 match self.db.get_cf(cf, id.as_bytes())? {
739 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
740 None => Ok(None),
741 }
742 }
743
744 pub fn delete_edge(&self, id: EdgeId) -> Result<()> {
745 let edge = self.get_edge(id)?.ok_or(Error::EdgeNotFound(id))?;
746 let mut batch = WriteBatch::default();
747 self.append_delete_edge(&mut batch, id, &edge)?;
748 self.db.write(batch)?;
749 Ok(())
750 }
751
752 fn append_delete_edge(&self, batch: &mut WriteBatch, id: EdgeId, edge: &Edge) -> Result<()> {
753 let edges_cf = self.cf(CF_EDGES)?;
754 let out_cf = self.cf(CF_ADJ_OUT)?;
755 let in_cf = self.cf(CF_ADJ_IN)?;
756 let type_cf = self.cf(CF_TYPE_INDEX)?;
757 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
758
759 batch.delete_cf(edges_cf, id.as_bytes());
760 batch.delete_cf(out_cf, adj_key(edge.source, id));
761 batch.delete_cf(in_cf, adj_key(edge.target, id));
762 batch.delete_cf(type_cf, type_index_key(&edge.edge_type, id));
763
764 let edge_indexes = self
769 .edge_indexes
770 .read()
771 .expect("edge_indexes lock poisoned");
772 for spec in edge_indexes.iter() {
773 if spec.edge_type != edge.edge_type {
774 continue;
775 }
776 if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
777 batch.delete_cf(
778 edge_prop_cf,
779 edge_property_index_composite_key(
780 &spec.edge_type,
781 &spec.properties,
782 &values,
783 id,
784 ),
785 );
786 }
787 }
788
789 let edge_point_indexes = self
792 .edge_point_indexes
793 .read()
794 .expect("edge_point_indexes lock poisoned");
795 if !edge_point_indexes.is_empty() {
796 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
797 for spec in edge_point_indexes.iter() {
798 if spec.edge_type != edge.edge_type {
799 continue;
800 }
801 if let Some(p) = extract_indexable_point(&edge.properties, &spec.property) {
802 let cell = point_cell(p.srid, p.x, p.y);
803 batch.delete_cf(
804 point_cf,
805 point_index_key(
806 &spec.edge_type,
807 &spec.property,
808 p.srid,
809 cell,
810 id.as_bytes(),
811 ),
812 );
813 }
814 }
815 }
816 Ok(())
817 }
818
819 pub fn detach_delete_node(&self, id: NodeId) -> Result<()> {
820 let mut batch = WriteBatch::default();
821 self.append_detach_delete_node(&mut batch, id)?;
822 self.db.write(batch)?;
823 Ok(())
824 }
825
826 fn append_detach_delete_node(&self, batch: &mut WriteBatch, id: NodeId) -> Result<()> {
827 let node = self.get_node(id)?;
828 let outgoing = self.outgoing(id)?;
829 let incoming = self.incoming(id)?;
830
831 let nodes_cf = self.cf(CF_NODES)?;
832 let edges_cf = self.cf(CF_EDGES)?;
833 let out_cf = self.cf(CF_ADJ_OUT)?;
834 let in_cf = self.cf(CF_ADJ_IN)?;
835 let label_cf = self.cf(CF_LABEL_INDEX)?;
836 let type_cf = self.cf(CF_TYPE_INDEX)?;
837 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
838 let edge_prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
839
840 if let Some(n) = &node {
841 for label in &n.labels {
842 batch.delete_cf(label_cf, label_index_key(label, id));
843 }
844 let indexes = self.indexes.read().expect("indexes lock poisoned");
849 for spec in indexes.iter() {
850 if !n.labels.iter().any(|l| l == &spec.label) {
851 continue;
852 }
853 if let Some(values) = encode_index_tuple(&n.properties, &spec.properties) {
854 batch.delete_cf(
855 prop_cf,
856 property_index_composite_key(&spec.label, &spec.properties, &values, id),
857 );
858 }
859 }
860 let point_indexes = self
863 .point_indexes
864 .read()
865 .expect("point_indexes lock poisoned");
866 if !point_indexes.is_empty() {
867 let point_cf = self.cf(CF_POINT_INDEX)?;
868 for spec in point_indexes.iter() {
869 if !n.labels.iter().any(|l| l == &spec.label) {
870 continue;
871 }
872 if let Some(p) = extract_indexable_point(&n.properties, &spec.property) {
873 let cell = point_cell(p.srid, p.x, p.y);
874 batch.delete_cf(
875 point_cf,
876 point_index_key(
877 &spec.label,
878 &spec.property,
879 p.srid,
880 cell,
881 id.as_bytes(),
882 ),
883 );
884 }
885 }
886 }
887 }
888
889 let edge_indexes_snapshot = self
893 .edge_indexes
894 .read()
895 .expect("edge_indexes lock poisoned")
896 .clone();
897 let edge_point_indexes_snapshot = self
898 .edge_point_indexes
899 .read()
900 .expect("edge_point_indexes lock poisoned")
901 .clone();
902 let edge_point_cf_opt = if edge_point_indexes_snapshot.is_empty() {
903 None
904 } else {
905 Some(self.cf(CF_EDGE_POINT_INDEX)?)
906 };
907
908 for (edge_id, target) in &outgoing {
909 if let Some(e) = self.get_edge(*edge_id)? {
910 batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
911 for spec in &edge_indexes_snapshot {
912 if spec.edge_type != e.edge_type {
913 continue;
914 }
915 if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
916 batch.delete_cf(
917 edge_prop_cf,
918 edge_property_index_composite_key(
919 &spec.edge_type,
920 &spec.properties,
921 &values,
922 *edge_id,
923 ),
924 );
925 }
926 }
927 if let Some(point_cf) = edge_point_cf_opt {
928 for spec in &edge_point_indexes_snapshot {
929 if spec.edge_type != e.edge_type {
930 continue;
931 }
932 if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
933 let cell = point_cell(p.srid, p.x, p.y);
934 batch.delete_cf(
935 point_cf,
936 point_index_key(
937 &spec.edge_type,
938 &spec.property,
939 p.srid,
940 cell,
941 edge_id.as_bytes(),
942 ),
943 );
944 }
945 }
946 }
947 }
948 batch.delete_cf(edges_cf, edge_id.as_bytes());
949 batch.delete_cf(out_cf, adj_key(id, *edge_id));
950 batch.delete_cf(in_cf, adj_key(*target, *edge_id));
951 }
952 for (edge_id, source) in &incoming {
953 if let Some(e) = self.get_edge(*edge_id)? {
954 batch.delete_cf(type_cf, type_index_key(&e.edge_type, *edge_id));
955 for spec in &edge_indexes_snapshot {
956 if spec.edge_type != e.edge_type {
957 continue;
958 }
959 if let Some(values) = encode_index_tuple(&e.properties, &spec.properties) {
960 batch.delete_cf(
961 edge_prop_cf,
962 edge_property_index_composite_key(
963 &spec.edge_type,
964 &spec.properties,
965 &values,
966 *edge_id,
967 ),
968 );
969 }
970 }
971 if let Some(point_cf) = edge_point_cf_opt {
972 for spec in &edge_point_indexes_snapshot {
973 if spec.edge_type != e.edge_type {
974 continue;
975 }
976 if let Some(p) = extract_indexable_point(&e.properties, &spec.property) {
977 let cell = point_cell(p.srid, p.x, p.y);
978 batch.delete_cf(
979 point_cf,
980 point_index_key(
981 &spec.edge_type,
982 &spec.property,
983 p.srid,
984 cell,
985 edge_id.as_bytes(),
986 ),
987 );
988 }
989 }
990 }
991 }
992 batch.delete_cf(edges_cf, edge_id.as_bytes());
993 batch.delete_cf(out_cf, adj_key(*source, *edge_id));
994 batch.delete_cf(in_cf, adj_key(id, *edge_id));
995 }
996 batch.delete_cf(nodes_cf, id.as_bytes());
997 Ok(())
998 }
999
1000 pub fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
1011 let mut batch = WriteBatch::default();
1012 for m in mutations {
1013 match m {
1014 GraphMutation::PutNode(n) => self.append_put_node(&mut batch, n)?,
1015 GraphMutation::PutEdge(e) => self.append_put_edge(&mut batch, e)?,
1016 GraphMutation::DeleteEdge(id) => {
1017 if let Some(edge) = self.get_edge(*id)? {
1018 self.append_delete_edge(&mut batch, *id, &edge)?;
1019 }
1020 }
1021 GraphMutation::DetachDeleteNode(id) => {
1022 self.append_detach_delete_node(&mut batch, *id)?;
1023 }
1024 }
1025 }
1026 self.db.write(batch)?;
1027 Ok(())
1028 }
1029
1030 pub fn all_nodes(&self) -> Result<Vec<Node>> {
1033 let cf = self.cf(CF_NODES)?;
1034 let mut nodes = Vec::new();
1035 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1036 let (_, value) = item?;
1037 nodes.push(serde_json::from_slice(&value)?);
1038 }
1039 Ok(nodes)
1040 }
1041
1042 pub fn all_edges(&self) -> Result<Vec<Edge>> {
1044 let cf = self.cf(CF_EDGES)?;
1045 let mut edges = Vec::new();
1046 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1047 let (_, value) = item?;
1048 edges.push(serde_json::from_slice(&value)?);
1049 }
1050 Ok(edges)
1051 }
1052
1053 pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<()> {
1065 let checkpoint = Checkpoint::new(&self.db)?;
1066 checkpoint.create_checkpoint(path)?;
1067 Ok(())
1068 }
1069
1070 pub fn clear_all(&self) -> Result<()> {
1074 let mut batch = WriteBatch::default();
1075 for cf_name in ALL_CFS {
1076 let cf = self.cf(cf_name)?;
1077 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1078 let (key, _) = item?;
1079 batch.delete_cf(cf, key);
1080 }
1081 }
1082 self.db.write(batch)?;
1083 Ok(())
1084 }
1085
1086 pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
1087 let cf = self.cf(CF_NODES)?;
1088 let mut results = Vec::new();
1089 for item in self.db.iterator_cf(cf, IteratorMode::Start) {
1090 let (key, _) = item?;
1091 if key.len() != ID_LEN {
1092 return Err(Error::CorruptBytes {
1093 cf: CF_NODES,
1094 expected: ID_LEN,
1095 actual: key.len(),
1096 });
1097 }
1098 let mut bytes = [0u8; ID_LEN];
1099 bytes.copy_from_slice(&key);
1100 results.push(NodeId::from_bytes(bytes));
1101 }
1102 Ok(results)
1103 }
1104
1105 pub fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1106 self.scan_adj(CF_ADJ_OUT, source)
1107 }
1108
1109 pub fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
1110 self.scan_adj(CF_ADJ_IN, target)
1111 }
1112
1113 pub fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
1116 self.indexes.read().expect("indexes lock poisoned").clone()
1117 }
1118
1119 pub fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
1130 self.create_property_index_composite(label, &[property.to_string()])
1131 }
1132
1133 pub fn create_property_index_composite(
1144 &self,
1145 label: &str,
1146 properties: &[String],
1147 ) -> Result<()> {
1148 assert!(
1149 !properties.is_empty(),
1150 "create_property_index_composite requires at least one property"
1151 );
1152 let spec = PropertyIndexSpec {
1153 label: label.to_string(),
1154 properties: properties.to_vec(),
1155 };
1156 let mut guard = self.indexes.write().expect("indexes lock poisoned");
1157 if guard.contains(&spec) {
1158 return Ok(());
1159 }
1160
1161 let meta_cf = self.cf(CF_INDEX_META)?;
1162 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1163 let mut batch = WriteBatch::default();
1164 batch.put_cf(meta_cf, index_meta_key(&spec), EMPTY);
1165
1166 for node_id in self.nodes_by_label(label)? {
1167 let node = match self.get_node(node_id)? {
1168 Some(n) => n,
1169 None => continue,
1170 };
1171 if let Some(values) = encode_index_tuple(&node.properties, &spec.properties) {
1172 batch.put_cf(
1173 prop_cf,
1174 property_index_composite_key(label, properties, &values, node_id),
1175 EMPTY,
1176 );
1177 }
1178 }
1179
1180 self.db.write(batch)?;
1181 guard.push(spec);
1182 Ok(())
1183 }
1184
1185 pub fn create_property_constraint(
1210 &self,
1211 name: Option<&str>,
1212 scope: &ConstraintScope,
1213 properties: &[String],
1214 kind: PropertyConstraintKind,
1215 if_not_exists: bool,
1216 ) -> Result<PropertyConstraintSpec> {
1217 if properties.is_empty() {
1224 return Err(Error::ConstraintArity {
1225 kind: kind.as_string(),
1226 details: "at least one property is required".into(),
1227 });
1228 }
1229 if !kind.allows_multi_property() && properties.len() != 1 {
1230 return Err(Error::ConstraintArity {
1231 kind: kind.as_string(),
1232 details: format!("expects exactly one property, got {}", properties.len()),
1233 });
1234 }
1235 if matches!(kind, PropertyConstraintKind::NodeKey)
1236 && matches!(scope, ConstraintScope::Relationship(_))
1237 {
1238 return Err(Error::ConstraintArity {
1239 kind: kind.as_string(),
1240 details: "NODE KEY cannot be applied to a relationship scope".into(),
1241 });
1242 }
1243 let resolved_name = match name {
1244 Some(n) => n.to_string(),
1245 None => default_constraint_name(scope, properties, kind),
1246 };
1247 let spec = PropertyConstraintSpec {
1248 name: resolved_name.clone(),
1249 scope: scope.clone(),
1250 properties: properties.to_vec(),
1251 kind,
1252 };
1253
1254 {
1258 let guard = self.constraints.read().expect("constraints lock poisoned");
1259 if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1260 if existing == &spec {
1261 return Ok(existing.clone());
1262 }
1263 if if_not_exists {
1264 return Ok(existing.clone());
1265 }
1266 return Err(Error::ConstraintNameConflict {
1267 name: resolved_name,
1268 });
1269 }
1270 }
1271
1272 match (kind, scope) {
1282 (PropertyConstraintKind::Unique, ConstraintScope::Node(label)) => {
1283 self.create_property_index(label, &properties[0])?;
1284 }
1285 (PropertyConstraintKind::Unique, ConstraintScope::Relationship(edge_type)) => {
1286 self.create_edge_property_index(edge_type, &properties[0])?;
1287 }
1288 (PropertyConstraintKind::NodeKey, ConstraintScope::Node(label)) => {
1289 self.create_property_index_composite(label, properties)?;
1290 }
1291 _ => {}
1292 }
1293
1294 validate_existing_data(self, &spec)?;
1299
1300 let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1301 let mut batch = WriteBatch::default();
1302 batch.put_cf(
1303 meta_cf,
1304 resolved_name.as_bytes(),
1305 constraint_meta_encode(&spec),
1306 );
1307 self.db.write(batch)?;
1308
1309 let mut guard = self.constraints.write().expect("constraints lock poisoned");
1310 if let Some(existing) = guard.iter().find(|s| s.name == resolved_name) {
1315 return Ok(existing.clone());
1316 }
1317 guard.push(spec.clone());
1318 Ok(spec)
1319 }
1320
1321 pub fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1326 let mut guard = self.constraints.write().expect("constraints lock poisoned");
1327 let idx = guard.iter().position(|s| s.name == name);
1328 match idx {
1329 None if if_exists => Ok(()),
1330 None => Err(Error::ConstraintNotFound {
1331 name: name.to_string(),
1332 }),
1333 Some(i) => {
1334 let meta_cf = self.cf(CF_CONSTRAINT_META)?;
1335 let mut batch = WriteBatch::default();
1336 batch.delete_cf(meta_cf, name.as_bytes());
1337 self.db.write(batch)?;
1338 guard.remove(i);
1339 Ok(())
1340 }
1341 }
1342 }
1343
1344 pub fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
1347 self.constraints
1348 .read()
1349 .expect("constraints lock poisoned")
1350 .clone()
1351 }
1352
1353 fn enforce_constraints(&self, node: &Node, existing: Option<&Node>) -> Result<()> {
1361 let constraints = self.constraints.read().expect("constraints lock poisoned");
1362 if constraints.is_empty() {
1363 return Ok(());
1364 }
1365 let now_has_label = |label: &str| node.labels.iter().any(|l| l == label);
1366 for spec in constraints.iter() {
1367 let label = match &spec.scope {
1368 ConstraintScope::Node(l) => l,
1369 ConstraintScope::Relationship(_) => continue,
1373 };
1374 if !now_has_label(label) {
1375 continue;
1376 }
1377 let primary = spec.primary_property();
1380 match spec.kind {
1381 PropertyConstraintKind::NotNull => {
1382 let present = node
1383 .properties
1384 .get(primary)
1385 .is_some_and(|v| !matches!(v, Property::Null));
1386 if !present {
1387 return Err(Error::ConstraintViolation {
1388 name: spec.name.clone(),
1389 kind: spec.kind.as_string(),
1390 label: label.clone(),
1391 property: primary.to_string(),
1392 details: format!(
1393 "node {} is missing required property `{}`",
1394 node.id, primary
1395 ),
1396 });
1397 }
1398 }
1399 PropertyConstraintKind::Unique => {
1400 let Some(value) = node.properties.get(primary) else {
1401 continue;
1405 };
1406 if matches!(value, Property::Null) {
1407 continue;
1408 }
1409 if encode_index_value(value).is_none() {
1410 continue;
1411 }
1412 let holders = self.nodes_by_property(label, primary, value)?;
1413 for other in holders {
1414 if other == node.id {
1415 continue;
1416 }
1417 let was_self = existing
1418 .and_then(|n| n.properties.get(primary))
1419 .is_some_and(|v| v == value);
1420 let _ = was_self;
1421 return Err(Error::ConstraintViolation {
1422 name: spec.name.clone(),
1423 kind: spec.kind.as_string(),
1424 label: label.clone(),
1425 property: primary.to_string(),
1426 details: format!("value already held by node {}", other),
1427 });
1428 }
1429 }
1430 PropertyConstraintKind::PropertyType(target) => {
1431 let Some(value) = node.properties.get(primary) else {
1432 continue;
1433 };
1434 if matches!(value, Property::Null) {
1435 continue;
1436 }
1437 if !property_matches_type(value, target) {
1438 return Err(Error::ConstraintViolation {
1439 name: spec.name.clone(),
1440 kind: spec.kind.as_string(),
1441 label: label.clone(),
1442 property: primary.to_string(),
1443 details: format!(
1444 "node {} has value of type {} (expected {})",
1445 node.id,
1446 value.type_name(),
1447 target.as_str()
1448 ),
1449 });
1450 }
1451 }
1452 PropertyConstraintKind::NodeKey => {
1453 let mut my_tuple: Vec<Property> = Vec::with_capacity(spec.properties.len());
1459 for prop in &spec.properties {
1460 match node.properties.get(prop) {
1461 Some(v) if !matches!(v, Property::Null) => my_tuple.push(v.clone()),
1462 _ => {
1463 return Err(Error::ConstraintViolation {
1464 name: spec.name.clone(),
1465 kind: spec.kind.as_string(),
1466 label: label.clone(),
1467 property: prop.clone(),
1468 details: format!(
1469 "node {} is missing required property `{}`",
1470 node.id, prop
1471 ),
1472 });
1473 }
1474 }
1475 }
1476 let hits = self.nodes_by_properties(label, &spec.properties, &my_tuple)?;
1480 if let Some(other_id) = hits.iter().find(|&&id| id != node.id) {
1481 return Err(Error::ConstraintViolation {
1482 name: spec.name.clone(),
1483 kind: spec.kind.as_string(),
1484 label: label.clone(),
1485 property: spec.properties.join(","),
1486 details: format!("tuple already held by node {}", other_id),
1487 });
1488 }
1489 }
1490 }
1491 }
1492 Ok(())
1493 }
1494
1495 fn enforce_edge_constraints(&self, edge: &Edge, existing: Option<&Edge>) -> Result<()> {
1502 let constraints = self.constraints.read().expect("constraints lock poisoned");
1503 if constraints.is_empty() {
1504 return Ok(());
1505 }
1506 for spec in constraints.iter() {
1507 let edge_type = match &spec.scope {
1508 ConstraintScope::Relationship(t) => t,
1509 ConstraintScope::Node(_) => continue,
1511 };
1512 if edge_type != &edge.edge_type {
1513 continue;
1514 }
1515 let primary = spec.primary_property();
1516 match spec.kind {
1517 PropertyConstraintKind::NotNull => {
1518 let present = edge
1519 .properties
1520 .get(primary)
1521 .is_some_and(|v| !matches!(v, Property::Null));
1522 if !present {
1523 return Err(Error::ConstraintViolation {
1524 name: spec.name.clone(),
1525 kind: spec.kind.as_string(),
1526 label: edge_type.clone(),
1527 property: primary.to_string(),
1528 details: format!(
1529 "edge {} is missing required property `{}`",
1530 edge.id, primary
1531 ),
1532 });
1533 }
1534 }
1535 PropertyConstraintKind::Unique => {
1536 let Some(value) = edge.properties.get(primary) else {
1537 continue;
1542 };
1543 if matches!(value, Property::Null) {
1544 continue;
1545 }
1546 if encode_index_value(value).is_none() {
1547 continue;
1553 }
1554 let holders = self.edges_by_property(edge_type, primary, value)?;
1559 for other_id in holders {
1560 if other_id == edge.id {
1561 continue;
1562 }
1563 let was_self = existing
1564 .and_then(|e| e.properties.get(primary))
1565 .is_some_and(|v| v == value);
1566 let _ = was_self;
1567 return Err(Error::ConstraintViolation {
1568 name: spec.name.clone(),
1569 kind: spec.kind.as_string(),
1570 label: edge_type.clone(),
1571 property: primary.to_string(),
1572 details: format!("value already held by edge {}", other_id),
1573 });
1574 }
1575 }
1576 PropertyConstraintKind::PropertyType(target) => {
1577 let Some(value) = edge.properties.get(primary) else {
1578 continue;
1579 };
1580 if matches!(value, Property::Null) {
1581 continue;
1582 }
1583 if !property_matches_type(value, target) {
1584 return Err(Error::ConstraintViolation {
1585 name: spec.name.clone(),
1586 kind: spec.kind.as_string(),
1587 label: edge_type.clone(),
1588 property: primary.to_string(),
1589 details: format!(
1590 "edge {} has value of type {} (expected {})",
1591 edge.id,
1592 value.type_name(),
1593 target.as_str()
1594 ),
1595 });
1596 }
1597 }
1598 PropertyConstraintKind::NodeKey => {
1599 unreachable!(
1602 "NODE KEY on relationship scope should have been rejected at create time"
1603 );
1604 }
1605 }
1606 }
1607 Ok(())
1608 }
1609
1610 pub fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
1614 self.drop_property_index_composite(label, &[property.to_string()])
1615 }
1616
1617 pub fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
1618 assert!(
1619 !properties.is_empty(),
1620 "drop_property_index_composite requires at least one property"
1621 );
1622 let spec = PropertyIndexSpec {
1623 label: label.to_string(),
1624 properties: properties.to_vec(),
1625 };
1626 let mut guard = self.indexes.write().expect("indexes lock poisoned");
1627 if !guard.contains(&spec) {
1628 return Ok(());
1629 }
1630
1631 if let Some(constraint) = find_constraint_backing_node_index(
1638 &self.constraints.read().expect("constraints lock poisoned"),
1639 label,
1640 properties,
1641 ) {
1642 return Err(Error::IndexInUse { constraint });
1643 }
1644
1645 let meta_cf = self.cf(CF_INDEX_META)?;
1646 let prop_cf = self.cf(CF_PROPERTY_INDEX)?;
1647 let mut batch = WriteBatch::default();
1648 batch.delete_cf(meta_cf, index_meta_key(&spec));
1649
1650 let label_prefix = property_index_label_prefix(label);
1657 let iter = self.db.iterator_cf(
1658 prop_cf,
1659 IteratorMode::From(&label_prefix, Direction::Forward),
1660 );
1661 for item in iter {
1662 let (key, _) = item?;
1663 if !key.starts_with(&label_prefix) {
1664 break;
1665 }
1666 let Some(parsed) = parse_property_index_entry_props(&key) else {
1667 continue;
1668 };
1669 if parsed.len() == properties.len()
1670 && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1671 {
1672 batch.delete_cf(prop_cf, key);
1673 }
1674 }
1675
1676 self.db.write(batch)?;
1677 guard.retain(|s| s != &spec);
1678 Ok(())
1679 }
1680
1681 pub fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
1685 self.edge_indexes
1686 .read()
1687 .expect("edge_indexes lock poisoned")
1688 .clone()
1689 }
1690
1691 pub fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1701 self.create_edge_property_index_composite(edge_type, &[property.to_string()])
1702 }
1703
1704 pub fn create_edge_property_index_composite(
1705 &self,
1706 edge_type: &str,
1707 properties: &[String],
1708 ) -> Result<()> {
1709 assert!(
1710 !properties.is_empty(),
1711 "create_edge_property_index_composite requires at least one property"
1712 );
1713 let spec = EdgePropertyIndexSpec {
1714 edge_type: edge_type.to_string(),
1715 properties: properties.to_vec(),
1716 };
1717 let mut guard = self
1718 .edge_indexes
1719 .write()
1720 .expect("edge_indexes lock poisoned");
1721 if guard.contains(&spec) {
1722 return Ok(());
1723 }
1724
1725 let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1726 let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1727 let mut batch = WriteBatch::default();
1728 batch.put_cf(meta_cf, edge_index_meta_key(&spec), EMPTY);
1729
1730 for edge_id in self.edges_by_type(edge_type)? {
1731 let edge = match self.get_edge(edge_id)? {
1732 Some(e) => e,
1733 None => continue,
1734 };
1735 if let Some(values) = encode_index_tuple(&edge.properties, &spec.properties) {
1736 batch.put_cf(
1737 prop_cf,
1738 edge_property_index_composite_key(edge_type, properties, &values, edge_id),
1739 EMPTY,
1740 );
1741 }
1742 }
1743
1744 self.db.write(batch)?;
1745 guard.push(spec);
1746 Ok(())
1747 }
1748
1749 pub fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
1750 self.drop_edge_property_index_composite(edge_type, &[property.to_string()])
1751 }
1752
1753 pub fn drop_edge_property_index_composite(
1754 &self,
1755 edge_type: &str,
1756 properties: &[String],
1757 ) -> Result<()> {
1758 assert!(
1759 !properties.is_empty(),
1760 "drop_edge_property_index_composite requires at least one property"
1761 );
1762 let spec = EdgePropertyIndexSpec {
1763 edge_type: edge_type.to_string(),
1764 properties: properties.to_vec(),
1765 };
1766 let mut guard = self
1767 .edge_indexes
1768 .write()
1769 .expect("edge_indexes lock poisoned");
1770 if !guard.contains(&spec) {
1771 return Ok(());
1772 }
1773
1774 if let Some(constraint) = find_constraint_backing_edge_index(
1780 &self.constraints.read().expect("constraints lock poisoned"),
1781 edge_type,
1782 properties,
1783 ) {
1784 return Err(Error::IndexInUse { constraint });
1785 }
1786
1787 let meta_cf = self.cf(CF_EDGE_INDEX_META)?;
1788 let prop_cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
1789 let mut batch = WriteBatch::default();
1790 batch.delete_cf(meta_cf, edge_index_meta_key(&spec));
1791
1792 let label_prefix = property_index_label_prefix(edge_type);
1795 let iter = self.db.iterator_cf(
1796 prop_cf,
1797 IteratorMode::From(&label_prefix, Direction::Forward),
1798 );
1799 for item in iter {
1800 let (key, _) = item?;
1801 if !key.starts_with(&label_prefix) {
1802 break;
1803 }
1804 let Some(parsed) = parse_property_index_entry_props(&key) else {
1805 continue;
1806 };
1807 if parsed.len() == properties.len()
1808 && parsed.iter().zip(properties.iter()).all(|(a, b)| a == b)
1809 {
1810 batch.delete_cf(prop_cf, key);
1811 }
1812 }
1813
1814 self.db.write(batch)?;
1815 guard.retain(|s| s != &spec);
1816 Ok(())
1817 }
1818
1819 pub fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
1835 self.point_indexes
1836 .read()
1837 .expect("point_indexes lock poisoned")
1838 .clone()
1839 }
1840
1841 pub fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
1848 let spec = PointIndexSpec {
1849 label: label.to_string(),
1850 property: property.to_string(),
1851 };
1852 let mut guard = self
1853 .point_indexes
1854 .write()
1855 .expect("point_indexes lock poisoned");
1856 if guard.contains(&spec) {
1857 return Ok(());
1858 }
1859
1860 let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1861 let point_cf = self.cf(CF_POINT_INDEX)?;
1862 let mut batch = WriteBatch::default();
1863 batch.put_cf(meta_cf, point_index_meta_key(&spec), EMPTY);
1864
1865 for node_id in self.nodes_by_label(label)? {
1866 let node = match self.get_node(node_id)? {
1867 Some(n) => n,
1868 None => continue,
1869 };
1870 if let Some(Property::Point(p)) = node.properties.get(property) {
1871 let cell = point_cell(p.srid, p.x, p.y);
1872 batch.put_cf(
1873 point_cf,
1874 point_index_key(label, property, p.srid, cell, node_id.as_bytes()),
1875 point_index_value(p.x, p.y, p.z),
1876 );
1877 }
1878 }
1879
1880 self.db.write(batch)?;
1881 guard.push(spec);
1882 Ok(())
1883 }
1884
1885 pub fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
1890 let spec = PointIndexSpec {
1891 label: label.to_string(),
1892 property: property.to_string(),
1893 };
1894 let mut guard = self
1895 .point_indexes
1896 .write()
1897 .expect("point_indexes lock poisoned");
1898 if !guard.contains(&spec) {
1899 return Ok(());
1900 }
1901
1902 let meta_cf = self.cf(CF_POINT_INDEX_META)?;
1903 let point_cf = self.cf(CF_POINT_INDEX)?;
1904 let mut batch = WriteBatch::default();
1905 batch.delete_cf(meta_cf, point_index_meta_key(&spec));
1906
1907 let prefix = point_index_label_prop_prefix(label, property);
1908 let iter = self
1909 .db
1910 .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
1911 for item in iter {
1912 let (key, _) = item?;
1913 if !key.starts_with(&prefix) {
1914 break;
1915 }
1916 batch.delete_cf(point_cf, key);
1917 }
1918
1919 self.db.write(batch)?;
1920 guard.retain(|s| s != &spec);
1921 Ok(())
1922 }
1923
1924 pub fn nodes_in_bbox(
1936 &self,
1937 label: &str,
1938 property: &str,
1939 srid: i32,
1940 xlo: f64,
1941 ylo: f64,
1942 xhi: f64,
1943 yhi: f64,
1944 ) -> Result<Vec<NodeId>> {
1945 let spec = PointIndexSpec {
1946 label: label.to_string(),
1947 property: property.to_string(),
1948 };
1949 {
1950 let guard = self
1951 .point_indexes
1952 .read()
1953 .expect("point_indexes lock poisoned");
1954 if !guard.contains(&spec) {
1955 return Ok(Vec::new());
1956 }
1957 }
1958
1959 let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
1960 let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
1961 let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
1962
1963 let prefix = point_index_srid_prefix(label, property, srid);
1964 let header_len = prefix.len();
1965 let mut seek = prefix.clone();
1966 seek.extend_from_slice(&min_cell.to_be_bytes());
1967
1968 let cf = self.cf(CF_POINT_INDEX)?;
1969 let iter = self
1970 .db
1971 .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
1972 let mut results = Vec::new();
1973 for item in iter {
1974 let (key, value) = item?;
1975 if !key.starts_with(&prefix) {
1976 break;
1977 }
1978 let cell = cell_from_point_index_key(CF_POINT_INDEX, &key, header_len)?;
1979 if cell > max_cell {
1980 break;
1981 }
1982 let (x, y, _z) = decode_point_index_value(CF_POINT_INDEX, &value)?;
1983 if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
1984 continue;
1985 }
1986 let id_bytes = node_id_from_point_index_key(CF_POINT_INDEX, &key)?;
1987 results.push(NodeId::from_bytes(id_bytes));
1988 }
1989 results.sort();
1990 Ok(results)
1991 }
1992
1993 pub fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2004 self.edge_point_indexes
2005 .read()
2006 .expect("edge_point_indexes lock poisoned")
2007 .clone()
2008 }
2009
2010 pub fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2015 let spec = EdgePointIndexSpec {
2016 edge_type: edge_type.to_string(),
2017 property: property.to_string(),
2018 };
2019 let mut guard = self
2020 .edge_point_indexes
2021 .write()
2022 .expect("edge_point_indexes lock poisoned");
2023 if guard.contains(&spec) {
2024 return Ok(());
2025 }
2026
2027 let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2028 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2029 let mut batch = WriteBatch::default();
2030 batch.put_cf(meta_cf, edge_point_index_meta_key(&spec), EMPTY);
2031
2032 for edge_id in self.edges_by_type(edge_type)? {
2033 let edge = match self.get_edge(edge_id)? {
2034 Some(e) => e,
2035 None => continue,
2036 };
2037 if let Some(Property::Point(p)) = edge.properties.get(property) {
2038 let cell = point_cell(p.srid, p.x, p.y);
2039 batch.put_cf(
2040 point_cf,
2041 point_index_key(edge_type, property, p.srid, cell, edge_id.as_bytes()),
2047 point_index_value(p.x, p.y, p.z),
2048 );
2049 }
2050 }
2051
2052 self.db.write(batch)?;
2053 guard.push(spec);
2054 Ok(())
2055 }
2056
2057 pub fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2059 let spec = EdgePointIndexSpec {
2060 edge_type: edge_type.to_string(),
2061 property: property.to_string(),
2062 };
2063 let mut guard = self
2064 .edge_point_indexes
2065 .write()
2066 .expect("edge_point_indexes lock poisoned");
2067 if !guard.contains(&spec) {
2068 return Ok(());
2069 }
2070
2071 let meta_cf = self.cf(CF_EDGE_POINT_INDEX_META)?;
2072 let point_cf = self.cf(CF_EDGE_POINT_INDEX)?;
2073 let mut batch = WriteBatch::default();
2074 batch.delete_cf(meta_cf, edge_point_index_meta_key(&spec));
2075
2076 let prefix = point_index_label_prop_prefix(edge_type, property);
2077 let iter = self
2078 .db
2079 .iterator_cf(point_cf, IteratorMode::From(&prefix, Direction::Forward));
2080 for item in iter {
2081 let (key, _) = item?;
2082 if !key.starts_with(&prefix) {
2083 break;
2084 }
2085 batch.delete_cf(point_cf, key);
2086 }
2087
2088 self.db.write(batch)?;
2089 guard.retain(|s| s != &spec);
2090 Ok(())
2091 }
2092
2093 pub fn edges_in_bbox(
2098 &self,
2099 edge_type: &str,
2100 property: &str,
2101 srid: i32,
2102 xlo: f64,
2103 ylo: f64,
2104 xhi: f64,
2105 yhi: f64,
2106 ) -> Result<Vec<EdgeId>> {
2107 let spec = EdgePointIndexSpec {
2108 edge_type: edge_type.to_string(),
2109 property: property.to_string(),
2110 };
2111 {
2112 let guard = self
2113 .edge_point_indexes
2114 .read()
2115 .expect("edge_point_indexes lock poisoned");
2116 if !guard.contains(&spec) {
2117 return Ok(Vec::new());
2118 }
2119 }
2120
2121 let (lo_x, hi_x) = if xlo <= xhi { (xlo, xhi) } else { (xhi, xlo) };
2122 let (lo_y, hi_y) = if ylo <= yhi { (ylo, yhi) } else { (yhi, ylo) };
2123 let (min_cell, max_cell) = point_cell_range(srid, lo_x, lo_y, hi_x, hi_y);
2124
2125 let prefix = point_index_srid_prefix(edge_type, property, srid);
2126 let header_len = prefix.len();
2127 let mut seek = prefix.clone();
2128 seek.extend_from_slice(&min_cell.to_be_bytes());
2129
2130 let cf = self.cf(CF_EDGE_POINT_INDEX)?;
2131 let iter = self
2132 .db
2133 .iterator_cf(cf, IteratorMode::From(&seek, Direction::Forward));
2134 let mut results = Vec::new();
2135 for item in iter {
2136 let (key, value) = item?;
2137 if !key.starts_with(&prefix) {
2138 break;
2139 }
2140 let cell = cell_from_point_index_key(CF_EDGE_POINT_INDEX, &key, header_len)?;
2141 if cell > max_cell {
2142 break;
2143 }
2144 let (x, y, _z) = decode_point_index_value(CF_EDGE_POINT_INDEX, &value)?;
2145 if x < lo_x || x > hi_x || y < lo_y || y > hi_y {
2146 continue;
2147 }
2148 let id_bytes = node_id_from_point_index_key(CF_EDGE_POINT_INDEX, &key)?;
2149 results.push(EdgeId::from_bytes(id_bytes));
2150 }
2151 results.sort();
2152 Ok(results)
2153 }
2154
2155 pub fn edges_by_property(
2162 &self,
2163 edge_type: &str,
2164 property: &str,
2165 value: &Property,
2166 ) -> Result<Vec<EdgeId>> {
2167 self.edges_by_properties(
2168 edge_type,
2169 std::slice::from_ref(&property.to_string()),
2170 std::slice::from_ref(value),
2171 )
2172 }
2173
2174 pub fn edges_by_properties(
2178 &self,
2179 edge_type: &str,
2180 properties: &[String],
2181 values: &[Property],
2182 ) -> Result<Vec<EdgeId>> {
2183 assert_eq!(
2184 properties.len(),
2185 values.len(),
2186 "composite edge seek: properties and values must have equal length"
2187 );
2188 let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2189 for (p, v) in properties.iter().zip(values.iter()) {
2190 let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2191 property: p.clone(),
2192 kind: v.type_name(),
2193 })?;
2194 encoded.push(bytes);
2195 }
2196 let cf = self.cf(CF_EDGE_PROPERTY_INDEX)?;
2197 let prefix = edge_property_index_composite_value_prefix(edge_type, properties, &encoded);
2198 let mut results = Vec::new();
2199 let iter = self
2200 .db
2201 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2202 for item in iter {
2203 let (key, _) = item?;
2204 if !key.starts_with(&prefix) {
2205 break;
2206 }
2207 let bytes = edge_id_from_property_index_key(&key, prefix.len())?;
2208 results.push(EdgeId::from_bytes(bytes));
2209 }
2210 Ok(results)
2211 }
2212
2213 pub fn nodes_by_property(
2223 &self,
2224 label: &str,
2225 property: &str,
2226 value: &Property,
2227 ) -> Result<Vec<NodeId>> {
2228 self.nodes_by_properties(
2229 label,
2230 std::slice::from_ref(&property.to_string()),
2231 std::slice::from_ref(value),
2232 )
2233 }
2234
2235 pub fn nodes_by_properties(
2243 &self,
2244 label: &str,
2245 properties: &[String],
2246 values: &[Property],
2247 ) -> Result<Vec<NodeId>> {
2248 assert_eq!(
2249 properties.len(),
2250 values.len(),
2251 "composite seek: properties and values must have equal length"
2252 );
2253 let mut encoded: Vec<Vec<u8>> = Vec::with_capacity(values.len());
2254 for (p, v) in properties.iter().zip(values.iter()) {
2255 let bytes = encode_index_value(v).ok_or_else(|| Error::UnindexableValue {
2256 property: p.clone(),
2257 kind: v.type_name(),
2258 })?;
2259 encoded.push(bytes);
2260 }
2261 let cf = self.cf(CF_PROPERTY_INDEX)?;
2262 let prefix = property_index_composite_value_prefix(label, properties, &encoded);
2263 let mut results = Vec::new();
2264 let iter = self
2265 .db
2266 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2267 for item in iter {
2268 let (key, _) = item?;
2269 if !key.starts_with(&prefix) {
2270 break;
2271 }
2272 let bytes = node_id_from_property_index_key(&key, prefix.len())?;
2273 results.push(NodeId::from_bytes(bytes));
2274 }
2275 Ok(results)
2276 }
2277
2278 pub fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2279 let cf = self.cf(CF_LABEL_INDEX)?;
2280 let prefix = label_index_prefix(label);
2281 let mut results = Vec::new();
2282 let iter = self
2283 .db
2284 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2285 for item in iter {
2286 let (key, _) = item?;
2287 if !key.starts_with(&prefix) {
2288 break;
2289 }
2290 let bytes = id_from_str_index_key(CF_LABEL_INDEX, &key, label.len())?;
2291 results.push(NodeId::from_bytes(bytes));
2292 }
2293 Ok(results)
2294 }
2295
2296 pub fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2297 let cf = self.cf(CF_TYPE_INDEX)?;
2298 let prefix = type_index_prefix(edge_type);
2299 let mut results = Vec::new();
2300 let iter = self
2301 .db
2302 .iterator_cf(cf, IteratorMode::From(&prefix, Direction::Forward));
2303 for item in iter {
2304 let (key, _) = item?;
2305 if !key.starts_with(&prefix) {
2306 break;
2307 }
2308 let bytes = id_from_str_index_key(CF_TYPE_INDEX, &key, edge_type.len())?;
2309 results.push(EdgeId::from_bytes(bytes));
2310 }
2311 Ok(results)
2312 }
2313
2314 fn scan_adj(&self, cf_name: &'static str, node: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2315 let cf = self.cf(cf_name)?;
2316 let prefix: &[u8] = node.as_bytes();
2317 let mut results = Vec::new();
2318 let iter = self
2319 .db
2320 .iterator_cf(cf, IteratorMode::From(prefix, Direction::Forward));
2321 for item in iter {
2322 let (key, value) = item?;
2323 if !key.starts_with(prefix) {
2324 break;
2325 }
2326 let edge_id = edge_from_adj_key(cf_name, &key)?;
2327 let other = node_from_adj_value(cf_name, &value)?;
2328 results.push((edge_id, other));
2329 }
2330 Ok(results)
2331 }
2332}
2333
2334fn extract_indexable_point(
2342 properties: &std::collections::HashMap<String, Property>,
2343 key: &str,
2344) -> Option<meshdb_core::Point> {
2345 match properties.get(key)? {
2346 Property::Point(p) => Some(*p),
2347 _ => None,
2348 }
2349}
2350
2351fn default_constraint_name(
2358 scope: &ConstraintScope,
2359 properties: &[String],
2360 kind: PropertyConstraintKind,
2361) -> String {
2362 let joined = properties.join("_");
2363 format!(
2364 "constraint_{scope_tag}_{target}_{joined}_{kind_tag}",
2365 scope_tag = scope.name_tag(),
2366 target = scope.target(),
2367 kind_tag = kind.name_tag(),
2368 )
2369}
2370
2371fn find_constraint_backing_node_index(
2378 constraints: &[PropertyConstraintSpec],
2379 label: &str,
2380 properties: &[String],
2381) -> Option<String> {
2382 constraints
2383 .iter()
2384 .filter(|c| matches!(&c.scope, ConstraintScope::Node(l) if l == label))
2385 .filter(|c| {
2386 matches!(
2387 c.kind,
2388 PropertyConstraintKind::Unique | PropertyConstraintKind::NodeKey
2389 )
2390 })
2391 .find(|c| c.properties.as_slice() == properties)
2392 .map(|c| c.name.clone())
2393}
2394
2395fn find_constraint_backing_edge_index(
2399 constraints: &[PropertyConstraintSpec],
2400 edge_type: &str,
2401 properties: &[String],
2402) -> Option<String> {
2403 constraints
2404 .iter()
2405 .filter(|c| matches!(&c.scope, ConstraintScope::Relationship(t) if t == edge_type))
2406 .filter(|c| matches!(c.kind, PropertyConstraintKind::Unique))
2407 .find(|c| c.properties.as_slice() == properties)
2408 .map(|c| c.name.clone())
2409}
2410
2411fn validate_existing_data(
2417 engine: &RocksDbStorageEngine,
2418 spec: &PropertyConstraintSpec,
2419) -> Result<()> {
2420 match &spec.scope {
2421 ConstraintScope::Node(label) => validate_existing_nodes(engine, spec, label),
2422 ConstraintScope::Relationship(edge_type) => {
2423 validate_existing_edges(engine, spec, edge_type)
2424 }
2425 }
2426}
2427
2428fn validate_existing_nodes(
2429 engine: &RocksDbStorageEngine,
2430 spec: &PropertyConstraintSpec,
2431 label: &str,
2432) -> Result<()> {
2433 let label_members = engine.nodes_by_label(label)?;
2434 let primary = spec.primary_property();
2435 match spec.kind {
2436 PropertyConstraintKind::NotNull => {
2437 for id in label_members {
2438 let node = match engine.get_node(id)? {
2439 Some(n) => n,
2440 None => continue,
2441 };
2442 let present = node
2443 .properties
2444 .get(primary)
2445 .is_some_and(|v| !matches!(v, Property::Null));
2446 if !present {
2447 return Err(Error::ConstraintViolation {
2448 name: spec.name.clone(),
2449 kind: spec.kind.as_string(),
2450 label: label.to_string(),
2451 property: primary.to_string(),
2452 details: format!("node {id} is missing required property"),
2453 });
2454 }
2455 }
2456 }
2457 PropertyConstraintKind::Unique => {
2458 use std::collections::HashMap;
2459 let mut seen: HashMap<Vec<u8>, meshdb_core::NodeId> = HashMap::new();
2460 for id in label_members {
2461 let node = match engine.get_node(id)? {
2462 Some(n) => n,
2463 None => continue,
2464 };
2465 let Some(value) = node.properties.get(primary) else {
2466 continue;
2467 };
2468 let Some(encoded) = encode_index_value(value) else {
2469 continue;
2470 };
2471 if let Some(&first) = seen.get(&encoded) {
2472 return Err(Error::ConstraintViolation {
2473 name: spec.name.clone(),
2474 kind: spec.kind.as_string(),
2475 label: label.to_string(),
2476 property: primary.to_string(),
2477 details: format!("duplicate value held by nodes {first} and {id}"),
2478 });
2479 }
2480 seen.insert(encoded, id);
2481 }
2482 }
2483 PropertyConstraintKind::PropertyType(target) => {
2484 for id in label_members {
2485 let node = match engine.get_node(id)? {
2486 Some(n) => n,
2487 None => continue,
2488 };
2489 let Some(value) = node.properties.get(primary) else {
2490 continue;
2491 };
2492 if matches!(value, Property::Null) {
2493 continue;
2494 }
2495 if !property_matches_type(value, target) {
2496 return Err(Error::ConstraintViolation {
2497 name: spec.name.clone(),
2498 kind: spec.kind.as_string(),
2499 label: label.to_string(),
2500 property: primary.to_string(),
2501 details: format!(
2502 "node {id} has value of type {} (expected {})",
2503 value.type_name(),
2504 target.as_str()
2505 ),
2506 });
2507 }
2508 }
2509 }
2510 PropertyConstraintKind::NodeKey => {
2511 use std::collections::HashMap;
2512 let mut seen: HashMap<Vec<Vec<u8>>, meshdb_core::NodeId> = HashMap::new();
2513 for id in label_members {
2514 let node = match engine.get_node(id)? {
2515 Some(n) => n,
2516 None => continue,
2517 };
2518 let mut tuple: Vec<Vec<u8>> = Vec::with_capacity(spec.properties.len());
2519 let mut missing: Option<&str> = None;
2520 for prop in &spec.properties {
2521 match node.properties.get(prop) {
2522 Some(v) if !matches!(v, Property::Null) => {
2523 if let Some(encoded) = encode_index_value(v) {
2524 tuple.push(encoded);
2525 } else {
2526 missing = Some(prop);
2527 break;
2528 }
2529 }
2530 _ => {
2531 missing = Some(prop.as_str());
2532 break;
2533 }
2534 }
2535 }
2536 if let Some(prop) = missing {
2537 return Err(Error::ConstraintViolation {
2538 name: spec.name.clone(),
2539 kind: spec.kind.as_string(),
2540 label: label.to_string(),
2541 property: prop.to_string(),
2542 details: format!("node {id} is missing required property `{prop}`"),
2543 });
2544 }
2545 if let Some(&first) = seen.get(&tuple) {
2546 return Err(Error::ConstraintViolation {
2547 name: spec.name.clone(),
2548 kind: spec.kind.as_string(),
2549 label: label.to_string(),
2550 property: spec.properties.join(","),
2551 details: format!("duplicate tuple held by nodes {first} and {id}"),
2552 });
2553 }
2554 seen.insert(tuple, id);
2555 }
2556 }
2557 }
2558 Ok(())
2559}
2560
2561fn validate_existing_edges(
2566 engine: &RocksDbStorageEngine,
2567 spec: &PropertyConstraintSpec,
2568 edge_type: &str,
2569) -> Result<()> {
2570 let edge_ids = engine.edges_by_type(edge_type)?;
2571 let primary = spec.primary_property();
2572 match spec.kind {
2573 PropertyConstraintKind::NotNull => {
2574 for id in edge_ids {
2575 let edge = match engine.get_edge(id)? {
2576 Some(e) => e,
2577 None => continue,
2578 };
2579 let present = edge
2580 .properties
2581 .get(primary)
2582 .is_some_and(|v| !matches!(v, Property::Null));
2583 if !present {
2584 return Err(Error::ConstraintViolation {
2585 name: spec.name.clone(),
2586 kind: spec.kind.as_string(),
2587 label: edge_type.to_string(),
2588 property: primary.to_string(),
2589 details: format!("edge {id} is missing required property"),
2590 });
2591 }
2592 }
2593 }
2594 PropertyConstraintKind::Unique => {
2595 use std::collections::HashMap;
2596 let mut seen: HashMap<Vec<u8>, meshdb_core::EdgeId> = HashMap::new();
2597 for id in edge_ids {
2598 let edge = match engine.get_edge(id)? {
2599 Some(e) => e,
2600 None => continue,
2601 };
2602 let Some(value) = edge.properties.get(primary) else {
2603 continue;
2604 };
2605 let Some(encoded) = encode_index_value(value) else {
2606 continue;
2607 };
2608 if let Some(&first) = seen.get(&encoded) {
2609 return Err(Error::ConstraintViolation {
2610 name: spec.name.clone(),
2611 kind: spec.kind.as_string(),
2612 label: edge_type.to_string(),
2613 property: primary.to_string(),
2614 details: format!("duplicate value held by edges {first} and {id}"),
2615 });
2616 }
2617 seen.insert(encoded, id);
2618 }
2619 }
2620 PropertyConstraintKind::PropertyType(target) => {
2621 for id in edge_ids {
2622 let edge = match engine.get_edge(id)? {
2623 Some(e) => e,
2624 None => continue,
2625 };
2626 let Some(value) = edge.properties.get(primary) else {
2627 continue;
2628 };
2629 if matches!(value, Property::Null) {
2630 continue;
2631 }
2632 if !property_matches_type(value, target) {
2633 return Err(Error::ConstraintViolation {
2634 name: spec.name.clone(),
2635 kind: spec.kind.as_string(),
2636 label: edge_type.to_string(),
2637 property: primary.to_string(),
2638 details: format!(
2639 "edge {id} has value of type {} (expected {})",
2640 value.type_name(),
2641 target.as_str()
2642 ),
2643 });
2644 }
2645 }
2646 }
2647 PropertyConstraintKind::NodeKey => {
2648 unreachable!("NODE KEY on relationship scope rejected at create time")
2649 }
2650 }
2651 Ok(())
2652}
2653
2654fn property_matches_type(value: &Property, target: PropertyType) -> bool {
2660 matches!(
2661 (target, value),
2662 (PropertyType::String, Property::String(_))
2663 | (PropertyType::Integer, Property::Int64(_))
2664 | (PropertyType::Float, Property::Float64(_))
2665 | (PropertyType::Boolean, Property::Bool(_))
2666 )
2667}
2668
2669fn load_constraint_meta(db: &DB) -> Result<Vec<PropertyConstraintSpec>> {
2674 let cf = db
2675 .cf_handle(CF_CONSTRAINT_META)
2676 .ok_or(Error::MissingColumnFamily(CF_CONSTRAINT_META))?;
2677 let mut specs = Vec::new();
2678 for item in db.iterator_cf(cf, IteratorMode::Start) {
2679 let (key, value) = item?;
2680 let name = std::str::from_utf8(&key)
2681 .map_err(|_| Error::CorruptBytes {
2682 cf: CF_CONSTRAINT_META,
2683 expected: key.len(),
2684 actual: key.len(),
2685 })?
2686 .to_string();
2687 specs.push(constraint_meta_decode(CF_CONSTRAINT_META, name, &value)?);
2688 }
2689 Ok(specs)
2690}
2691
2692fn load_index_meta(db: &DB) -> Result<Vec<PropertyIndexSpec>> {
2696 let cf = db
2697 .cf_handle(CF_INDEX_META)
2698 .ok_or(Error::MissingColumnFamily(CF_INDEX_META))?;
2699 let mut specs = Vec::new();
2700 for item in db.iterator_cf(cf, IteratorMode::Start) {
2701 let (key, _) = item?;
2702 specs.push(index_meta_key_decode(&key)?);
2703 }
2704 Ok(specs)
2705}
2706
2707fn load_edge_index_meta(db: &DB) -> Result<Vec<EdgePropertyIndexSpec>> {
2711 let cf = db
2712 .cf_handle(CF_EDGE_INDEX_META)
2713 .ok_or(Error::MissingColumnFamily(CF_EDGE_INDEX_META))?;
2714 let mut specs = Vec::new();
2715 for item in db.iterator_cf(cf, IteratorMode::Start) {
2716 let (key, _) = item?;
2717 specs.push(edge_index_meta_key_decode(&key)?);
2718 }
2719 Ok(specs)
2720}
2721
2722fn load_point_index_meta(db: &DB) -> Result<Vec<PointIndexSpec>> {
2725 let cf = db
2726 .cf_handle(CF_POINT_INDEX_META)
2727 .ok_or(Error::MissingColumnFamily(CF_POINT_INDEX_META))?;
2728 let mut specs = Vec::new();
2729 for item in db.iterator_cf(cf, IteratorMode::Start) {
2730 let (key, _) = item?;
2731 specs.push(point_index_meta_key_decode(&key)?);
2732 }
2733 Ok(specs)
2734}
2735
2736fn load_edge_point_index_meta(db: &DB) -> Result<Vec<EdgePointIndexSpec>> {
2738 let cf = db
2739 .cf_handle(CF_EDGE_POINT_INDEX_META)
2740 .ok_or(Error::MissingColumnFamily(CF_EDGE_POINT_INDEX_META))?;
2741 let mut specs = Vec::new();
2742 for item in db.iterator_cf(cf, IteratorMode::Start) {
2743 let (key, _) = item?;
2744 specs.push(edge_point_index_meta_key_decode(&key)?);
2745 }
2746 Ok(specs)
2747}
2748
2749impl StorageEngine for RocksDbStorageEngine {
2750 fn put_node(&self, node: &Node) -> Result<()> {
2751 RocksDbStorageEngine::put_node(self, node)
2752 }
2753
2754 fn get_node(&self, id: NodeId) -> Result<Option<Node>> {
2755 RocksDbStorageEngine::get_node(self, id)
2756 }
2757
2758 fn detach_delete_node(&self, id: NodeId) -> Result<()> {
2759 RocksDbStorageEngine::detach_delete_node(self, id)
2760 }
2761
2762 fn put_edge(&self, edge: &Edge) -> Result<()> {
2763 RocksDbStorageEngine::put_edge(self, edge)
2764 }
2765
2766 fn get_edge(&self, id: EdgeId) -> Result<Option<Edge>> {
2767 RocksDbStorageEngine::get_edge(self, id)
2768 }
2769
2770 fn delete_edge(&self, id: EdgeId) -> Result<()> {
2771 RocksDbStorageEngine::delete_edge(self, id)
2772 }
2773
2774 fn apply_batch(&self, mutations: &[GraphMutation]) -> Result<()> {
2775 RocksDbStorageEngine::apply_batch(self, mutations)
2776 }
2777
2778 fn all_nodes(&self) -> Result<Vec<Node>> {
2779 RocksDbStorageEngine::all_nodes(self)
2780 }
2781
2782 fn all_edges(&self) -> Result<Vec<Edge>> {
2783 RocksDbStorageEngine::all_edges(self)
2784 }
2785
2786 fn all_node_ids(&self) -> Result<Vec<NodeId>> {
2787 RocksDbStorageEngine::all_node_ids(self)
2788 }
2789
2790 fn outgoing(&self, source: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2791 RocksDbStorageEngine::outgoing(self, source)
2792 }
2793
2794 fn incoming(&self, target: NodeId) -> Result<Vec<(EdgeId, NodeId)>> {
2795 RocksDbStorageEngine::incoming(self, target)
2796 }
2797
2798 fn nodes_by_label(&self, label: &str) -> Result<Vec<NodeId>> {
2799 RocksDbStorageEngine::nodes_by_label(self, label)
2800 }
2801
2802 fn edges_by_type(&self, edge_type: &str) -> Result<Vec<EdgeId>> {
2803 RocksDbStorageEngine::edges_by_type(self, edge_type)
2804 }
2805
2806 fn nodes_by_property(
2807 &self,
2808 label: &str,
2809 property: &str,
2810 value: &Property,
2811 ) -> Result<Vec<NodeId>> {
2812 RocksDbStorageEngine::nodes_by_property(self, label, property, value)
2813 }
2814
2815 fn nodes_by_properties(
2816 &self,
2817 label: &str,
2818 properties: &[String],
2819 values: &[Property],
2820 ) -> Result<Vec<NodeId>> {
2821 RocksDbStorageEngine::nodes_by_properties(self, label, properties, values)
2822 }
2823
2824 fn edges_by_property(
2825 &self,
2826 edge_type: &str,
2827 property: &str,
2828 value: &Property,
2829 ) -> Result<Vec<EdgeId>> {
2830 RocksDbStorageEngine::edges_by_property(self, edge_type, property, value)
2831 }
2832
2833 fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
2834 RocksDbStorageEngine::create_property_index(self, label, property)
2835 }
2836
2837 fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
2838 RocksDbStorageEngine::drop_property_index(self, label, property)
2839 }
2840
2841 fn create_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2842 RocksDbStorageEngine::create_property_index_composite(self, label, properties)
2843 }
2844
2845 fn drop_property_index_composite(&self, label: &str, properties: &[String]) -> Result<()> {
2846 RocksDbStorageEngine::drop_property_index_composite(self, label, properties)
2847 }
2848
2849 fn list_property_indexes(&self) -> Vec<PropertyIndexSpec> {
2850 RocksDbStorageEngine::list_property_indexes(self)
2851 }
2852
2853 fn create_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2854 RocksDbStorageEngine::create_edge_property_index(self, edge_type, property)
2855 }
2856
2857 fn drop_edge_property_index(&self, edge_type: &str, property: &str) -> Result<()> {
2858 RocksDbStorageEngine::drop_edge_property_index(self, edge_type, property)
2859 }
2860
2861 fn create_edge_property_index_composite(
2862 &self,
2863 edge_type: &str,
2864 properties: &[String],
2865 ) -> Result<()> {
2866 RocksDbStorageEngine::create_edge_property_index_composite(self, edge_type, properties)
2867 }
2868
2869 fn drop_edge_property_index_composite(
2870 &self,
2871 edge_type: &str,
2872 properties: &[String],
2873 ) -> Result<()> {
2874 RocksDbStorageEngine::drop_edge_property_index_composite(self, edge_type, properties)
2875 }
2876
2877 fn list_edge_property_indexes(&self) -> Vec<EdgePropertyIndexSpec> {
2878 RocksDbStorageEngine::list_edge_property_indexes(self)
2879 }
2880
2881 fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
2882 RocksDbStorageEngine::create_point_index(self, label, property)
2883 }
2884
2885 fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
2886 RocksDbStorageEngine::drop_point_index(self, label, property)
2887 }
2888
2889 fn list_point_indexes(&self) -> Vec<PointIndexSpec> {
2890 RocksDbStorageEngine::list_point_indexes(self)
2891 }
2892
2893 fn nodes_in_bbox(
2894 &self,
2895 label: &str,
2896 property: &str,
2897 srid: i32,
2898 xlo: f64,
2899 ylo: f64,
2900 xhi: f64,
2901 yhi: f64,
2902 ) -> Result<Vec<NodeId>> {
2903 RocksDbStorageEngine::nodes_in_bbox(self, label, property, srid, xlo, ylo, xhi, yhi)
2904 }
2905
2906 fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2907 RocksDbStorageEngine::create_edge_point_index(self, edge_type, property)
2908 }
2909
2910 fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
2911 RocksDbStorageEngine::drop_edge_point_index(self, edge_type, property)
2912 }
2913
2914 fn list_edge_point_indexes(&self) -> Vec<EdgePointIndexSpec> {
2915 RocksDbStorageEngine::list_edge_point_indexes(self)
2916 }
2917
2918 fn edges_in_bbox(
2919 &self,
2920 edge_type: &str,
2921 property: &str,
2922 srid: i32,
2923 xlo: f64,
2924 ylo: f64,
2925 xhi: f64,
2926 yhi: f64,
2927 ) -> Result<Vec<EdgeId>> {
2928 RocksDbStorageEngine::edges_in_bbox(self, edge_type, property, srid, xlo, ylo, xhi, yhi)
2929 }
2930
2931 fn create_property_constraint(
2932 &self,
2933 name: Option<&str>,
2934 scope: &ConstraintScope,
2935 properties: &[String],
2936 kind: PropertyConstraintKind,
2937 if_not_exists: bool,
2938 ) -> Result<PropertyConstraintSpec> {
2939 RocksDbStorageEngine::create_property_constraint(
2940 self,
2941 name,
2942 scope,
2943 properties,
2944 kind,
2945 if_not_exists,
2946 )
2947 }
2948
2949 fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2950 RocksDbStorageEngine::drop_property_constraint(self, name, if_exists)
2951 }
2952
2953 fn list_property_constraints(&self) -> Vec<PropertyConstraintSpec> {
2954 RocksDbStorageEngine::list_property_constraints(self)
2955 }
2956
2957 fn put_trigger(&self, name: &str, value: &[u8]) -> Result<()> {
2958 let cf = self.cf(CF_TRIGGER_META)?;
2959 self.db
2960 .put_cf(cf, name.as_bytes(), value)
2961 .map_err(Error::from)
2962 }
2963
2964 fn delete_trigger(&self, name: &str) -> Result<()> {
2965 let cf = self.cf(CF_TRIGGER_META)?;
2966 self.db.delete_cf(cf, name.as_bytes()).map_err(Error::from)
2967 }
2968
2969 fn list_triggers(&self) -> Result<Vec<(String, Vec<u8>)>> {
2970 let cf = self.cf(CF_TRIGGER_META)?;
2971 let mut out: Vec<(String, Vec<u8>)> = Vec::new();
2972 let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2973 for entry in iter {
2974 let (k, v) = entry.map_err(Error::from)?;
2975 let name = String::from_utf8_lossy(&k).into_owned();
2979 out.push((name, v.to_vec()));
2980 }
2981 Ok(out)
2982 }
2983
2984 fn put_pending_tx(&self, key: &[u8], value: &[u8]) -> Result<()> {
2985 let cf = self.cf(CF_PENDING_TX_META)?;
2986 self.db.put_cf(cf, key, value).map_err(Error::from)
2987 }
2988
2989 fn delete_pending_tx(&self, key: &[u8]) -> Result<()> {
2990 let cf = self.cf(CF_PENDING_TX_META)?;
2991 self.db.delete_cf(cf, key).map_err(Error::from)
2992 }
2993
2994 fn list_pending_txs(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2995 let cf = self.cf(CF_PENDING_TX_META)?;
2996 let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
2997 let iter = self.db.iterator_cf(cf, IteratorMode::Start);
2998 for entry in iter {
2999 let (k, v) = entry.map_err(Error::from)?;
3000 out.push((k.to_vec(), v.to_vec()));
3001 }
3002 Ok(out)
3003 }
3004
3005 fn create_checkpoint(&self, path: &Path) -> Result<()> {
3006 RocksDbStorageEngine::create_checkpoint(self, path)
3007 }
3008
3009 fn clear_all(&self) -> Result<()> {
3010 RocksDbStorageEngine::clear_all(self)
3011 }
3012}