1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79pub fn sparse_vector_struct_fields() -> Fields {
86 Fields::from(vec![
87 Field::new(
88 "indices",
89 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt32, true))),
90 false,
91 ),
92 Field::new(
93 "values",
94 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Float32, true))),
95 false,
96 ),
97 ])
98}
99
100pub fn is_sparse_vector_struct(arrow_dt: &ArrowDataType) -> bool {
102 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == sparse_vector_struct_fields())
103}
104
105pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
113 HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
117#[non_exhaustive]
118pub enum CrdtType {
119 GCounter,
120 GSet,
121 ORSet,
122 LWWRegister,
123 LWWMap,
124 Rga,
125 VectorClock,
126 VCRegister,
127}
128
129impl CrdtType {
130 #[must_use]
142 pub fn type_name(&self) -> &'static str {
143 match self {
144 CrdtType::GCounter => "GCounter",
145 CrdtType::GSet => "GSet",
146 CrdtType::ORSet => "ORSet",
147 CrdtType::LWWRegister => "LWWRegister",
148 CrdtType::LWWMap => "LWWMap",
149 CrdtType::Rga => "Rga",
150 CrdtType::VectorClock => "VectorClock",
151 CrdtType::VCRegister => "VCRegister",
152 }
153 }
154}
155
156#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
157pub enum PointType {
158 Geographic, Cartesian2D, Cartesian3D, }
162
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
164#[non_exhaustive]
165pub enum DataType {
166 String,
167 Int32,
168 Int64,
169 Float32,
170 Float64,
171 Bool,
172 Timestamp,
173 Date,
174 Time,
175 DateTime,
176 Duration,
177 CypherValue,
178 Bytes,
179 Point(PointType),
180 Vector {
181 dimensions: usize,
182 },
183 SparseVector {
186 dimensions: usize,
187 },
188 Btic,
189 Crdt(CrdtType),
190 List(Box<DataType>),
191 Map(Box<DataType>, Box<DataType>),
192}
193
194impl DataType {
195 #[allow(non_upper_case_globals)]
197 pub const Float: DataType = DataType::Float64;
198 #[allow(non_upper_case_globals)]
199 pub const Int: DataType = DataType::Int64;
200
201 pub fn to_arrow(&self) -> ArrowDataType {
202 match self {
203 DataType::String => ArrowDataType::Utf8,
204 DataType::Int32 => ArrowDataType::Int32,
205 DataType::Int64 => ArrowDataType::Int64,
206 DataType::Float32 => ArrowDataType::Float32,
207 DataType::Float64 => ArrowDataType::Float64,
208 DataType::Bool => ArrowDataType::Boolean,
209 DataType::Timestamp => {
210 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
211 }
212 DataType::Date => ArrowDataType::Date32,
213 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
214 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
215 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
219 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
220 Field::new("latitude", ArrowDataType::Float64, false),
221 Field::new("longitude", ArrowDataType::Float64, false),
222 Field::new("crs", ArrowDataType::Utf8, false),
223 ])),
224 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
225 Field::new("x", ArrowDataType::Float64, false),
226 Field::new("y", ArrowDataType::Float64, false),
227 Field::new("crs", ArrowDataType::Utf8, false),
228 ])),
229 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
230 Field::new("x", ArrowDataType::Float64, false),
231 Field::new("y", ArrowDataType::Float64, false),
232 Field::new("z", ArrowDataType::Float64, false),
233 Field::new("crs", ArrowDataType::Utf8, false),
234 ])),
235 },
236 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
237 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
238 *dimensions as i32,
239 ),
240 DataType::SparseVector { .. } => ArrowDataType::Struct(sparse_vector_struct_fields()),
241 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
242 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
244 let item = Field::new("item", inner.to_arrow(), true);
248 let item = if matches!(**inner, DataType::Bytes) {
249 item.with_metadata(raw_bytes_field_metadata())
250 } else {
251 item
252 };
253 ArrowDataType::List(Arc::new(item))
254 }
255 DataType::Map(key, value) => {
256 let value_field = if value.map_value_is_typed() {
263 let f = Field::new("value", value.to_arrow(), true);
264 if matches!(**value, DataType::Bytes) {
265 f.with_metadata(raw_bytes_field_metadata())
266 } else {
267 f
268 }
269 } else {
270 Field::new("value", ArrowDataType::LargeBinary, true)
271 };
272 ArrowDataType::List(Arc::new(Field::new(
273 "item",
274 ArrowDataType::Struct(Fields::from(vec![
275 Field::new("key", key.to_arrow(), false),
276 value_field,
277 ])),
278 true,
279 )))
280 }
281 }
282 }
283
284 pub fn map_value_is_typed(&self) -> bool {
289 matches!(
290 self,
291 DataType::String
292 | DataType::Int64
293 | DataType::Int32
294 | DataType::Float64
295 | DataType::Float32
296 | DataType::Bool
297 | DataType::Bytes
298 )
299 }
300
301 pub fn accepts(&self, value: &crate::value::Value) -> bool {
327 use crate::value::{TemporalValue, Value};
328
329 if matches!(value, Value::Null) {
331 return true;
332 }
333
334 match self {
335 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
337
338 DataType::String => matches!(value, Value::String(_)),
339 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
340 DataType::Float32 | DataType::Float64 => {
342 matches!(value, Value::Int(_) | Value::Float(_))
343 }
344 DataType::Bool => matches!(value, Value::Bool(_)),
345
346 DataType::Timestamp => matches!(
349 value,
350 Value::String(_)
351 | Value::Int(_)
352 | Value::Temporal(
353 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
354 )
355 ),
356 DataType::DateTime => matches!(
357 value,
358 Value::Temporal(
359 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
360 )
361 ),
362 DataType::Date => {
363 matches!(
364 value,
365 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
366 )
367 }
368 DataType::Time => matches!(
369 value,
370 Value::Int(_)
371 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
372 ),
373 DataType::Duration => {
374 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
375 }
376 DataType::Bytes => matches!(value, Value::Bytes(_)),
377 DataType::Btic => matches!(
379 value,
380 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
381 ),
382 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
383 DataType::SparseVector { .. } => {
388 matches!(value, Value::SparseVector { .. } | Value::Map(_))
389 }
390 DataType::List(_) => matches!(value, Value::List(_)),
391 DataType::Map(_, _) => matches!(value, Value::Map(_)),
392 }
393 }
394}
395
396fn default_created_at() -> DateTime<Utc> {
397 Utc::now()
398}
399
400fn default_state() -> SchemaElementState {
401 SchemaElementState::Active
402}
403
404fn default_version_1() -> u32 {
405 1
406}
407
408#[derive(Clone, Debug, Serialize, Deserialize)]
409pub struct PropertyMeta {
410 pub r#type: DataType,
411 pub nullable: bool,
412 #[serde(default = "default_version_1")]
413 pub added_in: u32, #[serde(default = "default_state")]
415 pub state: SchemaElementState,
416 #[serde(default)]
417 pub generation_expression: Option<String>,
418 #[serde(default, skip_serializing_if = "Option::is_none")]
419 pub description: Option<String>,
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize)]
423pub struct LabelMeta {
424 pub id: u16, #[serde(default = "default_created_at")]
426 pub created_at: DateTime<Utc>,
427 #[serde(default = "default_state")]
428 pub state: SchemaElementState,
429 #[serde(default, skip_serializing_if = "Option::is_none")]
430 pub description: Option<String>,
431}
432
433#[derive(Clone, Debug, Serialize, Deserialize)]
434pub struct EdgeTypeMeta {
435 pub id: u32,
437 pub src_labels: Vec<String>,
438 pub dst_labels: Vec<String>,
439 #[serde(default = "default_state")]
440 pub state: SchemaElementState,
441 #[serde(default, skip_serializing_if = "Option::is_none")]
442 pub description: Option<String>,
443}
444
445#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
446#[non_exhaustive]
447pub enum ConstraintType {
448 Unique { properties: Vec<String> },
449 Exists { property: String },
450 Check { expression: String },
451}
452
453#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
454#[non_exhaustive]
455pub enum ConstraintTarget {
456 Label(String),
457 EdgeType(String),
458}
459
460#[derive(Clone, Debug, Serialize, Deserialize)]
461pub struct Constraint {
462 pub name: String,
463 pub constraint_type: ConstraintType,
464 pub target: ConstraintTarget,
465 pub enabled: bool,
466}
467
468#[derive(Clone, Debug, Serialize, Deserialize)]
474pub struct SchemalessEdgeTypeRegistry {
475 name_to_id: HashMap<String, u32>,
476 id_to_name: HashMap<u32, String>,
477 next_local_id: u32,
479}
480
481impl SchemalessEdgeTypeRegistry {
482 pub fn new() -> Self {
483 Self {
484 name_to_id: HashMap::new(),
485 id_to_name: HashMap::new(),
486 next_local_id: 1,
487 }
488 }
489
490 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
492 if let Some(&id) = self.name_to_id.get(type_name) {
493 return id;
494 }
495
496 let id = make_schemaless_id(self.next_local_id);
497 self.next_local_id += 1;
498
499 self.name_to_id.insert(type_name.to_string(), id);
500 self.id_to_name.insert(id, type_name.to_string());
501
502 id
503 }
504
505 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
507 self.id_to_name.get(&type_id).map(String::as_str)
508 }
509
510 pub fn contains(&self, type_name: &str) -> bool {
512 self.name_to_id.contains_key(type_name)
513 }
514
515 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
517 self.name_to_id.get(type_name).copied()
518 }
519
520 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
522 self.name_to_id
523 .iter()
524 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
525 .map(|(_, &id)| id)
526 }
527
528 pub fn all_type_ids(&self) -> Vec<u32> {
530 self.id_to_name.keys().copied().collect()
531 }
532
533 pub fn is_empty(&self) -> bool {
535 self.name_to_id.is_empty()
536 }
537}
538
539impl Default for SchemalessEdgeTypeRegistry {
540 fn default() -> Self {
541 Self::new()
542 }
543}
544
545pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
551pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
553
554const MAX_SCHEMA_NAME_LEN: usize = 255;
559
560#[inline]
562pub fn is_virtual_label_id(id: u16) -> bool {
563 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
564}
565
566#[derive(Clone, Debug, Serialize, Deserialize)]
567pub struct Schema {
568 pub schema_version: u32,
569 pub labels: HashMap<String, LabelMeta>,
570 pub edge_types: HashMap<String, EdgeTypeMeta>,
571 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
572 #[serde(default)]
573 pub indexes: Vec<IndexDefinition>,
574 #[serde(default)]
575 pub constraints: Vec<Constraint>,
576 #[serde(default)]
578 pub schemaless_registry: SchemalessEdgeTypeRegistry,
579}
580
581impl Default for Schema {
582 fn default() -> Self {
583 Self {
584 schema_version: 1,
585 labels: HashMap::new(),
586 edge_types: HashMap::new(),
587 properties: HashMap::new(),
588 indexes: Vec::new(),
589 constraints: Vec::new(),
590 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
591 }
592 }
593}
594
595impl Schema {
596 fn bump_version(&mut self) {
605 self.schema_version = self.schema_version.wrapping_add(1);
606 }
607
608 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
613 self.labels
614 .iter()
615 .find(|(_, meta)| meta.id == label_id)
616 .map(|(name, _)| name.as_str())
617 }
618
619 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
621 self.labels.get(label_name).map(|meta| meta.id)
622 }
623
624 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
629 self.edge_types
630 .iter()
631 .find(|(_, meta)| meta.id == type_id)
632 .map(|(name, _)| name.as_str())
633 }
634
635 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
637 self.edge_types.get(type_name).map(|meta| meta.id)
638 }
639
640 pub fn vector_index_for_property(
645 &self,
646 label: &str,
647 property: &str,
648 ) -> Option<&VectorIndexConfig> {
649 self.indexes.iter().find_map(|idx| {
650 if let IndexDefinition::Vector(config) = idx
651 && config.label == label
652 && config.property == property
653 && config.metadata.status == IndexStatus::Online
654 {
655 return Some(config);
656 }
657 None
658 })
659 }
660
661 pub fn sparse_index_for_property(
663 &self,
664 label: &str,
665 property: &str,
666 ) -> Option<&SparseVectorIndexConfig> {
667 self.indexes.iter().find_map(|idx| {
668 if let IndexDefinition::Sparse(config) = idx
669 && config.label == label
670 && config.property == property
671 && config.metadata.status == IndexStatus::Online
672 {
673 return Some(config);
674 }
675 None
676 })
677 }
678
679 pub fn fulltext_index_for_property(
684 &self,
685 label: &str,
686 property: &str,
687 ) -> Option<&FullTextIndexConfig> {
688 self.indexes.iter().find_map(|idx| {
689 if let IndexDefinition::FullText(config) = idx
690 && config.label == label
691 && config.properties.iter().any(|p| p == property)
692 && config.metadata.status == IndexStatus::Online
693 {
694 return Some(config);
695 }
696 None
697 })
698 }
699
700 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
705 self.labels
706 .iter()
707 .find(|(k, _)| k.eq_ignore_ascii_case(name))
708 .map(|(_, v)| v)
709 }
710
711 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
718 self.labels
719 .iter()
720 .find(|(k, _)| k.eq_ignore_ascii_case(name))
721 .map(|(k, _)| k.clone())
722 }
723
724 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
726 self.get_label_case_insensitive(label_name)
727 .map(|meta| meta.id)
728 }
729
730 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
735 self.edge_types
736 .iter()
737 .find(|(k, _)| k.eq_ignore_ascii_case(name))
738 .map(|(_, v)| v)
739 }
740
741 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
743 self.get_edge_type_case_insensitive(type_name)
744 .map(|meta| meta.id)
745 }
746
747 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
750 self.edge_type_id_by_name_case_insensitive(type_name)
751 .or_else(|| {
752 self.schemaless_registry
753 .id_by_name_case_insensitive(type_name)
754 })
755 }
756
757 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
763 if let Some(id) = self.edge_type_id_unified(type_name) {
764 return id;
765 }
766 let id = self.schemaless_registry.get_or_assign_id(type_name);
774 self.bump_version();
775 id
776 }
777
778 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
785 self.edge_type_id_by_name(type_name)
786 .or_else(|| self.schemaless_registry.id_by_name(type_name))
787 }
788
789 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
793 if is_schemaless_edge_type(type_id) {
794 self.schemaless_registry
795 .type_name_by_id(type_id)
796 .map(str::to_owned)
797 } else {
798 self.edge_type_name_by_id(type_id).map(str::to_owned)
799 }
800 }
801
802 pub fn all_edge_type_ids(&self) -> Vec<u32> {
805 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
806 ids.extend(self.schemaless_registry.all_type_ids());
807 ids.sort_unstable();
808 ids
809 }
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
814pub enum IndexStatus {
815 #[default]
817 Online,
818 Building,
820 Stale,
822 Failed,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
828pub struct IndexMetadata {
829 #[serde(default)]
831 pub status: IndexStatus,
832 #[serde(default)]
834 pub last_built_at: Option<DateTime<Utc>>,
835 #[serde(default)]
837 pub row_count_at_build: Option<u64>,
838}
839
840#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
841#[serde(tag = "type")]
842#[non_exhaustive]
843pub enum IndexDefinition {
844 Vector(VectorIndexConfig),
845 FullText(FullTextIndexConfig),
846 Scalar(ScalarIndexConfig),
847 Inverted(InvertedIndexConfig),
848 JsonFullText(JsonFtsIndexConfig),
849 Sparse(SparseVectorIndexConfig),
851}
852
853impl IndexDefinition {
854 pub fn name(&self) -> &str {
856 match self {
857 IndexDefinition::Vector(c) => &c.name,
858 IndexDefinition::FullText(c) => &c.name,
859 IndexDefinition::Scalar(c) => &c.name,
860 IndexDefinition::Inverted(c) => &c.name,
861 IndexDefinition::JsonFullText(c) => &c.name,
862 IndexDefinition::Sparse(c) => &c.name,
863 }
864 }
865
866 pub fn label(&self) -> &str {
868 match self {
869 IndexDefinition::Vector(c) => &c.label,
870 IndexDefinition::FullText(c) => &c.label,
871 IndexDefinition::Scalar(c) => &c.label,
872 IndexDefinition::Inverted(c) => &c.label,
873 IndexDefinition::JsonFullText(c) => &c.label,
874 IndexDefinition::Sparse(c) => &c.label,
875 }
876 }
877
878 pub fn metadata(&self) -> &IndexMetadata {
880 match self {
881 IndexDefinition::Vector(c) => &c.metadata,
882 IndexDefinition::FullText(c) => &c.metadata,
883 IndexDefinition::Scalar(c) => &c.metadata,
884 IndexDefinition::Inverted(c) => &c.metadata,
885 IndexDefinition::JsonFullText(c) => &c.metadata,
886 IndexDefinition::Sparse(c) => &c.metadata,
887 }
888 }
889
890 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
892 match self {
893 IndexDefinition::Vector(c) => &mut c.metadata,
894 IndexDefinition::FullText(c) => &mut c.metadata,
895 IndexDefinition::Scalar(c) => &mut c.metadata,
896 IndexDefinition::Inverted(c) => &mut c.metadata,
897 IndexDefinition::JsonFullText(c) => &mut c.metadata,
898 IndexDefinition::Sparse(c) => &mut c.metadata,
899 }
900 }
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
904pub struct InvertedIndexConfig {
905 pub name: String,
906 pub label: String,
907 pub property: String,
908 #[serde(default = "default_normalize")]
909 pub normalize: bool,
910 #[serde(default = "default_max_terms_per_doc")]
911 pub max_terms_per_doc: usize,
912 #[serde(default)]
913 pub metadata: IndexMetadata,
914}
915
916fn default_normalize() -> bool {
917 true
918}
919
920fn default_max_terms_per_doc() -> usize {
921 10_000
922}
923
924#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
931pub struct SparseVectorIndexConfig {
932 pub name: String,
933 pub label: String,
934 pub property: String,
935 pub dimensions: usize,
937 #[serde(default = "default_sparse_quantize")]
939 pub quantize: bool,
940 #[serde(default)]
944 pub embedding_config: Option<EmbeddingConfig>,
945 #[serde(default)]
946 pub metadata: IndexMetadata,
947}
948
949fn default_sparse_quantize() -> bool {
950 true
951}
952
953#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
954pub struct VectorIndexConfig {
955 pub name: String,
956 pub label: String,
957 pub property: String,
958 pub index_type: VectorIndexType,
959 pub metric: DistanceMetric,
960 pub embedding_config: Option<EmbeddingConfig>,
961 #[serde(default)]
962 pub metadata: IndexMetadata,
963}
964
965#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
966pub struct EmbeddingConfig {
967 pub alias: String,
969 pub source_properties: Vec<String>,
970 pub batch_size: usize,
971 #[serde(default)]
974 pub document_prefix: Option<String>,
975 #[serde(default)]
978 pub query_prefix: Option<String>,
979}
980
981#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
982#[non_exhaustive]
983pub enum VectorIndexType {
984 Flat,
985 IvfFlat {
986 num_partitions: u32,
987 },
988 IvfPq {
989 num_partitions: u32,
990 num_sub_vectors: u32,
991 bits_per_subvector: u8,
992 },
993 IvfSq {
994 num_partitions: u32,
995 },
996 IvfRq {
997 num_partitions: u32,
998 #[serde(default)]
999 num_bits: Option<u8>,
1000 },
1001 HnswFlat {
1002 m: u32,
1003 ef_construction: u32,
1004 #[serde(default)]
1005 num_partitions: Option<u32>,
1006 },
1007 HnswSq {
1008 m: u32,
1009 ef_construction: u32,
1010 #[serde(default)]
1011 num_partitions: Option<u32>,
1012 },
1013 HnswPq {
1014 m: u32,
1015 ef_construction: u32,
1016 num_sub_vectors: u32,
1017 #[serde(default)]
1018 num_partitions: Option<u32>,
1019 },
1020 Muvera {
1027 k_sim: u32,
1029 reps: u32,
1031 d_proj: u32,
1033 seed: u64,
1035 inner: Box<VectorIndexType>,
1037 },
1038}
1039
1040#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1041#[non_exhaustive]
1042pub enum DistanceMetric {
1043 Cosine,
1044 L2,
1045 Dot,
1046}
1047
1048impl DistanceMetric {
1049 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
1061 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
1062 match self {
1063 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
1064 DistanceMetric::Cosine => {
1065 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1066 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1067 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1068 let denom = norm_a * norm_b;
1069 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
1070 }
1071 DistanceMetric::Dot => {
1072 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1073 -dot
1074 }
1075 }
1076 }
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1080pub struct FullTextIndexConfig {
1081 pub name: String,
1082 pub label: String,
1083 pub properties: Vec<String>,
1084 pub tokenizer: TokenizerConfig,
1085 pub with_positions: bool,
1086 #[serde(default)]
1087 pub metadata: IndexMetadata,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1091#[non_exhaustive]
1092pub enum TokenizerConfig {
1093 Standard,
1094 Whitespace,
1095 Ngram { min: u8, max: u8 },
1096 Custom { name: String },
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1100pub struct JsonFtsIndexConfig {
1101 pub name: String,
1102 pub label: String,
1103 pub column: String,
1104 #[serde(default)]
1105 pub paths: Vec<String>,
1106 #[serde(default)]
1107 pub with_positions: bool,
1108 #[serde(default)]
1109 pub metadata: IndexMetadata,
1110}
1111
1112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1113pub struct ScalarIndexConfig {
1114 pub name: String,
1115 pub label: String,
1116 pub properties: Vec<String>,
1117 pub index_type: ScalarIndexType,
1118 pub where_clause: Option<String>,
1119 #[serde(default)]
1120 pub metadata: IndexMetadata,
1121}
1122
1123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1124#[non_exhaustive]
1125pub enum ScalarIndexType {
1126 BTree,
1127 Hash,
1128 Bitmap,
1129 LabelList,
1130}
1131
1132pub struct SchemaManager {
1133 store: Arc<dyn ObjectStore>,
1134 path: ObjectStorePath,
1135 schema: RwLock<Arc<Schema>>,
1136}
1137
1138impl SchemaManager {
1139 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1140 let path = path.as_ref();
1141 let parent = path
1142 .parent()
1143 .ok_or_else(|| anyhow!("Invalid schema path"))?;
1144 let filename = path
1145 .file_name()
1146 .ok_or_else(|| anyhow!("Invalid schema filename"))?
1147 .to_str()
1148 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1149
1150 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1151 let obj_path = ObjectStorePath::from(filename);
1152
1153 Self::load_from_store(store, &obj_path).await
1154 }
1155
1156 pub async fn load_from_store(
1157 store: Arc<dyn ObjectStore>,
1158 path: &ObjectStorePath,
1159 ) -> Result<Self> {
1160 match store.get(path).await {
1161 Ok(result) => {
1162 let bytes = result.bytes().await?;
1163 let content = String::from_utf8(bytes.to_vec())?;
1164 let mut schema: Schema = serde_json::from_str(&content)?;
1165 let original_len = schema.indexes.len();
1173 if original_len > 0 {
1174 let mut seen: std::collections::HashSet<String> =
1175 std::collections::HashSet::with_capacity(original_len);
1176 let mut dedup: Vec<IndexDefinition> = schema
1177 .indexes
1178 .iter()
1179 .rev()
1180 .filter(|idx| seen.insert(idx.name().to_string()))
1181 .cloned()
1182 .collect();
1183 dedup.reverse();
1184 if dedup.len() != original_len {
1185 tracing::warn!(
1186 collapsed = original_len - dedup.len(),
1187 kept = dedup.len(),
1188 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1189 );
1190 schema.indexes = dedup;
1191 }
1192 }
1193 Ok(Self {
1194 store,
1195 path: path.clone(),
1196 schema: RwLock::new(Arc::new(schema)),
1197 })
1198 }
1199 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1200 store,
1201 path: path.clone(),
1202 schema: RwLock::new(Arc::new(Schema::default())),
1203 }),
1204 Err(e) => Err(anyhow::Error::from(e)),
1205 }
1206 }
1207
1208 pub async fn save(&self) -> Result<()> {
1209 let content = {
1210 let schema_guard = acquire_read(&self.schema, "schema")?;
1211 serde_json::to_string_pretty(&**schema_guard)?
1212 };
1213 self.store
1214 .put(&self.path, content.into())
1215 .await
1216 .map_err(anyhow::Error::from)?;
1217 Ok(())
1218 }
1219
1220 pub fn path(&self) -> &ObjectStorePath {
1221 &self.path
1222 }
1223
1224 pub fn schema(&self) -> Arc<Schema> {
1225 self.schema
1226 .read()
1227 .expect("Schema lock poisoned - a thread panicked while holding it")
1228 .clone()
1229 }
1230
1231 fn normalize_function_names(expr: &str) -> String {
1234 let mut result = String::with_capacity(expr.len());
1235 let mut chars = expr.chars().peekable();
1236
1237 while let Some(ch) = chars.next() {
1238 if ch.is_alphabetic() {
1239 let mut ident = String::new();
1241 ident.push(ch);
1242
1243 while let Some(&next) = chars.peek() {
1244 if next.is_alphanumeric() || next == '_' {
1245 ident.push(chars.next().unwrap());
1246 } else {
1247 break;
1248 }
1249 }
1250
1251 if chars.peek() == Some(&'(') {
1253 result.push_str(&ident.to_uppercase());
1254 } else {
1255 result.push_str(&ident); }
1257 } else {
1258 result.push(ch);
1259 }
1260 }
1261
1262 result
1263 }
1264
1265 pub fn generated_column_name(expr: &str) -> String {
1273 let normalized = Self::normalize_function_names(expr);
1275
1276 let sanitized = normalized
1277 .replace(|c: char| !c.is_alphanumeric(), "_")
1278 .trim_matches('_')
1279 .to_string();
1280
1281 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1283 const FNV_PRIME: u64 = 1099511628211;
1284
1285 let mut hash = FNV_OFFSET_BASIS;
1286 for byte in normalized.as_bytes() {
1287 hash ^= *byte as u64;
1288 hash = hash.wrapping_mul(FNV_PRIME);
1289 }
1290
1291 format!("_gen_{}_{:x}", sanitized, hash)
1292 }
1293
1294 pub fn replace_schema(&self, new_schema: Schema) {
1295 let mut schema = self
1296 .schema
1297 .write()
1298 .expect("Schema lock poisoned - a thread panicked while holding it");
1299 *schema = Arc::new(new_schema);
1300 }
1301
1302 #[must_use]
1315 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1316 let primary = self.schema();
1317 let merged = if overlay.is_empty() {
1318 (*primary).clone()
1319 } else {
1320 let mut merged = (*primary).clone();
1321 for (name, label) in &overlay.added_labels {
1322 merged.labels.insert(name.clone(), label.clone());
1323 }
1324 for (name, edge_type) in &overlay.added_edge_types {
1325 merged.edge_types.insert(name.clone(), edge_type.clone());
1326 }
1327 for addition in &overlay.added_properties {
1328 let props = merged.properties.entry(addition.owner.clone()).or_default();
1329 props.insert(
1330 addition.property.clone(),
1331 PropertyMeta {
1332 r#type: addition.data_type.clone(),
1333 nullable: addition.nullable,
1334 added_in: merged.schema_version,
1335 state: SchemaElementState::Active,
1336 generation_expression: None,
1337 description: None,
1338 },
1339 );
1340 }
1341 merged
1342 };
1343
1344 Arc::new(Self {
1345 store: self.store.clone(),
1346 path: self.path.clone(),
1347 schema: RwLock::new(Arc::new(merged)),
1348 })
1349 }
1350
1351 pub fn next_label_id(&self) -> u16 {
1352 self.schema()
1353 .labels
1354 .values()
1355 .map(|l| l.id)
1356 .max()
1357 .unwrap_or(0)
1358 + 1
1359 }
1360
1361 pub fn next_type_id(&self) -> u32 {
1362 let max_schema_id = self
1363 .schema()
1364 .edge_types
1365 .values()
1366 .map(|t| t.id)
1367 .max()
1368 .unwrap_or(0);
1369
1370 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1372 panic!("Schema edge type ID exhaustion");
1373 }
1374
1375 max_schema_id + 1
1376 }
1377
1378 pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1396 if name.is_empty() || name.chars().all(char::is_whitespace) {
1397 return Err(anyhow!(
1398 "{kind} name must be non-empty and not all whitespace"
1399 ));
1400 }
1401 if name.len() > MAX_SCHEMA_NAME_LEN {
1402 return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1403 }
1404 if let Some(c) = name
1405 .chars()
1406 .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1407 {
1408 return Err(anyhow!(
1409 "{kind} name '{name}' contains an unsafe character ({c:?})"
1410 ));
1411 }
1412 Ok(())
1413 }
1414
1415 pub fn add_label(&self, name: &str) -> Result<u16> {
1416 self.add_label_with_desc(name, None)
1417 }
1418
1419 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1420 Self::validate_schema_element_name("Label", name)?;
1421 let mut guard = acquire_write(&self.schema, "schema")?;
1422 let schema = Arc::make_mut(&mut *guard);
1423 if schema.labels.contains_key(name) {
1424 return Err(anyhow!("Label '{}' already exists", name));
1425 }
1426
1427 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1428 if id >= VIRTUAL_LABEL_ID_START {
1429 return Err(anyhow!(
1430 "Native label space exhausted (next id {id:#x} would enter the \
1431 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1432 reserved for catalog-resolved labels)"
1433 ));
1434 }
1435 schema.labels.insert(
1436 name.to_string(),
1437 LabelMeta {
1438 id,
1439 created_at: Utc::now(),
1440 state: SchemaElementState::Active,
1441 description,
1442 },
1443 );
1444 schema.bump_version();
1445 Ok(id)
1446 }
1447
1448 pub fn add_edge_type(
1449 &self,
1450 name: &str,
1451 src_labels: Vec<String>,
1452 dst_labels: Vec<String>,
1453 ) -> Result<u32> {
1454 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1455 }
1456
1457 pub fn add_edge_type_with_desc(
1458 &self,
1459 name: &str,
1460 src_labels: Vec<String>,
1461 dst_labels: Vec<String>,
1462 description: Option<String>,
1463 ) -> Result<u32> {
1464 Self::validate_schema_element_name("Edge type", name)?;
1465 let mut guard = acquire_write(&self.schema, "schema")?;
1466 let schema = Arc::make_mut(&mut *guard);
1467 if schema.edge_types.contains_key(name) {
1468 return Err(anyhow!("Edge type '{}' already exists", name));
1469 }
1470
1471 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1472
1473 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1478 return Err(anyhow!(
1479 "Native edge type space exhausted (next id {id:#x} would enter the \
1480 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1481 reserved for catalog-resolved edge types)"
1482 ));
1483 }
1484
1485 schema.edge_types.insert(
1486 name.to_string(),
1487 EdgeTypeMeta {
1488 id,
1489 src_labels,
1490 dst_labels,
1491 state: SchemaElementState::Active,
1492 description,
1493 },
1494 );
1495 schema.bump_version();
1496 Ok(id)
1497 }
1498
1499 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1508 {
1509 let guard = acquire_read(&self.schema, "schema")
1510 .expect("Schema lock poisoned - a thread panicked while holding it");
1511 if let Some(id) = guard.edge_type_id_unified(type_name) {
1512 return id;
1513 }
1514 }
1515 let mut guard = acquire_write(&self.schema, "schema")
1516 .expect("Schema lock poisoned - a thread panicked while holding it");
1517 let schema = Arc::make_mut(&mut *guard);
1518 schema.get_or_assign_edge_type_id(type_name)
1519 }
1520
1521 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1523 let schema = acquire_read(&self.schema, "schema")
1524 .expect("Schema lock poisoned - a thread panicked while holding it");
1525 schema.edge_type_name_by_id_unified(type_id)
1526 }
1527
1528 pub fn add_property(
1529 &self,
1530 label_or_type: &str,
1531 prop_name: &str,
1532 data_type: DataType,
1533 nullable: bool,
1534 ) -> Result<()> {
1535 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1536 }
1537
1538 pub fn add_property_with_desc(
1539 &self,
1540 label_or_type: &str,
1541 prop_name: &str,
1542 data_type: DataType,
1543 nullable: bool,
1544 description: Option<String>,
1545 ) -> Result<()> {
1546 validate_property_name(prop_name)?;
1547 let mut guard = acquire_write(&self.schema, "schema")?;
1548 let schema = Arc::make_mut(&mut *guard);
1549 let version = schema.schema_version;
1550 let props = schema
1551 .properties
1552 .entry(label_or_type.to_string())
1553 .or_default();
1554
1555 if props.contains_key(prop_name) {
1556 return Err(anyhow!(
1557 "Property '{}' already exists for '{}'",
1558 prop_name,
1559 label_or_type
1560 ));
1561 }
1562
1563 props.insert(
1564 prop_name.to_string(),
1565 PropertyMeta {
1566 r#type: data_type,
1567 nullable,
1568 added_in: version,
1569 state: SchemaElementState::Active,
1570 generation_expression: None,
1571 description,
1572 },
1573 );
1574 schema.bump_version();
1576 Ok(())
1577 }
1578
1579 pub fn add_internal_property(
1590 &self,
1591 label_or_type: &str,
1592 prop_name: &str,
1593 data_type: DataType,
1594 nullable: bool,
1595 ) -> Result<bool> {
1596 validate_reserved_property_name(prop_name)?;
1597 let mut guard = acquire_write(&self.schema, "schema")?;
1598 let schema = Arc::make_mut(&mut *guard);
1599 let version = schema.schema_version;
1600 let props = schema
1601 .properties
1602 .entry(label_or_type.to_string())
1603 .or_default();
1604
1605 if let Some(existing) = props.get(prop_name) {
1606 if existing.r#type == data_type {
1607 return Ok(false); }
1609 return Err(anyhow!(
1610 "Internal property '{}' already exists for '{}' with a different type",
1611 prop_name,
1612 label_or_type
1613 ));
1614 }
1615
1616 props.insert(
1617 prop_name.to_string(),
1618 PropertyMeta {
1619 r#type: data_type,
1620 nullable,
1621 added_in: version,
1622 state: SchemaElementState::Active,
1623 generation_expression: None,
1624 description: None,
1625 },
1626 );
1627 schema.bump_version();
1628 Ok(true)
1629 }
1630
1631 pub fn add_generated_property(
1632 &self,
1633 label_or_type: &str,
1634 prop_name: &str,
1635 data_type: DataType,
1636 expr: String,
1637 ) -> Result<()> {
1638 validate_reserved_property_name(prop_name)?;
1641 let mut guard = acquire_write(&self.schema, "schema")?;
1642 let schema = Arc::make_mut(&mut *guard);
1643 let version = schema.schema_version;
1644 let props = schema
1645 .properties
1646 .entry(label_or_type.to_string())
1647 .or_default();
1648
1649 if props.contains_key(prop_name) {
1650 return Err(anyhow!("Property '{}' already exists", prop_name));
1651 }
1652
1653 props.insert(
1654 prop_name.to_string(),
1655 PropertyMeta {
1656 r#type: data_type,
1657 nullable: true,
1658 added_in: version,
1659 state: SchemaElementState::Active,
1660 generation_expression: Some(expr),
1661 description: None,
1662 },
1663 );
1664 schema.bump_version();
1666 Ok(())
1667 }
1668
1669 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1670 let mut guard = acquire_write(&self.schema, "schema")?;
1671 let schema = Arc::make_mut(&mut *guard);
1672 let meta = schema
1673 .labels
1674 .get_mut(name)
1675 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1676 meta.description = description;
1677 Ok(())
1678 }
1679
1680 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1681 let mut guard = acquire_write(&self.schema, "schema")?;
1682 let schema = Arc::make_mut(&mut *guard);
1683 let meta = schema
1684 .edge_types
1685 .get_mut(name)
1686 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1687 meta.description = description;
1688 Ok(())
1689 }
1690
1691 pub fn set_property_description(
1692 &self,
1693 entity: &str,
1694 prop_name: &str,
1695 description: Option<String>,
1696 ) -> Result<()> {
1697 let mut guard = acquire_write(&self.schema, "schema")?;
1698 let schema = Arc::make_mut(&mut *guard);
1699 let props = schema
1700 .properties
1701 .get_mut(entity)
1702 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1703 let meta = props
1704 .get_mut(prop_name)
1705 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1706 meta.description = description;
1707 Ok(())
1708 }
1709
1710 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1719 let mut guard = acquire_write(&self.schema, "schema")?;
1720 let schema = Arc::make_mut(&mut *guard);
1721 if let Some(existing) = schema
1722 .indexes
1723 .iter_mut()
1724 .find(|i| i.name() == index_def.name())
1725 {
1726 *existing = index_def;
1727 } else {
1728 schema.indexes.push(index_def);
1729 }
1730 schema.bump_version();
1731 Ok(())
1732 }
1733
1734 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1735 let schema = self.schema.read().expect("Schema lock poisoned");
1736 schema.indexes.iter().find(|i| i.name() == name).cloned()
1737 }
1738
1739 pub fn update_index_metadata(
1744 &self,
1745 index_name: &str,
1746 f: impl FnOnce(&mut IndexMetadata),
1747 ) -> Result<()> {
1748 let mut guard = acquire_write(&self.schema, "schema")?;
1749 let schema = Arc::make_mut(&mut *guard);
1750 let idx = schema
1751 .indexes
1752 .iter_mut()
1753 .find(|i| i.name() == index_name)
1754 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1755 f(idx.metadata_mut());
1756 Ok(())
1757 }
1758
1759 pub fn remove_index(&self, name: &str) -> Result<()> {
1760 let mut guard = acquire_write(&self.schema, "schema")?;
1761 let schema = Arc::make_mut(&mut *guard);
1762 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1763 schema.indexes.remove(pos);
1764 schema.bump_version();
1765 Ok(())
1766 } else {
1767 Err(anyhow!("Index '{}' not found", name))
1768 }
1769 }
1770
1771 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1772 let mut guard = acquire_write(&self.schema, "schema")?;
1773 let schema = Arc::make_mut(&mut *guard);
1774 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1775 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1776 }
1777 schema.constraints.push(constraint);
1778 schema.bump_version();
1779 Ok(())
1780 }
1781
1782 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1783 let mut guard = acquire_write(&self.schema, "schema")?;
1784 let schema = Arc::make_mut(&mut *guard);
1785 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1786 schema.constraints.remove(pos);
1787 schema.bump_version();
1788 Ok(())
1789 } else if if_exists {
1790 Ok(())
1791 } else {
1792 Err(anyhow!("Constraint '{}' not found", name))
1793 }
1794 }
1795
1796 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1797 let mut guard = acquire_write(&self.schema, "schema")?;
1798 let schema = Arc::make_mut(&mut *guard);
1799 let Some(props) = schema.properties.get_mut(label_or_type) else {
1800 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1801 };
1802 if props.remove(prop_name).is_none() {
1803 return Err(anyhow!(
1804 "Property '{}' not found for '{}'",
1805 prop_name,
1806 label_or_type
1807 ));
1808 }
1809 schema.bump_version();
1810 Ok(())
1811 }
1812
1813 pub fn rename_property(
1814 &self,
1815 label_or_type: &str,
1816 old_name: &str,
1817 new_name: &str,
1818 ) -> Result<()> {
1819 let mut guard = acquire_write(&self.schema, "schema")?;
1820 let schema = Arc::make_mut(&mut *guard);
1821 let Some(props) = schema.properties.get_mut(label_or_type) else {
1822 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1823 };
1824 let Some(meta) = props.remove(old_name) else {
1825 return Err(anyhow!(
1826 "Property '{}' not found for '{}'",
1827 old_name,
1828 label_or_type
1829 ));
1830 };
1831 if props.contains_key(new_name) {
1832 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1835 }
1836 props.insert(new_name.to_string(), meta);
1837 schema.bump_version();
1838 Ok(())
1839 }
1840
1841 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1842 let mut guard = acquire_write(&self.schema, "schema")?;
1843 let schema = Arc::make_mut(&mut *guard);
1844 if let Some(label_meta) = schema.labels.get_mut(name) {
1845 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1846 schema.bump_version();
1848 Ok(())
1849 } else if if_exists {
1850 Ok(())
1851 } else {
1852 Err(anyhow!("Label '{}' not found", name))
1853 }
1854 }
1855
1856 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1857 let mut guard = acquire_write(&self.schema, "schema")?;
1858 let schema = Arc::make_mut(&mut *guard);
1859 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1860 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1861 schema.bump_version();
1863 Ok(())
1864 } else if if_exists {
1865 Ok(())
1866 } else {
1867 Err(anyhow!("Edge Type '{}' not found", name))
1868 }
1869 }
1870}
1871
1872pub fn validate_identifier(name: &str) -> Result<()> {
1874 if name.is_empty() || name.len() > 64 {
1876 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1877 }
1878
1879 let first = name.chars().next().unwrap();
1881 if !first.is_alphabetic() && first != '_' {
1882 return Err(anyhow!(
1883 "Identifier '{}' must start with letter or underscore",
1884 name
1885 ));
1886 }
1887
1888 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1890 return Err(anyhow!(
1891 "Identifier '{}' must contain only alphanumeric and underscore",
1892 name
1893 ));
1894 }
1895
1896 const RESERVED: &[&str] = &[
1898 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1899 "UNION", "ORDER", "LIMIT",
1900 ];
1901 if RESERVED.contains(&name.to_uppercase().as_str()) {
1902 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1903 }
1904
1905 Ok(())
1906}
1907
1908pub fn validate_property_name(name: &str) -> Result<()> {
1915 if name.starts_with('_') {
1916 return Err(anyhow!(
1917 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1918 name
1919 ));
1920 }
1921 validate_reserved_property_name(name)
1922}
1923
1924fn validate_reserved_property_name(name: &str) -> Result<()> {
1931 const RESERVED_PROPS: &[&str] = &[
1940 "ext_id",
1941 "overflow_json",
1942 "eid",
1943 "src_vid",
1944 "dst_vid",
1945 "op",
1946 "__set_struct__",
1954 ];
1955 if RESERVED_PROPS.contains(&name) {
1956 return Err(anyhow!(
1957 "Property name '{}' is reserved by the storage layer; please choose a different name",
1958 name
1959 ));
1960 }
1961 Ok(())
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966 use super::*;
1967 use crate::value::{TemporalValue, Value};
1968 use object_store::local::LocalFileSystem;
1969 use tempfile::tempdir;
1970
1971 #[test]
1972 fn test_datatype_accepts_matrix() {
1973 let dt = || TemporalValue::DateTime {
1974 nanos_since_epoch: 0,
1975 offset_seconds: 0,
1976 timezone_name: None,
1977 };
1978
1979 for ty in [
1981 DataType::String,
1982 DataType::Int64,
1983 DataType::Bool,
1984 DataType::DateTime,
1985 DataType::Float64,
1986 ] {
1987 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1988 }
1989
1990 assert!(DataType::String.accepts(&Value::String("x".into())));
1992 assert!(DataType::Int64.accepts(&Value::Int(1)));
1993 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1994 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1995
1996 assert!(
1998 DataType::Float64.accepts(&Value::Int(3)),
1999 "Int widens to Float"
2000 );
2001 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
2002 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
2003 assert!(
2004 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2005 "storage parses strings for non-struct Timestamp columns"
2006 );
2007
2008 assert!(
2010 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2011 "String into a DateTime struct column nulls silently — reject here"
2012 );
2013 assert!(!DataType::Bool.accepts(&Value::Int(1)));
2014 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
2015 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
2016 assert!(
2017 !DataType::String.accepts(&Value::Int(10)),
2018 "no implicit stringification"
2019 );
2020 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
2021
2022 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
2024 }
2025
2026 #[tokio::test]
2027 async fn test_schema_management() -> Result<()> {
2028 let dir = tempdir()?;
2029 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2030 let path = ObjectStorePath::from("schema.json");
2031 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2032
2033 let lid = manager.add_label("Person")?;
2035 assert_eq!(lid, 1);
2036 assert!(manager.add_label("Person").is_err());
2037
2038 manager.add_property("Person", "name", DataType::String, false)?;
2040 assert!(
2041 manager
2042 .add_property("Person", "name", DataType::String, false)
2043 .is_err()
2044 );
2045
2046 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
2048 assert_eq!(tid, 1);
2049
2050 manager.save().await?;
2051 assert!(store.get(&path).await.is_ok());
2053
2054 let manager2 = SchemaManager::load_from_store(store, &path).await?;
2055 assert!(manager2.schema().labels.contains_key("Person"));
2056 assert!(
2057 manager2
2058 .schema()
2059 .properties
2060 .get("Person")
2061 .unwrap()
2062 .contains_key("name")
2063 );
2064
2065 Ok(())
2066 }
2067
2068 #[tokio::test]
2069 async fn test_reserved_property_names_rejected() -> Result<()> {
2070 let dir = tempdir()?;
2071 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2072 let path = ObjectStorePath::from("schema.json");
2073 let manager = SchemaManager::load_from_store(store, &path).await?;
2074
2075 manager.add_label("Tiny")?;
2076
2077 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
2081 let err = manager
2082 .add_property("Tiny", reserved, DataType::String, true)
2083 .expect_err(&format!("expected '{reserved}' to be rejected"));
2084 assert!(
2085 err.to_string().contains("reserved"),
2086 "error for '{reserved}' should mention 'reserved', got: {err}"
2087 );
2088 }
2089
2090 let err = manager
2095 .add_property("Tiny", "__set_struct__", DataType::String, true)
2096 .expect_err("expected '__set_struct__' to be rejected");
2097 assert!(
2098 err.to_string().contains("reserved"),
2099 "__set_struct__ rejection should mention 'reserved', got: {err}"
2100 );
2101
2102 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2104 assert!(
2105 manager
2106 .add_property("Tiny", reserved, DataType::String, true)
2107 .is_err(),
2108 "expected '{reserved}' to be rejected"
2109 );
2110 }
2111
2112 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2115 manager.add_property("Tiny", "user_op", DataType::String, true)?;
2116 manager.add_property("Tiny", "type_name", DataType::String, true)?;
2117
2118 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2120 assert!(
2121 manager
2122 .add_property("knows", "src_vid", DataType::Int64, true)
2123 .is_err()
2124 );
2125
2126 assert!(
2128 manager
2129 .add_generated_property(
2130 "Tiny",
2131 "ext_id",
2132 DataType::String,
2133 "concat('x', name)".into()
2134 )
2135 .is_err()
2136 );
2137
2138 Ok(())
2139 }
2140
2141 #[test]
2142 fn test_normalize_function_names() {
2143 assert_eq!(
2144 SchemaManager::normalize_function_names("lower(email)"),
2145 "LOWER(email)"
2146 );
2147 assert_eq!(
2148 SchemaManager::normalize_function_names("LOWER(email)"),
2149 "LOWER(email)"
2150 );
2151 assert_eq!(
2152 SchemaManager::normalize_function_names("Lower(email)"),
2153 "LOWER(email)"
2154 );
2155 assert_eq!(
2156 SchemaManager::normalize_function_names("trim(lower(email))"),
2157 "TRIM(LOWER(email))"
2158 );
2159 }
2160
2161 #[test]
2162 fn test_generated_column_name_case_insensitive() {
2163 let col1 = SchemaManager::generated_column_name("lower(email)");
2164 let col2 = SchemaManager::generated_column_name("LOWER(email)");
2165 let col3 = SchemaManager::generated_column_name("Lower(email)");
2166 assert_eq!(col1, col2);
2167 assert_eq!(col2, col3);
2168 assert!(col1.starts_with("_gen_LOWER_email_"));
2169 }
2170
2171 #[test]
2172 fn test_index_metadata_serde_backward_compat() {
2173 let json = r#"{
2175 "type": "Scalar",
2176 "name": "idx_person_name",
2177 "label": "Person",
2178 "properties": ["name"],
2179 "index_type": "BTree",
2180 "where_clause": null
2181 }"#;
2182 let def: IndexDefinition = serde_json::from_str(json).unwrap();
2183 let meta = def.metadata();
2184 assert_eq!(meta.status, IndexStatus::Online);
2185 assert!(meta.last_built_at.is_none());
2186 assert!(meta.row_count_at_build.is_none());
2187 }
2188
2189 #[test]
2190 fn test_index_metadata_serde_roundtrip() {
2191 let now = Utc::now();
2192 let def = IndexDefinition::Scalar(ScalarIndexConfig {
2193 name: "idx_test".to_string(),
2194 label: "Test".to_string(),
2195 properties: vec!["prop".to_string()],
2196 index_type: ScalarIndexType::BTree,
2197 where_clause: None,
2198 metadata: IndexMetadata {
2199 status: IndexStatus::Building,
2200 last_built_at: Some(now),
2201 row_count_at_build: Some(42),
2202 },
2203 });
2204
2205 let json = serde_json::to_string(&def).unwrap();
2206 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2207 assert_eq!(parsed.metadata().status, IndexStatus::Building);
2208 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2209 assert!(parsed.metadata().last_built_at.is_some());
2210 }
2211
2212 #[tokio::test]
2213 async fn test_update_index_metadata() -> Result<()> {
2214 let dir = tempdir()?;
2215 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2216 let path = ObjectStorePath::from("schema.json");
2217 let manager = SchemaManager::load_from_store(store, &path).await?;
2218
2219 manager.add_label("Person")?;
2220 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2221 name: "idx_test".to_string(),
2222 label: "Person".to_string(),
2223 properties: vec!["name".to_string()],
2224 index_type: ScalarIndexType::BTree,
2225 where_clause: None,
2226 metadata: Default::default(),
2227 });
2228 manager.add_index(idx)?;
2229
2230 let initial = manager.get_index("idx_test").unwrap();
2232 assert_eq!(initial.metadata().status, IndexStatus::Online);
2233
2234 manager.update_index_metadata("idx_test", |m| {
2236 m.status = IndexStatus::Building;
2237 m.row_count_at_build = Some(100);
2238 })?;
2239
2240 let updated = manager.get_index("idx_test").unwrap();
2241 assert_eq!(updated.metadata().status, IndexStatus::Building);
2242 assert_eq!(updated.metadata().row_count_at_build, Some(100));
2243
2244 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2246
2247 Ok(())
2248 }
2249
2250 #[tokio::test]
2255 async fn add_internal_property_reports_newly_added() -> Result<()> {
2256 let dir = tempdir()?;
2257 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2258 let path = ObjectStorePath::from("schema.json");
2259 let manager = SchemaManager::load_from_store(store, &path).await?;
2260 manager.add_label("Doc")?;
2261
2262 let dt = DataType::Vector { dimensions: 16 };
2263 assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2265 assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2267 assert!(
2269 manager
2270 .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2271 .is_err()
2272 );
2273 Ok(())
2274 }
2275
2276 #[tokio::test]
2281 async fn test_add_index_is_upsert_by_name() -> Result<()> {
2282 let dir = tempdir()?;
2283 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2284 let path = ObjectStorePath::from("schema.json");
2285 let manager = SchemaManager::load_from_store(store, &path).await?;
2286 manager.add_label("Person")?;
2287
2288 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2289 name: "idx_test".to_string(),
2290 label: "Person".to_string(),
2291 properties: vec!["name".to_string()],
2292 index_type: ScalarIndexType::BTree,
2293 where_clause: None,
2294 metadata: IndexMetadata {
2295 status: IndexStatus::Building,
2296 ..Default::default()
2297 },
2298 });
2299 manager.add_index(initial.clone())?;
2300 assert_eq!(manager.schema().indexes.len(), 1);
2301
2302 manager.add_index(initial.clone())?;
2304 assert_eq!(
2305 manager.schema().indexes.len(),
2306 1,
2307 "duplicate add_index by name must not append"
2308 );
2309
2310 let mut updated_cfg = match initial {
2312 IndexDefinition::Scalar(c) => c,
2313 _ => unreachable!(),
2314 };
2315 updated_cfg.metadata.status = IndexStatus::Online;
2316 updated_cfg.metadata.row_count_at_build = Some(42);
2317 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2318 assert_eq!(manager.schema().indexes.len(), 1);
2319 let stored = manager.get_index("idx_test").unwrap();
2320 assert_eq!(stored.metadata().status, IndexStatus::Online);
2321 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2322
2323 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2325 name: "idx_other".to_string(),
2326 label: "Person".to_string(),
2327 properties: vec!["age".to_string()],
2328 index_type: ScalarIndexType::BTree,
2329 where_clause: None,
2330 metadata: IndexMetadata::default(),
2331 });
2332 manager.add_index(other)?;
2333 assert_eq!(manager.schema().indexes.len(), 2);
2334
2335 Ok(())
2336 }
2337
2338 #[tokio::test]
2341 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2342 let dir = tempdir()?;
2343 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2344 let path = ObjectStorePath::from("schema.json");
2345
2346 let mut schema = Schema::default();
2350 schema.labels.insert(
2351 "Person".to_string(),
2352 LabelMeta {
2353 id: 1,
2354 created_at: chrono::Utc::now(),
2355 state: SchemaElementState::Active,
2356 description: None,
2357 },
2358 );
2359 let make = |status: IndexStatus, count: Option<u64>| {
2360 IndexDefinition::Scalar(ScalarIndexConfig {
2361 name: "idx_dup".to_string(),
2362 label: "Person".to_string(),
2363 properties: vec!["name".to_string()],
2364 index_type: ScalarIndexType::BTree,
2365 where_clause: None,
2366 metadata: IndexMetadata {
2367 status,
2368 row_count_at_build: count,
2369 ..Default::default()
2370 },
2371 })
2372 };
2373 for _ in 0..49 {
2374 schema.indexes.push(make(IndexStatus::Building, None));
2375 }
2376 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2377 let json = serde_json::to_string_pretty(&schema)?;
2378 store.put(&path, json.into()).await?;
2379
2380 let manager = SchemaManager::load_from_store(store, &path).await?;
2381 let schema = manager.schema();
2382 assert_eq!(
2383 schema.indexes.len(),
2384 1,
2385 "load() must collapse 50 duplicates by name to 1"
2386 );
2387 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2389 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2390
2391 Ok(())
2392 }
2393
2394 #[test]
2395 fn test_vector_index_for_property_skips_non_online() {
2396 let mut schema = Schema::default();
2397 schema.labels.insert(
2398 "Document".to_string(),
2399 LabelMeta {
2400 id: 1,
2401 created_at: chrono::Utc::now(),
2402 state: SchemaElementState::Active,
2403 description: None,
2404 },
2405 );
2406
2407 schema
2409 .indexes
2410 .push(IndexDefinition::Vector(VectorIndexConfig {
2411 name: "vec_doc_embedding".to_string(),
2412 label: "Document".to_string(),
2413 property: "embedding".to_string(),
2414 index_type: VectorIndexType::Flat,
2415 metric: DistanceMetric::Cosine,
2416 embedding_config: None,
2417 metadata: IndexMetadata {
2418 status: IndexStatus::Stale,
2419 ..Default::default()
2420 },
2421 }));
2422
2423 assert!(
2425 schema
2426 .vector_index_for_property("Document", "embedding")
2427 .is_none()
2428 );
2429
2430 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2432 cfg.metadata.status = IndexStatus::Online;
2433 }
2434 let result = schema.vector_index_for_property("Document", "embedding");
2435 assert!(result.is_some());
2436 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2437 }
2438
2439 #[tokio::test]
2440 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2441 use crate::core::fork::SchemaDelta;
2442
2443 let dir = tempdir()?;
2444 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2445 let path = ObjectStorePath::from("schema.json");
2446 let primary = SchemaManager::load_from_store(store, &path).await?;
2447 primary.add_label("Person")?;
2448
2449 let overlay = primary.with_overlay(&SchemaDelta::empty());
2450 assert_eq!(overlay.schema().labels.len(), 1);
2451
2452 overlay.add_label("Forked")?;
2455 assert!(overlay.schema().labels.contains_key("Forked"));
2456 assert!(!primary.schema().labels.contains_key("Forked"));
2457
2458 Ok(())
2459 }
2460
2461 #[tokio::test]
2462 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2463 use crate::core::fork::SchemaDelta;
2464 use chrono::Utc;
2465
2466 let dir = tempdir()?;
2467 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2468 let path = ObjectStorePath::from("schema.json");
2469 let primary = SchemaManager::load_from_store(store, &path).await?;
2470 primary.add_label("Existing")?;
2471
2472 let label_meta = LabelMeta {
2473 id: 99,
2474 created_at: Utc::now(),
2475 state: SchemaElementState::Active,
2476 description: None,
2477 };
2478 let edge_meta = EdgeTypeMeta {
2479 id: 99,
2480 src_labels: vec!["NewLabel".into()],
2481 dst_labels: vec!["NewLabel".into()],
2482 state: SchemaElementState::Active,
2483 description: None,
2484 };
2485 let delta = SchemaDelta {
2486 added_labels: vec![("NewLabel".to_string(), label_meta)],
2487 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2488 added_properties: vec![],
2489 };
2490
2491 let overlay = primary.with_overlay(&delta);
2492 let merged = overlay.schema();
2493 assert!(merged.labels.contains_key("Existing"));
2494 assert!(merged.labels.contains_key("NewLabel"));
2495 assert!(merged.edge_types.contains_key("NewEdge"));
2496
2497 assert!(!primary.schema().labels.contains_key("NewLabel"));
2499 Ok(())
2500 }
2501
2502 #[tokio::test]
2507 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2508 let dir = tempdir()?;
2509 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2510 let path = ObjectStorePath::from("schema.json");
2511 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2512
2513 let mut handles = Vec::new();
2514 for _ in 0..16 {
2515 let m = manager.clone();
2516 handles.push(std::thread::spawn(move || {
2517 m.get_or_assign_edge_type_id("RACED")
2518 }));
2519 }
2520 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2521 assert!(
2522 ids.iter().all(|&id| id == ids[0]),
2523 "all racers must observe one id, got {ids:?}"
2524 );
2525 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2527
2528 manager.add_label("A")?;
2530 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2531 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2532 Ok(())
2533 }
2534
2535 #[test]
2540 fn test_new_schemaless_edge_type_bumps_schema_version() {
2541 let mut schema = Schema::default();
2542 let v0 = schema.schema_version;
2543
2544 let id1 = schema.get_or_assign_edge_type_id("FRESH");
2545 assert_eq!(
2546 schema.schema_version,
2547 v0.wrapping_add(1),
2548 "minting a new edge type must bump schema_version"
2549 );
2550
2551 let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2553 assert_eq!(id1, id1_again);
2554 assert_eq!(
2555 schema.schema_version,
2556 v0.wrapping_add(1),
2557 "resolving an existing edge type must not bump schema_version"
2558 );
2559
2560 let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2562 assert_eq!(
2563 schema.schema_version,
2564 v0.wrapping_add(2),
2565 "a second new edge type must bump schema_version again"
2566 );
2567 }
2568
2569 #[test]
2573 fn validate_schema_element_name_rejects_unsafe() {
2574 for bad in ["", " ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2575 assert!(
2576 SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2577 "expected {bad:?} to be rejected"
2578 );
2579 }
2580 for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2581 assert!(
2582 SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2583 "expected {good:?} to be accepted"
2584 );
2585 }
2586 let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2588 assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2589 }
2590}