1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82 GCounter,
83 GSet,
84 ORSet,
85 LWWRegister,
86 LWWMap,
87 Rga,
88 VectorClock,
89 VCRegister,
90}
91
92impl CrdtType {
93 #[must_use]
105 pub fn type_name(&self) -> &'static str {
106 match self {
107 CrdtType::GCounter => "GCounter",
108 CrdtType::GSet => "GSet",
109 CrdtType::ORSet => "ORSet",
110 CrdtType::LWWRegister => "LWWRegister",
111 CrdtType::LWWMap => "LWWMap",
112 CrdtType::Rga => "Rga",
113 CrdtType::VectorClock => "VectorClock",
114 CrdtType::VCRegister => "VCRegister",
115 }
116 }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121 Geographic, Cartesian2D, Cartesian3D, }
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129 String,
130 Int32,
131 Int64,
132 Float32,
133 Float64,
134 Bool,
135 Timestamp,
136 Date,
137 Time,
138 DateTime,
139 Duration,
140 CypherValue,
141 Bytes,
142 Point(PointType),
143 Vector { dimensions: usize },
144 Btic,
145 Crdt(CrdtType),
146 List(Box<DataType>),
147 Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151 #[allow(non_upper_case_globals)]
153 pub const Float: DataType = DataType::Float64;
154 #[allow(non_upper_case_globals)]
155 pub const Int: DataType = DataType::Int64;
156
157 pub fn to_arrow(&self) -> ArrowDataType {
158 match self {
159 DataType::String => ArrowDataType::Utf8,
160 DataType::Int32 => ArrowDataType::Int32,
161 DataType::Int64 => ArrowDataType::Int64,
162 DataType::Float32 => ArrowDataType::Float32,
163 DataType::Float64 => ArrowDataType::Float64,
164 DataType::Bool => ArrowDataType::Boolean,
165 DataType::Timestamp => {
166 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167 }
168 DataType::Date => ArrowDataType::Date32,
169 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
175 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176 Field::new("latitude", ArrowDataType::Float64, false),
177 Field::new("longitude", ArrowDataType::Float64, false),
178 Field::new("crs", ArrowDataType::Utf8, false),
179 ])),
180 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181 Field::new("x", ArrowDataType::Float64, false),
182 Field::new("y", ArrowDataType::Float64, false),
183 Field::new("crs", ArrowDataType::Utf8, false),
184 ])),
185 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186 Field::new("x", ArrowDataType::Float64, false),
187 Field::new("y", ArrowDataType::Float64, false),
188 Field::new("z", ArrowDataType::Float64, false),
189 Field::new("crs", ArrowDataType::Utf8, false),
190 ])),
191 },
192 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194 *dimensions as i32,
195 ),
196 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
199 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200 }
201 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202 "item",
203 ArrowDataType::Struct(Fields::from(vec![
204 Field::new("key", key.to_arrow(), false),
205 Field::new("value", value.to_arrow(), true),
206 ])),
207 true,
208 ))),
209 }
210 }
211
212 pub fn accepts(&self, value: &crate::value::Value) -> bool {
238 use crate::value::{TemporalValue, Value};
239
240 if matches!(value, Value::Null) {
242 return true;
243 }
244
245 match self {
246 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249 DataType::String => matches!(value, Value::String(_)),
250 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251 DataType::Float32 | DataType::Float64 => {
253 matches!(value, Value::Int(_) | Value::Float(_))
254 }
255 DataType::Bool => matches!(value, Value::Bool(_)),
256
257 DataType::Timestamp => matches!(
260 value,
261 Value::String(_)
262 | Value::Int(_)
263 | Value::Temporal(
264 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265 )
266 ),
267 DataType::DateTime => matches!(
268 value,
269 Value::Temporal(
270 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271 )
272 ),
273 DataType::Date => {
274 matches!(
275 value,
276 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277 )
278 }
279 DataType::Time => matches!(
280 value,
281 Value::Int(_)
282 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283 ),
284 DataType::Duration => {
285 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286 }
287 DataType::Bytes => matches!(value, Value::Bytes(_)),
288 DataType::Btic => matches!(
290 value,
291 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292 ),
293 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294 DataType::List(_) => matches!(value, Value::List(_)),
295 DataType::Map(_, _) => matches!(value, Value::Map(_)),
296 }
297 }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301 Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305 SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309 1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314 pub r#type: DataType,
315 pub nullable: bool,
316 #[serde(default = "default_version_1")]
317 pub added_in: u32, #[serde(default = "default_state")]
319 pub state: SchemaElementState,
320 #[serde(default)]
321 pub generation_expression: Option<String>,
322 #[serde(default, skip_serializing_if = "Option::is_none")]
323 pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328 pub id: u16, #[serde(default = "default_created_at")]
330 pub created_at: DateTime<Utc>,
331 #[serde(default = "default_state")]
332 pub state: SchemaElementState,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339 pub id: u32,
341 pub src_labels: Vec<String>,
342 pub dst_labels: Vec<String>,
343 #[serde(default = "default_state")]
344 pub state: SchemaElementState,
345 #[serde(default, skip_serializing_if = "Option::is_none")]
346 pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352 Unique { properties: Vec<String> },
353 Exists { property: String },
354 Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360 Label(String),
361 EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366 pub name: String,
367 pub constraint_type: ConstraintType,
368 pub target: ConstraintTarget,
369 pub enabled: bool,
370}
371
372#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379 name_to_id: HashMap<String, u32>,
380 id_to_name: HashMap<u32, String>,
381 next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386 pub fn new() -> Self {
387 Self {
388 name_to_id: HashMap::new(),
389 id_to_name: HashMap::new(),
390 next_local_id: 1,
391 }
392 }
393
394 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396 if let Some(&id) = self.name_to_id.get(type_name) {
397 return id;
398 }
399
400 let id = make_schemaless_id(self.next_local_id);
401 self.next_local_id += 1;
402
403 self.name_to_id.insert(type_name.to_string(), id);
404 self.id_to_name.insert(id, type_name.to_string());
405
406 id
407 }
408
409 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411 self.id_to_name.get(&type_id).map(String::as_str)
412 }
413
414 pub fn contains(&self, type_name: &str) -> bool {
416 self.name_to_id.contains_key(type_name)
417 }
418
419 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
421 self.name_to_id
422 .iter()
423 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
424 .map(|(_, &id)| id)
425 }
426
427 pub fn all_type_ids(&self) -> Vec<u32> {
429 self.id_to_name.keys().copied().collect()
430 }
431
432 pub fn is_empty(&self) -> bool {
434 self.name_to_id.is_empty()
435 }
436}
437
438impl Default for SchemalessEdgeTypeRegistry {
439 fn default() -> Self {
440 Self::new()
441 }
442}
443
444pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
450pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
452
453#[inline]
455pub fn is_virtual_label_id(id: u16) -> bool {
456 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
457}
458
459#[derive(Clone, Debug, Serialize, Deserialize)]
460pub struct Schema {
461 pub schema_version: u32,
462 pub labels: HashMap<String, LabelMeta>,
463 pub edge_types: HashMap<String, EdgeTypeMeta>,
464 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
465 #[serde(default)]
466 pub indexes: Vec<IndexDefinition>,
467 #[serde(default)]
468 pub constraints: Vec<Constraint>,
469 #[serde(default)]
471 pub schemaless_registry: SchemalessEdgeTypeRegistry,
472}
473
474impl Default for Schema {
475 fn default() -> Self {
476 Self {
477 schema_version: 1,
478 labels: HashMap::new(),
479 edge_types: HashMap::new(),
480 properties: HashMap::new(),
481 indexes: Vec::new(),
482 constraints: Vec::new(),
483 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
484 }
485 }
486}
487
488impl Schema {
489 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
494 self.labels
495 .iter()
496 .find(|(_, meta)| meta.id == label_id)
497 .map(|(name, _)| name.as_str())
498 }
499
500 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
502 self.labels.get(label_name).map(|meta| meta.id)
503 }
504
505 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
510 self.edge_types
511 .iter()
512 .find(|(_, meta)| meta.id == type_id)
513 .map(|(name, _)| name.as_str())
514 }
515
516 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
518 self.edge_types.get(type_name).map(|meta| meta.id)
519 }
520
521 pub fn vector_index_for_property(
526 &self,
527 label: &str,
528 property: &str,
529 ) -> Option<&VectorIndexConfig> {
530 self.indexes.iter().find_map(|idx| {
531 if let IndexDefinition::Vector(config) = idx
532 && config.label == label
533 && config.property == property
534 && config.metadata.status == IndexStatus::Online
535 {
536 return Some(config);
537 }
538 None
539 })
540 }
541
542 pub fn fulltext_index_for_property(
547 &self,
548 label: &str,
549 property: &str,
550 ) -> Option<&FullTextIndexConfig> {
551 self.indexes.iter().find_map(|idx| {
552 if let IndexDefinition::FullText(config) = idx
553 && config.label == label
554 && config.properties.iter().any(|p| p == property)
555 && config.metadata.status == IndexStatus::Online
556 {
557 return Some(config);
558 }
559 None
560 })
561 }
562
563 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
568 self.labels
569 .iter()
570 .find(|(k, _)| k.eq_ignore_ascii_case(name))
571 .map(|(_, v)| v)
572 }
573
574 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
576 self.get_label_case_insensitive(label_name)
577 .map(|meta| meta.id)
578 }
579
580 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
585 self.edge_types
586 .iter()
587 .find(|(k, _)| k.eq_ignore_ascii_case(name))
588 .map(|(_, v)| v)
589 }
590
591 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
593 self.get_edge_type_case_insensitive(type_name)
594 .map(|meta| meta.id)
595 }
596
597 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
600 self.edge_type_id_by_name_case_insensitive(type_name)
601 .or_else(|| {
602 self.schemaless_registry
603 .id_by_name_case_insensitive(type_name)
604 })
605 }
606
607 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
613 if let Some(id) = self.edge_type_id_by_name(type_name) {
614 return id;
615 }
616 self.schemaless_registry.get_or_assign_id(type_name)
617 }
618
619 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
623 if is_schemaless_edge_type(type_id) {
624 self.schemaless_registry
625 .type_name_by_id(type_id)
626 .map(str::to_owned)
627 } else {
628 self.edge_type_name_by_id(type_id).map(str::to_owned)
629 }
630 }
631
632 pub fn all_edge_type_ids(&self) -> Vec<u32> {
635 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
636 ids.extend(self.schemaless_registry.all_type_ids());
637 ids.sort_unstable();
638 ids
639 }
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
644pub enum IndexStatus {
645 #[default]
647 Online,
648 Building,
650 Stale,
652 Failed,
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
658pub struct IndexMetadata {
659 #[serde(default)]
661 pub status: IndexStatus,
662 #[serde(default)]
664 pub last_built_at: Option<DateTime<Utc>>,
665 #[serde(default)]
667 pub row_count_at_build: Option<u64>,
668}
669
670#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
671#[serde(tag = "type")]
672#[non_exhaustive]
673pub enum IndexDefinition {
674 Vector(VectorIndexConfig),
675 FullText(FullTextIndexConfig),
676 Scalar(ScalarIndexConfig),
677 Inverted(InvertedIndexConfig),
678 JsonFullText(JsonFtsIndexConfig),
679}
680
681impl IndexDefinition {
682 pub fn name(&self) -> &str {
684 match self {
685 IndexDefinition::Vector(c) => &c.name,
686 IndexDefinition::FullText(c) => &c.name,
687 IndexDefinition::Scalar(c) => &c.name,
688 IndexDefinition::Inverted(c) => &c.name,
689 IndexDefinition::JsonFullText(c) => &c.name,
690 }
691 }
692
693 pub fn label(&self) -> &str {
695 match self {
696 IndexDefinition::Vector(c) => &c.label,
697 IndexDefinition::FullText(c) => &c.label,
698 IndexDefinition::Scalar(c) => &c.label,
699 IndexDefinition::Inverted(c) => &c.label,
700 IndexDefinition::JsonFullText(c) => &c.label,
701 }
702 }
703
704 pub fn metadata(&self) -> &IndexMetadata {
706 match self {
707 IndexDefinition::Vector(c) => &c.metadata,
708 IndexDefinition::FullText(c) => &c.metadata,
709 IndexDefinition::Scalar(c) => &c.metadata,
710 IndexDefinition::Inverted(c) => &c.metadata,
711 IndexDefinition::JsonFullText(c) => &c.metadata,
712 }
713 }
714
715 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
717 match self {
718 IndexDefinition::Vector(c) => &mut c.metadata,
719 IndexDefinition::FullText(c) => &mut c.metadata,
720 IndexDefinition::Scalar(c) => &mut c.metadata,
721 IndexDefinition::Inverted(c) => &mut c.metadata,
722 IndexDefinition::JsonFullText(c) => &mut c.metadata,
723 }
724 }
725}
726
727#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
728pub struct InvertedIndexConfig {
729 pub name: String,
730 pub label: String,
731 pub property: String,
732 #[serde(default = "default_normalize")]
733 pub normalize: bool,
734 #[serde(default = "default_max_terms_per_doc")]
735 pub max_terms_per_doc: usize,
736 #[serde(default)]
737 pub metadata: IndexMetadata,
738}
739
740fn default_normalize() -> bool {
741 true
742}
743
744fn default_max_terms_per_doc() -> usize {
745 10_000
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
749pub struct VectorIndexConfig {
750 pub name: String,
751 pub label: String,
752 pub property: String,
753 pub index_type: VectorIndexType,
754 pub metric: DistanceMetric,
755 pub embedding_config: Option<EmbeddingConfig>,
756 #[serde(default)]
757 pub metadata: IndexMetadata,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
761pub struct EmbeddingConfig {
762 pub alias: String,
764 pub source_properties: Vec<String>,
765 pub batch_size: usize,
766 #[serde(default)]
769 pub document_prefix: Option<String>,
770 #[serde(default)]
773 pub query_prefix: Option<String>,
774}
775
776#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
777#[non_exhaustive]
778pub enum VectorIndexType {
779 Flat,
780 IvfFlat {
781 num_partitions: u32,
782 },
783 IvfPq {
784 num_partitions: u32,
785 num_sub_vectors: u32,
786 bits_per_subvector: u8,
787 },
788 IvfSq {
789 num_partitions: u32,
790 },
791 IvfRq {
792 num_partitions: u32,
793 #[serde(default)]
794 num_bits: Option<u8>,
795 },
796 HnswFlat {
797 m: u32,
798 ef_construction: u32,
799 #[serde(default)]
800 num_partitions: Option<u32>,
801 },
802 HnswSq {
803 m: u32,
804 ef_construction: u32,
805 #[serde(default)]
806 num_partitions: Option<u32>,
807 },
808 HnswPq {
809 m: u32,
810 ef_construction: u32,
811 num_sub_vectors: u32,
812 #[serde(default)]
813 num_partitions: Option<u32>,
814 },
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
818#[non_exhaustive]
819pub enum DistanceMetric {
820 Cosine,
821 L2,
822 Dot,
823}
824
825impl DistanceMetric {
826 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
838 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
839 match self {
840 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
841 DistanceMetric::Cosine => {
842 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
843 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
844 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
845 let denom = norm_a * norm_b;
846 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
847 }
848 DistanceMetric::Dot => {
849 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
850 -dot
851 }
852 }
853 }
854}
855
856#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
857pub struct FullTextIndexConfig {
858 pub name: String,
859 pub label: String,
860 pub properties: Vec<String>,
861 pub tokenizer: TokenizerConfig,
862 pub with_positions: bool,
863 #[serde(default)]
864 pub metadata: IndexMetadata,
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
868#[non_exhaustive]
869pub enum TokenizerConfig {
870 Standard,
871 Whitespace,
872 Ngram { min: u8, max: u8 },
873 Custom { name: String },
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
877pub struct JsonFtsIndexConfig {
878 pub name: String,
879 pub label: String,
880 pub column: String,
881 #[serde(default)]
882 pub paths: Vec<String>,
883 #[serde(default)]
884 pub with_positions: bool,
885 #[serde(default)]
886 pub metadata: IndexMetadata,
887}
888
889#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
890pub struct ScalarIndexConfig {
891 pub name: String,
892 pub label: String,
893 pub properties: Vec<String>,
894 pub index_type: ScalarIndexType,
895 pub where_clause: Option<String>,
896 #[serde(default)]
897 pub metadata: IndexMetadata,
898}
899
900#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
901#[non_exhaustive]
902pub enum ScalarIndexType {
903 BTree,
904 Hash,
905 Bitmap,
906 LabelList,
907}
908
909pub struct SchemaManager {
910 store: Arc<dyn ObjectStore>,
911 path: ObjectStorePath,
912 schema: RwLock<Arc<Schema>>,
913}
914
915impl SchemaManager {
916 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
917 let path = path.as_ref();
918 let parent = path
919 .parent()
920 .ok_or_else(|| anyhow!("Invalid schema path"))?;
921 let filename = path
922 .file_name()
923 .ok_or_else(|| anyhow!("Invalid schema filename"))?
924 .to_str()
925 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
926
927 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
928 let obj_path = ObjectStorePath::from(filename);
929
930 Self::load_from_store(store, &obj_path).await
931 }
932
933 pub async fn load_from_store(
934 store: Arc<dyn ObjectStore>,
935 path: &ObjectStorePath,
936 ) -> Result<Self> {
937 match store.get(path).await {
938 Ok(result) => {
939 let bytes = result.bytes().await?;
940 let content = String::from_utf8(bytes.to_vec())?;
941 let mut schema: Schema = serde_json::from_str(&content)?;
942 let original_len = schema.indexes.len();
950 if original_len > 0 {
951 let mut seen: std::collections::HashSet<String> =
952 std::collections::HashSet::with_capacity(original_len);
953 let mut dedup: Vec<IndexDefinition> = schema
954 .indexes
955 .iter()
956 .rev()
957 .filter(|idx| seen.insert(idx.name().to_string()))
958 .cloned()
959 .collect();
960 dedup.reverse();
961 if dedup.len() != original_len {
962 tracing::warn!(
963 collapsed = original_len - dedup.len(),
964 kept = dedup.len(),
965 "schema.indexes: collapsed duplicate entries on load (issue #63)"
966 );
967 schema.indexes = dedup;
968 }
969 }
970 Ok(Self {
971 store,
972 path: path.clone(),
973 schema: RwLock::new(Arc::new(schema)),
974 })
975 }
976 Err(object_store::Error::NotFound { .. }) => Ok(Self {
977 store,
978 path: path.clone(),
979 schema: RwLock::new(Arc::new(Schema::default())),
980 }),
981 Err(e) => Err(anyhow::Error::from(e)),
982 }
983 }
984
985 pub async fn save(&self) -> Result<()> {
986 let content = {
987 let schema_guard = acquire_read(&self.schema, "schema")?;
988 serde_json::to_string_pretty(&**schema_guard)?
989 };
990 self.store
991 .put(&self.path, content.into())
992 .await
993 .map_err(anyhow::Error::from)?;
994 Ok(())
995 }
996
997 pub fn path(&self) -> &ObjectStorePath {
998 &self.path
999 }
1000
1001 pub fn schema(&self) -> Arc<Schema> {
1002 self.schema
1003 .read()
1004 .expect("Schema lock poisoned - a thread panicked while holding it")
1005 .clone()
1006 }
1007
1008 fn normalize_function_names(expr: &str) -> String {
1011 let mut result = String::with_capacity(expr.len());
1012 let mut chars = expr.chars().peekable();
1013
1014 while let Some(ch) = chars.next() {
1015 if ch.is_alphabetic() {
1016 let mut ident = String::new();
1018 ident.push(ch);
1019
1020 while let Some(&next) = chars.peek() {
1021 if next.is_alphanumeric() || next == '_' {
1022 ident.push(chars.next().unwrap());
1023 } else {
1024 break;
1025 }
1026 }
1027
1028 if chars.peek() == Some(&'(') {
1030 result.push_str(&ident.to_uppercase());
1031 } else {
1032 result.push_str(&ident); }
1034 } else {
1035 result.push(ch);
1036 }
1037 }
1038
1039 result
1040 }
1041
1042 pub fn generated_column_name(expr: &str) -> String {
1050 let normalized = Self::normalize_function_names(expr);
1052
1053 let sanitized = normalized
1054 .replace(|c: char| !c.is_alphanumeric(), "_")
1055 .trim_matches('_')
1056 .to_string();
1057
1058 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1060 const FNV_PRIME: u64 = 1099511628211;
1061
1062 let mut hash = FNV_OFFSET_BASIS;
1063 for byte in normalized.as_bytes() {
1064 hash ^= *byte as u64;
1065 hash = hash.wrapping_mul(FNV_PRIME);
1066 }
1067
1068 format!("_gen_{}_{:x}", sanitized, hash)
1069 }
1070
1071 pub fn replace_schema(&self, new_schema: Schema) {
1072 let mut schema = self
1073 .schema
1074 .write()
1075 .expect("Schema lock poisoned - a thread panicked while holding it");
1076 *schema = Arc::new(new_schema);
1077 }
1078
1079 #[must_use]
1092 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1093 let primary = self.schema();
1094 let merged = if overlay.is_empty() {
1095 (*primary).clone()
1096 } else {
1097 let mut merged = (*primary).clone();
1098 for (name, label) in &overlay.added_labels {
1099 merged.labels.insert(name.clone(), label.clone());
1100 }
1101 for (name, edge_type) in &overlay.added_edge_types {
1102 merged.edge_types.insert(name.clone(), edge_type.clone());
1103 }
1104 for addition in &overlay.added_properties {
1105 let props = merged.properties.entry(addition.owner.clone()).or_default();
1106 props.insert(
1107 addition.property.clone(),
1108 PropertyMeta {
1109 r#type: addition.data_type.clone(),
1110 nullable: addition.nullable,
1111 added_in: merged.schema_version,
1112 state: SchemaElementState::Active,
1113 generation_expression: None,
1114 description: None,
1115 },
1116 );
1117 }
1118 merged
1119 };
1120
1121 Arc::new(Self {
1122 store: self.store.clone(),
1123 path: self.path.clone(),
1124 schema: RwLock::new(Arc::new(merged)),
1125 })
1126 }
1127
1128 pub fn next_label_id(&self) -> u16 {
1129 self.schema()
1130 .labels
1131 .values()
1132 .map(|l| l.id)
1133 .max()
1134 .unwrap_or(0)
1135 + 1
1136 }
1137
1138 pub fn next_type_id(&self) -> u32 {
1139 let max_schema_id = self
1140 .schema()
1141 .edge_types
1142 .values()
1143 .map(|t| t.id)
1144 .max()
1145 .unwrap_or(0);
1146
1147 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1149 panic!("Schema edge type ID exhaustion");
1150 }
1151
1152 max_schema_id + 1
1153 }
1154
1155 pub fn add_label(&self, name: &str) -> Result<u16> {
1156 self.add_label_with_desc(name, None)
1157 }
1158
1159 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1160 let mut guard = acquire_write(&self.schema, "schema")?;
1161 let schema = Arc::make_mut(&mut *guard);
1162 if schema.labels.contains_key(name) {
1163 return Err(anyhow!("Label '{}' already exists", name));
1164 }
1165
1166 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1167 if id >= VIRTUAL_LABEL_ID_START {
1168 return Err(anyhow!(
1169 "Native label space exhausted (next id {id:#x} would enter the \
1170 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1171 reserved for catalog-resolved labels)"
1172 ));
1173 }
1174 schema.labels.insert(
1175 name.to_string(),
1176 LabelMeta {
1177 id,
1178 created_at: Utc::now(),
1179 state: SchemaElementState::Active,
1180 description,
1181 },
1182 );
1183 Ok(id)
1184 }
1185
1186 pub fn add_edge_type(
1187 &self,
1188 name: &str,
1189 src_labels: Vec<String>,
1190 dst_labels: Vec<String>,
1191 ) -> Result<u32> {
1192 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1193 }
1194
1195 pub fn add_edge_type_with_desc(
1196 &self,
1197 name: &str,
1198 src_labels: Vec<String>,
1199 dst_labels: Vec<String>,
1200 description: Option<String>,
1201 ) -> Result<u32> {
1202 let mut guard = acquire_write(&self.schema, "schema")?;
1203 let schema = Arc::make_mut(&mut *guard);
1204 if schema.edge_types.contains_key(name) {
1205 return Err(anyhow!("Edge type '{}' already exists", name));
1206 }
1207
1208 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1209
1210 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1215 return Err(anyhow!(
1216 "Native edge type space exhausted (next id {id:#x} would enter the \
1217 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1218 reserved for catalog-resolved edge types)"
1219 ));
1220 }
1221
1222 schema.edge_types.insert(
1223 name.to_string(),
1224 EdgeTypeMeta {
1225 id,
1226 src_labels,
1227 dst_labels,
1228 state: SchemaElementState::Active,
1229 description,
1230 },
1231 );
1232 Ok(id)
1233 }
1234
1235 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1237 let mut guard = acquire_write(&self.schema, "schema")
1238 .expect("Schema lock poisoned - a thread panicked while holding it");
1239 let schema = Arc::make_mut(&mut *guard);
1240 schema.get_or_assign_edge_type_id(type_name)
1241 }
1242
1243 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1245 let schema = acquire_read(&self.schema, "schema")
1246 .expect("Schema lock poisoned - a thread panicked while holding it");
1247 schema.edge_type_name_by_id_unified(type_id)
1248 }
1249
1250 pub fn add_property(
1251 &self,
1252 label_or_type: &str,
1253 prop_name: &str,
1254 data_type: DataType,
1255 nullable: bool,
1256 ) -> Result<()> {
1257 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1258 }
1259
1260 pub fn add_property_with_desc(
1261 &self,
1262 label_or_type: &str,
1263 prop_name: &str,
1264 data_type: DataType,
1265 nullable: bool,
1266 description: Option<String>,
1267 ) -> Result<()> {
1268 validate_property_name(prop_name)?;
1269 let mut guard = acquire_write(&self.schema, "schema")?;
1270 let schema = Arc::make_mut(&mut *guard);
1271 let version = schema.schema_version;
1272 let props = schema
1273 .properties
1274 .entry(label_or_type.to_string())
1275 .or_default();
1276
1277 if props.contains_key(prop_name) {
1278 return Err(anyhow!(
1279 "Property '{}' already exists for '{}'",
1280 prop_name,
1281 label_or_type
1282 ));
1283 }
1284
1285 props.insert(
1286 prop_name.to_string(),
1287 PropertyMeta {
1288 r#type: data_type,
1289 nullable,
1290 added_in: version,
1291 state: SchemaElementState::Active,
1292 generation_expression: None,
1293 description,
1294 },
1295 );
1296 Ok(())
1297 }
1298
1299 pub fn add_generated_property(
1300 &self,
1301 label_or_type: &str,
1302 prop_name: &str,
1303 data_type: DataType,
1304 expr: String,
1305 ) -> Result<()> {
1306 validate_reserved_property_name(prop_name)?;
1309 let mut guard = acquire_write(&self.schema, "schema")?;
1310 let schema = Arc::make_mut(&mut *guard);
1311 let version = schema.schema_version;
1312 let props = schema
1313 .properties
1314 .entry(label_or_type.to_string())
1315 .or_default();
1316
1317 if props.contains_key(prop_name) {
1318 return Err(anyhow!("Property '{}' already exists", prop_name));
1319 }
1320
1321 props.insert(
1322 prop_name.to_string(),
1323 PropertyMeta {
1324 r#type: data_type,
1325 nullable: true,
1326 added_in: version,
1327 state: SchemaElementState::Active,
1328 generation_expression: Some(expr),
1329 description: None,
1330 },
1331 );
1332 Ok(())
1333 }
1334
1335 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1336 let mut guard = acquire_write(&self.schema, "schema")?;
1337 let schema = Arc::make_mut(&mut *guard);
1338 let meta = schema
1339 .labels
1340 .get_mut(name)
1341 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1342 meta.description = description;
1343 Ok(())
1344 }
1345
1346 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1347 let mut guard = acquire_write(&self.schema, "schema")?;
1348 let schema = Arc::make_mut(&mut *guard);
1349 let meta = schema
1350 .edge_types
1351 .get_mut(name)
1352 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1353 meta.description = description;
1354 Ok(())
1355 }
1356
1357 pub fn set_property_description(
1358 &self,
1359 entity: &str,
1360 prop_name: &str,
1361 description: Option<String>,
1362 ) -> Result<()> {
1363 let mut guard = acquire_write(&self.schema, "schema")?;
1364 let schema = Arc::make_mut(&mut *guard);
1365 let props = schema
1366 .properties
1367 .get_mut(entity)
1368 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1369 let meta = props
1370 .get_mut(prop_name)
1371 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1372 meta.description = description;
1373 Ok(())
1374 }
1375
1376 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1385 let mut guard = acquire_write(&self.schema, "schema")?;
1386 let schema = Arc::make_mut(&mut *guard);
1387 if let Some(existing) = schema
1388 .indexes
1389 .iter_mut()
1390 .find(|i| i.name() == index_def.name())
1391 {
1392 *existing = index_def;
1393 } else {
1394 schema.indexes.push(index_def);
1395 }
1396 Ok(())
1397 }
1398
1399 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1400 let schema = self.schema.read().expect("Schema lock poisoned");
1401 schema.indexes.iter().find(|i| i.name() == name).cloned()
1402 }
1403
1404 pub fn update_index_metadata(
1409 &self,
1410 index_name: &str,
1411 f: impl FnOnce(&mut IndexMetadata),
1412 ) -> Result<()> {
1413 let mut guard = acquire_write(&self.schema, "schema")?;
1414 let schema = Arc::make_mut(&mut *guard);
1415 let idx = schema
1416 .indexes
1417 .iter_mut()
1418 .find(|i| i.name() == index_name)
1419 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1420 f(idx.metadata_mut());
1421 Ok(())
1422 }
1423
1424 pub fn remove_index(&self, name: &str) -> Result<()> {
1425 let mut guard = acquire_write(&self.schema, "schema")?;
1426 let schema = Arc::make_mut(&mut *guard);
1427 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1428 schema.indexes.remove(pos);
1429 Ok(())
1430 } else {
1431 Err(anyhow!("Index '{}' not found", name))
1432 }
1433 }
1434
1435 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1436 let mut guard = acquire_write(&self.schema, "schema")?;
1437 let schema = Arc::make_mut(&mut *guard);
1438 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1439 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1440 }
1441 schema.constraints.push(constraint);
1442 Ok(())
1443 }
1444
1445 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1446 let mut guard = acquire_write(&self.schema, "schema")?;
1447 let schema = Arc::make_mut(&mut *guard);
1448 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1449 schema.constraints.remove(pos);
1450 Ok(())
1451 } else if if_exists {
1452 Ok(())
1453 } else {
1454 Err(anyhow!("Constraint '{}' not found", name))
1455 }
1456 }
1457
1458 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1459 let mut guard = acquire_write(&self.schema, "schema")?;
1460 let schema = Arc::make_mut(&mut *guard);
1461 if let Some(props) = schema.properties.get_mut(label_or_type) {
1462 if props.remove(prop_name).is_some() {
1463 Ok(())
1464 } else {
1465 Err(anyhow!(
1466 "Property '{}' not found for '{}'",
1467 prop_name,
1468 label_or_type
1469 ))
1470 }
1471 } else {
1472 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1473 }
1474 }
1475
1476 pub fn rename_property(
1477 &self,
1478 label_or_type: &str,
1479 old_name: &str,
1480 new_name: &str,
1481 ) -> Result<()> {
1482 let mut guard = acquire_write(&self.schema, "schema")?;
1483 let schema = Arc::make_mut(&mut *guard);
1484 if let Some(props) = schema.properties.get_mut(label_or_type) {
1485 if let Some(meta) = props.remove(old_name) {
1486 if props.contains_key(new_name) {
1487 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1490 }
1491 props.insert(new_name.to_string(), meta);
1492 Ok(())
1493 } else {
1494 Err(anyhow!(
1495 "Property '{}' not found for '{}'",
1496 old_name,
1497 label_or_type
1498 ))
1499 }
1500 } else {
1501 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1502 }
1503 }
1504
1505 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1506 let mut guard = acquire_write(&self.schema, "schema")?;
1507 let schema = Arc::make_mut(&mut *guard);
1508 if let Some(label_meta) = schema.labels.get_mut(name) {
1509 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1510 Ok(())
1512 } else if if_exists {
1513 Ok(())
1514 } else {
1515 Err(anyhow!("Label '{}' not found", name))
1516 }
1517 }
1518
1519 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1520 let mut guard = acquire_write(&self.schema, "schema")?;
1521 let schema = Arc::make_mut(&mut *guard);
1522 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1523 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1524 Ok(())
1526 } else if if_exists {
1527 Ok(())
1528 } else {
1529 Err(anyhow!("Edge Type '{}' not found", name))
1530 }
1531 }
1532}
1533
1534pub fn validate_identifier(name: &str) -> Result<()> {
1536 if name.is_empty() || name.len() > 64 {
1538 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1539 }
1540
1541 let first = name.chars().next().unwrap();
1543 if !first.is_alphabetic() && first != '_' {
1544 return Err(anyhow!(
1545 "Identifier '{}' must start with letter or underscore",
1546 name
1547 ));
1548 }
1549
1550 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1552 return Err(anyhow!(
1553 "Identifier '{}' must contain only alphanumeric and underscore",
1554 name
1555 ));
1556 }
1557
1558 const RESERVED: &[&str] = &[
1560 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1561 "UNION", "ORDER", "LIMIT",
1562 ];
1563 if RESERVED.contains(&name.to_uppercase().as_str()) {
1564 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1565 }
1566
1567 Ok(())
1568}
1569
1570pub fn validate_property_name(name: &str) -> Result<()> {
1577 if name.starts_with('_') {
1578 return Err(anyhow!(
1579 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1580 name
1581 ));
1582 }
1583 validate_reserved_property_name(name)
1584}
1585
1586fn validate_reserved_property_name(name: &str) -> Result<()> {
1593 const RESERVED_PROPS: &[&str] = &[
1602 "ext_id",
1603 "overflow_json",
1604 "eid",
1605 "src_vid",
1606 "dst_vid",
1607 "op",
1608 "__set_struct__",
1616 ];
1617 if RESERVED_PROPS.contains(&name) {
1618 return Err(anyhow!(
1619 "Property name '{}' is reserved by the storage layer; please choose a different name",
1620 name
1621 ));
1622 }
1623 Ok(())
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628 use super::*;
1629 use crate::value::{TemporalValue, Value};
1630 use object_store::local::LocalFileSystem;
1631 use tempfile::tempdir;
1632
1633 #[test]
1634 fn test_datatype_accepts_matrix() {
1635 let dt = || TemporalValue::DateTime {
1636 nanos_since_epoch: 0,
1637 offset_seconds: 0,
1638 timezone_name: None,
1639 };
1640
1641 for ty in [
1643 DataType::String,
1644 DataType::Int64,
1645 DataType::Bool,
1646 DataType::DateTime,
1647 DataType::Float64,
1648 ] {
1649 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1650 }
1651
1652 assert!(DataType::String.accepts(&Value::String("x".into())));
1654 assert!(DataType::Int64.accepts(&Value::Int(1)));
1655 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1656 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1657
1658 assert!(
1660 DataType::Float64.accepts(&Value::Int(3)),
1661 "Int widens to Float"
1662 );
1663 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1664 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1665 assert!(
1666 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1667 "storage parses strings for non-struct Timestamp columns"
1668 );
1669
1670 assert!(
1672 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1673 "String into a DateTime struct column nulls silently — reject here"
1674 );
1675 assert!(!DataType::Bool.accepts(&Value::Int(1)));
1676 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1677 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1678 assert!(
1679 !DataType::String.accepts(&Value::Int(10)),
1680 "no implicit stringification"
1681 );
1682 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1683
1684 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1686 }
1687
1688 #[tokio::test]
1689 async fn test_schema_management() -> Result<()> {
1690 let dir = tempdir()?;
1691 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1692 let path = ObjectStorePath::from("schema.json");
1693 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1694
1695 let lid = manager.add_label("Person")?;
1697 assert_eq!(lid, 1);
1698 assert!(manager.add_label("Person").is_err());
1699
1700 manager.add_property("Person", "name", DataType::String, false)?;
1702 assert!(
1703 manager
1704 .add_property("Person", "name", DataType::String, false)
1705 .is_err()
1706 );
1707
1708 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1710 assert_eq!(tid, 1);
1711
1712 manager.save().await?;
1713 assert!(store.get(&path).await.is_ok());
1715
1716 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1717 assert!(manager2.schema().labels.contains_key("Person"));
1718 assert!(
1719 manager2
1720 .schema()
1721 .properties
1722 .get("Person")
1723 .unwrap()
1724 .contains_key("name")
1725 );
1726
1727 Ok(())
1728 }
1729
1730 #[tokio::test]
1731 async fn test_reserved_property_names_rejected() -> Result<()> {
1732 let dir = tempdir()?;
1733 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1734 let path = ObjectStorePath::from("schema.json");
1735 let manager = SchemaManager::load_from_store(store, &path).await?;
1736
1737 manager.add_label("Tiny")?;
1738
1739 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1743 let err = manager
1744 .add_property("Tiny", reserved, DataType::String, true)
1745 .expect_err(&format!("expected '{reserved}' to be rejected"));
1746 assert!(
1747 err.to_string().contains("reserved"),
1748 "error for '{reserved}' should mention 'reserved', got: {err}"
1749 );
1750 }
1751
1752 let err = manager
1757 .add_property("Tiny", "__set_struct__", DataType::String, true)
1758 .expect_err("expected '__set_struct__' to be rejected");
1759 assert!(
1760 err.to_string().contains("reserved"),
1761 "__set_struct__ rejection should mention 'reserved', got: {err}"
1762 );
1763
1764 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1766 assert!(
1767 manager
1768 .add_property("Tiny", reserved, DataType::String, true)
1769 .is_err(),
1770 "expected '{reserved}' to be rejected"
1771 );
1772 }
1773
1774 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1777 manager.add_property("Tiny", "user_op", DataType::String, true)?;
1778 manager.add_property("Tiny", "type_name", DataType::String, true)?;
1779
1780 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1782 assert!(
1783 manager
1784 .add_property("knows", "src_vid", DataType::Int64, true)
1785 .is_err()
1786 );
1787
1788 assert!(
1790 manager
1791 .add_generated_property(
1792 "Tiny",
1793 "ext_id",
1794 DataType::String,
1795 "concat('x', name)".into()
1796 )
1797 .is_err()
1798 );
1799
1800 Ok(())
1801 }
1802
1803 #[test]
1804 fn test_normalize_function_names() {
1805 assert_eq!(
1806 SchemaManager::normalize_function_names("lower(email)"),
1807 "LOWER(email)"
1808 );
1809 assert_eq!(
1810 SchemaManager::normalize_function_names("LOWER(email)"),
1811 "LOWER(email)"
1812 );
1813 assert_eq!(
1814 SchemaManager::normalize_function_names("Lower(email)"),
1815 "LOWER(email)"
1816 );
1817 assert_eq!(
1818 SchemaManager::normalize_function_names("trim(lower(email))"),
1819 "TRIM(LOWER(email))"
1820 );
1821 }
1822
1823 #[test]
1824 fn test_generated_column_name_case_insensitive() {
1825 let col1 = SchemaManager::generated_column_name("lower(email)");
1826 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1827 let col3 = SchemaManager::generated_column_name("Lower(email)");
1828 assert_eq!(col1, col2);
1829 assert_eq!(col2, col3);
1830 assert!(col1.starts_with("_gen_LOWER_email_"));
1831 }
1832
1833 #[test]
1834 fn test_index_metadata_serde_backward_compat() {
1835 let json = r#"{
1837 "type": "Scalar",
1838 "name": "idx_person_name",
1839 "label": "Person",
1840 "properties": ["name"],
1841 "index_type": "BTree",
1842 "where_clause": null
1843 }"#;
1844 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1845 let meta = def.metadata();
1846 assert_eq!(meta.status, IndexStatus::Online);
1847 assert!(meta.last_built_at.is_none());
1848 assert!(meta.row_count_at_build.is_none());
1849 }
1850
1851 #[test]
1852 fn test_index_metadata_serde_roundtrip() {
1853 let now = Utc::now();
1854 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1855 name: "idx_test".to_string(),
1856 label: "Test".to_string(),
1857 properties: vec!["prop".to_string()],
1858 index_type: ScalarIndexType::BTree,
1859 where_clause: None,
1860 metadata: IndexMetadata {
1861 status: IndexStatus::Building,
1862 last_built_at: Some(now),
1863 row_count_at_build: Some(42),
1864 },
1865 });
1866
1867 let json = serde_json::to_string(&def).unwrap();
1868 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1869 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1870 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1871 assert!(parsed.metadata().last_built_at.is_some());
1872 }
1873
1874 #[tokio::test]
1875 async fn test_update_index_metadata() -> Result<()> {
1876 let dir = tempdir()?;
1877 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1878 let path = ObjectStorePath::from("schema.json");
1879 let manager = SchemaManager::load_from_store(store, &path).await?;
1880
1881 manager.add_label("Person")?;
1882 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1883 name: "idx_test".to_string(),
1884 label: "Person".to_string(),
1885 properties: vec!["name".to_string()],
1886 index_type: ScalarIndexType::BTree,
1887 where_clause: None,
1888 metadata: Default::default(),
1889 });
1890 manager.add_index(idx)?;
1891
1892 let initial = manager.get_index("idx_test").unwrap();
1894 assert_eq!(initial.metadata().status, IndexStatus::Online);
1895
1896 manager.update_index_metadata("idx_test", |m| {
1898 m.status = IndexStatus::Building;
1899 m.row_count_at_build = Some(100);
1900 })?;
1901
1902 let updated = manager.get_index("idx_test").unwrap();
1903 assert_eq!(updated.metadata().status, IndexStatus::Building);
1904 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1905
1906 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1908
1909 Ok(())
1910 }
1911
1912 #[tokio::test]
1917 async fn test_add_index_is_upsert_by_name() -> Result<()> {
1918 let dir = tempdir()?;
1919 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1920 let path = ObjectStorePath::from("schema.json");
1921 let manager = SchemaManager::load_from_store(store, &path).await?;
1922 manager.add_label("Person")?;
1923
1924 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1925 name: "idx_test".to_string(),
1926 label: "Person".to_string(),
1927 properties: vec!["name".to_string()],
1928 index_type: ScalarIndexType::BTree,
1929 where_clause: None,
1930 metadata: IndexMetadata {
1931 status: IndexStatus::Building,
1932 ..Default::default()
1933 },
1934 });
1935 manager.add_index(initial.clone())?;
1936 assert_eq!(manager.schema().indexes.len(), 1);
1937
1938 manager.add_index(initial.clone())?;
1940 assert_eq!(
1941 manager.schema().indexes.len(),
1942 1,
1943 "duplicate add_index by name must not append"
1944 );
1945
1946 let mut updated_cfg = match initial {
1948 IndexDefinition::Scalar(c) => c,
1949 _ => unreachable!(),
1950 };
1951 updated_cfg.metadata.status = IndexStatus::Online;
1952 updated_cfg.metadata.row_count_at_build = Some(42);
1953 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
1954 assert_eq!(manager.schema().indexes.len(), 1);
1955 let stored = manager.get_index("idx_test").unwrap();
1956 assert_eq!(stored.metadata().status, IndexStatus::Online);
1957 assert_eq!(stored.metadata().row_count_at_build, Some(42));
1958
1959 let other = IndexDefinition::Scalar(ScalarIndexConfig {
1961 name: "idx_other".to_string(),
1962 label: "Person".to_string(),
1963 properties: vec!["age".to_string()],
1964 index_type: ScalarIndexType::BTree,
1965 where_clause: None,
1966 metadata: IndexMetadata::default(),
1967 });
1968 manager.add_index(other)?;
1969 assert_eq!(manager.schema().indexes.len(), 2);
1970
1971 Ok(())
1972 }
1973
1974 #[tokio::test]
1977 async fn test_load_dedups_bloated_indexes() -> Result<()> {
1978 let dir = tempdir()?;
1979 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1980 let path = ObjectStorePath::from("schema.json");
1981
1982 let mut schema = Schema::default();
1986 schema.labels.insert(
1987 "Person".to_string(),
1988 LabelMeta {
1989 id: 1,
1990 created_at: chrono::Utc::now(),
1991 state: SchemaElementState::Active,
1992 description: None,
1993 },
1994 );
1995 let make = |status: IndexStatus, count: Option<u64>| {
1996 IndexDefinition::Scalar(ScalarIndexConfig {
1997 name: "idx_dup".to_string(),
1998 label: "Person".to_string(),
1999 properties: vec!["name".to_string()],
2000 index_type: ScalarIndexType::BTree,
2001 where_clause: None,
2002 metadata: IndexMetadata {
2003 status,
2004 row_count_at_build: count,
2005 ..Default::default()
2006 },
2007 })
2008 };
2009 for _ in 0..49 {
2010 schema.indexes.push(make(IndexStatus::Building, None));
2011 }
2012 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2013 let json = serde_json::to_string_pretty(&schema)?;
2014 store.put(&path, json.into()).await?;
2015
2016 let manager = SchemaManager::load_from_store(store, &path).await?;
2017 let schema = manager.schema();
2018 assert_eq!(
2019 schema.indexes.len(),
2020 1,
2021 "load() must collapse 50 duplicates by name to 1"
2022 );
2023 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2025 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2026
2027 Ok(())
2028 }
2029
2030 #[test]
2031 fn test_vector_index_for_property_skips_non_online() {
2032 let mut schema = Schema::default();
2033 schema.labels.insert(
2034 "Document".to_string(),
2035 LabelMeta {
2036 id: 1,
2037 created_at: chrono::Utc::now(),
2038 state: SchemaElementState::Active,
2039 description: None,
2040 },
2041 );
2042
2043 schema
2045 .indexes
2046 .push(IndexDefinition::Vector(VectorIndexConfig {
2047 name: "vec_doc_embedding".to_string(),
2048 label: "Document".to_string(),
2049 property: "embedding".to_string(),
2050 index_type: VectorIndexType::Flat,
2051 metric: DistanceMetric::Cosine,
2052 embedding_config: None,
2053 metadata: IndexMetadata {
2054 status: IndexStatus::Stale,
2055 ..Default::default()
2056 },
2057 }));
2058
2059 assert!(
2061 schema
2062 .vector_index_for_property("Document", "embedding")
2063 .is_none()
2064 );
2065
2066 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2068 cfg.metadata.status = IndexStatus::Online;
2069 }
2070 let result = schema.vector_index_for_property("Document", "embedding");
2071 assert!(result.is_some());
2072 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2073 }
2074
2075 #[tokio::test]
2076 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2077 use crate::core::fork::SchemaDelta;
2078
2079 let dir = tempdir()?;
2080 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2081 let path = ObjectStorePath::from("schema.json");
2082 let primary = SchemaManager::load_from_store(store, &path).await?;
2083 primary.add_label("Person")?;
2084
2085 let overlay = primary.with_overlay(&SchemaDelta::empty());
2086 assert_eq!(overlay.schema().labels.len(), 1);
2087
2088 overlay.add_label("Forked")?;
2091 assert!(overlay.schema().labels.contains_key("Forked"));
2092 assert!(!primary.schema().labels.contains_key("Forked"));
2093
2094 Ok(())
2095 }
2096
2097 #[tokio::test]
2098 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2099 use crate::core::fork::SchemaDelta;
2100 use chrono::Utc;
2101
2102 let dir = tempdir()?;
2103 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2104 let path = ObjectStorePath::from("schema.json");
2105 let primary = SchemaManager::load_from_store(store, &path).await?;
2106 primary.add_label("Existing")?;
2107
2108 let label_meta = LabelMeta {
2109 id: 99,
2110 created_at: Utc::now(),
2111 state: SchemaElementState::Active,
2112 description: None,
2113 };
2114 let edge_meta = EdgeTypeMeta {
2115 id: 99,
2116 src_labels: vec!["NewLabel".into()],
2117 dst_labels: vec!["NewLabel".into()],
2118 state: SchemaElementState::Active,
2119 description: None,
2120 };
2121 let delta = SchemaDelta {
2122 added_labels: vec![("NewLabel".to_string(), label_meta)],
2123 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2124 added_properties: vec![],
2125 };
2126
2127 let overlay = primary.with_overlay(&delta);
2128 let merged = overlay.schema();
2129 assert!(merged.labels.contains_key("Existing"));
2130 assert!(merged.labels.contains_key("NewLabel"));
2131 assert!(merged.edge_types.contains_key("NewEdge"));
2132
2133 assert!(!primary.schema().labels.contains_key("NewLabel"));
2135 Ok(())
2136 }
2137}