1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82 GCounter,
83 GSet,
84 ORSet,
85 LWWRegister,
86 LWWMap,
87 Rga,
88 VectorClock,
89 VCRegister,
90}
91
92impl CrdtType {
93 #[must_use]
105 pub fn type_name(&self) -> &'static str {
106 match self {
107 CrdtType::GCounter => "GCounter",
108 CrdtType::GSet => "GSet",
109 CrdtType::ORSet => "ORSet",
110 CrdtType::LWWRegister => "LWWRegister",
111 CrdtType::LWWMap => "LWWMap",
112 CrdtType::Rga => "Rga",
113 CrdtType::VectorClock => "VectorClock",
114 CrdtType::VCRegister => "VCRegister",
115 }
116 }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121 Geographic, Cartesian2D, Cartesian3D, }
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129 String,
130 Int32,
131 Int64,
132 Float32,
133 Float64,
134 Bool,
135 Timestamp,
136 Date,
137 Time,
138 DateTime,
139 Duration,
140 CypherValue,
141 Bytes,
142 Point(PointType),
143 Vector { dimensions: usize },
144 Btic,
145 Crdt(CrdtType),
146 List(Box<DataType>),
147 Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151 #[allow(non_upper_case_globals)]
153 pub const Float: DataType = DataType::Float64;
154 #[allow(non_upper_case_globals)]
155 pub const Int: DataType = DataType::Int64;
156
157 pub fn to_arrow(&self) -> ArrowDataType {
158 match self {
159 DataType::String => ArrowDataType::Utf8,
160 DataType::Int32 => ArrowDataType::Int32,
161 DataType::Int64 => ArrowDataType::Int64,
162 DataType::Float32 => ArrowDataType::Float32,
163 DataType::Float64 => ArrowDataType::Float64,
164 DataType::Bool => ArrowDataType::Boolean,
165 DataType::Timestamp => {
166 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167 }
168 DataType::Date => ArrowDataType::Date32,
169 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
175 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176 Field::new("latitude", ArrowDataType::Float64, false),
177 Field::new("longitude", ArrowDataType::Float64, false),
178 Field::new("crs", ArrowDataType::Utf8, false),
179 ])),
180 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181 Field::new("x", ArrowDataType::Float64, false),
182 Field::new("y", ArrowDataType::Float64, false),
183 Field::new("crs", ArrowDataType::Utf8, false),
184 ])),
185 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186 Field::new("x", ArrowDataType::Float64, false),
187 Field::new("y", ArrowDataType::Float64, false),
188 Field::new("z", ArrowDataType::Float64, false),
189 Field::new("crs", ArrowDataType::Utf8, false),
190 ])),
191 },
192 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194 *dimensions as i32,
195 ),
196 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
199 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200 }
201 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202 "item",
203 ArrowDataType::Struct(Fields::from(vec![
204 Field::new("key", key.to_arrow(), false),
205 Field::new("value", value.to_arrow(), true),
206 ])),
207 true,
208 ))),
209 }
210 }
211
212 pub fn accepts(&self, value: &crate::value::Value) -> bool {
238 use crate::value::{TemporalValue, Value};
239
240 if matches!(value, Value::Null) {
242 return true;
243 }
244
245 match self {
246 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249 DataType::String => matches!(value, Value::String(_)),
250 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251 DataType::Float32 | DataType::Float64 => {
253 matches!(value, Value::Int(_) | Value::Float(_))
254 }
255 DataType::Bool => matches!(value, Value::Bool(_)),
256
257 DataType::Timestamp => matches!(
260 value,
261 Value::String(_)
262 | Value::Int(_)
263 | Value::Temporal(
264 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265 )
266 ),
267 DataType::DateTime => matches!(
268 value,
269 Value::Temporal(
270 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271 )
272 ),
273 DataType::Date => {
274 matches!(
275 value,
276 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277 )
278 }
279 DataType::Time => matches!(
280 value,
281 Value::Int(_)
282 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283 ),
284 DataType::Duration => {
285 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286 }
287 DataType::Bytes => matches!(value, Value::Bytes(_)),
288 DataType::Btic => matches!(
290 value,
291 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292 ),
293 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294 DataType::List(_) => matches!(value, Value::List(_)),
295 DataType::Map(_, _) => matches!(value, Value::Map(_)),
296 }
297 }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301 Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305 SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309 1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314 pub r#type: DataType,
315 pub nullable: bool,
316 #[serde(default = "default_version_1")]
317 pub added_in: u32, #[serde(default = "default_state")]
319 pub state: SchemaElementState,
320 #[serde(default)]
321 pub generation_expression: Option<String>,
322 #[serde(default, skip_serializing_if = "Option::is_none")]
323 pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328 pub id: u16, #[serde(default = "default_created_at")]
330 pub created_at: DateTime<Utc>,
331 #[serde(default = "default_state")]
332 pub state: SchemaElementState,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339 pub id: u32,
341 pub src_labels: Vec<String>,
342 pub dst_labels: Vec<String>,
343 #[serde(default = "default_state")]
344 pub state: SchemaElementState,
345 #[serde(default, skip_serializing_if = "Option::is_none")]
346 pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352 Unique { properties: Vec<String> },
353 Exists { property: String },
354 Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360 Label(String),
361 EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366 pub name: String,
367 pub constraint_type: ConstraintType,
368 pub target: ConstraintTarget,
369 pub enabled: bool,
370}
371
372#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379 name_to_id: HashMap<String, u32>,
380 id_to_name: HashMap<u32, String>,
381 next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386 pub fn new() -> Self {
387 Self {
388 name_to_id: HashMap::new(),
389 id_to_name: HashMap::new(),
390 next_local_id: 1,
391 }
392 }
393
394 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396 if let Some(&id) = self.name_to_id.get(type_name) {
397 return id;
398 }
399
400 let id = make_schemaless_id(self.next_local_id);
401 self.next_local_id += 1;
402
403 self.name_to_id.insert(type_name.to_string(), id);
404 self.id_to_name.insert(id, type_name.to_string());
405
406 id
407 }
408
409 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411 self.id_to_name.get(&type_id).map(String::as_str)
412 }
413
414 pub fn contains(&self, type_name: &str) -> bool {
416 self.name_to_id.contains_key(type_name)
417 }
418
419 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
421 self.name_to_id.get(type_name).copied()
422 }
423
424 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
426 self.name_to_id
427 .iter()
428 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
429 .map(|(_, &id)| id)
430 }
431
432 pub fn all_type_ids(&self) -> Vec<u32> {
434 self.id_to_name.keys().copied().collect()
435 }
436
437 pub fn is_empty(&self) -> bool {
439 self.name_to_id.is_empty()
440 }
441}
442
443impl Default for SchemalessEdgeTypeRegistry {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
455pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
457
458#[inline]
460pub fn is_virtual_label_id(id: u16) -> bool {
461 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct Schema {
466 pub schema_version: u32,
467 pub labels: HashMap<String, LabelMeta>,
468 pub edge_types: HashMap<String, EdgeTypeMeta>,
469 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
470 #[serde(default)]
471 pub indexes: Vec<IndexDefinition>,
472 #[serde(default)]
473 pub constraints: Vec<Constraint>,
474 #[serde(default)]
476 pub schemaless_registry: SchemalessEdgeTypeRegistry,
477}
478
479impl Default for Schema {
480 fn default() -> Self {
481 Self {
482 schema_version: 1,
483 labels: HashMap::new(),
484 edge_types: HashMap::new(),
485 properties: HashMap::new(),
486 indexes: Vec::new(),
487 constraints: Vec::new(),
488 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
489 }
490 }
491}
492
493impl Schema {
494 fn bump_version(&mut self) {
503 self.schema_version = self.schema_version.wrapping_add(1);
504 }
505
506 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
511 self.labels
512 .iter()
513 .find(|(_, meta)| meta.id == label_id)
514 .map(|(name, _)| name.as_str())
515 }
516
517 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
519 self.labels.get(label_name).map(|meta| meta.id)
520 }
521
522 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
527 self.edge_types
528 .iter()
529 .find(|(_, meta)| meta.id == type_id)
530 .map(|(name, _)| name.as_str())
531 }
532
533 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
535 self.edge_types.get(type_name).map(|meta| meta.id)
536 }
537
538 pub fn vector_index_for_property(
543 &self,
544 label: &str,
545 property: &str,
546 ) -> Option<&VectorIndexConfig> {
547 self.indexes.iter().find_map(|idx| {
548 if let IndexDefinition::Vector(config) = idx
549 && config.label == label
550 && config.property == property
551 && config.metadata.status == IndexStatus::Online
552 {
553 return Some(config);
554 }
555 None
556 })
557 }
558
559 pub fn fulltext_index_for_property(
564 &self,
565 label: &str,
566 property: &str,
567 ) -> Option<&FullTextIndexConfig> {
568 self.indexes.iter().find_map(|idx| {
569 if let IndexDefinition::FullText(config) = idx
570 && config.label == label
571 && config.properties.iter().any(|p| p == property)
572 && config.metadata.status == IndexStatus::Online
573 {
574 return Some(config);
575 }
576 None
577 })
578 }
579
580 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
585 self.labels
586 .iter()
587 .find(|(k, _)| k.eq_ignore_ascii_case(name))
588 .map(|(_, v)| v)
589 }
590
591 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
598 self.labels
599 .iter()
600 .find(|(k, _)| k.eq_ignore_ascii_case(name))
601 .map(|(k, _)| k.clone())
602 }
603
604 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
606 self.get_label_case_insensitive(label_name)
607 .map(|meta| meta.id)
608 }
609
610 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
615 self.edge_types
616 .iter()
617 .find(|(k, _)| k.eq_ignore_ascii_case(name))
618 .map(|(_, v)| v)
619 }
620
621 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
623 self.get_edge_type_case_insensitive(type_name)
624 .map(|meta| meta.id)
625 }
626
627 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
630 self.edge_type_id_by_name_case_insensitive(type_name)
631 .or_else(|| {
632 self.schemaless_registry
633 .id_by_name_case_insensitive(type_name)
634 })
635 }
636
637 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
643 if let Some(id) = self.edge_type_id_unified(type_name) {
644 return id;
645 }
646 let id = self.schemaless_registry.get_or_assign_id(type_name);
654 self.bump_version();
655 id
656 }
657
658 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
665 self.edge_type_id_by_name(type_name)
666 .or_else(|| self.schemaless_registry.id_by_name(type_name))
667 }
668
669 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
673 if is_schemaless_edge_type(type_id) {
674 self.schemaless_registry
675 .type_name_by_id(type_id)
676 .map(str::to_owned)
677 } else {
678 self.edge_type_name_by_id(type_id).map(str::to_owned)
679 }
680 }
681
682 pub fn all_edge_type_ids(&self) -> Vec<u32> {
685 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
686 ids.extend(self.schemaless_registry.all_type_ids());
687 ids.sort_unstable();
688 ids
689 }
690}
691
692#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
694pub enum IndexStatus {
695 #[default]
697 Online,
698 Building,
700 Stale,
702 Failed,
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
708pub struct IndexMetadata {
709 #[serde(default)]
711 pub status: IndexStatus,
712 #[serde(default)]
714 pub last_built_at: Option<DateTime<Utc>>,
715 #[serde(default)]
717 pub row_count_at_build: Option<u64>,
718}
719
720#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
721#[serde(tag = "type")]
722#[non_exhaustive]
723pub enum IndexDefinition {
724 Vector(VectorIndexConfig),
725 FullText(FullTextIndexConfig),
726 Scalar(ScalarIndexConfig),
727 Inverted(InvertedIndexConfig),
728 JsonFullText(JsonFtsIndexConfig),
729}
730
731impl IndexDefinition {
732 pub fn name(&self) -> &str {
734 match self {
735 IndexDefinition::Vector(c) => &c.name,
736 IndexDefinition::FullText(c) => &c.name,
737 IndexDefinition::Scalar(c) => &c.name,
738 IndexDefinition::Inverted(c) => &c.name,
739 IndexDefinition::JsonFullText(c) => &c.name,
740 }
741 }
742
743 pub fn label(&self) -> &str {
745 match self {
746 IndexDefinition::Vector(c) => &c.label,
747 IndexDefinition::FullText(c) => &c.label,
748 IndexDefinition::Scalar(c) => &c.label,
749 IndexDefinition::Inverted(c) => &c.label,
750 IndexDefinition::JsonFullText(c) => &c.label,
751 }
752 }
753
754 pub fn metadata(&self) -> &IndexMetadata {
756 match self {
757 IndexDefinition::Vector(c) => &c.metadata,
758 IndexDefinition::FullText(c) => &c.metadata,
759 IndexDefinition::Scalar(c) => &c.metadata,
760 IndexDefinition::Inverted(c) => &c.metadata,
761 IndexDefinition::JsonFullText(c) => &c.metadata,
762 }
763 }
764
765 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
767 match self {
768 IndexDefinition::Vector(c) => &mut c.metadata,
769 IndexDefinition::FullText(c) => &mut c.metadata,
770 IndexDefinition::Scalar(c) => &mut c.metadata,
771 IndexDefinition::Inverted(c) => &mut c.metadata,
772 IndexDefinition::JsonFullText(c) => &mut c.metadata,
773 }
774 }
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
778pub struct InvertedIndexConfig {
779 pub name: String,
780 pub label: String,
781 pub property: String,
782 #[serde(default = "default_normalize")]
783 pub normalize: bool,
784 #[serde(default = "default_max_terms_per_doc")]
785 pub max_terms_per_doc: usize,
786 #[serde(default)]
787 pub metadata: IndexMetadata,
788}
789
790fn default_normalize() -> bool {
791 true
792}
793
794fn default_max_terms_per_doc() -> usize {
795 10_000
796}
797
798#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
799pub struct VectorIndexConfig {
800 pub name: String,
801 pub label: String,
802 pub property: String,
803 pub index_type: VectorIndexType,
804 pub metric: DistanceMetric,
805 pub embedding_config: Option<EmbeddingConfig>,
806 #[serde(default)]
807 pub metadata: IndexMetadata,
808}
809
810#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
811pub struct EmbeddingConfig {
812 pub alias: String,
814 pub source_properties: Vec<String>,
815 pub batch_size: usize,
816 #[serde(default)]
819 pub document_prefix: Option<String>,
820 #[serde(default)]
823 pub query_prefix: Option<String>,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
827#[non_exhaustive]
828pub enum VectorIndexType {
829 Flat,
830 IvfFlat {
831 num_partitions: u32,
832 },
833 IvfPq {
834 num_partitions: u32,
835 num_sub_vectors: u32,
836 bits_per_subvector: u8,
837 },
838 IvfSq {
839 num_partitions: u32,
840 },
841 IvfRq {
842 num_partitions: u32,
843 #[serde(default)]
844 num_bits: Option<u8>,
845 },
846 HnswFlat {
847 m: u32,
848 ef_construction: u32,
849 #[serde(default)]
850 num_partitions: Option<u32>,
851 },
852 HnswSq {
853 m: u32,
854 ef_construction: u32,
855 #[serde(default)]
856 num_partitions: Option<u32>,
857 },
858 HnswPq {
859 m: u32,
860 ef_construction: u32,
861 num_sub_vectors: u32,
862 #[serde(default)]
863 num_partitions: Option<u32>,
864 },
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
868#[non_exhaustive]
869pub enum DistanceMetric {
870 Cosine,
871 L2,
872 Dot,
873}
874
875impl DistanceMetric {
876 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
888 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
889 match self {
890 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
891 DistanceMetric::Cosine => {
892 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
893 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
894 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
895 let denom = norm_a * norm_b;
896 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
897 }
898 DistanceMetric::Dot => {
899 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
900 -dot
901 }
902 }
903 }
904}
905
906#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
907pub struct FullTextIndexConfig {
908 pub name: String,
909 pub label: String,
910 pub properties: Vec<String>,
911 pub tokenizer: TokenizerConfig,
912 pub with_positions: bool,
913 #[serde(default)]
914 pub metadata: IndexMetadata,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
918#[non_exhaustive]
919pub enum TokenizerConfig {
920 Standard,
921 Whitespace,
922 Ngram { min: u8, max: u8 },
923 Custom { name: String },
924}
925
926#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
927pub struct JsonFtsIndexConfig {
928 pub name: String,
929 pub label: String,
930 pub column: String,
931 #[serde(default)]
932 pub paths: Vec<String>,
933 #[serde(default)]
934 pub with_positions: bool,
935 #[serde(default)]
936 pub metadata: IndexMetadata,
937}
938
939#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
940pub struct ScalarIndexConfig {
941 pub name: String,
942 pub label: String,
943 pub properties: Vec<String>,
944 pub index_type: ScalarIndexType,
945 pub where_clause: Option<String>,
946 #[serde(default)]
947 pub metadata: IndexMetadata,
948}
949
950#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
951#[non_exhaustive]
952pub enum ScalarIndexType {
953 BTree,
954 Hash,
955 Bitmap,
956 LabelList,
957}
958
959pub struct SchemaManager {
960 store: Arc<dyn ObjectStore>,
961 path: ObjectStorePath,
962 schema: RwLock<Arc<Schema>>,
963}
964
965impl SchemaManager {
966 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
967 let path = path.as_ref();
968 let parent = path
969 .parent()
970 .ok_or_else(|| anyhow!("Invalid schema path"))?;
971 let filename = path
972 .file_name()
973 .ok_or_else(|| anyhow!("Invalid schema filename"))?
974 .to_str()
975 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
976
977 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
978 let obj_path = ObjectStorePath::from(filename);
979
980 Self::load_from_store(store, &obj_path).await
981 }
982
983 pub async fn load_from_store(
984 store: Arc<dyn ObjectStore>,
985 path: &ObjectStorePath,
986 ) -> Result<Self> {
987 match store.get(path).await {
988 Ok(result) => {
989 let bytes = result.bytes().await?;
990 let content = String::from_utf8(bytes.to_vec())?;
991 let mut schema: Schema = serde_json::from_str(&content)?;
992 let original_len = schema.indexes.len();
1000 if original_len > 0 {
1001 let mut seen: std::collections::HashSet<String> =
1002 std::collections::HashSet::with_capacity(original_len);
1003 let mut dedup: Vec<IndexDefinition> = schema
1004 .indexes
1005 .iter()
1006 .rev()
1007 .filter(|idx| seen.insert(idx.name().to_string()))
1008 .cloned()
1009 .collect();
1010 dedup.reverse();
1011 if dedup.len() != original_len {
1012 tracing::warn!(
1013 collapsed = original_len - dedup.len(),
1014 kept = dedup.len(),
1015 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1016 );
1017 schema.indexes = dedup;
1018 }
1019 }
1020 Ok(Self {
1021 store,
1022 path: path.clone(),
1023 schema: RwLock::new(Arc::new(schema)),
1024 })
1025 }
1026 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1027 store,
1028 path: path.clone(),
1029 schema: RwLock::new(Arc::new(Schema::default())),
1030 }),
1031 Err(e) => Err(anyhow::Error::from(e)),
1032 }
1033 }
1034
1035 pub async fn save(&self) -> Result<()> {
1036 let content = {
1037 let schema_guard = acquire_read(&self.schema, "schema")?;
1038 serde_json::to_string_pretty(&**schema_guard)?
1039 };
1040 self.store
1041 .put(&self.path, content.into())
1042 .await
1043 .map_err(anyhow::Error::from)?;
1044 Ok(())
1045 }
1046
1047 pub fn path(&self) -> &ObjectStorePath {
1048 &self.path
1049 }
1050
1051 pub fn schema(&self) -> Arc<Schema> {
1052 self.schema
1053 .read()
1054 .expect("Schema lock poisoned - a thread panicked while holding it")
1055 .clone()
1056 }
1057
1058 fn normalize_function_names(expr: &str) -> String {
1061 let mut result = String::with_capacity(expr.len());
1062 let mut chars = expr.chars().peekable();
1063
1064 while let Some(ch) = chars.next() {
1065 if ch.is_alphabetic() {
1066 let mut ident = String::new();
1068 ident.push(ch);
1069
1070 while let Some(&next) = chars.peek() {
1071 if next.is_alphanumeric() || next == '_' {
1072 ident.push(chars.next().unwrap());
1073 } else {
1074 break;
1075 }
1076 }
1077
1078 if chars.peek() == Some(&'(') {
1080 result.push_str(&ident.to_uppercase());
1081 } else {
1082 result.push_str(&ident); }
1084 } else {
1085 result.push(ch);
1086 }
1087 }
1088
1089 result
1090 }
1091
1092 pub fn generated_column_name(expr: &str) -> String {
1100 let normalized = Self::normalize_function_names(expr);
1102
1103 let sanitized = normalized
1104 .replace(|c: char| !c.is_alphanumeric(), "_")
1105 .trim_matches('_')
1106 .to_string();
1107
1108 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1110 const FNV_PRIME: u64 = 1099511628211;
1111
1112 let mut hash = FNV_OFFSET_BASIS;
1113 for byte in normalized.as_bytes() {
1114 hash ^= *byte as u64;
1115 hash = hash.wrapping_mul(FNV_PRIME);
1116 }
1117
1118 format!("_gen_{}_{:x}", sanitized, hash)
1119 }
1120
1121 pub fn replace_schema(&self, new_schema: Schema) {
1122 let mut schema = self
1123 .schema
1124 .write()
1125 .expect("Schema lock poisoned - a thread panicked while holding it");
1126 *schema = Arc::new(new_schema);
1127 }
1128
1129 #[must_use]
1142 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1143 let primary = self.schema();
1144 let merged = if overlay.is_empty() {
1145 (*primary).clone()
1146 } else {
1147 let mut merged = (*primary).clone();
1148 for (name, label) in &overlay.added_labels {
1149 merged.labels.insert(name.clone(), label.clone());
1150 }
1151 for (name, edge_type) in &overlay.added_edge_types {
1152 merged.edge_types.insert(name.clone(), edge_type.clone());
1153 }
1154 for addition in &overlay.added_properties {
1155 let props = merged.properties.entry(addition.owner.clone()).or_default();
1156 props.insert(
1157 addition.property.clone(),
1158 PropertyMeta {
1159 r#type: addition.data_type.clone(),
1160 nullable: addition.nullable,
1161 added_in: merged.schema_version,
1162 state: SchemaElementState::Active,
1163 generation_expression: None,
1164 description: None,
1165 },
1166 );
1167 }
1168 merged
1169 };
1170
1171 Arc::new(Self {
1172 store: self.store.clone(),
1173 path: self.path.clone(),
1174 schema: RwLock::new(Arc::new(merged)),
1175 })
1176 }
1177
1178 pub fn next_label_id(&self) -> u16 {
1179 self.schema()
1180 .labels
1181 .values()
1182 .map(|l| l.id)
1183 .max()
1184 .unwrap_or(0)
1185 + 1
1186 }
1187
1188 pub fn next_type_id(&self) -> u32 {
1189 let max_schema_id = self
1190 .schema()
1191 .edge_types
1192 .values()
1193 .map(|t| t.id)
1194 .max()
1195 .unwrap_or(0);
1196
1197 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1199 panic!("Schema edge type ID exhaustion");
1200 }
1201
1202 max_schema_id + 1
1203 }
1204
1205 pub fn add_label(&self, name: &str) -> Result<u16> {
1206 self.add_label_with_desc(name, None)
1207 }
1208
1209 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1210 let mut guard = acquire_write(&self.schema, "schema")?;
1211 let schema = Arc::make_mut(&mut *guard);
1212 if schema.labels.contains_key(name) {
1213 return Err(anyhow!("Label '{}' already exists", name));
1214 }
1215
1216 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1217 if id >= VIRTUAL_LABEL_ID_START {
1218 return Err(anyhow!(
1219 "Native label space exhausted (next id {id:#x} would enter the \
1220 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1221 reserved for catalog-resolved labels)"
1222 ));
1223 }
1224 schema.labels.insert(
1225 name.to_string(),
1226 LabelMeta {
1227 id,
1228 created_at: Utc::now(),
1229 state: SchemaElementState::Active,
1230 description,
1231 },
1232 );
1233 schema.bump_version();
1234 Ok(id)
1235 }
1236
1237 pub fn add_edge_type(
1238 &self,
1239 name: &str,
1240 src_labels: Vec<String>,
1241 dst_labels: Vec<String>,
1242 ) -> Result<u32> {
1243 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1244 }
1245
1246 pub fn add_edge_type_with_desc(
1247 &self,
1248 name: &str,
1249 src_labels: Vec<String>,
1250 dst_labels: Vec<String>,
1251 description: Option<String>,
1252 ) -> Result<u32> {
1253 let mut guard = acquire_write(&self.schema, "schema")?;
1254 let schema = Arc::make_mut(&mut *guard);
1255 if schema.edge_types.contains_key(name) {
1256 return Err(anyhow!("Edge type '{}' already exists", name));
1257 }
1258
1259 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1260
1261 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1266 return Err(anyhow!(
1267 "Native edge type space exhausted (next id {id:#x} would enter the \
1268 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1269 reserved for catalog-resolved edge types)"
1270 ));
1271 }
1272
1273 schema.edge_types.insert(
1274 name.to_string(),
1275 EdgeTypeMeta {
1276 id,
1277 src_labels,
1278 dst_labels,
1279 state: SchemaElementState::Active,
1280 description,
1281 },
1282 );
1283 schema.bump_version();
1284 Ok(id)
1285 }
1286
1287 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1296 {
1297 let guard = acquire_read(&self.schema, "schema")
1298 .expect("Schema lock poisoned - a thread panicked while holding it");
1299 if let Some(id) = guard.edge_type_id_unified(type_name) {
1300 return id;
1301 }
1302 }
1303 let mut guard = acquire_write(&self.schema, "schema")
1304 .expect("Schema lock poisoned - a thread panicked while holding it");
1305 let schema = Arc::make_mut(&mut *guard);
1306 schema.get_or_assign_edge_type_id(type_name)
1307 }
1308
1309 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1311 let schema = acquire_read(&self.schema, "schema")
1312 .expect("Schema lock poisoned - a thread panicked while holding it");
1313 schema.edge_type_name_by_id_unified(type_id)
1314 }
1315
1316 pub fn add_property(
1317 &self,
1318 label_or_type: &str,
1319 prop_name: &str,
1320 data_type: DataType,
1321 nullable: bool,
1322 ) -> Result<()> {
1323 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1324 }
1325
1326 pub fn add_property_with_desc(
1327 &self,
1328 label_or_type: &str,
1329 prop_name: &str,
1330 data_type: DataType,
1331 nullable: bool,
1332 description: Option<String>,
1333 ) -> Result<()> {
1334 validate_property_name(prop_name)?;
1335 let mut guard = acquire_write(&self.schema, "schema")?;
1336 let schema = Arc::make_mut(&mut *guard);
1337 let version = schema.schema_version;
1338 let props = schema
1339 .properties
1340 .entry(label_or_type.to_string())
1341 .or_default();
1342
1343 if props.contains_key(prop_name) {
1344 return Err(anyhow!(
1345 "Property '{}' already exists for '{}'",
1346 prop_name,
1347 label_or_type
1348 ));
1349 }
1350
1351 props.insert(
1352 prop_name.to_string(),
1353 PropertyMeta {
1354 r#type: data_type,
1355 nullable,
1356 added_in: version,
1357 state: SchemaElementState::Active,
1358 generation_expression: None,
1359 description,
1360 },
1361 );
1362 schema.bump_version();
1364 Ok(())
1365 }
1366
1367 pub fn add_generated_property(
1368 &self,
1369 label_or_type: &str,
1370 prop_name: &str,
1371 data_type: DataType,
1372 expr: String,
1373 ) -> Result<()> {
1374 validate_reserved_property_name(prop_name)?;
1377 let mut guard = acquire_write(&self.schema, "schema")?;
1378 let schema = Arc::make_mut(&mut *guard);
1379 let version = schema.schema_version;
1380 let props = schema
1381 .properties
1382 .entry(label_or_type.to_string())
1383 .or_default();
1384
1385 if props.contains_key(prop_name) {
1386 return Err(anyhow!("Property '{}' already exists", prop_name));
1387 }
1388
1389 props.insert(
1390 prop_name.to_string(),
1391 PropertyMeta {
1392 r#type: data_type,
1393 nullable: true,
1394 added_in: version,
1395 state: SchemaElementState::Active,
1396 generation_expression: Some(expr),
1397 description: None,
1398 },
1399 );
1400 schema.bump_version();
1402 Ok(())
1403 }
1404
1405 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1406 let mut guard = acquire_write(&self.schema, "schema")?;
1407 let schema = Arc::make_mut(&mut *guard);
1408 let meta = schema
1409 .labels
1410 .get_mut(name)
1411 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1412 meta.description = description;
1413 Ok(())
1414 }
1415
1416 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1417 let mut guard = acquire_write(&self.schema, "schema")?;
1418 let schema = Arc::make_mut(&mut *guard);
1419 let meta = schema
1420 .edge_types
1421 .get_mut(name)
1422 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1423 meta.description = description;
1424 Ok(())
1425 }
1426
1427 pub fn set_property_description(
1428 &self,
1429 entity: &str,
1430 prop_name: &str,
1431 description: Option<String>,
1432 ) -> Result<()> {
1433 let mut guard = acquire_write(&self.schema, "schema")?;
1434 let schema = Arc::make_mut(&mut *guard);
1435 let props = schema
1436 .properties
1437 .get_mut(entity)
1438 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1439 let meta = props
1440 .get_mut(prop_name)
1441 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1442 meta.description = description;
1443 Ok(())
1444 }
1445
1446 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1455 let mut guard = acquire_write(&self.schema, "schema")?;
1456 let schema = Arc::make_mut(&mut *guard);
1457 if let Some(existing) = schema
1458 .indexes
1459 .iter_mut()
1460 .find(|i| i.name() == index_def.name())
1461 {
1462 *existing = index_def;
1463 } else {
1464 schema.indexes.push(index_def);
1465 }
1466 schema.bump_version();
1467 Ok(())
1468 }
1469
1470 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1471 let schema = self.schema.read().expect("Schema lock poisoned");
1472 schema.indexes.iter().find(|i| i.name() == name).cloned()
1473 }
1474
1475 pub fn update_index_metadata(
1480 &self,
1481 index_name: &str,
1482 f: impl FnOnce(&mut IndexMetadata),
1483 ) -> Result<()> {
1484 let mut guard = acquire_write(&self.schema, "schema")?;
1485 let schema = Arc::make_mut(&mut *guard);
1486 let idx = schema
1487 .indexes
1488 .iter_mut()
1489 .find(|i| i.name() == index_name)
1490 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1491 f(idx.metadata_mut());
1492 Ok(())
1493 }
1494
1495 pub fn remove_index(&self, name: &str) -> Result<()> {
1496 let mut guard = acquire_write(&self.schema, "schema")?;
1497 let schema = Arc::make_mut(&mut *guard);
1498 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1499 schema.indexes.remove(pos);
1500 schema.bump_version();
1501 Ok(())
1502 } else {
1503 Err(anyhow!("Index '{}' not found", name))
1504 }
1505 }
1506
1507 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1508 let mut guard = acquire_write(&self.schema, "schema")?;
1509 let schema = Arc::make_mut(&mut *guard);
1510 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1511 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1512 }
1513 schema.constraints.push(constraint);
1514 schema.bump_version();
1515 Ok(())
1516 }
1517
1518 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1519 let mut guard = acquire_write(&self.schema, "schema")?;
1520 let schema = Arc::make_mut(&mut *guard);
1521 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1522 schema.constraints.remove(pos);
1523 schema.bump_version();
1524 Ok(())
1525 } else if if_exists {
1526 Ok(())
1527 } else {
1528 Err(anyhow!("Constraint '{}' not found", name))
1529 }
1530 }
1531
1532 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1533 let mut guard = acquire_write(&self.schema, "schema")?;
1534 let schema = Arc::make_mut(&mut *guard);
1535 let Some(props) = schema.properties.get_mut(label_or_type) else {
1536 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1537 };
1538 if props.remove(prop_name).is_none() {
1539 return Err(anyhow!(
1540 "Property '{}' not found for '{}'",
1541 prop_name,
1542 label_or_type
1543 ));
1544 }
1545 schema.bump_version();
1546 Ok(())
1547 }
1548
1549 pub fn rename_property(
1550 &self,
1551 label_or_type: &str,
1552 old_name: &str,
1553 new_name: &str,
1554 ) -> Result<()> {
1555 let mut guard = acquire_write(&self.schema, "schema")?;
1556 let schema = Arc::make_mut(&mut *guard);
1557 let Some(props) = schema.properties.get_mut(label_or_type) else {
1558 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1559 };
1560 let Some(meta) = props.remove(old_name) else {
1561 return Err(anyhow!(
1562 "Property '{}' not found for '{}'",
1563 old_name,
1564 label_or_type
1565 ));
1566 };
1567 if props.contains_key(new_name) {
1568 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1571 }
1572 props.insert(new_name.to_string(), meta);
1573 schema.bump_version();
1574 Ok(())
1575 }
1576
1577 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1578 let mut guard = acquire_write(&self.schema, "schema")?;
1579 let schema = Arc::make_mut(&mut *guard);
1580 if let Some(label_meta) = schema.labels.get_mut(name) {
1581 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1582 schema.bump_version();
1584 Ok(())
1585 } else if if_exists {
1586 Ok(())
1587 } else {
1588 Err(anyhow!("Label '{}' not found", name))
1589 }
1590 }
1591
1592 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1593 let mut guard = acquire_write(&self.schema, "schema")?;
1594 let schema = Arc::make_mut(&mut *guard);
1595 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1596 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1597 schema.bump_version();
1599 Ok(())
1600 } else if if_exists {
1601 Ok(())
1602 } else {
1603 Err(anyhow!("Edge Type '{}' not found", name))
1604 }
1605 }
1606}
1607
1608pub fn validate_identifier(name: &str) -> Result<()> {
1610 if name.is_empty() || name.len() > 64 {
1612 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1613 }
1614
1615 let first = name.chars().next().unwrap();
1617 if !first.is_alphabetic() && first != '_' {
1618 return Err(anyhow!(
1619 "Identifier '{}' must start with letter or underscore",
1620 name
1621 ));
1622 }
1623
1624 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1626 return Err(anyhow!(
1627 "Identifier '{}' must contain only alphanumeric and underscore",
1628 name
1629 ));
1630 }
1631
1632 const RESERVED: &[&str] = &[
1634 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1635 "UNION", "ORDER", "LIMIT",
1636 ];
1637 if RESERVED.contains(&name.to_uppercase().as_str()) {
1638 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1639 }
1640
1641 Ok(())
1642}
1643
1644pub fn validate_property_name(name: &str) -> Result<()> {
1651 if name.starts_with('_') {
1652 return Err(anyhow!(
1653 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1654 name
1655 ));
1656 }
1657 validate_reserved_property_name(name)
1658}
1659
1660fn validate_reserved_property_name(name: &str) -> Result<()> {
1667 const RESERVED_PROPS: &[&str] = &[
1676 "ext_id",
1677 "overflow_json",
1678 "eid",
1679 "src_vid",
1680 "dst_vid",
1681 "op",
1682 "__set_struct__",
1690 ];
1691 if RESERVED_PROPS.contains(&name) {
1692 return Err(anyhow!(
1693 "Property name '{}' is reserved by the storage layer; please choose a different name",
1694 name
1695 ));
1696 }
1697 Ok(())
1698}
1699
1700#[cfg(test)]
1701mod tests {
1702 use super::*;
1703 use crate::value::{TemporalValue, Value};
1704 use object_store::local::LocalFileSystem;
1705 use tempfile::tempdir;
1706
1707 #[test]
1708 fn test_datatype_accepts_matrix() {
1709 let dt = || TemporalValue::DateTime {
1710 nanos_since_epoch: 0,
1711 offset_seconds: 0,
1712 timezone_name: None,
1713 };
1714
1715 for ty in [
1717 DataType::String,
1718 DataType::Int64,
1719 DataType::Bool,
1720 DataType::DateTime,
1721 DataType::Float64,
1722 ] {
1723 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1724 }
1725
1726 assert!(DataType::String.accepts(&Value::String("x".into())));
1728 assert!(DataType::Int64.accepts(&Value::Int(1)));
1729 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1730 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1731
1732 assert!(
1734 DataType::Float64.accepts(&Value::Int(3)),
1735 "Int widens to Float"
1736 );
1737 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1738 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1739 assert!(
1740 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1741 "storage parses strings for non-struct Timestamp columns"
1742 );
1743
1744 assert!(
1746 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1747 "String into a DateTime struct column nulls silently — reject here"
1748 );
1749 assert!(!DataType::Bool.accepts(&Value::Int(1)));
1750 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1751 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1752 assert!(
1753 !DataType::String.accepts(&Value::Int(10)),
1754 "no implicit stringification"
1755 );
1756 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1757
1758 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1760 }
1761
1762 #[tokio::test]
1763 async fn test_schema_management() -> Result<()> {
1764 let dir = tempdir()?;
1765 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1766 let path = ObjectStorePath::from("schema.json");
1767 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1768
1769 let lid = manager.add_label("Person")?;
1771 assert_eq!(lid, 1);
1772 assert!(manager.add_label("Person").is_err());
1773
1774 manager.add_property("Person", "name", DataType::String, false)?;
1776 assert!(
1777 manager
1778 .add_property("Person", "name", DataType::String, false)
1779 .is_err()
1780 );
1781
1782 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1784 assert_eq!(tid, 1);
1785
1786 manager.save().await?;
1787 assert!(store.get(&path).await.is_ok());
1789
1790 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1791 assert!(manager2.schema().labels.contains_key("Person"));
1792 assert!(
1793 manager2
1794 .schema()
1795 .properties
1796 .get("Person")
1797 .unwrap()
1798 .contains_key("name")
1799 );
1800
1801 Ok(())
1802 }
1803
1804 #[tokio::test]
1805 async fn test_reserved_property_names_rejected() -> Result<()> {
1806 let dir = tempdir()?;
1807 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1808 let path = ObjectStorePath::from("schema.json");
1809 let manager = SchemaManager::load_from_store(store, &path).await?;
1810
1811 manager.add_label("Tiny")?;
1812
1813 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1817 let err = manager
1818 .add_property("Tiny", reserved, DataType::String, true)
1819 .expect_err(&format!("expected '{reserved}' to be rejected"));
1820 assert!(
1821 err.to_string().contains("reserved"),
1822 "error for '{reserved}' should mention 'reserved', got: {err}"
1823 );
1824 }
1825
1826 let err = manager
1831 .add_property("Tiny", "__set_struct__", DataType::String, true)
1832 .expect_err("expected '__set_struct__' to be rejected");
1833 assert!(
1834 err.to_string().contains("reserved"),
1835 "__set_struct__ rejection should mention 'reserved', got: {err}"
1836 );
1837
1838 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1840 assert!(
1841 manager
1842 .add_property("Tiny", reserved, DataType::String, true)
1843 .is_err(),
1844 "expected '{reserved}' to be rejected"
1845 );
1846 }
1847
1848 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1851 manager.add_property("Tiny", "user_op", DataType::String, true)?;
1852 manager.add_property("Tiny", "type_name", DataType::String, true)?;
1853
1854 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1856 assert!(
1857 manager
1858 .add_property("knows", "src_vid", DataType::Int64, true)
1859 .is_err()
1860 );
1861
1862 assert!(
1864 manager
1865 .add_generated_property(
1866 "Tiny",
1867 "ext_id",
1868 DataType::String,
1869 "concat('x', name)".into()
1870 )
1871 .is_err()
1872 );
1873
1874 Ok(())
1875 }
1876
1877 #[test]
1878 fn test_normalize_function_names() {
1879 assert_eq!(
1880 SchemaManager::normalize_function_names("lower(email)"),
1881 "LOWER(email)"
1882 );
1883 assert_eq!(
1884 SchemaManager::normalize_function_names("LOWER(email)"),
1885 "LOWER(email)"
1886 );
1887 assert_eq!(
1888 SchemaManager::normalize_function_names("Lower(email)"),
1889 "LOWER(email)"
1890 );
1891 assert_eq!(
1892 SchemaManager::normalize_function_names("trim(lower(email))"),
1893 "TRIM(LOWER(email))"
1894 );
1895 }
1896
1897 #[test]
1898 fn test_generated_column_name_case_insensitive() {
1899 let col1 = SchemaManager::generated_column_name("lower(email)");
1900 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1901 let col3 = SchemaManager::generated_column_name("Lower(email)");
1902 assert_eq!(col1, col2);
1903 assert_eq!(col2, col3);
1904 assert!(col1.starts_with("_gen_LOWER_email_"));
1905 }
1906
1907 #[test]
1908 fn test_index_metadata_serde_backward_compat() {
1909 let json = r#"{
1911 "type": "Scalar",
1912 "name": "idx_person_name",
1913 "label": "Person",
1914 "properties": ["name"],
1915 "index_type": "BTree",
1916 "where_clause": null
1917 }"#;
1918 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1919 let meta = def.metadata();
1920 assert_eq!(meta.status, IndexStatus::Online);
1921 assert!(meta.last_built_at.is_none());
1922 assert!(meta.row_count_at_build.is_none());
1923 }
1924
1925 #[test]
1926 fn test_index_metadata_serde_roundtrip() {
1927 let now = Utc::now();
1928 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1929 name: "idx_test".to_string(),
1930 label: "Test".to_string(),
1931 properties: vec!["prop".to_string()],
1932 index_type: ScalarIndexType::BTree,
1933 where_clause: None,
1934 metadata: IndexMetadata {
1935 status: IndexStatus::Building,
1936 last_built_at: Some(now),
1937 row_count_at_build: Some(42),
1938 },
1939 });
1940
1941 let json = serde_json::to_string(&def).unwrap();
1942 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1943 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1944 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1945 assert!(parsed.metadata().last_built_at.is_some());
1946 }
1947
1948 #[tokio::test]
1949 async fn test_update_index_metadata() -> Result<()> {
1950 let dir = tempdir()?;
1951 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1952 let path = ObjectStorePath::from("schema.json");
1953 let manager = SchemaManager::load_from_store(store, &path).await?;
1954
1955 manager.add_label("Person")?;
1956 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1957 name: "idx_test".to_string(),
1958 label: "Person".to_string(),
1959 properties: vec!["name".to_string()],
1960 index_type: ScalarIndexType::BTree,
1961 where_clause: None,
1962 metadata: Default::default(),
1963 });
1964 manager.add_index(idx)?;
1965
1966 let initial = manager.get_index("idx_test").unwrap();
1968 assert_eq!(initial.metadata().status, IndexStatus::Online);
1969
1970 manager.update_index_metadata("idx_test", |m| {
1972 m.status = IndexStatus::Building;
1973 m.row_count_at_build = Some(100);
1974 })?;
1975
1976 let updated = manager.get_index("idx_test").unwrap();
1977 assert_eq!(updated.metadata().status, IndexStatus::Building);
1978 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1979
1980 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1982
1983 Ok(())
1984 }
1985
1986 #[tokio::test]
1991 async fn test_add_index_is_upsert_by_name() -> Result<()> {
1992 let dir = tempdir()?;
1993 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1994 let path = ObjectStorePath::from("schema.json");
1995 let manager = SchemaManager::load_from_store(store, &path).await?;
1996 manager.add_label("Person")?;
1997
1998 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1999 name: "idx_test".to_string(),
2000 label: "Person".to_string(),
2001 properties: vec!["name".to_string()],
2002 index_type: ScalarIndexType::BTree,
2003 where_clause: None,
2004 metadata: IndexMetadata {
2005 status: IndexStatus::Building,
2006 ..Default::default()
2007 },
2008 });
2009 manager.add_index(initial.clone())?;
2010 assert_eq!(manager.schema().indexes.len(), 1);
2011
2012 manager.add_index(initial.clone())?;
2014 assert_eq!(
2015 manager.schema().indexes.len(),
2016 1,
2017 "duplicate add_index by name must not append"
2018 );
2019
2020 let mut updated_cfg = match initial {
2022 IndexDefinition::Scalar(c) => c,
2023 _ => unreachable!(),
2024 };
2025 updated_cfg.metadata.status = IndexStatus::Online;
2026 updated_cfg.metadata.row_count_at_build = Some(42);
2027 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2028 assert_eq!(manager.schema().indexes.len(), 1);
2029 let stored = manager.get_index("idx_test").unwrap();
2030 assert_eq!(stored.metadata().status, IndexStatus::Online);
2031 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2032
2033 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2035 name: "idx_other".to_string(),
2036 label: "Person".to_string(),
2037 properties: vec!["age".to_string()],
2038 index_type: ScalarIndexType::BTree,
2039 where_clause: None,
2040 metadata: IndexMetadata::default(),
2041 });
2042 manager.add_index(other)?;
2043 assert_eq!(manager.schema().indexes.len(), 2);
2044
2045 Ok(())
2046 }
2047
2048 #[tokio::test]
2051 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2052 let dir = tempdir()?;
2053 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2054 let path = ObjectStorePath::from("schema.json");
2055
2056 let mut schema = Schema::default();
2060 schema.labels.insert(
2061 "Person".to_string(),
2062 LabelMeta {
2063 id: 1,
2064 created_at: chrono::Utc::now(),
2065 state: SchemaElementState::Active,
2066 description: None,
2067 },
2068 );
2069 let make = |status: IndexStatus, count: Option<u64>| {
2070 IndexDefinition::Scalar(ScalarIndexConfig {
2071 name: "idx_dup".to_string(),
2072 label: "Person".to_string(),
2073 properties: vec!["name".to_string()],
2074 index_type: ScalarIndexType::BTree,
2075 where_clause: None,
2076 metadata: IndexMetadata {
2077 status,
2078 row_count_at_build: count,
2079 ..Default::default()
2080 },
2081 })
2082 };
2083 for _ in 0..49 {
2084 schema.indexes.push(make(IndexStatus::Building, None));
2085 }
2086 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2087 let json = serde_json::to_string_pretty(&schema)?;
2088 store.put(&path, json.into()).await?;
2089
2090 let manager = SchemaManager::load_from_store(store, &path).await?;
2091 let schema = manager.schema();
2092 assert_eq!(
2093 schema.indexes.len(),
2094 1,
2095 "load() must collapse 50 duplicates by name to 1"
2096 );
2097 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2099 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2100
2101 Ok(())
2102 }
2103
2104 #[test]
2105 fn test_vector_index_for_property_skips_non_online() {
2106 let mut schema = Schema::default();
2107 schema.labels.insert(
2108 "Document".to_string(),
2109 LabelMeta {
2110 id: 1,
2111 created_at: chrono::Utc::now(),
2112 state: SchemaElementState::Active,
2113 description: None,
2114 },
2115 );
2116
2117 schema
2119 .indexes
2120 .push(IndexDefinition::Vector(VectorIndexConfig {
2121 name: "vec_doc_embedding".to_string(),
2122 label: "Document".to_string(),
2123 property: "embedding".to_string(),
2124 index_type: VectorIndexType::Flat,
2125 metric: DistanceMetric::Cosine,
2126 embedding_config: None,
2127 metadata: IndexMetadata {
2128 status: IndexStatus::Stale,
2129 ..Default::default()
2130 },
2131 }));
2132
2133 assert!(
2135 schema
2136 .vector_index_for_property("Document", "embedding")
2137 .is_none()
2138 );
2139
2140 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2142 cfg.metadata.status = IndexStatus::Online;
2143 }
2144 let result = schema.vector_index_for_property("Document", "embedding");
2145 assert!(result.is_some());
2146 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2147 }
2148
2149 #[tokio::test]
2150 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2151 use crate::core::fork::SchemaDelta;
2152
2153 let dir = tempdir()?;
2154 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2155 let path = ObjectStorePath::from("schema.json");
2156 let primary = SchemaManager::load_from_store(store, &path).await?;
2157 primary.add_label("Person")?;
2158
2159 let overlay = primary.with_overlay(&SchemaDelta::empty());
2160 assert_eq!(overlay.schema().labels.len(), 1);
2161
2162 overlay.add_label("Forked")?;
2165 assert!(overlay.schema().labels.contains_key("Forked"));
2166 assert!(!primary.schema().labels.contains_key("Forked"));
2167
2168 Ok(())
2169 }
2170
2171 #[tokio::test]
2172 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2173 use crate::core::fork::SchemaDelta;
2174 use chrono::Utc;
2175
2176 let dir = tempdir()?;
2177 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2178 let path = ObjectStorePath::from("schema.json");
2179 let primary = SchemaManager::load_from_store(store, &path).await?;
2180 primary.add_label("Existing")?;
2181
2182 let label_meta = LabelMeta {
2183 id: 99,
2184 created_at: Utc::now(),
2185 state: SchemaElementState::Active,
2186 description: None,
2187 };
2188 let edge_meta = EdgeTypeMeta {
2189 id: 99,
2190 src_labels: vec!["NewLabel".into()],
2191 dst_labels: vec!["NewLabel".into()],
2192 state: SchemaElementState::Active,
2193 description: None,
2194 };
2195 let delta = SchemaDelta {
2196 added_labels: vec![("NewLabel".to_string(), label_meta)],
2197 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2198 added_properties: vec![],
2199 };
2200
2201 let overlay = primary.with_overlay(&delta);
2202 let merged = overlay.schema();
2203 assert!(merged.labels.contains_key("Existing"));
2204 assert!(merged.labels.contains_key("NewLabel"));
2205 assert!(merged.edge_types.contains_key("NewEdge"));
2206
2207 assert!(!primary.schema().labels.contains_key("NewLabel"));
2209 Ok(())
2210 }
2211
2212 #[tokio::test]
2217 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2218 let dir = tempdir()?;
2219 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2220 let path = ObjectStorePath::from("schema.json");
2221 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2222
2223 let mut handles = Vec::new();
2224 for _ in 0..16 {
2225 let m = manager.clone();
2226 handles.push(std::thread::spawn(move || {
2227 m.get_or_assign_edge_type_id("RACED")
2228 }));
2229 }
2230 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2231 assert!(
2232 ids.iter().all(|&id| id == ids[0]),
2233 "all racers must observe one id, got {ids:?}"
2234 );
2235 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2237
2238 manager.add_label("A")?;
2240 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2241 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2242 Ok(())
2243 }
2244
2245 #[test]
2250 fn test_new_schemaless_edge_type_bumps_schema_version() {
2251 let mut schema = Schema::default();
2252 let v0 = schema.schema_version;
2253
2254 let id1 = schema.get_or_assign_edge_type_id("FRESH");
2255 assert_eq!(
2256 schema.schema_version,
2257 v0.wrapping_add(1),
2258 "minting a new edge type must bump schema_version"
2259 );
2260
2261 let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2263 assert_eq!(id1, id1_again);
2264 assert_eq!(
2265 schema.schema_version,
2266 v0.wrapping_add(1),
2267 "resolving an existing edge type must not bump schema_version"
2268 );
2269
2270 let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2272 assert_eq!(
2273 schema.schema_version,
2274 v0.wrapping_add(2),
2275 "a second new edge type must bump schema_version again"
2276 );
2277 }
2278}