1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
87 HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91#[non_exhaustive]
92pub enum CrdtType {
93 GCounter,
94 GSet,
95 ORSet,
96 LWWRegister,
97 LWWMap,
98 Rga,
99 VectorClock,
100 VCRegister,
101}
102
103impl CrdtType {
104 #[must_use]
116 pub fn type_name(&self) -> &'static str {
117 match self {
118 CrdtType::GCounter => "GCounter",
119 CrdtType::GSet => "GSet",
120 CrdtType::ORSet => "ORSet",
121 CrdtType::LWWRegister => "LWWRegister",
122 CrdtType::LWWMap => "LWWMap",
123 CrdtType::Rga => "Rga",
124 CrdtType::VectorClock => "VectorClock",
125 CrdtType::VCRegister => "VCRegister",
126 }
127 }
128}
129
130#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PointType {
132 Geographic, Cartesian2D, Cartesian3D, }
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138#[non_exhaustive]
139pub enum DataType {
140 String,
141 Int32,
142 Int64,
143 Float32,
144 Float64,
145 Bool,
146 Timestamp,
147 Date,
148 Time,
149 DateTime,
150 Duration,
151 CypherValue,
152 Bytes,
153 Point(PointType),
154 Vector { dimensions: usize },
155 Btic,
156 Crdt(CrdtType),
157 List(Box<DataType>),
158 Map(Box<DataType>, Box<DataType>),
159}
160
161impl DataType {
162 #[allow(non_upper_case_globals)]
164 pub const Float: DataType = DataType::Float64;
165 #[allow(non_upper_case_globals)]
166 pub const Int: DataType = DataType::Int64;
167
168 pub fn to_arrow(&self) -> ArrowDataType {
169 match self {
170 DataType::String => ArrowDataType::Utf8,
171 DataType::Int32 => ArrowDataType::Int32,
172 DataType::Int64 => ArrowDataType::Int64,
173 DataType::Float32 => ArrowDataType::Float32,
174 DataType::Float64 => ArrowDataType::Float64,
175 DataType::Bool => ArrowDataType::Boolean,
176 DataType::Timestamp => {
177 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
178 }
179 DataType::Date => ArrowDataType::Date32,
180 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
181 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
182 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
186 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
187 Field::new("latitude", ArrowDataType::Float64, false),
188 Field::new("longitude", ArrowDataType::Float64, false),
189 Field::new("crs", ArrowDataType::Utf8, false),
190 ])),
191 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
192 Field::new("x", ArrowDataType::Float64, false),
193 Field::new("y", ArrowDataType::Float64, false),
194 Field::new("crs", ArrowDataType::Utf8, false),
195 ])),
196 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
197 Field::new("x", ArrowDataType::Float64, false),
198 Field::new("y", ArrowDataType::Float64, false),
199 Field::new("z", ArrowDataType::Float64, false),
200 Field::new("crs", ArrowDataType::Utf8, false),
201 ])),
202 },
203 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
204 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
205 *dimensions as i32,
206 ),
207 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
208 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
210 let item = Field::new("item", inner.to_arrow(), true);
214 let item = if matches!(**inner, DataType::Bytes) {
215 item.with_metadata(raw_bytes_field_metadata())
216 } else {
217 item
218 };
219 ArrowDataType::List(Arc::new(item))
220 }
221 DataType::Map(key, value) => {
222 let value_field = Field::new("value", value.to_arrow(), true);
223 let value_field = if matches!(**value, DataType::Bytes) {
224 value_field.with_metadata(raw_bytes_field_metadata())
225 } else {
226 value_field
227 };
228 ArrowDataType::List(Arc::new(Field::new(
229 "item",
230 ArrowDataType::Struct(Fields::from(vec![
231 Field::new("key", key.to_arrow(), false),
232 value_field,
233 ])),
234 true,
235 )))
236 }
237 }
238 }
239
240 pub fn accepts(&self, value: &crate::value::Value) -> bool {
266 use crate::value::{TemporalValue, Value};
267
268 if matches!(value, Value::Null) {
270 return true;
271 }
272
273 match self {
274 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
276
277 DataType::String => matches!(value, Value::String(_)),
278 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
279 DataType::Float32 | DataType::Float64 => {
281 matches!(value, Value::Int(_) | Value::Float(_))
282 }
283 DataType::Bool => matches!(value, Value::Bool(_)),
284
285 DataType::Timestamp => matches!(
288 value,
289 Value::String(_)
290 | Value::Int(_)
291 | Value::Temporal(
292 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
293 )
294 ),
295 DataType::DateTime => matches!(
296 value,
297 Value::Temporal(
298 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
299 )
300 ),
301 DataType::Date => {
302 matches!(
303 value,
304 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
305 )
306 }
307 DataType::Time => matches!(
308 value,
309 Value::Int(_)
310 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
311 ),
312 DataType::Duration => {
313 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
314 }
315 DataType::Bytes => matches!(value, Value::Bytes(_)),
316 DataType::Btic => matches!(
318 value,
319 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
320 ),
321 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
322 DataType::List(_) => matches!(value, Value::List(_)),
323 DataType::Map(_, _) => matches!(value, Value::Map(_)),
324 }
325 }
326}
327
328fn default_created_at() -> DateTime<Utc> {
329 Utc::now()
330}
331
332fn default_state() -> SchemaElementState {
333 SchemaElementState::Active
334}
335
336fn default_version_1() -> u32 {
337 1
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct PropertyMeta {
342 pub r#type: DataType,
343 pub nullable: bool,
344 #[serde(default = "default_version_1")]
345 pub added_in: u32, #[serde(default = "default_state")]
347 pub state: SchemaElementState,
348 #[serde(default)]
349 pub generation_expression: Option<String>,
350 #[serde(default, skip_serializing_if = "Option::is_none")]
351 pub description: Option<String>,
352}
353
354#[derive(Clone, Debug, Serialize, Deserialize)]
355pub struct LabelMeta {
356 pub id: u16, #[serde(default = "default_created_at")]
358 pub created_at: DateTime<Utc>,
359 #[serde(default = "default_state")]
360 pub state: SchemaElementState,
361 #[serde(default, skip_serializing_if = "Option::is_none")]
362 pub description: Option<String>,
363}
364
365#[derive(Clone, Debug, Serialize, Deserialize)]
366pub struct EdgeTypeMeta {
367 pub id: u32,
369 pub src_labels: Vec<String>,
370 pub dst_labels: Vec<String>,
371 #[serde(default = "default_state")]
372 pub state: SchemaElementState,
373 #[serde(default, skip_serializing_if = "Option::is_none")]
374 pub description: Option<String>,
375}
376
377#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
378#[non_exhaustive]
379pub enum ConstraintType {
380 Unique { properties: Vec<String> },
381 Exists { property: String },
382 Check { expression: String },
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
386#[non_exhaustive]
387pub enum ConstraintTarget {
388 Label(String),
389 EdgeType(String),
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub struct Constraint {
394 pub name: String,
395 pub constraint_type: ConstraintType,
396 pub target: ConstraintTarget,
397 pub enabled: bool,
398}
399
400#[derive(Clone, Debug, Serialize, Deserialize)]
406pub struct SchemalessEdgeTypeRegistry {
407 name_to_id: HashMap<String, u32>,
408 id_to_name: HashMap<u32, String>,
409 next_local_id: u32,
411}
412
413impl SchemalessEdgeTypeRegistry {
414 pub fn new() -> Self {
415 Self {
416 name_to_id: HashMap::new(),
417 id_to_name: HashMap::new(),
418 next_local_id: 1,
419 }
420 }
421
422 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
424 if let Some(&id) = self.name_to_id.get(type_name) {
425 return id;
426 }
427
428 let id = make_schemaless_id(self.next_local_id);
429 self.next_local_id += 1;
430
431 self.name_to_id.insert(type_name.to_string(), id);
432 self.id_to_name.insert(id, type_name.to_string());
433
434 id
435 }
436
437 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
439 self.id_to_name.get(&type_id).map(String::as_str)
440 }
441
442 pub fn contains(&self, type_name: &str) -> bool {
444 self.name_to_id.contains_key(type_name)
445 }
446
447 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
449 self.name_to_id.get(type_name).copied()
450 }
451
452 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
454 self.name_to_id
455 .iter()
456 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
457 .map(|(_, &id)| id)
458 }
459
460 pub fn all_type_ids(&self) -> Vec<u32> {
462 self.id_to_name.keys().copied().collect()
463 }
464
465 pub fn is_empty(&self) -> bool {
467 self.name_to_id.is_empty()
468 }
469}
470
471impl Default for SchemalessEdgeTypeRegistry {
472 fn default() -> Self {
473 Self::new()
474 }
475}
476
477pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
483pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
485
486const MAX_SCHEMA_NAME_LEN: usize = 255;
491
492#[inline]
494pub fn is_virtual_label_id(id: u16) -> bool {
495 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
496}
497
498#[derive(Clone, Debug, Serialize, Deserialize)]
499pub struct Schema {
500 pub schema_version: u32,
501 pub labels: HashMap<String, LabelMeta>,
502 pub edge_types: HashMap<String, EdgeTypeMeta>,
503 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
504 #[serde(default)]
505 pub indexes: Vec<IndexDefinition>,
506 #[serde(default)]
507 pub constraints: Vec<Constraint>,
508 #[serde(default)]
510 pub schemaless_registry: SchemalessEdgeTypeRegistry,
511}
512
513impl Default for Schema {
514 fn default() -> Self {
515 Self {
516 schema_version: 1,
517 labels: HashMap::new(),
518 edge_types: HashMap::new(),
519 properties: HashMap::new(),
520 indexes: Vec::new(),
521 constraints: Vec::new(),
522 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
523 }
524 }
525}
526
527impl Schema {
528 fn bump_version(&mut self) {
537 self.schema_version = self.schema_version.wrapping_add(1);
538 }
539
540 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
545 self.labels
546 .iter()
547 .find(|(_, meta)| meta.id == label_id)
548 .map(|(name, _)| name.as_str())
549 }
550
551 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
553 self.labels.get(label_name).map(|meta| meta.id)
554 }
555
556 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
561 self.edge_types
562 .iter()
563 .find(|(_, meta)| meta.id == type_id)
564 .map(|(name, _)| name.as_str())
565 }
566
567 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
569 self.edge_types.get(type_name).map(|meta| meta.id)
570 }
571
572 pub fn vector_index_for_property(
577 &self,
578 label: &str,
579 property: &str,
580 ) -> Option<&VectorIndexConfig> {
581 self.indexes.iter().find_map(|idx| {
582 if let IndexDefinition::Vector(config) = idx
583 && config.label == label
584 && config.property == property
585 && config.metadata.status == IndexStatus::Online
586 {
587 return Some(config);
588 }
589 None
590 })
591 }
592
593 pub fn fulltext_index_for_property(
598 &self,
599 label: &str,
600 property: &str,
601 ) -> Option<&FullTextIndexConfig> {
602 self.indexes.iter().find_map(|idx| {
603 if let IndexDefinition::FullText(config) = idx
604 && config.label == label
605 && config.properties.iter().any(|p| p == property)
606 && config.metadata.status == IndexStatus::Online
607 {
608 return Some(config);
609 }
610 None
611 })
612 }
613
614 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
619 self.labels
620 .iter()
621 .find(|(k, _)| k.eq_ignore_ascii_case(name))
622 .map(|(_, v)| v)
623 }
624
625 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
632 self.labels
633 .iter()
634 .find(|(k, _)| k.eq_ignore_ascii_case(name))
635 .map(|(k, _)| k.clone())
636 }
637
638 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
640 self.get_label_case_insensitive(label_name)
641 .map(|meta| meta.id)
642 }
643
644 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
649 self.edge_types
650 .iter()
651 .find(|(k, _)| k.eq_ignore_ascii_case(name))
652 .map(|(_, v)| v)
653 }
654
655 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
657 self.get_edge_type_case_insensitive(type_name)
658 .map(|meta| meta.id)
659 }
660
661 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
664 self.edge_type_id_by_name_case_insensitive(type_name)
665 .or_else(|| {
666 self.schemaless_registry
667 .id_by_name_case_insensitive(type_name)
668 })
669 }
670
671 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
677 if let Some(id) = self.edge_type_id_unified(type_name) {
678 return id;
679 }
680 let id = self.schemaless_registry.get_or_assign_id(type_name);
688 self.bump_version();
689 id
690 }
691
692 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
699 self.edge_type_id_by_name(type_name)
700 .or_else(|| self.schemaless_registry.id_by_name(type_name))
701 }
702
703 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
707 if is_schemaless_edge_type(type_id) {
708 self.schemaless_registry
709 .type_name_by_id(type_id)
710 .map(str::to_owned)
711 } else {
712 self.edge_type_name_by_id(type_id).map(str::to_owned)
713 }
714 }
715
716 pub fn all_edge_type_ids(&self) -> Vec<u32> {
719 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
720 ids.extend(self.schemaless_registry.all_type_ids());
721 ids.sort_unstable();
722 ids
723 }
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
728pub enum IndexStatus {
729 #[default]
731 Online,
732 Building,
734 Stale,
736 Failed,
738}
739
740#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
742pub struct IndexMetadata {
743 #[serde(default)]
745 pub status: IndexStatus,
746 #[serde(default)]
748 pub last_built_at: Option<DateTime<Utc>>,
749 #[serde(default)]
751 pub row_count_at_build: Option<u64>,
752}
753
754#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
755#[serde(tag = "type")]
756#[non_exhaustive]
757pub enum IndexDefinition {
758 Vector(VectorIndexConfig),
759 FullText(FullTextIndexConfig),
760 Scalar(ScalarIndexConfig),
761 Inverted(InvertedIndexConfig),
762 JsonFullText(JsonFtsIndexConfig),
763}
764
765impl IndexDefinition {
766 pub fn name(&self) -> &str {
768 match self {
769 IndexDefinition::Vector(c) => &c.name,
770 IndexDefinition::FullText(c) => &c.name,
771 IndexDefinition::Scalar(c) => &c.name,
772 IndexDefinition::Inverted(c) => &c.name,
773 IndexDefinition::JsonFullText(c) => &c.name,
774 }
775 }
776
777 pub fn label(&self) -> &str {
779 match self {
780 IndexDefinition::Vector(c) => &c.label,
781 IndexDefinition::FullText(c) => &c.label,
782 IndexDefinition::Scalar(c) => &c.label,
783 IndexDefinition::Inverted(c) => &c.label,
784 IndexDefinition::JsonFullText(c) => &c.label,
785 }
786 }
787
788 pub fn metadata(&self) -> &IndexMetadata {
790 match self {
791 IndexDefinition::Vector(c) => &c.metadata,
792 IndexDefinition::FullText(c) => &c.metadata,
793 IndexDefinition::Scalar(c) => &c.metadata,
794 IndexDefinition::Inverted(c) => &c.metadata,
795 IndexDefinition::JsonFullText(c) => &c.metadata,
796 }
797 }
798
799 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
801 match self {
802 IndexDefinition::Vector(c) => &mut c.metadata,
803 IndexDefinition::FullText(c) => &mut c.metadata,
804 IndexDefinition::Scalar(c) => &mut c.metadata,
805 IndexDefinition::Inverted(c) => &mut c.metadata,
806 IndexDefinition::JsonFullText(c) => &mut c.metadata,
807 }
808 }
809}
810
811#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
812pub struct InvertedIndexConfig {
813 pub name: String,
814 pub label: String,
815 pub property: String,
816 #[serde(default = "default_normalize")]
817 pub normalize: bool,
818 #[serde(default = "default_max_terms_per_doc")]
819 pub max_terms_per_doc: usize,
820 #[serde(default)]
821 pub metadata: IndexMetadata,
822}
823
824fn default_normalize() -> bool {
825 true
826}
827
828fn default_max_terms_per_doc() -> usize {
829 10_000
830}
831
832#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
833pub struct VectorIndexConfig {
834 pub name: String,
835 pub label: String,
836 pub property: String,
837 pub index_type: VectorIndexType,
838 pub metric: DistanceMetric,
839 pub embedding_config: Option<EmbeddingConfig>,
840 #[serde(default)]
841 pub metadata: IndexMetadata,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
845pub struct EmbeddingConfig {
846 pub alias: String,
848 pub source_properties: Vec<String>,
849 pub batch_size: usize,
850 #[serde(default)]
853 pub document_prefix: Option<String>,
854 #[serde(default)]
857 pub query_prefix: Option<String>,
858}
859
860#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
861#[non_exhaustive]
862pub enum VectorIndexType {
863 Flat,
864 IvfFlat {
865 num_partitions: u32,
866 },
867 IvfPq {
868 num_partitions: u32,
869 num_sub_vectors: u32,
870 bits_per_subvector: u8,
871 },
872 IvfSq {
873 num_partitions: u32,
874 },
875 IvfRq {
876 num_partitions: u32,
877 #[serde(default)]
878 num_bits: Option<u8>,
879 },
880 HnswFlat {
881 m: u32,
882 ef_construction: u32,
883 #[serde(default)]
884 num_partitions: Option<u32>,
885 },
886 HnswSq {
887 m: u32,
888 ef_construction: u32,
889 #[serde(default)]
890 num_partitions: Option<u32>,
891 },
892 HnswPq {
893 m: u32,
894 ef_construction: u32,
895 num_sub_vectors: u32,
896 #[serde(default)]
897 num_partitions: Option<u32>,
898 },
899}
900
901#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
902#[non_exhaustive]
903pub enum DistanceMetric {
904 Cosine,
905 L2,
906 Dot,
907}
908
909impl DistanceMetric {
910 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
922 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
923 match self {
924 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
925 DistanceMetric::Cosine => {
926 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
927 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
928 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
929 let denom = norm_a * norm_b;
930 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
931 }
932 DistanceMetric::Dot => {
933 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
934 -dot
935 }
936 }
937 }
938}
939
940#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
941pub struct FullTextIndexConfig {
942 pub name: String,
943 pub label: String,
944 pub properties: Vec<String>,
945 pub tokenizer: TokenizerConfig,
946 pub with_positions: bool,
947 #[serde(default)]
948 pub metadata: IndexMetadata,
949}
950
951#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
952#[non_exhaustive]
953pub enum TokenizerConfig {
954 Standard,
955 Whitespace,
956 Ngram { min: u8, max: u8 },
957 Custom { name: String },
958}
959
960#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
961pub struct JsonFtsIndexConfig {
962 pub name: String,
963 pub label: String,
964 pub column: String,
965 #[serde(default)]
966 pub paths: Vec<String>,
967 #[serde(default)]
968 pub with_positions: bool,
969 #[serde(default)]
970 pub metadata: IndexMetadata,
971}
972
973#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
974pub struct ScalarIndexConfig {
975 pub name: String,
976 pub label: String,
977 pub properties: Vec<String>,
978 pub index_type: ScalarIndexType,
979 pub where_clause: Option<String>,
980 #[serde(default)]
981 pub metadata: IndexMetadata,
982}
983
984#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
985#[non_exhaustive]
986pub enum ScalarIndexType {
987 BTree,
988 Hash,
989 Bitmap,
990 LabelList,
991}
992
993pub struct SchemaManager {
994 store: Arc<dyn ObjectStore>,
995 path: ObjectStorePath,
996 schema: RwLock<Arc<Schema>>,
997}
998
999impl SchemaManager {
1000 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1001 let path = path.as_ref();
1002 let parent = path
1003 .parent()
1004 .ok_or_else(|| anyhow!("Invalid schema path"))?;
1005 let filename = path
1006 .file_name()
1007 .ok_or_else(|| anyhow!("Invalid schema filename"))?
1008 .to_str()
1009 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1010
1011 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1012 let obj_path = ObjectStorePath::from(filename);
1013
1014 Self::load_from_store(store, &obj_path).await
1015 }
1016
1017 pub async fn load_from_store(
1018 store: Arc<dyn ObjectStore>,
1019 path: &ObjectStorePath,
1020 ) -> Result<Self> {
1021 match store.get(path).await {
1022 Ok(result) => {
1023 let bytes = result.bytes().await?;
1024 let content = String::from_utf8(bytes.to_vec())?;
1025 let mut schema: Schema = serde_json::from_str(&content)?;
1026 let original_len = schema.indexes.len();
1034 if original_len > 0 {
1035 let mut seen: std::collections::HashSet<String> =
1036 std::collections::HashSet::with_capacity(original_len);
1037 let mut dedup: Vec<IndexDefinition> = schema
1038 .indexes
1039 .iter()
1040 .rev()
1041 .filter(|idx| seen.insert(idx.name().to_string()))
1042 .cloned()
1043 .collect();
1044 dedup.reverse();
1045 if dedup.len() != original_len {
1046 tracing::warn!(
1047 collapsed = original_len - dedup.len(),
1048 kept = dedup.len(),
1049 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1050 );
1051 schema.indexes = dedup;
1052 }
1053 }
1054 Ok(Self {
1055 store,
1056 path: path.clone(),
1057 schema: RwLock::new(Arc::new(schema)),
1058 })
1059 }
1060 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1061 store,
1062 path: path.clone(),
1063 schema: RwLock::new(Arc::new(Schema::default())),
1064 }),
1065 Err(e) => Err(anyhow::Error::from(e)),
1066 }
1067 }
1068
1069 pub async fn save(&self) -> Result<()> {
1070 let content = {
1071 let schema_guard = acquire_read(&self.schema, "schema")?;
1072 serde_json::to_string_pretty(&**schema_guard)?
1073 };
1074 self.store
1075 .put(&self.path, content.into())
1076 .await
1077 .map_err(anyhow::Error::from)?;
1078 Ok(())
1079 }
1080
1081 pub fn path(&self) -> &ObjectStorePath {
1082 &self.path
1083 }
1084
1085 pub fn schema(&self) -> Arc<Schema> {
1086 self.schema
1087 .read()
1088 .expect("Schema lock poisoned - a thread panicked while holding it")
1089 .clone()
1090 }
1091
1092 fn normalize_function_names(expr: &str) -> String {
1095 let mut result = String::with_capacity(expr.len());
1096 let mut chars = expr.chars().peekable();
1097
1098 while let Some(ch) = chars.next() {
1099 if ch.is_alphabetic() {
1100 let mut ident = String::new();
1102 ident.push(ch);
1103
1104 while let Some(&next) = chars.peek() {
1105 if next.is_alphanumeric() || next == '_' {
1106 ident.push(chars.next().unwrap());
1107 } else {
1108 break;
1109 }
1110 }
1111
1112 if chars.peek() == Some(&'(') {
1114 result.push_str(&ident.to_uppercase());
1115 } else {
1116 result.push_str(&ident); }
1118 } else {
1119 result.push(ch);
1120 }
1121 }
1122
1123 result
1124 }
1125
1126 pub fn generated_column_name(expr: &str) -> String {
1134 let normalized = Self::normalize_function_names(expr);
1136
1137 let sanitized = normalized
1138 .replace(|c: char| !c.is_alphanumeric(), "_")
1139 .trim_matches('_')
1140 .to_string();
1141
1142 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1144 const FNV_PRIME: u64 = 1099511628211;
1145
1146 let mut hash = FNV_OFFSET_BASIS;
1147 for byte in normalized.as_bytes() {
1148 hash ^= *byte as u64;
1149 hash = hash.wrapping_mul(FNV_PRIME);
1150 }
1151
1152 format!("_gen_{}_{:x}", sanitized, hash)
1153 }
1154
1155 pub fn replace_schema(&self, new_schema: Schema) {
1156 let mut schema = self
1157 .schema
1158 .write()
1159 .expect("Schema lock poisoned - a thread panicked while holding it");
1160 *schema = Arc::new(new_schema);
1161 }
1162
1163 #[must_use]
1176 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1177 let primary = self.schema();
1178 let merged = if overlay.is_empty() {
1179 (*primary).clone()
1180 } else {
1181 let mut merged = (*primary).clone();
1182 for (name, label) in &overlay.added_labels {
1183 merged.labels.insert(name.clone(), label.clone());
1184 }
1185 for (name, edge_type) in &overlay.added_edge_types {
1186 merged.edge_types.insert(name.clone(), edge_type.clone());
1187 }
1188 for addition in &overlay.added_properties {
1189 let props = merged.properties.entry(addition.owner.clone()).or_default();
1190 props.insert(
1191 addition.property.clone(),
1192 PropertyMeta {
1193 r#type: addition.data_type.clone(),
1194 nullable: addition.nullable,
1195 added_in: merged.schema_version,
1196 state: SchemaElementState::Active,
1197 generation_expression: None,
1198 description: None,
1199 },
1200 );
1201 }
1202 merged
1203 };
1204
1205 Arc::new(Self {
1206 store: self.store.clone(),
1207 path: self.path.clone(),
1208 schema: RwLock::new(Arc::new(merged)),
1209 })
1210 }
1211
1212 pub fn next_label_id(&self) -> u16 {
1213 self.schema()
1214 .labels
1215 .values()
1216 .map(|l| l.id)
1217 .max()
1218 .unwrap_or(0)
1219 + 1
1220 }
1221
1222 pub fn next_type_id(&self) -> u32 {
1223 let max_schema_id = self
1224 .schema()
1225 .edge_types
1226 .values()
1227 .map(|t| t.id)
1228 .max()
1229 .unwrap_or(0);
1230
1231 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1233 panic!("Schema edge type ID exhaustion");
1234 }
1235
1236 max_schema_id + 1
1237 }
1238
1239 pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1257 if name.is_empty() || name.chars().all(char::is_whitespace) {
1258 return Err(anyhow!(
1259 "{kind} name must be non-empty and not all whitespace"
1260 ));
1261 }
1262 if name.len() > MAX_SCHEMA_NAME_LEN {
1263 return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1264 }
1265 if let Some(c) = name
1266 .chars()
1267 .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1268 {
1269 return Err(anyhow!(
1270 "{kind} name '{name}' contains an unsafe character ({c:?})"
1271 ));
1272 }
1273 Ok(())
1274 }
1275
1276 pub fn add_label(&self, name: &str) -> Result<u16> {
1277 self.add_label_with_desc(name, None)
1278 }
1279
1280 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1281 Self::validate_schema_element_name("Label", name)?;
1282 let mut guard = acquire_write(&self.schema, "schema")?;
1283 let schema = Arc::make_mut(&mut *guard);
1284 if schema.labels.contains_key(name) {
1285 return Err(anyhow!("Label '{}' already exists", name));
1286 }
1287
1288 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1289 if id >= VIRTUAL_LABEL_ID_START {
1290 return Err(anyhow!(
1291 "Native label space exhausted (next id {id:#x} would enter the \
1292 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1293 reserved for catalog-resolved labels)"
1294 ));
1295 }
1296 schema.labels.insert(
1297 name.to_string(),
1298 LabelMeta {
1299 id,
1300 created_at: Utc::now(),
1301 state: SchemaElementState::Active,
1302 description,
1303 },
1304 );
1305 schema.bump_version();
1306 Ok(id)
1307 }
1308
1309 pub fn add_edge_type(
1310 &self,
1311 name: &str,
1312 src_labels: Vec<String>,
1313 dst_labels: Vec<String>,
1314 ) -> Result<u32> {
1315 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1316 }
1317
1318 pub fn add_edge_type_with_desc(
1319 &self,
1320 name: &str,
1321 src_labels: Vec<String>,
1322 dst_labels: Vec<String>,
1323 description: Option<String>,
1324 ) -> Result<u32> {
1325 Self::validate_schema_element_name("Edge type", name)?;
1326 let mut guard = acquire_write(&self.schema, "schema")?;
1327 let schema = Arc::make_mut(&mut *guard);
1328 if schema.edge_types.contains_key(name) {
1329 return Err(anyhow!("Edge type '{}' already exists", name));
1330 }
1331
1332 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1333
1334 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1339 return Err(anyhow!(
1340 "Native edge type space exhausted (next id {id:#x} would enter the \
1341 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1342 reserved for catalog-resolved edge types)"
1343 ));
1344 }
1345
1346 schema.edge_types.insert(
1347 name.to_string(),
1348 EdgeTypeMeta {
1349 id,
1350 src_labels,
1351 dst_labels,
1352 state: SchemaElementState::Active,
1353 description,
1354 },
1355 );
1356 schema.bump_version();
1357 Ok(id)
1358 }
1359
1360 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1369 {
1370 let guard = acquire_read(&self.schema, "schema")
1371 .expect("Schema lock poisoned - a thread panicked while holding it");
1372 if let Some(id) = guard.edge_type_id_unified(type_name) {
1373 return id;
1374 }
1375 }
1376 let mut guard = acquire_write(&self.schema, "schema")
1377 .expect("Schema lock poisoned - a thread panicked while holding it");
1378 let schema = Arc::make_mut(&mut *guard);
1379 schema.get_or_assign_edge_type_id(type_name)
1380 }
1381
1382 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1384 let schema = acquire_read(&self.schema, "schema")
1385 .expect("Schema lock poisoned - a thread panicked while holding it");
1386 schema.edge_type_name_by_id_unified(type_id)
1387 }
1388
1389 pub fn add_property(
1390 &self,
1391 label_or_type: &str,
1392 prop_name: &str,
1393 data_type: DataType,
1394 nullable: bool,
1395 ) -> Result<()> {
1396 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1397 }
1398
1399 pub fn add_property_with_desc(
1400 &self,
1401 label_or_type: &str,
1402 prop_name: &str,
1403 data_type: DataType,
1404 nullable: bool,
1405 description: Option<String>,
1406 ) -> Result<()> {
1407 validate_property_name(prop_name)?;
1408 let mut guard = acquire_write(&self.schema, "schema")?;
1409 let schema = Arc::make_mut(&mut *guard);
1410 let version = schema.schema_version;
1411 let props = schema
1412 .properties
1413 .entry(label_or_type.to_string())
1414 .or_default();
1415
1416 if props.contains_key(prop_name) {
1417 return Err(anyhow!(
1418 "Property '{}' already exists for '{}'",
1419 prop_name,
1420 label_or_type
1421 ));
1422 }
1423
1424 props.insert(
1425 prop_name.to_string(),
1426 PropertyMeta {
1427 r#type: data_type,
1428 nullable,
1429 added_in: version,
1430 state: SchemaElementState::Active,
1431 generation_expression: None,
1432 description,
1433 },
1434 );
1435 schema.bump_version();
1437 Ok(())
1438 }
1439
1440 pub fn add_generated_property(
1441 &self,
1442 label_or_type: &str,
1443 prop_name: &str,
1444 data_type: DataType,
1445 expr: String,
1446 ) -> Result<()> {
1447 validate_reserved_property_name(prop_name)?;
1450 let mut guard = acquire_write(&self.schema, "schema")?;
1451 let schema = Arc::make_mut(&mut *guard);
1452 let version = schema.schema_version;
1453 let props = schema
1454 .properties
1455 .entry(label_or_type.to_string())
1456 .or_default();
1457
1458 if props.contains_key(prop_name) {
1459 return Err(anyhow!("Property '{}' already exists", prop_name));
1460 }
1461
1462 props.insert(
1463 prop_name.to_string(),
1464 PropertyMeta {
1465 r#type: data_type,
1466 nullable: true,
1467 added_in: version,
1468 state: SchemaElementState::Active,
1469 generation_expression: Some(expr),
1470 description: None,
1471 },
1472 );
1473 schema.bump_version();
1475 Ok(())
1476 }
1477
1478 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1479 let mut guard = acquire_write(&self.schema, "schema")?;
1480 let schema = Arc::make_mut(&mut *guard);
1481 let meta = schema
1482 .labels
1483 .get_mut(name)
1484 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1485 meta.description = description;
1486 Ok(())
1487 }
1488
1489 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1490 let mut guard = acquire_write(&self.schema, "schema")?;
1491 let schema = Arc::make_mut(&mut *guard);
1492 let meta = schema
1493 .edge_types
1494 .get_mut(name)
1495 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1496 meta.description = description;
1497 Ok(())
1498 }
1499
1500 pub fn set_property_description(
1501 &self,
1502 entity: &str,
1503 prop_name: &str,
1504 description: Option<String>,
1505 ) -> Result<()> {
1506 let mut guard = acquire_write(&self.schema, "schema")?;
1507 let schema = Arc::make_mut(&mut *guard);
1508 let props = schema
1509 .properties
1510 .get_mut(entity)
1511 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1512 let meta = props
1513 .get_mut(prop_name)
1514 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1515 meta.description = description;
1516 Ok(())
1517 }
1518
1519 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1528 let mut guard = acquire_write(&self.schema, "schema")?;
1529 let schema = Arc::make_mut(&mut *guard);
1530 if let Some(existing) = schema
1531 .indexes
1532 .iter_mut()
1533 .find(|i| i.name() == index_def.name())
1534 {
1535 *existing = index_def;
1536 } else {
1537 schema.indexes.push(index_def);
1538 }
1539 schema.bump_version();
1540 Ok(())
1541 }
1542
1543 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1544 let schema = self.schema.read().expect("Schema lock poisoned");
1545 schema.indexes.iter().find(|i| i.name() == name).cloned()
1546 }
1547
1548 pub fn update_index_metadata(
1553 &self,
1554 index_name: &str,
1555 f: impl FnOnce(&mut IndexMetadata),
1556 ) -> Result<()> {
1557 let mut guard = acquire_write(&self.schema, "schema")?;
1558 let schema = Arc::make_mut(&mut *guard);
1559 let idx = schema
1560 .indexes
1561 .iter_mut()
1562 .find(|i| i.name() == index_name)
1563 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1564 f(idx.metadata_mut());
1565 Ok(())
1566 }
1567
1568 pub fn remove_index(&self, name: &str) -> Result<()> {
1569 let mut guard = acquire_write(&self.schema, "schema")?;
1570 let schema = Arc::make_mut(&mut *guard);
1571 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1572 schema.indexes.remove(pos);
1573 schema.bump_version();
1574 Ok(())
1575 } else {
1576 Err(anyhow!("Index '{}' not found", name))
1577 }
1578 }
1579
1580 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1581 let mut guard = acquire_write(&self.schema, "schema")?;
1582 let schema = Arc::make_mut(&mut *guard);
1583 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1584 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1585 }
1586 schema.constraints.push(constraint);
1587 schema.bump_version();
1588 Ok(())
1589 }
1590
1591 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1592 let mut guard = acquire_write(&self.schema, "schema")?;
1593 let schema = Arc::make_mut(&mut *guard);
1594 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1595 schema.constraints.remove(pos);
1596 schema.bump_version();
1597 Ok(())
1598 } else if if_exists {
1599 Ok(())
1600 } else {
1601 Err(anyhow!("Constraint '{}' not found", name))
1602 }
1603 }
1604
1605 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1606 let mut guard = acquire_write(&self.schema, "schema")?;
1607 let schema = Arc::make_mut(&mut *guard);
1608 let Some(props) = schema.properties.get_mut(label_or_type) else {
1609 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1610 };
1611 if props.remove(prop_name).is_none() {
1612 return Err(anyhow!(
1613 "Property '{}' not found for '{}'",
1614 prop_name,
1615 label_or_type
1616 ));
1617 }
1618 schema.bump_version();
1619 Ok(())
1620 }
1621
1622 pub fn rename_property(
1623 &self,
1624 label_or_type: &str,
1625 old_name: &str,
1626 new_name: &str,
1627 ) -> Result<()> {
1628 let mut guard = acquire_write(&self.schema, "schema")?;
1629 let schema = Arc::make_mut(&mut *guard);
1630 let Some(props) = schema.properties.get_mut(label_or_type) else {
1631 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1632 };
1633 let Some(meta) = props.remove(old_name) else {
1634 return Err(anyhow!(
1635 "Property '{}' not found for '{}'",
1636 old_name,
1637 label_or_type
1638 ));
1639 };
1640 if props.contains_key(new_name) {
1641 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1644 }
1645 props.insert(new_name.to_string(), meta);
1646 schema.bump_version();
1647 Ok(())
1648 }
1649
1650 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1651 let mut guard = acquire_write(&self.schema, "schema")?;
1652 let schema = Arc::make_mut(&mut *guard);
1653 if let Some(label_meta) = schema.labels.get_mut(name) {
1654 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1655 schema.bump_version();
1657 Ok(())
1658 } else if if_exists {
1659 Ok(())
1660 } else {
1661 Err(anyhow!("Label '{}' not found", name))
1662 }
1663 }
1664
1665 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1666 let mut guard = acquire_write(&self.schema, "schema")?;
1667 let schema = Arc::make_mut(&mut *guard);
1668 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1669 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1670 schema.bump_version();
1672 Ok(())
1673 } else if if_exists {
1674 Ok(())
1675 } else {
1676 Err(anyhow!("Edge Type '{}' not found", name))
1677 }
1678 }
1679}
1680
1681pub fn validate_identifier(name: &str) -> Result<()> {
1683 if name.is_empty() || name.len() > 64 {
1685 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1686 }
1687
1688 let first = name.chars().next().unwrap();
1690 if !first.is_alphabetic() && first != '_' {
1691 return Err(anyhow!(
1692 "Identifier '{}' must start with letter or underscore",
1693 name
1694 ));
1695 }
1696
1697 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1699 return Err(anyhow!(
1700 "Identifier '{}' must contain only alphanumeric and underscore",
1701 name
1702 ));
1703 }
1704
1705 const RESERVED: &[&str] = &[
1707 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1708 "UNION", "ORDER", "LIMIT",
1709 ];
1710 if RESERVED.contains(&name.to_uppercase().as_str()) {
1711 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1712 }
1713
1714 Ok(())
1715}
1716
1717pub fn validate_property_name(name: &str) -> Result<()> {
1724 if name.starts_with('_') {
1725 return Err(anyhow!(
1726 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1727 name
1728 ));
1729 }
1730 validate_reserved_property_name(name)
1731}
1732
1733fn validate_reserved_property_name(name: &str) -> Result<()> {
1740 const RESERVED_PROPS: &[&str] = &[
1749 "ext_id",
1750 "overflow_json",
1751 "eid",
1752 "src_vid",
1753 "dst_vid",
1754 "op",
1755 "__set_struct__",
1763 ];
1764 if RESERVED_PROPS.contains(&name) {
1765 return Err(anyhow!(
1766 "Property name '{}' is reserved by the storage layer; please choose a different name",
1767 name
1768 ));
1769 }
1770 Ok(())
1771}
1772
1773#[cfg(test)]
1774mod tests {
1775 use super::*;
1776 use crate::value::{TemporalValue, Value};
1777 use object_store::local::LocalFileSystem;
1778 use tempfile::tempdir;
1779
1780 #[test]
1781 fn test_datatype_accepts_matrix() {
1782 let dt = || TemporalValue::DateTime {
1783 nanos_since_epoch: 0,
1784 offset_seconds: 0,
1785 timezone_name: None,
1786 };
1787
1788 for ty in [
1790 DataType::String,
1791 DataType::Int64,
1792 DataType::Bool,
1793 DataType::DateTime,
1794 DataType::Float64,
1795 ] {
1796 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1797 }
1798
1799 assert!(DataType::String.accepts(&Value::String("x".into())));
1801 assert!(DataType::Int64.accepts(&Value::Int(1)));
1802 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1803 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1804
1805 assert!(
1807 DataType::Float64.accepts(&Value::Int(3)),
1808 "Int widens to Float"
1809 );
1810 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1811 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1812 assert!(
1813 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1814 "storage parses strings for non-struct Timestamp columns"
1815 );
1816
1817 assert!(
1819 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1820 "String into a DateTime struct column nulls silently — reject here"
1821 );
1822 assert!(!DataType::Bool.accepts(&Value::Int(1)));
1823 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1824 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1825 assert!(
1826 !DataType::String.accepts(&Value::Int(10)),
1827 "no implicit stringification"
1828 );
1829 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1830
1831 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1833 }
1834
1835 #[tokio::test]
1836 async fn test_schema_management() -> Result<()> {
1837 let dir = tempdir()?;
1838 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1839 let path = ObjectStorePath::from("schema.json");
1840 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1841
1842 let lid = manager.add_label("Person")?;
1844 assert_eq!(lid, 1);
1845 assert!(manager.add_label("Person").is_err());
1846
1847 manager.add_property("Person", "name", DataType::String, false)?;
1849 assert!(
1850 manager
1851 .add_property("Person", "name", DataType::String, false)
1852 .is_err()
1853 );
1854
1855 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1857 assert_eq!(tid, 1);
1858
1859 manager.save().await?;
1860 assert!(store.get(&path).await.is_ok());
1862
1863 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1864 assert!(manager2.schema().labels.contains_key("Person"));
1865 assert!(
1866 manager2
1867 .schema()
1868 .properties
1869 .get("Person")
1870 .unwrap()
1871 .contains_key("name")
1872 );
1873
1874 Ok(())
1875 }
1876
1877 #[tokio::test]
1878 async fn test_reserved_property_names_rejected() -> Result<()> {
1879 let dir = tempdir()?;
1880 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1881 let path = ObjectStorePath::from("schema.json");
1882 let manager = SchemaManager::load_from_store(store, &path).await?;
1883
1884 manager.add_label("Tiny")?;
1885
1886 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1890 let err = manager
1891 .add_property("Tiny", reserved, DataType::String, true)
1892 .expect_err(&format!("expected '{reserved}' to be rejected"));
1893 assert!(
1894 err.to_string().contains("reserved"),
1895 "error for '{reserved}' should mention 'reserved', got: {err}"
1896 );
1897 }
1898
1899 let err = manager
1904 .add_property("Tiny", "__set_struct__", DataType::String, true)
1905 .expect_err("expected '__set_struct__' to be rejected");
1906 assert!(
1907 err.to_string().contains("reserved"),
1908 "__set_struct__ rejection should mention 'reserved', got: {err}"
1909 );
1910
1911 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1913 assert!(
1914 manager
1915 .add_property("Tiny", reserved, DataType::String, true)
1916 .is_err(),
1917 "expected '{reserved}' to be rejected"
1918 );
1919 }
1920
1921 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1924 manager.add_property("Tiny", "user_op", DataType::String, true)?;
1925 manager.add_property("Tiny", "type_name", DataType::String, true)?;
1926
1927 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1929 assert!(
1930 manager
1931 .add_property("knows", "src_vid", DataType::Int64, true)
1932 .is_err()
1933 );
1934
1935 assert!(
1937 manager
1938 .add_generated_property(
1939 "Tiny",
1940 "ext_id",
1941 DataType::String,
1942 "concat('x', name)".into()
1943 )
1944 .is_err()
1945 );
1946
1947 Ok(())
1948 }
1949
1950 #[test]
1951 fn test_normalize_function_names() {
1952 assert_eq!(
1953 SchemaManager::normalize_function_names("lower(email)"),
1954 "LOWER(email)"
1955 );
1956 assert_eq!(
1957 SchemaManager::normalize_function_names("LOWER(email)"),
1958 "LOWER(email)"
1959 );
1960 assert_eq!(
1961 SchemaManager::normalize_function_names("Lower(email)"),
1962 "LOWER(email)"
1963 );
1964 assert_eq!(
1965 SchemaManager::normalize_function_names("trim(lower(email))"),
1966 "TRIM(LOWER(email))"
1967 );
1968 }
1969
1970 #[test]
1971 fn test_generated_column_name_case_insensitive() {
1972 let col1 = SchemaManager::generated_column_name("lower(email)");
1973 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1974 let col3 = SchemaManager::generated_column_name("Lower(email)");
1975 assert_eq!(col1, col2);
1976 assert_eq!(col2, col3);
1977 assert!(col1.starts_with("_gen_LOWER_email_"));
1978 }
1979
1980 #[test]
1981 fn test_index_metadata_serde_backward_compat() {
1982 let json = r#"{
1984 "type": "Scalar",
1985 "name": "idx_person_name",
1986 "label": "Person",
1987 "properties": ["name"],
1988 "index_type": "BTree",
1989 "where_clause": null
1990 }"#;
1991 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1992 let meta = def.metadata();
1993 assert_eq!(meta.status, IndexStatus::Online);
1994 assert!(meta.last_built_at.is_none());
1995 assert!(meta.row_count_at_build.is_none());
1996 }
1997
1998 #[test]
1999 fn test_index_metadata_serde_roundtrip() {
2000 let now = Utc::now();
2001 let def = IndexDefinition::Scalar(ScalarIndexConfig {
2002 name: "idx_test".to_string(),
2003 label: "Test".to_string(),
2004 properties: vec!["prop".to_string()],
2005 index_type: ScalarIndexType::BTree,
2006 where_clause: None,
2007 metadata: IndexMetadata {
2008 status: IndexStatus::Building,
2009 last_built_at: Some(now),
2010 row_count_at_build: Some(42),
2011 },
2012 });
2013
2014 let json = serde_json::to_string(&def).unwrap();
2015 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2016 assert_eq!(parsed.metadata().status, IndexStatus::Building);
2017 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2018 assert!(parsed.metadata().last_built_at.is_some());
2019 }
2020
2021 #[tokio::test]
2022 async fn test_update_index_metadata() -> Result<()> {
2023 let dir = tempdir()?;
2024 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2025 let path = ObjectStorePath::from("schema.json");
2026 let manager = SchemaManager::load_from_store(store, &path).await?;
2027
2028 manager.add_label("Person")?;
2029 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2030 name: "idx_test".to_string(),
2031 label: "Person".to_string(),
2032 properties: vec!["name".to_string()],
2033 index_type: ScalarIndexType::BTree,
2034 where_clause: None,
2035 metadata: Default::default(),
2036 });
2037 manager.add_index(idx)?;
2038
2039 let initial = manager.get_index("idx_test").unwrap();
2041 assert_eq!(initial.metadata().status, IndexStatus::Online);
2042
2043 manager.update_index_metadata("idx_test", |m| {
2045 m.status = IndexStatus::Building;
2046 m.row_count_at_build = Some(100);
2047 })?;
2048
2049 let updated = manager.get_index("idx_test").unwrap();
2050 assert_eq!(updated.metadata().status, IndexStatus::Building);
2051 assert_eq!(updated.metadata().row_count_at_build, Some(100));
2052
2053 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2055
2056 Ok(())
2057 }
2058
2059 #[tokio::test]
2064 async fn test_add_index_is_upsert_by_name() -> Result<()> {
2065 let dir = tempdir()?;
2066 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2067 let path = ObjectStorePath::from("schema.json");
2068 let manager = SchemaManager::load_from_store(store, &path).await?;
2069 manager.add_label("Person")?;
2070
2071 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2072 name: "idx_test".to_string(),
2073 label: "Person".to_string(),
2074 properties: vec!["name".to_string()],
2075 index_type: ScalarIndexType::BTree,
2076 where_clause: None,
2077 metadata: IndexMetadata {
2078 status: IndexStatus::Building,
2079 ..Default::default()
2080 },
2081 });
2082 manager.add_index(initial.clone())?;
2083 assert_eq!(manager.schema().indexes.len(), 1);
2084
2085 manager.add_index(initial.clone())?;
2087 assert_eq!(
2088 manager.schema().indexes.len(),
2089 1,
2090 "duplicate add_index by name must not append"
2091 );
2092
2093 let mut updated_cfg = match initial {
2095 IndexDefinition::Scalar(c) => c,
2096 _ => unreachable!(),
2097 };
2098 updated_cfg.metadata.status = IndexStatus::Online;
2099 updated_cfg.metadata.row_count_at_build = Some(42);
2100 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2101 assert_eq!(manager.schema().indexes.len(), 1);
2102 let stored = manager.get_index("idx_test").unwrap();
2103 assert_eq!(stored.metadata().status, IndexStatus::Online);
2104 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2105
2106 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2108 name: "idx_other".to_string(),
2109 label: "Person".to_string(),
2110 properties: vec!["age".to_string()],
2111 index_type: ScalarIndexType::BTree,
2112 where_clause: None,
2113 metadata: IndexMetadata::default(),
2114 });
2115 manager.add_index(other)?;
2116 assert_eq!(manager.schema().indexes.len(), 2);
2117
2118 Ok(())
2119 }
2120
2121 #[tokio::test]
2124 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2125 let dir = tempdir()?;
2126 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2127 let path = ObjectStorePath::from("schema.json");
2128
2129 let mut schema = Schema::default();
2133 schema.labels.insert(
2134 "Person".to_string(),
2135 LabelMeta {
2136 id: 1,
2137 created_at: chrono::Utc::now(),
2138 state: SchemaElementState::Active,
2139 description: None,
2140 },
2141 );
2142 let make = |status: IndexStatus, count: Option<u64>| {
2143 IndexDefinition::Scalar(ScalarIndexConfig {
2144 name: "idx_dup".to_string(),
2145 label: "Person".to_string(),
2146 properties: vec!["name".to_string()],
2147 index_type: ScalarIndexType::BTree,
2148 where_clause: None,
2149 metadata: IndexMetadata {
2150 status,
2151 row_count_at_build: count,
2152 ..Default::default()
2153 },
2154 })
2155 };
2156 for _ in 0..49 {
2157 schema.indexes.push(make(IndexStatus::Building, None));
2158 }
2159 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2160 let json = serde_json::to_string_pretty(&schema)?;
2161 store.put(&path, json.into()).await?;
2162
2163 let manager = SchemaManager::load_from_store(store, &path).await?;
2164 let schema = manager.schema();
2165 assert_eq!(
2166 schema.indexes.len(),
2167 1,
2168 "load() must collapse 50 duplicates by name to 1"
2169 );
2170 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2172 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2173
2174 Ok(())
2175 }
2176
2177 #[test]
2178 fn test_vector_index_for_property_skips_non_online() {
2179 let mut schema = Schema::default();
2180 schema.labels.insert(
2181 "Document".to_string(),
2182 LabelMeta {
2183 id: 1,
2184 created_at: chrono::Utc::now(),
2185 state: SchemaElementState::Active,
2186 description: None,
2187 },
2188 );
2189
2190 schema
2192 .indexes
2193 .push(IndexDefinition::Vector(VectorIndexConfig {
2194 name: "vec_doc_embedding".to_string(),
2195 label: "Document".to_string(),
2196 property: "embedding".to_string(),
2197 index_type: VectorIndexType::Flat,
2198 metric: DistanceMetric::Cosine,
2199 embedding_config: None,
2200 metadata: IndexMetadata {
2201 status: IndexStatus::Stale,
2202 ..Default::default()
2203 },
2204 }));
2205
2206 assert!(
2208 schema
2209 .vector_index_for_property("Document", "embedding")
2210 .is_none()
2211 );
2212
2213 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2215 cfg.metadata.status = IndexStatus::Online;
2216 }
2217 let result = schema.vector_index_for_property("Document", "embedding");
2218 assert!(result.is_some());
2219 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2220 }
2221
2222 #[tokio::test]
2223 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2224 use crate::core::fork::SchemaDelta;
2225
2226 let dir = tempdir()?;
2227 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2228 let path = ObjectStorePath::from("schema.json");
2229 let primary = SchemaManager::load_from_store(store, &path).await?;
2230 primary.add_label("Person")?;
2231
2232 let overlay = primary.with_overlay(&SchemaDelta::empty());
2233 assert_eq!(overlay.schema().labels.len(), 1);
2234
2235 overlay.add_label("Forked")?;
2238 assert!(overlay.schema().labels.contains_key("Forked"));
2239 assert!(!primary.schema().labels.contains_key("Forked"));
2240
2241 Ok(())
2242 }
2243
2244 #[tokio::test]
2245 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2246 use crate::core::fork::SchemaDelta;
2247 use chrono::Utc;
2248
2249 let dir = tempdir()?;
2250 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2251 let path = ObjectStorePath::from("schema.json");
2252 let primary = SchemaManager::load_from_store(store, &path).await?;
2253 primary.add_label("Existing")?;
2254
2255 let label_meta = LabelMeta {
2256 id: 99,
2257 created_at: Utc::now(),
2258 state: SchemaElementState::Active,
2259 description: None,
2260 };
2261 let edge_meta = EdgeTypeMeta {
2262 id: 99,
2263 src_labels: vec!["NewLabel".into()],
2264 dst_labels: vec!["NewLabel".into()],
2265 state: SchemaElementState::Active,
2266 description: None,
2267 };
2268 let delta = SchemaDelta {
2269 added_labels: vec![("NewLabel".to_string(), label_meta)],
2270 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2271 added_properties: vec![],
2272 };
2273
2274 let overlay = primary.with_overlay(&delta);
2275 let merged = overlay.schema();
2276 assert!(merged.labels.contains_key("Existing"));
2277 assert!(merged.labels.contains_key("NewLabel"));
2278 assert!(merged.edge_types.contains_key("NewEdge"));
2279
2280 assert!(!primary.schema().labels.contains_key("NewLabel"));
2282 Ok(())
2283 }
2284
2285 #[tokio::test]
2290 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2291 let dir = tempdir()?;
2292 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2293 let path = ObjectStorePath::from("schema.json");
2294 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2295
2296 let mut handles = Vec::new();
2297 for _ in 0..16 {
2298 let m = manager.clone();
2299 handles.push(std::thread::spawn(move || {
2300 m.get_or_assign_edge_type_id("RACED")
2301 }));
2302 }
2303 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2304 assert!(
2305 ids.iter().all(|&id| id == ids[0]),
2306 "all racers must observe one id, got {ids:?}"
2307 );
2308 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2310
2311 manager.add_label("A")?;
2313 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2314 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2315 Ok(())
2316 }
2317
2318 #[test]
2323 fn test_new_schemaless_edge_type_bumps_schema_version() {
2324 let mut schema = Schema::default();
2325 let v0 = schema.schema_version;
2326
2327 let id1 = schema.get_or_assign_edge_type_id("FRESH");
2328 assert_eq!(
2329 schema.schema_version,
2330 v0.wrapping_add(1),
2331 "minting a new edge type must bump schema_version"
2332 );
2333
2334 let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2336 assert_eq!(id1, id1_again);
2337 assert_eq!(
2338 schema.schema_version,
2339 v0.wrapping_add(1),
2340 "resolving an existing edge type must not bump schema_version"
2341 );
2342
2343 let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2345 assert_eq!(
2346 schema.schema_version,
2347 v0.wrapping_add(2),
2348 "a second new edge type must bump schema_version again"
2349 );
2350 }
2351
2352 #[test]
2356 fn validate_schema_element_name_rejects_unsafe() {
2357 for bad in ["", " ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2358 assert!(
2359 SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2360 "expected {bad:?} to be rejected"
2361 );
2362 }
2363 for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2364 assert!(
2365 SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2366 "expected {good:?} to be accepted"
2367 );
2368 }
2369 let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2371 assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2372 }
2373}