1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79pub fn sparse_vector_struct_fields() -> Fields {
86 Fields::from(vec![
87 Field::new(
88 "indices",
89 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt32, true))),
90 false,
91 ),
92 Field::new(
93 "values",
94 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Float32, true))),
95 false,
96 ),
97 ])
98}
99
100pub fn is_sparse_vector_struct(arrow_dt: &ArrowDataType) -> bool {
102 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == sparse_vector_struct_fields())
103}
104
105pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
113 HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
117#[non_exhaustive]
118pub enum CrdtType {
119 GCounter,
120 GSet,
121 ORSet,
122 LWWRegister,
123 LWWMap,
124 Rga,
125 VectorClock,
126 VCRegister,
127}
128
129impl CrdtType {
130 #[must_use]
142 pub fn type_name(&self) -> &'static str {
143 match self {
144 CrdtType::GCounter => "GCounter",
145 CrdtType::GSet => "GSet",
146 CrdtType::ORSet => "ORSet",
147 CrdtType::LWWRegister => "LWWRegister",
148 CrdtType::LWWMap => "LWWMap",
149 CrdtType::Rga => "Rga",
150 CrdtType::VectorClock => "VectorClock",
151 CrdtType::VCRegister => "VCRegister",
152 }
153 }
154}
155
156#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
157pub enum PointType {
158 Geographic, Cartesian2D, Cartesian3D, }
162
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
164#[non_exhaustive]
165pub enum DataType {
166 String,
167 Int32,
168 Int64,
169 Float32,
170 Float64,
171 Bool,
172 Timestamp,
173 Date,
174 Time,
175 DateTime,
176 Duration,
177 CypherValue,
178 Bytes,
179 Point(PointType),
180 Vector {
181 dimensions: usize,
182 },
183 SparseVector {
186 dimensions: usize,
187 },
188 Btic,
189 Crdt(CrdtType),
190 List(Box<DataType>),
191 Map(Box<DataType>, Box<DataType>),
192}
193
194impl DataType {
195 #[allow(non_upper_case_globals)]
197 pub const Float: DataType = DataType::Float64;
198 #[allow(non_upper_case_globals)]
199 pub const Int: DataType = DataType::Int64;
200
201 pub fn to_arrow(&self) -> ArrowDataType {
202 match self {
203 DataType::String => ArrowDataType::Utf8,
204 DataType::Int32 => ArrowDataType::Int32,
205 DataType::Int64 => ArrowDataType::Int64,
206 DataType::Float32 => ArrowDataType::Float32,
207 DataType::Float64 => ArrowDataType::Float64,
208 DataType::Bool => ArrowDataType::Boolean,
209 DataType::Timestamp => {
210 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
211 }
212 DataType::Date => ArrowDataType::Date32,
213 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
214 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
215 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
219 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
220 Field::new("latitude", ArrowDataType::Float64, false),
221 Field::new("longitude", ArrowDataType::Float64, false),
222 Field::new("crs", ArrowDataType::Utf8, false),
223 ])),
224 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
225 Field::new("x", ArrowDataType::Float64, false),
226 Field::new("y", ArrowDataType::Float64, false),
227 Field::new("crs", ArrowDataType::Utf8, false),
228 ])),
229 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
230 Field::new("x", ArrowDataType::Float64, false),
231 Field::new("y", ArrowDataType::Float64, false),
232 Field::new("z", ArrowDataType::Float64, false),
233 Field::new("crs", ArrowDataType::Utf8, false),
234 ])),
235 },
236 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
237 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
238 *dimensions as i32,
239 ),
240 DataType::SparseVector { .. } => ArrowDataType::Struct(sparse_vector_struct_fields()),
241 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
242 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
244 let item = Field::new("item", inner.to_arrow(), true);
248 let item = if matches!(**inner, DataType::Bytes) {
249 item.with_metadata(raw_bytes_field_metadata())
250 } else {
251 item
252 };
253 ArrowDataType::List(Arc::new(item))
254 }
255 DataType::Map(key, value) => {
256 let value_field = if value.map_value_is_typed() {
263 let f = Field::new("value", value.to_arrow(), true);
264 if matches!(**value, DataType::Bytes) {
265 f.with_metadata(raw_bytes_field_metadata())
266 } else {
267 f
268 }
269 } else {
270 Field::new("value", ArrowDataType::LargeBinary, true)
271 };
272 ArrowDataType::List(Arc::new(Field::new(
273 "item",
274 ArrowDataType::Struct(Fields::from(vec![
275 Field::new("key", key.to_arrow(), false),
276 value_field,
277 ])),
278 true,
279 )))
280 }
281 }
282 }
283
284 pub fn map_value_is_typed(&self) -> bool {
289 matches!(
290 self,
291 DataType::String
292 | DataType::Int64
293 | DataType::Int32
294 | DataType::Float64
295 | DataType::Float32
296 | DataType::Bool
297 | DataType::Bytes
298 )
299 }
300
301 pub fn accepts(&self, value: &crate::value::Value) -> bool {
327 use crate::value::{TemporalValue, Value};
328
329 if matches!(value, Value::Null) {
331 return true;
332 }
333
334 match self {
335 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
337
338 DataType::String => matches!(value, Value::String(_)),
339 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
340 DataType::Float32 | DataType::Float64 => {
342 matches!(value, Value::Int(_) | Value::Float(_))
343 }
344 DataType::Bool => matches!(value, Value::Bool(_)),
345
346 DataType::Timestamp => matches!(
349 value,
350 Value::String(_)
351 | Value::Int(_)
352 | Value::Temporal(
353 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
354 )
355 ),
356 DataType::DateTime => matches!(
357 value,
358 Value::Temporal(
359 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
360 )
361 ),
362 DataType::Date => {
363 matches!(
364 value,
365 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
366 )
367 }
368 DataType::Time => matches!(
369 value,
370 Value::Int(_)
371 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
372 ),
373 DataType::Duration => {
374 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
375 }
376 DataType::Bytes => matches!(value, Value::Bytes(_)),
377 DataType::Btic => matches!(
379 value,
380 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
381 ),
382 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
385 DataType::SparseVector { .. } => {
390 matches!(value, Value::SparseVector { .. } | Value::Map(_))
391 }
392 DataType::List(_) => matches!(value, Value::List(_)),
395 DataType::Map(_, _) => matches!(value, Value::Map(_)),
396 }
397 }
398
399 pub fn check_vector_dims(&self, value: &crate::value::Value) -> Result<(), VectorDimError> {
417 use crate::value::Value;
418
419 if matches!(value, Value::Null) {
420 return Ok(());
421 }
422
423 match self {
424 DataType::Vector { dimensions } => check_dense_vector_value(value, *dimensions),
425 DataType::List(inner) => {
426 let DataType::Vector { dimensions } = inner.as_ref() else {
427 return Ok(());
428 };
429 let Value::List(tokens) = value else {
430 return Err(VectorDimError::NotATokenList {
431 actual: value_variant_name(value),
432 });
433 };
434 for (token, token_value) in tokens.iter().enumerate() {
435 check_dense_vector_value(token_value, *dimensions)
436 .map_err(|e| e.for_token(token))?;
437 }
438 Ok(())
439 }
440 _ => Ok(()),
441 }
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Eq)]
451pub enum VectorDimError {
452 WrongLength {
454 expected: usize,
456 actual: usize,
458 },
459 NonNumericElement {
461 index: usize,
463 },
464 NotAVector {
466 actual: &'static str,
468 },
469 TokenWrongLength {
471 token: usize,
473 expected: usize,
475 actual: usize,
477 },
478 TokenNonNumericElement {
480 token: usize,
482 index: usize,
484 },
485 TokenNotAVector {
487 token: usize,
489 actual: &'static str,
491 },
492 NotATokenList {
494 actual: &'static str,
496 },
497}
498
499impl VectorDimError {
500 fn for_token(self, token: usize) -> Self {
502 match self {
503 Self::WrongLength { expected, actual } => Self::TokenWrongLength {
504 token,
505 expected,
506 actual,
507 },
508 Self::NonNumericElement { index } => Self::TokenNonNumericElement { token, index },
509 Self::NotAVector { actual } => Self::TokenNotAVector { token, actual },
510 other => other,
511 }
512 }
513}
514
515impl std::fmt::Display for VectorDimError {
516 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
517 match self {
518 Self::WrongLength { expected, actual } => write!(
519 f,
520 "got a vector of length {actual}, expected {expected} dimensions"
521 ),
522 Self::NonNumericElement { index } => {
523 write!(f, "element {index} is not numeric")
524 }
525 Self::NotAVector { actual } => {
526 write!(f, "got a non-vector value of type {actual}")
527 }
528 Self::TokenWrongLength {
529 token,
530 expected,
531 actual,
532 } => write!(
533 f,
534 "token {token} has {actual} dimensions, expected {expected}"
535 ),
536 Self::TokenNonNumericElement { token, index } => {
537 write!(f, "token {token} element {index} is not numeric")
538 }
539 Self::TokenNotAVector { token, actual } => {
540 write!(f, "token {token} is not a vector (got {actual})")
541 }
542 Self::NotATokenList { actual } => write!(
543 f,
544 "got a non-list value of type {actual} for a multi-vector column"
545 ),
546 }
547 }
548}
549
550impl std::error::Error for VectorDimError {}
551
552pub fn check_dense_vector_value(
566 value: &crate::value::Value,
567 dimensions: usize,
568) -> Result<(), VectorDimError> {
569 use crate::value::Value;
570
571 match value {
572 Value::Null => Ok(()),
573 Value::Vector(v) => {
574 if v.len() == dimensions {
575 Ok(())
576 } else {
577 Err(VectorDimError::WrongLength {
578 expected: dimensions,
579 actual: v.len(),
580 })
581 }
582 }
583 Value::List(items) => {
584 if items.len() != dimensions {
585 return Err(VectorDimError::WrongLength {
586 expected: dimensions,
587 actual: items.len(),
588 });
589 }
590 if let Some(index) = items.iter().position(|e| !e.is_number()) {
591 return Err(VectorDimError::NonNumericElement { index });
592 }
593 Ok(())
594 }
595 other => Err(VectorDimError::NotAVector {
596 actual: value_variant_name(other),
597 }),
598 }
599}
600
601fn value_variant_name(value: &crate::value::Value) -> &'static str {
603 use crate::value::Value;
604
605 match value {
606 Value::Null => "Null",
607 Value::Bool(_) => "Bool",
608 Value::Int(_) => "Int",
609 Value::Float(_) => "Float",
610 Value::String(_) => "String",
611 Value::Bytes(_) => "Bytes",
612 Value::List(_) => "List",
613 Value::Map(_) => "Map",
614 Value::Node(_) => "Node",
615 Value::Edge(_) => "Edge",
616 Value::Path(_) => "Path",
617 Value::Vector(_) => "Vector",
618 Value::SparseVector { .. } => "SparseVector",
619 Value::Temporal(_) => "Temporal",
620 }
621}
622
623fn default_created_at() -> DateTime<Utc> {
624 Utc::now()
625}
626
627fn default_state() -> SchemaElementState {
628 SchemaElementState::Active
629}
630
631fn default_version_1() -> u32 {
632 1
633}
634
635#[derive(Clone, Debug, Serialize, Deserialize)]
636pub struct PropertyMeta {
637 pub r#type: DataType,
638 pub nullable: bool,
639 #[serde(default = "default_version_1")]
640 pub added_in: u32, #[serde(default = "default_state")]
642 pub state: SchemaElementState,
643 #[serde(default)]
644 pub generation_expression: Option<String>,
645 #[serde(default, skip_serializing_if = "Option::is_none")]
646 pub description: Option<String>,
647}
648
649#[derive(Clone, Debug, Serialize, Deserialize)]
650pub struct LabelMeta {
651 pub id: u16, #[serde(default = "default_created_at")]
653 pub created_at: DateTime<Utc>,
654 #[serde(default = "default_state")]
655 pub state: SchemaElementState,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub description: Option<String>,
658}
659
660#[derive(Clone, Debug, Serialize, Deserialize)]
661pub struct EdgeTypeMeta {
662 pub id: u32,
664 pub src_labels: Vec<String>,
665 pub dst_labels: Vec<String>,
666 #[serde(default = "default_state")]
667 pub state: SchemaElementState,
668 #[serde(default, skip_serializing_if = "Option::is_none")]
669 pub description: Option<String>,
670}
671
672#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
673#[non_exhaustive]
674pub enum ConstraintType {
675 Unique { properties: Vec<String> },
676 Exists { property: String },
677 Check { expression: String },
678}
679
680#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
681#[non_exhaustive]
682pub enum ConstraintTarget {
683 Label(String),
684 EdgeType(String),
685}
686
687#[derive(Clone, Debug, Serialize, Deserialize)]
688pub struct Constraint {
689 pub name: String,
690 pub constraint_type: ConstraintType,
691 pub target: ConstraintTarget,
692 pub enabled: bool,
693}
694
695#[derive(Clone, Debug, Serialize, Deserialize)]
701pub struct SchemalessEdgeTypeRegistry {
702 name_to_id: HashMap<String, u32>,
703 id_to_name: HashMap<u32, String>,
704 next_local_id: u32,
706}
707
708impl SchemalessEdgeTypeRegistry {
709 pub fn new() -> Self {
710 Self {
711 name_to_id: HashMap::new(),
712 id_to_name: HashMap::new(),
713 next_local_id: 1,
714 }
715 }
716
717 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
719 if let Some(&id) = self.name_to_id.get(type_name) {
720 return id;
721 }
722
723 let id = make_schemaless_id(self.next_local_id);
724 self.next_local_id += 1;
725
726 self.name_to_id.insert(type_name.to_string(), id);
727 self.id_to_name.insert(id, type_name.to_string());
728
729 id
730 }
731
732 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
734 self.id_to_name.get(&type_id).map(String::as_str)
735 }
736
737 pub fn contains(&self, type_name: &str) -> bool {
739 self.name_to_id.contains_key(type_name)
740 }
741
742 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
744 self.name_to_id.get(type_name).copied()
745 }
746
747 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
749 self.name_to_id
750 .iter()
751 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
752 .map(|(_, &id)| id)
753 }
754
755 pub fn all_type_ids(&self) -> Vec<u32> {
757 self.id_to_name.keys().copied().collect()
758 }
759
760 pub fn is_empty(&self) -> bool {
762 self.name_to_id.is_empty()
763 }
764}
765
766impl Default for SchemalessEdgeTypeRegistry {
767 fn default() -> Self {
768 Self::new()
769 }
770}
771
772pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
778pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
780
781const MAX_SCHEMA_NAME_LEN: usize = 255;
786
787#[inline]
789pub fn is_virtual_label_id(id: u16) -> bool {
790 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
791}
792
793#[derive(Clone, Debug, Serialize, Deserialize)]
794pub struct Schema {
795 pub schema_version: u32,
796 pub labels: HashMap<String, LabelMeta>,
797 pub edge_types: HashMap<String, EdgeTypeMeta>,
798 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
799 #[serde(default)]
800 pub indexes: Vec<IndexDefinition>,
801 #[serde(default)]
802 pub constraints: Vec<Constraint>,
803 #[serde(default)]
805 pub schemaless_registry: SchemalessEdgeTypeRegistry,
806}
807
808impl Default for Schema {
809 fn default() -> Self {
810 Self {
811 schema_version: 1,
812 labels: HashMap::new(),
813 edge_types: HashMap::new(),
814 properties: HashMap::new(),
815 indexes: Vec::new(),
816 constraints: Vec::new(),
817 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
818 }
819 }
820}
821
822impl Schema {
823 fn bump_version(&mut self) {
832 self.schema_version = self.schema_version.wrapping_add(1);
833 }
834
835 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
840 self.labels
841 .iter()
842 .find(|(_, meta)| meta.id == label_id)
843 .map(|(name, _)| name.as_str())
844 }
845
846 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
848 self.labels.get(label_name).map(|meta| meta.id)
849 }
850
851 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
856 self.edge_types
857 .iter()
858 .find(|(_, meta)| meta.id == type_id)
859 .map(|(name, _)| name.as_str())
860 }
861
862 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
864 self.edge_types.get(type_name).map(|meta| meta.id)
865 }
866
867 pub fn vector_index_for_property(
872 &self,
873 label: &str,
874 property: &str,
875 ) -> Option<&VectorIndexConfig> {
876 self.indexes.iter().find_map(|idx| {
877 if let IndexDefinition::Vector(config) = idx
878 && config.label == label
879 && config.property == property
880 && config.metadata.status == IndexStatus::Online
881 {
882 return Some(config);
883 }
884 None
885 })
886 }
887
888 pub fn sparse_index_for_property(
890 &self,
891 label: &str,
892 property: &str,
893 ) -> Option<&SparseVectorIndexConfig> {
894 self.indexes.iter().find_map(|idx| {
895 if let IndexDefinition::Sparse(config) = idx
896 && config.label == label
897 && config.property == property
898 && config.metadata.status == IndexStatus::Online
899 {
900 return Some(config);
901 }
902 None
903 })
904 }
905
906 pub fn fulltext_index_for_property(
911 &self,
912 label: &str,
913 property: &str,
914 ) -> Option<&FullTextIndexConfig> {
915 self.indexes.iter().find_map(|idx| {
916 if let IndexDefinition::FullText(config) = idx
917 && config.label == label
918 && config.properties.iter().any(|p| p == property)
919 && config.metadata.status == IndexStatus::Online
920 {
921 return Some(config);
922 }
923 None
924 })
925 }
926
927 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
932 self.labels
933 .iter()
934 .find(|(k, _)| k.eq_ignore_ascii_case(name))
935 .map(|(_, v)| v)
936 }
937
938 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
945 self.labels
946 .iter()
947 .find(|(k, _)| k.eq_ignore_ascii_case(name))
948 .map(|(k, _)| k.clone())
949 }
950
951 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
953 self.get_label_case_insensitive(label_name)
954 .map(|meta| meta.id)
955 }
956
957 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
962 self.edge_types
963 .iter()
964 .find(|(k, _)| k.eq_ignore_ascii_case(name))
965 .map(|(_, v)| v)
966 }
967
968 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
970 self.get_edge_type_case_insensitive(type_name)
971 .map(|meta| meta.id)
972 }
973
974 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
977 self.edge_type_id_by_name_case_insensitive(type_name)
978 .or_else(|| {
979 self.schemaless_registry
980 .id_by_name_case_insensitive(type_name)
981 })
982 }
983
984 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
990 if let Some(id) = self.edge_type_id_unified(type_name) {
991 return id;
992 }
993 let id = self.schemaless_registry.get_or_assign_id(type_name);
1001 self.bump_version();
1002 id
1003 }
1004
1005 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
1012 self.edge_type_id_by_name(type_name)
1013 .or_else(|| self.schemaless_registry.id_by_name(type_name))
1014 }
1015
1016 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1020 if is_schemaless_edge_type(type_id) {
1021 self.schemaless_registry
1022 .type_name_by_id(type_id)
1023 .map(str::to_owned)
1024 } else {
1025 self.edge_type_name_by_id(type_id).map(str::to_owned)
1026 }
1027 }
1028
1029 pub fn all_edge_type_ids(&self) -> Vec<u32> {
1032 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
1033 ids.extend(self.schemaless_registry.all_type_ids());
1034 ids.sort_unstable();
1035 ids
1036 }
1037}
1038
1039#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1041pub enum IndexStatus {
1042 #[default]
1044 Online,
1045 Building,
1047 Stale,
1049 Failed,
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1055pub struct IndexMetadata {
1056 #[serde(default)]
1058 pub status: IndexStatus,
1059 #[serde(default)]
1061 pub last_built_at: Option<DateTime<Utc>>,
1062 #[serde(default)]
1064 pub row_count_at_build: Option<u64>,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1068#[serde(tag = "type")]
1069#[non_exhaustive]
1070pub enum IndexDefinition {
1071 Vector(VectorIndexConfig),
1072 FullText(FullTextIndexConfig),
1073 Scalar(ScalarIndexConfig),
1074 Inverted(InvertedIndexConfig),
1075 JsonFullText(JsonFtsIndexConfig),
1076 Sparse(SparseVectorIndexConfig),
1078}
1079
1080impl IndexDefinition {
1081 pub fn name(&self) -> &str {
1083 match self {
1084 IndexDefinition::Vector(c) => &c.name,
1085 IndexDefinition::FullText(c) => &c.name,
1086 IndexDefinition::Scalar(c) => &c.name,
1087 IndexDefinition::Inverted(c) => &c.name,
1088 IndexDefinition::JsonFullText(c) => &c.name,
1089 IndexDefinition::Sparse(c) => &c.name,
1090 }
1091 }
1092
1093 pub fn label(&self) -> &str {
1095 match self {
1096 IndexDefinition::Vector(c) => &c.label,
1097 IndexDefinition::FullText(c) => &c.label,
1098 IndexDefinition::Scalar(c) => &c.label,
1099 IndexDefinition::Inverted(c) => &c.label,
1100 IndexDefinition::JsonFullText(c) => &c.label,
1101 IndexDefinition::Sparse(c) => &c.label,
1102 }
1103 }
1104
1105 pub fn metadata(&self) -> &IndexMetadata {
1107 match self {
1108 IndexDefinition::Vector(c) => &c.metadata,
1109 IndexDefinition::FullText(c) => &c.metadata,
1110 IndexDefinition::Scalar(c) => &c.metadata,
1111 IndexDefinition::Inverted(c) => &c.metadata,
1112 IndexDefinition::JsonFullText(c) => &c.metadata,
1113 IndexDefinition::Sparse(c) => &c.metadata,
1114 }
1115 }
1116
1117 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
1119 match self {
1120 IndexDefinition::Vector(c) => &mut c.metadata,
1121 IndexDefinition::FullText(c) => &mut c.metadata,
1122 IndexDefinition::Scalar(c) => &mut c.metadata,
1123 IndexDefinition::Inverted(c) => &mut c.metadata,
1124 IndexDefinition::JsonFullText(c) => &mut c.metadata,
1125 IndexDefinition::Sparse(c) => &mut c.metadata,
1126 }
1127 }
1128}
1129
1130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1131pub struct InvertedIndexConfig {
1132 pub name: String,
1133 pub label: String,
1134 pub property: String,
1135 #[serde(default = "default_normalize")]
1136 pub normalize: bool,
1137 #[serde(default = "default_max_terms_per_doc")]
1138 pub max_terms_per_doc: usize,
1139 #[serde(default)]
1140 pub metadata: IndexMetadata,
1141}
1142
1143fn default_normalize() -> bool {
1144 true
1145}
1146
1147fn default_max_terms_per_doc() -> usize {
1148 10_000
1149}
1150
1151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1158pub struct SparseVectorIndexConfig {
1159 pub name: String,
1160 pub label: String,
1161 pub property: String,
1162 pub dimensions: usize,
1164 #[serde(default = "default_sparse_quantize")]
1166 pub quantize: bool,
1167 #[serde(default)]
1171 pub embedding_config: Option<EmbeddingConfig>,
1172 #[serde(default)]
1173 pub metadata: IndexMetadata,
1174}
1175
1176fn default_sparse_quantize() -> bool {
1177 true
1178}
1179
1180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1181pub struct VectorIndexConfig {
1182 pub name: String,
1183 pub label: String,
1184 pub property: String,
1185 pub index_type: VectorIndexType,
1186 pub metric: DistanceMetric,
1187 pub embedding_config: Option<EmbeddingConfig>,
1188 #[serde(default)]
1189 pub metadata: IndexMetadata,
1190}
1191
1192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1193pub struct EmbeddingConfig {
1194 pub alias: String,
1196 pub source_properties: Vec<String>,
1197 pub batch_size: usize,
1198 #[serde(default)]
1201 pub document_prefix: Option<String>,
1202 #[serde(default)]
1205 pub query_prefix: Option<String>,
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1209#[non_exhaustive]
1210pub enum VectorIndexType {
1211 Flat,
1212 IvfFlat {
1213 num_partitions: u32,
1214 },
1215 IvfPq {
1216 num_partitions: u32,
1217 num_sub_vectors: u32,
1218 bits_per_subvector: u8,
1219 },
1220 IvfSq {
1221 num_partitions: u32,
1222 },
1223 IvfRq {
1224 num_partitions: u32,
1225 #[serde(default)]
1226 num_bits: Option<u8>,
1227 },
1228 HnswFlat {
1229 m: u32,
1230 ef_construction: u32,
1231 #[serde(default)]
1232 num_partitions: Option<u32>,
1233 },
1234 HnswSq {
1235 m: u32,
1236 ef_construction: u32,
1237 #[serde(default)]
1238 num_partitions: Option<u32>,
1239 },
1240 HnswPq {
1241 m: u32,
1242 ef_construction: u32,
1243 num_sub_vectors: u32,
1244 #[serde(default)]
1245 num_partitions: Option<u32>,
1246 },
1247 Muvera {
1254 k_sim: u32,
1256 reps: u32,
1258 d_proj: u32,
1260 seed: u64,
1262 inner: Box<VectorIndexType>,
1264 },
1265}
1266
1267#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1268#[non_exhaustive]
1269pub enum DistanceMetric {
1270 Cosine,
1271 L2,
1272 Dot,
1273}
1274
1275impl DistanceMetric {
1276 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
1288 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
1289 match self {
1290 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
1291 DistanceMetric::Cosine => {
1292 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1293 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1294 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1295 let denom = norm_a * norm_b;
1296 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
1297 }
1298 DistanceMetric::Dot => {
1299 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1300 -dot
1301 }
1302 }
1303 }
1304}
1305
1306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1307pub struct FullTextIndexConfig {
1308 pub name: String,
1309 pub label: String,
1310 pub properties: Vec<String>,
1311 pub tokenizer: TokenizerConfig,
1312 pub with_positions: bool,
1313 #[serde(default)]
1314 pub metadata: IndexMetadata,
1315}
1316
1317#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1318#[non_exhaustive]
1319pub enum TokenizerConfig {
1320 Standard,
1321 Whitespace,
1322 Ngram { min: u8, max: u8 },
1323 Custom { name: String },
1324}
1325
1326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1327pub struct JsonFtsIndexConfig {
1328 pub name: String,
1329 pub label: String,
1330 pub column: String,
1331 #[serde(default)]
1332 pub paths: Vec<String>,
1333 #[serde(default)]
1334 pub with_positions: bool,
1335 #[serde(default)]
1336 pub metadata: IndexMetadata,
1337}
1338
1339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1340pub struct ScalarIndexConfig {
1341 pub name: String,
1342 pub label: String,
1343 pub properties: Vec<String>,
1344 pub index_type: ScalarIndexType,
1345 pub where_clause: Option<String>,
1346 #[serde(default)]
1347 pub metadata: IndexMetadata,
1348}
1349
1350#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1351#[non_exhaustive]
1352pub enum ScalarIndexType {
1353 BTree,
1354 Hash,
1355 Bitmap,
1356 LabelList,
1357}
1358
1359pub struct SchemaManager {
1360 store: Arc<dyn ObjectStore>,
1361 path: ObjectStorePath,
1362 schema: RwLock<Arc<Schema>>,
1363}
1364
1365impl SchemaManager {
1366 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1367 let path = path.as_ref();
1368 let parent = path
1369 .parent()
1370 .ok_or_else(|| anyhow!("Invalid schema path"))?;
1371 let filename = path
1372 .file_name()
1373 .ok_or_else(|| anyhow!("Invalid schema filename"))?
1374 .to_str()
1375 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1376
1377 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1378 let obj_path = ObjectStorePath::from(filename);
1379
1380 Self::load_from_store(store, &obj_path).await
1381 }
1382
1383 pub async fn load_from_store(
1384 store: Arc<dyn ObjectStore>,
1385 path: &ObjectStorePath,
1386 ) -> Result<Self> {
1387 match store.get(path).await {
1388 Ok(result) => {
1389 let bytes = result.bytes().await?;
1390 let content = String::from_utf8(bytes.to_vec())?;
1391 let mut schema: Schema = serde_json::from_str(&content)?;
1392 let original_len = schema.indexes.len();
1400 if original_len > 0 {
1401 let mut seen: std::collections::HashSet<String> =
1402 std::collections::HashSet::with_capacity(original_len);
1403 let mut dedup: Vec<IndexDefinition> = schema
1404 .indexes
1405 .iter()
1406 .rev()
1407 .filter(|idx| seen.insert(idx.name().to_string()))
1408 .cloned()
1409 .collect();
1410 dedup.reverse();
1411 if dedup.len() != original_len {
1412 tracing::warn!(
1413 collapsed = original_len - dedup.len(),
1414 kept = dedup.len(),
1415 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1416 );
1417 schema.indexes = dedup;
1418 }
1419 }
1420 Ok(Self {
1421 store,
1422 path: path.clone(),
1423 schema: RwLock::new(Arc::new(schema)),
1424 })
1425 }
1426 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1427 store,
1428 path: path.clone(),
1429 schema: RwLock::new(Arc::new(Schema::default())),
1430 }),
1431 Err(e) => Err(anyhow::Error::from(e)),
1432 }
1433 }
1434
1435 pub async fn save(&self) -> Result<()> {
1436 let content = {
1437 let schema_guard = acquire_read(&self.schema, "schema")?;
1438 serde_json::to_string_pretty(&**schema_guard)?
1439 };
1440 self.store
1441 .put(&self.path, content.into())
1442 .await
1443 .map_err(anyhow::Error::from)?;
1444 Ok(())
1445 }
1446
1447 pub fn path(&self) -> &ObjectStorePath {
1448 &self.path
1449 }
1450
1451 pub fn schema(&self) -> Arc<Schema> {
1452 self.schema
1453 .read()
1454 .expect("Schema lock poisoned - a thread panicked while holding it")
1455 .clone()
1456 }
1457
1458 fn normalize_function_names(expr: &str) -> String {
1461 let mut result = String::with_capacity(expr.len());
1462 let mut chars = expr.chars().peekable();
1463
1464 while let Some(ch) = chars.next() {
1465 if ch.is_alphabetic() {
1466 let mut ident = String::new();
1468 ident.push(ch);
1469
1470 while let Some(&next) = chars.peek() {
1471 if next.is_alphanumeric() || next == '_' {
1472 ident.push(chars.next().unwrap());
1473 } else {
1474 break;
1475 }
1476 }
1477
1478 if chars.peek() == Some(&'(') {
1480 result.push_str(&ident.to_uppercase());
1481 } else {
1482 result.push_str(&ident); }
1484 } else {
1485 result.push(ch);
1486 }
1487 }
1488
1489 result
1490 }
1491
1492 pub fn generated_column_name(expr: &str) -> String {
1500 let normalized = Self::normalize_function_names(expr);
1502
1503 let sanitized = normalized
1504 .replace(|c: char| !c.is_alphanumeric(), "_")
1505 .trim_matches('_')
1506 .to_string();
1507
1508 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1510 const FNV_PRIME: u64 = 1099511628211;
1511
1512 let mut hash = FNV_OFFSET_BASIS;
1513 for byte in normalized.as_bytes() {
1514 hash ^= *byte as u64;
1515 hash = hash.wrapping_mul(FNV_PRIME);
1516 }
1517
1518 format!("_gen_{}_{:x}", sanitized, hash)
1519 }
1520
1521 pub fn replace_schema(&self, new_schema: Schema) {
1522 let mut schema = self
1523 .schema
1524 .write()
1525 .expect("Schema lock poisoned - a thread panicked while holding it");
1526 *schema = Arc::new(new_schema);
1527 }
1528
1529 #[must_use]
1542 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1543 let primary = self.schema();
1544 let merged = if overlay.is_empty() {
1545 (*primary).clone()
1546 } else {
1547 let mut merged = (*primary).clone();
1548 for (name, label) in &overlay.added_labels {
1549 merged.labels.insert(name.clone(), label.clone());
1550 }
1551 for (name, edge_type) in &overlay.added_edge_types {
1552 merged.edge_types.insert(name.clone(), edge_type.clone());
1553 }
1554 for addition in &overlay.added_properties {
1555 let props = merged.properties.entry(addition.owner.clone()).or_default();
1556 props.insert(
1557 addition.property.clone(),
1558 PropertyMeta {
1559 r#type: addition.data_type.clone(),
1560 nullable: addition.nullable,
1561 added_in: merged.schema_version,
1562 state: SchemaElementState::Active,
1563 generation_expression: None,
1564 description: None,
1565 },
1566 );
1567 }
1568 merged
1569 };
1570
1571 Arc::new(Self {
1572 store: self.store.clone(),
1573 path: self.path.clone(),
1574 schema: RwLock::new(Arc::new(merged)),
1575 })
1576 }
1577
1578 pub fn next_label_id(&self) -> u16 {
1579 self.schema()
1580 .labels
1581 .values()
1582 .map(|l| l.id)
1583 .max()
1584 .unwrap_or(0)
1585 + 1
1586 }
1587
1588 pub fn next_type_id(&self) -> u32 {
1589 let max_schema_id = self
1590 .schema()
1591 .edge_types
1592 .values()
1593 .map(|t| t.id)
1594 .max()
1595 .unwrap_or(0);
1596
1597 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1599 panic!("Schema edge type ID exhaustion");
1600 }
1601
1602 max_schema_id + 1
1603 }
1604
1605 pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1623 if name.is_empty() || name.chars().all(char::is_whitespace) {
1624 return Err(anyhow!(
1625 "{kind} name must be non-empty and not all whitespace"
1626 ));
1627 }
1628 if name.len() > MAX_SCHEMA_NAME_LEN {
1629 return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1630 }
1631 if let Some(c) = name
1632 .chars()
1633 .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1634 {
1635 return Err(anyhow!(
1636 "{kind} name '{name}' contains an unsafe character ({c:?})"
1637 ));
1638 }
1639 Ok(())
1640 }
1641
1642 pub fn add_label(&self, name: &str) -> Result<u16> {
1643 self.add_label_with_desc(name, None)
1644 }
1645
1646 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1647 Self::validate_schema_element_name("Label", name)?;
1648 let mut guard = acquire_write(&self.schema, "schema")?;
1649 let schema = Arc::make_mut(&mut *guard);
1650 if schema.labels.contains_key(name) {
1651 return Err(anyhow!("Label '{}' already exists", name));
1652 }
1653
1654 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1655 if id >= VIRTUAL_LABEL_ID_START {
1656 return Err(anyhow!(
1657 "Native label space exhausted (next id {id:#x} would enter the \
1658 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1659 reserved for catalog-resolved labels)"
1660 ));
1661 }
1662 schema.labels.insert(
1663 name.to_string(),
1664 LabelMeta {
1665 id,
1666 created_at: Utc::now(),
1667 state: SchemaElementState::Active,
1668 description,
1669 },
1670 );
1671 schema.bump_version();
1672 Ok(id)
1673 }
1674
1675 pub fn add_edge_type(
1676 &self,
1677 name: &str,
1678 src_labels: Vec<String>,
1679 dst_labels: Vec<String>,
1680 ) -> Result<u32> {
1681 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1682 }
1683
1684 pub fn add_edge_type_with_desc(
1685 &self,
1686 name: &str,
1687 src_labels: Vec<String>,
1688 dst_labels: Vec<String>,
1689 description: Option<String>,
1690 ) -> Result<u32> {
1691 Self::validate_schema_element_name("Edge type", name)?;
1692 let mut guard = acquire_write(&self.schema, "schema")?;
1693 let schema = Arc::make_mut(&mut *guard);
1694 if schema.edge_types.contains_key(name) {
1695 return Err(anyhow!("Edge type '{}' already exists", name));
1696 }
1697
1698 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1699
1700 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1705 return Err(anyhow!(
1706 "Native edge type space exhausted (next id {id:#x} would enter the \
1707 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1708 reserved for catalog-resolved edge types)"
1709 ));
1710 }
1711
1712 schema.edge_types.insert(
1713 name.to_string(),
1714 EdgeTypeMeta {
1715 id,
1716 src_labels,
1717 dst_labels,
1718 state: SchemaElementState::Active,
1719 description,
1720 },
1721 );
1722 schema.bump_version();
1723 Ok(id)
1724 }
1725
1726 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1735 {
1736 let guard = acquire_read(&self.schema, "schema")
1737 .expect("Schema lock poisoned - a thread panicked while holding it");
1738 if let Some(id) = guard.edge_type_id_unified(type_name) {
1739 return id;
1740 }
1741 }
1742 let mut guard = acquire_write(&self.schema, "schema")
1743 .expect("Schema lock poisoned - a thread panicked while holding it");
1744 let schema = Arc::make_mut(&mut *guard);
1745 schema.get_or_assign_edge_type_id(type_name)
1746 }
1747
1748 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1750 let schema = acquire_read(&self.schema, "schema")
1751 .expect("Schema lock poisoned - a thread panicked while holding it");
1752 schema.edge_type_name_by_id_unified(type_id)
1753 }
1754
1755 pub fn add_property(
1756 &self,
1757 label_or_type: &str,
1758 prop_name: &str,
1759 data_type: DataType,
1760 nullable: bool,
1761 ) -> Result<()> {
1762 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1763 }
1764
1765 pub fn add_property_with_desc(
1766 &self,
1767 label_or_type: &str,
1768 prop_name: &str,
1769 data_type: DataType,
1770 nullable: bool,
1771 description: Option<String>,
1772 ) -> Result<()> {
1773 validate_property_name(prop_name)?;
1774 let mut guard = acquire_write(&self.schema, "schema")?;
1775 let schema = Arc::make_mut(&mut *guard);
1776 let version = schema.schema_version;
1777 let props = schema
1778 .properties
1779 .entry(label_or_type.to_string())
1780 .or_default();
1781
1782 if props.contains_key(prop_name) {
1783 return Err(anyhow!(
1784 "Property '{}' already exists for '{}'",
1785 prop_name,
1786 label_or_type
1787 ));
1788 }
1789
1790 props.insert(
1791 prop_name.to_string(),
1792 PropertyMeta {
1793 r#type: data_type,
1794 nullable,
1795 added_in: version,
1796 state: SchemaElementState::Active,
1797 generation_expression: None,
1798 description,
1799 },
1800 );
1801 schema.bump_version();
1803 Ok(())
1804 }
1805
1806 pub fn declare_property(
1824 &self,
1825 label_or_type: &str,
1826 prop_name: &str,
1827 data_type: DataType,
1828 nullable: bool,
1829 description: Option<String>,
1830 ) -> Result<bool> {
1831 validate_property_name(prop_name)?;
1832 let mut guard = acquire_write(&self.schema, "schema")?;
1833 let schema = Arc::make_mut(&mut *guard);
1834 let version = schema.schema_version;
1835 let props = schema
1836 .properties
1837 .entry(label_or_type.to_string())
1838 .or_default();
1839
1840 if let Some(existing) = props.get(prop_name) {
1841 if existing.r#type == data_type && existing.nullable == nullable {
1842 return Ok(false); }
1844 return Err(anyhow!(
1845 "Property '{}' on '{}' is declared as {:?} (nullable: {}); cannot re-declare \
1846 as {:?} (nullable: {}). Property types are immutable — use a new property \
1847 name or migrate the data",
1848 prop_name,
1849 label_or_type,
1850 existing.r#type,
1851 existing.nullable,
1852 data_type,
1853 nullable
1854 ));
1855 }
1856
1857 props.insert(
1858 prop_name.to_string(),
1859 PropertyMeta {
1860 r#type: data_type,
1861 nullable,
1862 added_in: version,
1863 state: SchemaElementState::Active,
1864 generation_expression: None,
1865 description,
1866 },
1867 );
1868 schema.bump_version();
1870 Ok(true)
1871 }
1872
1873 pub fn add_internal_property(
1884 &self,
1885 label_or_type: &str,
1886 prop_name: &str,
1887 data_type: DataType,
1888 nullable: bool,
1889 ) -> Result<bool> {
1890 validate_reserved_property_name(prop_name)?;
1891 let mut guard = acquire_write(&self.schema, "schema")?;
1892 let schema = Arc::make_mut(&mut *guard);
1893 let version = schema.schema_version;
1894 let props = schema
1895 .properties
1896 .entry(label_or_type.to_string())
1897 .or_default();
1898
1899 if let Some(existing) = props.get(prop_name) {
1900 if existing.r#type == data_type {
1901 return Ok(false); }
1903 return Err(anyhow!(
1904 "Internal property '{}' already exists for '{}' with a different type",
1905 prop_name,
1906 label_or_type
1907 ));
1908 }
1909
1910 props.insert(
1911 prop_name.to_string(),
1912 PropertyMeta {
1913 r#type: data_type,
1914 nullable,
1915 added_in: version,
1916 state: SchemaElementState::Active,
1917 generation_expression: None,
1918 description: None,
1919 },
1920 );
1921 schema.bump_version();
1922 Ok(true)
1923 }
1924
1925 pub fn add_generated_property(
1926 &self,
1927 label_or_type: &str,
1928 prop_name: &str,
1929 data_type: DataType,
1930 expr: String,
1931 ) -> Result<()> {
1932 validate_reserved_property_name(prop_name)?;
1935 let mut guard = acquire_write(&self.schema, "schema")?;
1936 let schema = Arc::make_mut(&mut *guard);
1937 let version = schema.schema_version;
1938 let props = schema
1939 .properties
1940 .entry(label_or_type.to_string())
1941 .or_default();
1942
1943 if props.contains_key(prop_name) {
1944 return Err(anyhow!("Property '{}' already exists", prop_name));
1945 }
1946
1947 props.insert(
1948 prop_name.to_string(),
1949 PropertyMeta {
1950 r#type: data_type,
1951 nullable: true,
1952 added_in: version,
1953 state: SchemaElementState::Active,
1954 generation_expression: Some(expr),
1955 description: None,
1956 },
1957 );
1958 schema.bump_version();
1960 Ok(())
1961 }
1962
1963 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1964 let mut guard = acquire_write(&self.schema, "schema")?;
1965 let schema = Arc::make_mut(&mut *guard);
1966 let meta = schema
1967 .labels
1968 .get_mut(name)
1969 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1970 meta.description = description;
1971 Ok(())
1972 }
1973
1974 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1975 let mut guard = acquire_write(&self.schema, "schema")?;
1976 let schema = Arc::make_mut(&mut *guard);
1977 let meta = schema
1978 .edge_types
1979 .get_mut(name)
1980 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1981 meta.description = description;
1982 Ok(())
1983 }
1984
1985 pub fn set_property_description(
1986 &self,
1987 entity: &str,
1988 prop_name: &str,
1989 description: Option<String>,
1990 ) -> Result<()> {
1991 let mut guard = acquire_write(&self.schema, "schema")?;
1992 let schema = Arc::make_mut(&mut *guard);
1993 let props = schema
1994 .properties
1995 .get_mut(entity)
1996 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1997 let meta = props
1998 .get_mut(prop_name)
1999 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
2000 meta.description = description;
2001 Ok(())
2002 }
2003
2004 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
2013 let mut guard = acquire_write(&self.schema, "schema")?;
2014 let schema = Arc::make_mut(&mut *guard);
2015 if let Some(existing) = schema
2016 .indexes
2017 .iter_mut()
2018 .find(|i| i.name() == index_def.name())
2019 {
2020 *existing = index_def;
2021 } else {
2022 schema.indexes.push(index_def);
2023 }
2024 schema.bump_version();
2025 Ok(())
2026 }
2027
2028 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
2029 let schema = self.schema.read().expect("Schema lock poisoned");
2030 schema.indexes.iter().find(|i| i.name() == name).cloned()
2031 }
2032
2033 pub fn update_index_metadata(
2038 &self,
2039 index_name: &str,
2040 f: impl FnOnce(&mut IndexMetadata),
2041 ) -> Result<()> {
2042 let mut guard = acquire_write(&self.schema, "schema")?;
2043 let schema = Arc::make_mut(&mut *guard);
2044 let idx = schema
2045 .indexes
2046 .iter_mut()
2047 .find(|i| i.name() == index_name)
2048 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
2049 f(idx.metadata_mut());
2050 Ok(())
2051 }
2052
2053 pub fn remove_index(&self, name: &str) -> Result<()> {
2054 let mut guard = acquire_write(&self.schema, "schema")?;
2055 let schema = Arc::make_mut(&mut *guard);
2056 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
2057 schema.indexes.remove(pos);
2058 schema.bump_version();
2059 Ok(())
2060 } else {
2061 Err(anyhow!("Index '{}' not found", name))
2062 }
2063 }
2064
2065 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
2066 let mut guard = acquire_write(&self.schema, "schema")?;
2067 let schema = Arc::make_mut(&mut *guard);
2068 if schema.constraints.iter().any(|c| c.name == constraint.name) {
2069 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
2070 }
2071 schema.constraints.push(constraint);
2072 schema.bump_version();
2073 Ok(())
2074 }
2075
2076 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2077 let mut guard = acquire_write(&self.schema, "schema")?;
2078 let schema = Arc::make_mut(&mut *guard);
2079 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
2080 schema.constraints.remove(pos);
2081 schema.bump_version();
2082 Ok(())
2083 } else if if_exists {
2084 Ok(())
2085 } else {
2086 Err(anyhow!("Constraint '{}' not found", name))
2087 }
2088 }
2089
2090 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
2091 let mut guard = acquire_write(&self.schema, "schema")?;
2092 let schema = Arc::make_mut(&mut *guard);
2093 let Some(props) = schema.properties.get_mut(label_or_type) else {
2094 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
2095 };
2096 if props.remove(prop_name).is_none() {
2097 return Err(anyhow!(
2098 "Property '{}' not found for '{}'",
2099 prop_name,
2100 label_or_type
2101 ));
2102 }
2103 schema.bump_version();
2104 Ok(())
2105 }
2106
2107 pub fn rename_property(
2108 &self,
2109 label_or_type: &str,
2110 old_name: &str,
2111 new_name: &str,
2112 ) -> Result<()> {
2113 let mut guard = acquire_write(&self.schema, "schema")?;
2114 let schema = Arc::make_mut(&mut *guard);
2115 let Some(props) = schema.properties.get_mut(label_or_type) else {
2116 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
2117 };
2118 let Some(meta) = props.remove(old_name) else {
2119 return Err(anyhow!(
2120 "Property '{}' not found for '{}'",
2121 old_name,
2122 label_or_type
2123 ));
2124 };
2125 if props.contains_key(new_name) {
2126 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
2129 }
2130 props.insert(new_name.to_string(), meta);
2131 schema.bump_version();
2132 Ok(())
2133 }
2134
2135 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
2136 let mut guard = acquire_write(&self.schema, "schema")?;
2137 let schema = Arc::make_mut(&mut *guard);
2138 if let Some(label_meta) = schema.labels.get_mut(name) {
2139 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
2140 schema.bump_version();
2142 Ok(())
2143 } else if if_exists {
2144 Ok(())
2145 } else {
2146 Err(anyhow!("Label '{}' not found", name))
2147 }
2148 }
2149
2150 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
2151 let mut guard = acquire_write(&self.schema, "schema")?;
2152 let schema = Arc::make_mut(&mut *guard);
2153 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
2154 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
2155 schema.bump_version();
2157 Ok(())
2158 } else if if_exists {
2159 Ok(())
2160 } else {
2161 Err(anyhow!("Edge Type '{}' not found", name))
2162 }
2163 }
2164}
2165
2166pub fn validate_identifier(name: &str) -> Result<()> {
2168 if name.is_empty() || name.len() > 64 {
2170 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
2171 }
2172
2173 let first = name.chars().next().unwrap();
2175 if !first.is_alphabetic() && first != '_' {
2176 return Err(anyhow!(
2177 "Identifier '{}' must start with letter or underscore",
2178 name
2179 ));
2180 }
2181
2182 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
2184 return Err(anyhow!(
2185 "Identifier '{}' must contain only alphanumeric and underscore",
2186 name
2187 ));
2188 }
2189
2190 const RESERVED: &[&str] = &[
2192 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
2193 "UNION", "ORDER", "LIMIT",
2194 ];
2195 if RESERVED.contains(&name.to_uppercase().as_str()) {
2196 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
2197 }
2198
2199 Ok(())
2200}
2201
2202pub fn validate_property_name(name: &str) -> Result<()> {
2209 if name.starts_with('_') {
2210 return Err(anyhow!(
2211 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
2212 name
2213 ));
2214 }
2215 validate_reserved_property_name(name)
2216}
2217
2218fn validate_reserved_property_name(name: &str) -> Result<()> {
2225 const RESERVED_PROPS: &[&str] = &[
2234 "ext_id",
2235 "overflow_json",
2236 "eid",
2237 "src_vid",
2238 "dst_vid",
2239 "op",
2240 "__set_struct__",
2248 ];
2249 if RESERVED_PROPS.contains(&name) {
2250 return Err(anyhow!(
2251 "Property name '{}' is reserved by the storage layer; please choose a different name",
2252 name
2253 ));
2254 }
2255 Ok(())
2256}
2257
2258#[cfg(test)]
2259mod tests {
2260 use super::*;
2261 use crate::value::{TemporalValue, Value};
2262 use object_store::local::LocalFileSystem;
2263 use tempfile::tempdir;
2264
2265 #[test]
2266 fn test_datatype_accepts_matrix() {
2267 let dt = || TemporalValue::DateTime {
2268 nanos_since_epoch: 0,
2269 offset_seconds: 0,
2270 timezone_name: None,
2271 };
2272
2273 for ty in [
2275 DataType::String,
2276 DataType::Int64,
2277 DataType::Bool,
2278 DataType::DateTime,
2279 DataType::Float64,
2280 ] {
2281 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
2282 }
2283
2284 assert!(DataType::String.accepts(&Value::String("x".into())));
2286 assert!(DataType::Int64.accepts(&Value::Int(1)));
2287 assert!(DataType::Bool.accepts(&Value::Bool(true)));
2288 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
2289
2290 assert!(
2292 DataType::Float64.accepts(&Value::Int(3)),
2293 "Int widens to Float"
2294 );
2295 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
2296 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
2297 assert!(
2298 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2299 "storage parses strings for non-struct Timestamp columns"
2300 );
2301
2302 assert!(
2304 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2305 "String into a DateTime struct column nulls silently — reject here"
2306 );
2307 assert!(!DataType::Bool.accepts(&Value::Int(1)));
2308 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
2309 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
2310 assert!(
2311 !DataType::String.accepts(&Value::Int(10)),
2312 "no implicit stringification"
2313 );
2314 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
2315
2316 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
2318 }
2319
2320 #[test]
2321 fn test_check_vector_dims_matrix() {
2322 let vec3 = DataType::Vector { dimensions: 3 };
2323 let multi2 = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
2324 let flist = |vals: &[f64]| Value::List(vals.iter().map(|f| Value::Float(*f)).collect());
2325
2326 assert!(vec3.check_vector_dims(&Value::Null).is_ok());
2328 assert!(multi2.check_vector_dims(&Value::Null).is_ok());
2329
2330 assert!(
2332 vec3.check_vector_dims(&Value::Vector(vec![1.0, 2.0, 3.0]))
2333 .is_ok()
2334 );
2335 assert!(vec3.check_vector_dims(&flist(&[1.0, 2.0, 3.0])).is_ok());
2336 assert!(
2337 vec3.check_vector_dims(&Value::List(vec![
2338 Value::Int(1),
2339 Value::Float(2.0),
2340 Value::Int(3)
2341 ]))
2342 .is_ok()
2343 );
2344
2345 assert_eq!(
2347 vec3.check_vector_dims(&Value::Vector(vec![1.0, 2.0])),
2348 Err(VectorDimError::WrongLength {
2349 expected: 3,
2350 actual: 2
2351 })
2352 );
2353 assert_eq!(
2354 vec3.check_vector_dims(&flist(&[1.0, 2.0, 3.0, 4.0, 5.0])),
2355 Err(VectorDimError::WrongLength {
2356 expected: 3,
2357 actual: 5
2358 })
2359 );
2360 assert_eq!(
2361 vec3.check_vector_dims(&Value::List(vec![])),
2362 Err(VectorDimError::WrongLength {
2363 expected: 3,
2364 actual: 0
2365 })
2366 );
2367 assert_eq!(
2368 vec3.check_vector_dims(&Value::List(vec![
2369 Value::Float(1.0),
2370 Value::String("x".into()),
2371 Value::Float(3.0),
2372 ])),
2373 Err(VectorDimError::NonNumericElement { index: 1 })
2374 );
2375 assert_eq!(
2376 vec3.check_vector_dims(&Value::List(vec![
2377 Value::Float(1.0),
2378 Value::Null,
2379 Value::Float(3.0)
2380 ])),
2381 Err(VectorDimError::NonNumericElement { index: 1 })
2382 );
2383 assert_eq!(
2384 vec3.check_vector_dims(&Value::String("not a vector".into())),
2385 Err(VectorDimError::NotAVector { actual: "String" })
2386 );
2387
2388 assert!(multi2.check_vector_dims(&Value::List(vec![])).is_ok());
2391 assert!(
2392 multi2
2393 .check_vector_dims(&Value::List(vec![flist(&[1.0, 2.0]), flist(&[3.0, 4.0])]))
2394 .is_ok()
2395 );
2396 assert_eq!(
2397 multi2.check_vector_dims(&Value::List(vec![
2398 flist(&[1.0, 2.0]),
2399 flist(&[9.0, 9.0, 9.0])
2400 ])),
2401 Err(VectorDimError::TokenWrongLength {
2402 token: 1,
2403 expected: 2,
2404 actual: 3
2405 })
2406 );
2407 assert_eq!(
2408 multi2.check_vector_dims(&Value::List(vec![Value::String("tok".into())])),
2409 Err(VectorDimError::TokenNotAVector {
2410 token: 0,
2411 actual: "String"
2412 })
2413 );
2414 assert_eq!(
2415 multi2.check_vector_dims(&Value::Vector(vec![1.0, 2.0])),
2416 Err(VectorDimError::NotATokenList { actual: "Vector" })
2417 );
2418
2419 assert!(
2421 DataType::Int64
2422 .check_vector_dims(&Value::String("x".into()))
2423 .is_ok()
2424 );
2425 assert!(
2426 DataType::List(Box::new(DataType::Float64))
2427 .check_vector_dims(&Value::List(vec![Value::String("x".into())]))
2428 .is_ok()
2429 );
2430 assert!(
2431 DataType::SparseVector { dimensions: 8 }
2432 .check_vector_dims(&Value::Map(Default::default()))
2433 .is_ok()
2434 );
2435
2436 let msg = VectorDimError::WrongLength {
2438 expected: 4,
2439 actual: 5,
2440 }
2441 .to_string();
2442 assert!(msg.contains('4') && msg.contains('5'), "message: {msg}");
2443 }
2444
2445 #[tokio::test]
2446 async fn test_declare_property_idempotent_and_conflicting() -> Result<()> {
2447 let dir = tempdir()?;
2448 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2449 let path = ObjectStorePath::from("schema.json");
2450 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2451
2452 manager.add_label("Doc")?;
2453 let vec4 = DataType::Vector { dimensions: 4 };
2454
2455 assert!(manager.declare_property("Doc", "embedding", vec4.clone(), true, None)?);
2457
2458 assert!(!manager.declare_property("Doc", "embedding", vec4.clone(), true, None)?);
2461 assert!(!manager.declare_property(
2462 "Doc",
2463 "embedding",
2464 vec4.clone(),
2465 true,
2466 Some("new docs".into())
2467 )?);
2468
2469 let err = manager
2472 .declare_property(
2473 "Doc",
2474 "embedding",
2475 DataType::Vector { dimensions: 8 },
2476 true,
2477 None,
2478 )
2479 .unwrap_err()
2480 .to_string();
2481 assert!(err.contains('4') && err.contains('8'), "message: {err}");
2482 assert!(!err.contains("already exists"), "message: {err}");
2483
2484 assert!(
2486 manager
2487 .declare_property("Doc", "embedding", vec4.clone(), false, None)
2488 .is_err()
2489 );
2490
2491 let schema = manager.schema();
2493 let meta = &schema.properties["Doc"]["embedding"];
2494 assert_eq!(meta.r#type, vec4);
2495 assert!(meta.nullable);
2496 Ok(())
2497 }
2498
2499 #[tokio::test]
2500 async fn test_schema_management() -> Result<()> {
2501 let dir = tempdir()?;
2502 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2503 let path = ObjectStorePath::from("schema.json");
2504 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2505
2506 let lid = manager.add_label("Person")?;
2508 assert_eq!(lid, 1);
2509 assert!(manager.add_label("Person").is_err());
2510
2511 manager.add_property("Person", "name", DataType::String, false)?;
2513 assert!(
2514 manager
2515 .add_property("Person", "name", DataType::String, false)
2516 .is_err()
2517 );
2518
2519 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
2521 assert_eq!(tid, 1);
2522
2523 manager.save().await?;
2524 assert!(store.get(&path).await.is_ok());
2526
2527 let manager2 = SchemaManager::load_from_store(store, &path).await?;
2528 assert!(manager2.schema().labels.contains_key("Person"));
2529 assert!(
2530 manager2
2531 .schema()
2532 .properties
2533 .get("Person")
2534 .unwrap()
2535 .contains_key("name")
2536 );
2537
2538 Ok(())
2539 }
2540
2541 #[tokio::test]
2542 async fn test_reserved_property_names_rejected() -> Result<()> {
2543 let dir = tempdir()?;
2544 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2545 let path = ObjectStorePath::from("schema.json");
2546 let manager = SchemaManager::load_from_store(store, &path).await?;
2547
2548 manager.add_label("Tiny")?;
2549
2550 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
2554 let err = manager
2555 .add_property("Tiny", reserved, DataType::String, true)
2556 .expect_err(&format!("expected '{reserved}' to be rejected"));
2557 assert!(
2558 err.to_string().contains("reserved"),
2559 "error for '{reserved}' should mention 'reserved', got: {err}"
2560 );
2561 }
2562
2563 let err = manager
2568 .add_property("Tiny", "__set_struct__", DataType::String, true)
2569 .expect_err("expected '__set_struct__' to be rejected");
2570 assert!(
2571 err.to_string().contains("reserved"),
2572 "__set_struct__ rejection should mention 'reserved', got: {err}"
2573 );
2574
2575 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2577 assert!(
2578 manager
2579 .add_property("Tiny", reserved, DataType::String, true)
2580 .is_err(),
2581 "expected '{reserved}' to be rejected"
2582 );
2583 }
2584
2585 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2588 manager.add_property("Tiny", "user_op", DataType::String, true)?;
2589 manager.add_property("Tiny", "type_name", DataType::String, true)?;
2590
2591 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2593 assert!(
2594 manager
2595 .add_property("knows", "src_vid", DataType::Int64, true)
2596 .is_err()
2597 );
2598
2599 assert!(
2601 manager
2602 .add_generated_property(
2603 "Tiny",
2604 "ext_id",
2605 DataType::String,
2606 "concat('x', name)".into()
2607 )
2608 .is_err()
2609 );
2610
2611 Ok(())
2612 }
2613
2614 #[test]
2615 fn test_normalize_function_names() {
2616 assert_eq!(
2617 SchemaManager::normalize_function_names("lower(email)"),
2618 "LOWER(email)"
2619 );
2620 assert_eq!(
2621 SchemaManager::normalize_function_names("LOWER(email)"),
2622 "LOWER(email)"
2623 );
2624 assert_eq!(
2625 SchemaManager::normalize_function_names("Lower(email)"),
2626 "LOWER(email)"
2627 );
2628 assert_eq!(
2629 SchemaManager::normalize_function_names("trim(lower(email))"),
2630 "TRIM(LOWER(email))"
2631 );
2632 }
2633
2634 #[test]
2635 fn test_generated_column_name_case_insensitive() {
2636 let col1 = SchemaManager::generated_column_name("lower(email)");
2637 let col2 = SchemaManager::generated_column_name("LOWER(email)");
2638 let col3 = SchemaManager::generated_column_name("Lower(email)");
2639 assert_eq!(col1, col2);
2640 assert_eq!(col2, col3);
2641 assert!(col1.starts_with("_gen_LOWER_email_"));
2642 }
2643
2644 #[test]
2645 fn test_index_metadata_serde_backward_compat() {
2646 let json = r#"{
2648 "type": "Scalar",
2649 "name": "idx_person_name",
2650 "label": "Person",
2651 "properties": ["name"],
2652 "index_type": "BTree",
2653 "where_clause": null
2654 }"#;
2655 let def: IndexDefinition = serde_json::from_str(json).unwrap();
2656 let meta = def.metadata();
2657 assert_eq!(meta.status, IndexStatus::Online);
2658 assert!(meta.last_built_at.is_none());
2659 assert!(meta.row_count_at_build.is_none());
2660 }
2661
2662 #[test]
2663 fn test_index_metadata_serde_roundtrip() {
2664 let now = Utc::now();
2665 let def = IndexDefinition::Scalar(ScalarIndexConfig {
2666 name: "idx_test".to_string(),
2667 label: "Test".to_string(),
2668 properties: vec!["prop".to_string()],
2669 index_type: ScalarIndexType::BTree,
2670 where_clause: None,
2671 metadata: IndexMetadata {
2672 status: IndexStatus::Building,
2673 last_built_at: Some(now),
2674 row_count_at_build: Some(42),
2675 },
2676 });
2677
2678 let json = serde_json::to_string(&def).unwrap();
2679 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2680 assert_eq!(parsed.metadata().status, IndexStatus::Building);
2681 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2682 assert!(parsed.metadata().last_built_at.is_some());
2683 }
2684
2685 #[tokio::test]
2686 async fn test_update_index_metadata() -> Result<()> {
2687 let dir = tempdir()?;
2688 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2689 let path = ObjectStorePath::from("schema.json");
2690 let manager = SchemaManager::load_from_store(store, &path).await?;
2691
2692 manager.add_label("Person")?;
2693 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2694 name: "idx_test".to_string(),
2695 label: "Person".to_string(),
2696 properties: vec!["name".to_string()],
2697 index_type: ScalarIndexType::BTree,
2698 where_clause: None,
2699 metadata: Default::default(),
2700 });
2701 manager.add_index(idx)?;
2702
2703 let initial = manager.get_index("idx_test").unwrap();
2705 assert_eq!(initial.metadata().status, IndexStatus::Online);
2706
2707 manager.update_index_metadata("idx_test", |m| {
2709 m.status = IndexStatus::Building;
2710 m.row_count_at_build = Some(100);
2711 })?;
2712
2713 let updated = manager.get_index("idx_test").unwrap();
2714 assert_eq!(updated.metadata().status, IndexStatus::Building);
2715 assert_eq!(updated.metadata().row_count_at_build, Some(100));
2716
2717 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2719
2720 Ok(())
2721 }
2722
2723 #[tokio::test]
2728 async fn add_internal_property_reports_newly_added() -> Result<()> {
2729 let dir = tempdir()?;
2730 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2731 let path = ObjectStorePath::from("schema.json");
2732 let manager = SchemaManager::load_from_store(store, &path).await?;
2733 manager.add_label("Doc")?;
2734
2735 let dt = DataType::Vector { dimensions: 16 };
2736 assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2738 assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2740 assert!(
2742 manager
2743 .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2744 .is_err()
2745 );
2746 Ok(())
2747 }
2748
2749 #[tokio::test]
2754 async fn test_add_index_is_upsert_by_name() -> Result<()> {
2755 let dir = tempdir()?;
2756 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2757 let path = ObjectStorePath::from("schema.json");
2758 let manager = SchemaManager::load_from_store(store, &path).await?;
2759 manager.add_label("Person")?;
2760
2761 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2762 name: "idx_test".to_string(),
2763 label: "Person".to_string(),
2764 properties: vec!["name".to_string()],
2765 index_type: ScalarIndexType::BTree,
2766 where_clause: None,
2767 metadata: IndexMetadata {
2768 status: IndexStatus::Building,
2769 ..Default::default()
2770 },
2771 });
2772 manager.add_index(initial.clone())?;
2773 assert_eq!(manager.schema().indexes.len(), 1);
2774
2775 manager.add_index(initial.clone())?;
2777 assert_eq!(
2778 manager.schema().indexes.len(),
2779 1,
2780 "duplicate add_index by name must not append"
2781 );
2782
2783 let mut updated_cfg = match initial {
2785 IndexDefinition::Scalar(c) => c,
2786 _ => unreachable!(),
2787 };
2788 updated_cfg.metadata.status = IndexStatus::Online;
2789 updated_cfg.metadata.row_count_at_build = Some(42);
2790 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2791 assert_eq!(manager.schema().indexes.len(), 1);
2792 let stored = manager.get_index("idx_test").unwrap();
2793 assert_eq!(stored.metadata().status, IndexStatus::Online);
2794 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2795
2796 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2798 name: "idx_other".to_string(),
2799 label: "Person".to_string(),
2800 properties: vec!["age".to_string()],
2801 index_type: ScalarIndexType::BTree,
2802 where_clause: None,
2803 metadata: IndexMetadata::default(),
2804 });
2805 manager.add_index(other)?;
2806 assert_eq!(manager.schema().indexes.len(), 2);
2807
2808 Ok(())
2809 }
2810
2811 #[tokio::test]
2814 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2815 let dir = tempdir()?;
2816 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2817 let path = ObjectStorePath::from("schema.json");
2818
2819 let mut schema = Schema::default();
2823 schema.labels.insert(
2824 "Person".to_string(),
2825 LabelMeta {
2826 id: 1,
2827 created_at: chrono::Utc::now(),
2828 state: SchemaElementState::Active,
2829 description: None,
2830 },
2831 );
2832 let make = |status: IndexStatus, count: Option<u64>| {
2833 IndexDefinition::Scalar(ScalarIndexConfig {
2834 name: "idx_dup".to_string(),
2835 label: "Person".to_string(),
2836 properties: vec!["name".to_string()],
2837 index_type: ScalarIndexType::BTree,
2838 where_clause: None,
2839 metadata: IndexMetadata {
2840 status,
2841 row_count_at_build: count,
2842 ..Default::default()
2843 },
2844 })
2845 };
2846 for _ in 0..49 {
2847 schema.indexes.push(make(IndexStatus::Building, None));
2848 }
2849 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2850 let json = serde_json::to_string_pretty(&schema)?;
2851 store.put(&path, json.into()).await?;
2852
2853 let manager = SchemaManager::load_from_store(store, &path).await?;
2854 let schema = manager.schema();
2855 assert_eq!(
2856 schema.indexes.len(),
2857 1,
2858 "load() must collapse 50 duplicates by name to 1"
2859 );
2860 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2862 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2863
2864 Ok(())
2865 }
2866
2867 #[test]
2868 fn test_vector_index_for_property_skips_non_online() {
2869 let mut schema = Schema::default();
2870 schema.labels.insert(
2871 "Document".to_string(),
2872 LabelMeta {
2873 id: 1,
2874 created_at: chrono::Utc::now(),
2875 state: SchemaElementState::Active,
2876 description: None,
2877 },
2878 );
2879
2880 schema
2882 .indexes
2883 .push(IndexDefinition::Vector(VectorIndexConfig {
2884 name: "vec_doc_embedding".to_string(),
2885 label: "Document".to_string(),
2886 property: "embedding".to_string(),
2887 index_type: VectorIndexType::Flat,
2888 metric: DistanceMetric::Cosine,
2889 embedding_config: None,
2890 metadata: IndexMetadata {
2891 status: IndexStatus::Stale,
2892 ..Default::default()
2893 },
2894 }));
2895
2896 assert!(
2898 schema
2899 .vector_index_for_property("Document", "embedding")
2900 .is_none()
2901 );
2902
2903 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2905 cfg.metadata.status = IndexStatus::Online;
2906 }
2907 let result = schema.vector_index_for_property("Document", "embedding");
2908 assert!(result.is_some());
2909 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2910 }
2911
2912 #[tokio::test]
2913 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2914 use crate::core::fork::SchemaDelta;
2915
2916 let dir = tempdir()?;
2917 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2918 let path = ObjectStorePath::from("schema.json");
2919 let primary = SchemaManager::load_from_store(store, &path).await?;
2920 primary.add_label("Person")?;
2921
2922 let overlay = primary.with_overlay(&SchemaDelta::empty());
2923 assert_eq!(overlay.schema().labels.len(), 1);
2924
2925 overlay.add_label("Forked")?;
2928 assert!(overlay.schema().labels.contains_key("Forked"));
2929 assert!(!primary.schema().labels.contains_key("Forked"));
2930
2931 Ok(())
2932 }
2933
2934 #[tokio::test]
2935 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2936 use crate::core::fork::SchemaDelta;
2937 use chrono::Utc;
2938
2939 let dir = tempdir()?;
2940 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2941 let path = ObjectStorePath::from("schema.json");
2942 let primary = SchemaManager::load_from_store(store, &path).await?;
2943 primary.add_label("Existing")?;
2944
2945 let label_meta = LabelMeta {
2946 id: 99,
2947 created_at: Utc::now(),
2948 state: SchemaElementState::Active,
2949 description: None,
2950 };
2951 let edge_meta = EdgeTypeMeta {
2952 id: 99,
2953 src_labels: vec!["NewLabel".into()],
2954 dst_labels: vec!["NewLabel".into()],
2955 state: SchemaElementState::Active,
2956 description: None,
2957 };
2958 let delta = SchemaDelta {
2959 added_labels: vec![("NewLabel".to_string(), label_meta)],
2960 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2961 added_properties: vec![],
2962 };
2963
2964 let overlay = primary.with_overlay(&delta);
2965 let merged = overlay.schema();
2966 assert!(merged.labels.contains_key("Existing"));
2967 assert!(merged.labels.contains_key("NewLabel"));
2968 assert!(merged.edge_types.contains_key("NewEdge"));
2969
2970 assert!(!primary.schema().labels.contains_key("NewLabel"));
2972 Ok(())
2973 }
2974
2975 #[tokio::test]
2980 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2981 let dir = tempdir()?;
2982 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2983 let path = ObjectStorePath::from("schema.json");
2984 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2985
2986 let mut handles = Vec::new();
2987 for _ in 0..16 {
2988 let m = manager.clone();
2989 handles.push(std::thread::spawn(move || {
2990 m.get_or_assign_edge_type_id("RACED")
2991 }));
2992 }
2993 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2994 assert!(
2995 ids.iter().all(|&id| id == ids[0]),
2996 "all racers must observe one id, got {ids:?}"
2997 );
2998 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
3000
3001 manager.add_label("A")?;
3003 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
3004 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
3005 Ok(())
3006 }
3007
3008 #[test]
3013 fn test_new_schemaless_edge_type_bumps_schema_version() {
3014 let mut schema = Schema::default();
3015 let v0 = schema.schema_version;
3016
3017 let id1 = schema.get_or_assign_edge_type_id("FRESH");
3018 assert_eq!(
3019 schema.schema_version,
3020 v0.wrapping_add(1),
3021 "minting a new edge type must bump schema_version"
3022 );
3023
3024 let id1_again = schema.get_or_assign_edge_type_id("FRESH");
3026 assert_eq!(id1, id1_again);
3027 assert_eq!(
3028 schema.schema_version,
3029 v0.wrapping_add(1),
3030 "resolving an existing edge type must not bump schema_version"
3031 );
3032
3033 let _id2 = schema.get_or_assign_edge_type_id("OTHER");
3035 assert_eq!(
3036 schema.schema_version,
3037 v0.wrapping_add(2),
3038 "a second new edge type must bump schema_version again"
3039 );
3040 }
3041
3042 #[test]
3046 fn validate_schema_element_name_rejects_unsafe() {
3047 for bad in ["", " ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
3048 assert!(
3049 SchemaManager::validate_schema_element_name("Label", bad).is_err(),
3050 "expected {bad:?} to be rejected"
3051 );
3052 }
3053 for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
3054 assert!(
3055 SchemaManager::validate_schema_element_name("Label", good).is_ok(),
3056 "expected {good:?} to be accepted"
3057 );
3058 }
3059 let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
3061 assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
3062 }
3063}