1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82 GCounter,
83 GSet,
84 ORSet,
85 LWWRegister,
86 LWWMap,
87 Rga,
88 VectorClock,
89 VCRegister,
90}
91
92impl CrdtType {
93 #[must_use]
105 pub fn type_name(&self) -> &'static str {
106 match self {
107 CrdtType::GCounter => "GCounter",
108 CrdtType::GSet => "GSet",
109 CrdtType::ORSet => "ORSet",
110 CrdtType::LWWRegister => "LWWRegister",
111 CrdtType::LWWMap => "LWWMap",
112 CrdtType::Rga => "Rga",
113 CrdtType::VectorClock => "VectorClock",
114 CrdtType::VCRegister => "VCRegister",
115 }
116 }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121 Geographic, Cartesian2D, Cartesian3D, }
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129 String,
130 Int32,
131 Int64,
132 Float32,
133 Float64,
134 Bool,
135 Timestamp,
136 Date,
137 Time,
138 DateTime,
139 Duration,
140 CypherValue,
141 Bytes,
142 Point(PointType),
143 Vector { dimensions: usize },
144 Btic,
145 Crdt(CrdtType),
146 List(Box<DataType>),
147 Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151 #[allow(non_upper_case_globals)]
153 pub const Float: DataType = DataType::Float64;
154 #[allow(non_upper_case_globals)]
155 pub const Int: DataType = DataType::Int64;
156
157 pub fn to_arrow(&self) -> ArrowDataType {
158 match self {
159 DataType::String => ArrowDataType::Utf8,
160 DataType::Int32 => ArrowDataType::Int32,
161 DataType::Int64 => ArrowDataType::Int64,
162 DataType::Float32 => ArrowDataType::Float32,
163 DataType::Float64 => ArrowDataType::Float64,
164 DataType::Bool => ArrowDataType::Boolean,
165 DataType::Timestamp => {
166 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167 }
168 DataType::Date => ArrowDataType::Date32,
169 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
175 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176 Field::new("latitude", ArrowDataType::Float64, false),
177 Field::new("longitude", ArrowDataType::Float64, false),
178 Field::new("crs", ArrowDataType::Utf8, false),
179 ])),
180 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181 Field::new("x", ArrowDataType::Float64, false),
182 Field::new("y", ArrowDataType::Float64, false),
183 Field::new("crs", ArrowDataType::Utf8, false),
184 ])),
185 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186 Field::new("x", ArrowDataType::Float64, false),
187 Field::new("y", ArrowDataType::Float64, false),
188 Field::new("z", ArrowDataType::Float64, false),
189 Field::new("crs", ArrowDataType::Utf8, false),
190 ])),
191 },
192 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194 *dimensions as i32,
195 ),
196 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
199 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200 }
201 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202 "item",
203 ArrowDataType::Struct(Fields::from(vec![
204 Field::new("key", key.to_arrow(), false),
205 Field::new("value", value.to_arrow(), true),
206 ])),
207 true,
208 ))),
209 }
210 }
211
212 pub fn accepts(&self, value: &crate::value::Value) -> bool {
238 use crate::value::{TemporalValue, Value};
239
240 if matches!(value, Value::Null) {
242 return true;
243 }
244
245 match self {
246 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249 DataType::String => matches!(value, Value::String(_)),
250 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251 DataType::Float32 | DataType::Float64 => {
253 matches!(value, Value::Int(_) | Value::Float(_))
254 }
255 DataType::Bool => matches!(value, Value::Bool(_)),
256
257 DataType::Timestamp => matches!(
260 value,
261 Value::String(_)
262 | Value::Int(_)
263 | Value::Temporal(
264 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265 )
266 ),
267 DataType::DateTime => matches!(
268 value,
269 Value::Temporal(
270 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271 )
272 ),
273 DataType::Date => {
274 matches!(
275 value,
276 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277 )
278 }
279 DataType::Time => matches!(
280 value,
281 Value::Int(_)
282 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283 ),
284 DataType::Duration => {
285 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286 }
287 DataType::Bytes => matches!(value, Value::Bytes(_)),
288 DataType::Btic => matches!(
290 value,
291 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292 ),
293 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294 DataType::List(_) => matches!(value, Value::List(_)),
295 DataType::Map(_, _) => matches!(value, Value::Map(_)),
296 }
297 }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301 Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305 SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309 1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314 pub r#type: DataType,
315 pub nullable: bool,
316 #[serde(default = "default_version_1")]
317 pub added_in: u32, #[serde(default = "default_state")]
319 pub state: SchemaElementState,
320 #[serde(default)]
321 pub generation_expression: Option<String>,
322 #[serde(default, skip_serializing_if = "Option::is_none")]
323 pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328 pub id: u16, #[serde(default = "default_created_at")]
330 pub created_at: DateTime<Utc>,
331 #[serde(default = "default_state")]
332 pub state: SchemaElementState,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339 pub id: u32,
341 pub src_labels: Vec<String>,
342 pub dst_labels: Vec<String>,
343 #[serde(default = "default_state")]
344 pub state: SchemaElementState,
345 #[serde(default, skip_serializing_if = "Option::is_none")]
346 pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352 Unique { properties: Vec<String> },
353 Exists { property: String },
354 Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360 Label(String),
361 EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366 pub name: String,
367 pub constraint_type: ConstraintType,
368 pub target: ConstraintTarget,
369 pub enabled: bool,
370}
371
372#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379 name_to_id: HashMap<String, u32>,
380 id_to_name: HashMap<u32, String>,
381 next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386 pub fn new() -> Self {
387 Self {
388 name_to_id: HashMap::new(),
389 id_to_name: HashMap::new(),
390 next_local_id: 1,
391 }
392 }
393
394 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396 if let Some(&id) = self.name_to_id.get(type_name) {
397 return id;
398 }
399
400 let id = make_schemaless_id(self.next_local_id);
401 self.next_local_id += 1;
402
403 self.name_to_id.insert(type_name.to_string(), id);
404 self.id_to_name.insert(id, type_name.to_string());
405
406 id
407 }
408
409 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411 self.id_to_name.get(&type_id).map(String::as_str)
412 }
413
414 pub fn contains(&self, type_name: &str) -> bool {
416 self.name_to_id.contains_key(type_name)
417 }
418
419 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
421 self.name_to_id.get(type_name).copied()
422 }
423
424 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
426 self.name_to_id
427 .iter()
428 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
429 .map(|(_, &id)| id)
430 }
431
432 pub fn all_type_ids(&self) -> Vec<u32> {
434 self.id_to_name.keys().copied().collect()
435 }
436
437 pub fn is_empty(&self) -> bool {
439 self.name_to_id.is_empty()
440 }
441}
442
443impl Default for SchemalessEdgeTypeRegistry {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
455pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
457
458#[inline]
460pub fn is_virtual_label_id(id: u16) -> bool {
461 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct Schema {
466 pub schema_version: u32,
467 pub labels: HashMap<String, LabelMeta>,
468 pub edge_types: HashMap<String, EdgeTypeMeta>,
469 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
470 #[serde(default)]
471 pub indexes: Vec<IndexDefinition>,
472 #[serde(default)]
473 pub constraints: Vec<Constraint>,
474 #[serde(default)]
476 pub schemaless_registry: SchemalessEdgeTypeRegistry,
477}
478
479impl Default for Schema {
480 fn default() -> Self {
481 Self {
482 schema_version: 1,
483 labels: HashMap::new(),
484 edge_types: HashMap::new(),
485 properties: HashMap::new(),
486 indexes: Vec::new(),
487 constraints: Vec::new(),
488 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
489 }
490 }
491}
492
493impl Schema {
494 fn bump_version(&mut self) {
503 self.schema_version = self.schema_version.wrapping_add(1);
504 }
505
506 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
511 self.labels
512 .iter()
513 .find(|(_, meta)| meta.id == label_id)
514 .map(|(name, _)| name.as_str())
515 }
516
517 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
519 self.labels.get(label_name).map(|meta| meta.id)
520 }
521
522 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
527 self.edge_types
528 .iter()
529 .find(|(_, meta)| meta.id == type_id)
530 .map(|(name, _)| name.as_str())
531 }
532
533 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
535 self.edge_types.get(type_name).map(|meta| meta.id)
536 }
537
538 pub fn vector_index_for_property(
543 &self,
544 label: &str,
545 property: &str,
546 ) -> Option<&VectorIndexConfig> {
547 self.indexes.iter().find_map(|idx| {
548 if let IndexDefinition::Vector(config) = idx
549 && config.label == label
550 && config.property == property
551 && config.metadata.status == IndexStatus::Online
552 {
553 return Some(config);
554 }
555 None
556 })
557 }
558
559 pub fn fulltext_index_for_property(
564 &self,
565 label: &str,
566 property: &str,
567 ) -> Option<&FullTextIndexConfig> {
568 self.indexes.iter().find_map(|idx| {
569 if let IndexDefinition::FullText(config) = idx
570 && config.label == label
571 && config.properties.iter().any(|p| p == property)
572 && config.metadata.status == IndexStatus::Online
573 {
574 return Some(config);
575 }
576 None
577 })
578 }
579
580 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
585 self.labels
586 .iter()
587 .find(|(k, _)| k.eq_ignore_ascii_case(name))
588 .map(|(_, v)| v)
589 }
590
591 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
598 self.labels
599 .iter()
600 .find(|(k, _)| k.eq_ignore_ascii_case(name))
601 .map(|(k, _)| k.clone())
602 }
603
604 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
606 self.get_label_case_insensitive(label_name)
607 .map(|meta| meta.id)
608 }
609
610 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
615 self.edge_types
616 .iter()
617 .find(|(k, _)| k.eq_ignore_ascii_case(name))
618 .map(|(_, v)| v)
619 }
620
621 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
623 self.get_edge_type_case_insensitive(type_name)
624 .map(|meta| meta.id)
625 }
626
627 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
630 self.edge_type_id_by_name_case_insensitive(type_name)
631 .or_else(|| {
632 self.schemaless_registry
633 .id_by_name_case_insensitive(type_name)
634 })
635 }
636
637 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
643 if let Some(id) = self.edge_type_id_unified(type_name) {
644 return id;
645 }
646 self.schemaless_registry.get_or_assign_id(type_name)
647 }
648
649 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
656 self.edge_type_id_by_name(type_name)
657 .or_else(|| self.schemaless_registry.id_by_name(type_name))
658 }
659
660 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
664 if is_schemaless_edge_type(type_id) {
665 self.schemaless_registry
666 .type_name_by_id(type_id)
667 .map(str::to_owned)
668 } else {
669 self.edge_type_name_by_id(type_id).map(str::to_owned)
670 }
671 }
672
673 pub fn all_edge_type_ids(&self) -> Vec<u32> {
676 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
677 ids.extend(self.schemaless_registry.all_type_ids());
678 ids.sort_unstable();
679 ids
680 }
681}
682
683#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
685pub enum IndexStatus {
686 #[default]
688 Online,
689 Building,
691 Stale,
693 Failed,
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
699pub struct IndexMetadata {
700 #[serde(default)]
702 pub status: IndexStatus,
703 #[serde(default)]
705 pub last_built_at: Option<DateTime<Utc>>,
706 #[serde(default)]
708 pub row_count_at_build: Option<u64>,
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
712#[serde(tag = "type")]
713#[non_exhaustive]
714pub enum IndexDefinition {
715 Vector(VectorIndexConfig),
716 FullText(FullTextIndexConfig),
717 Scalar(ScalarIndexConfig),
718 Inverted(InvertedIndexConfig),
719 JsonFullText(JsonFtsIndexConfig),
720}
721
722impl IndexDefinition {
723 pub fn name(&self) -> &str {
725 match self {
726 IndexDefinition::Vector(c) => &c.name,
727 IndexDefinition::FullText(c) => &c.name,
728 IndexDefinition::Scalar(c) => &c.name,
729 IndexDefinition::Inverted(c) => &c.name,
730 IndexDefinition::JsonFullText(c) => &c.name,
731 }
732 }
733
734 pub fn label(&self) -> &str {
736 match self {
737 IndexDefinition::Vector(c) => &c.label,
738 IndexDefinition::FullText(c) => &c.label,
739 IndexDefinition::Scalar(c) => &c.label,
740 IndexDefinition::Inverted(c) => &c.label,
741 IndexDefinition::JsonFullText(c) => &c.label,
742 }
743 }
744
745 pub fn metadata(&self) -> &IndexMetadata {
747 match self {
748 IndexDefinition::Vector(c) => &c.metadata,
749 IndexDefinition::FullText(c) => &c.metadata,
750 IndexDefinition::Scalar(c) => &c.metadata,
751 IndexDefinition::Inverted(c) => &c.metadata,
752 IndexDefinition::JsonFullText(c) => &c.metadata,
753 }
754 }
755
756 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
758 match self {
759 IndexDefinition::Vector(c) => &mut c.metadata,
760 IndexDefinition::FullText(c) => &mut c.metadata,
761 IndexDefinition::Scalar(c) => &mut c.metadata,
762 IndexDefinition::Inverted(c) => &mut c.metadata,
763 IndexDefinition::JsonFullText(c) => &mut c.metadata,
764 }
765 }
766}
767
768#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
769pub struct InvertedIndexConfig {
770 pub name: String,
771 pub label: String,
772 pub property: String,
773 #[serde(default = "default_normalize")]
774 pub normalize: bool,
775 #[serde(default = "default_max_terms_per_doc")]
776 pub max_terms_per_doc: usize,
777 #[serde(default)]
778 pub metadata: IndexMetadata,
779}
780
781fn default_normalize() -> bool {
782 true
783}
784
785fn default_max_terms_per_doc() -> usize {
786 10_000
787}
788
789#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
790pub struct VectorIndexConfig {
791 pub name: String,
792 pub label: String,
793 pub property: String,
794 pub index_type: VectorIndexType,
795 pub metric: DistanceMetric,
796 pub embedding_config: Option<EmbeddingConfig>,
797 #[serde(default)]
798 pub metadata: IndexMetadata,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
802pub struct EmbeddingConfig {
803 pub alias: String,
805 pub source_properties: Vec<String>,
806 pub batch_size: usize,
807 #[serde(default)]
810 pub document_prefix: Option<String>,
811 #[serde(default)]
814 pub query_prefix: Option<String>,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
818#[non_exhaustive]
819pub enum VectorIndexType {
820 Flat,
821 IvfFlat {
822 num_partitions: u32,
823 },
824 IvfPq {
825 num_partitions: u32,
826 num_sub_vectors: u32,
827 bits_per_subvector: u8,
828 },
829 IvfSq {
830 num_partitions: u32,
831 },
832 IvfRq {
833 num_partitions: u32,
834 #[serde(default)]
835 num_bits: Option<u8>,
836 },
837 HnswFlat {
838 m: u32,
839 ef_construction: u32,
840 #[serde(default)]
841 num_partitions: Option<u32>,
842 },
843 HnswSq {
844 m: u32,
845 ef_construction: u32,
846 #[serde(default)]
847 num_partitions: Option<u32>,
848 },
849 HnswPq {
850 m: u32,
851 ef_construction: u32,
852 num_sub_vectors: u32,
853 #[serde(default)]
854 num_partitions: Option<u32>,
855 },
856}
857
858#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
859#[non_exhaustive]
860pub enum DistanceMetric {
861 Cosine,
862 L2,
863 Dot,
864}
865
866impl DistanceMetric {
867 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
879 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
880 match self {
881 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
882 DistanceMetric::Cosine => {
883 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
884 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
885 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
886 let denom = norm_a * norm_b;
887 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
888 }
889 DistanceMetric::Dot => {
890 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
891 -dot
892 }
893 }
894 }
895}
896
897#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
898pub struct FullTextIndexConfig {
899 pub name: String,
900 pub label: String,
901 pub properties: Vec<String>,
902 pub tokenizer: TokenizerConfig,
903 pub with_positions: bool,
904 #[serde(default)]
905 pub metadata: IndexMetadata,
906}
907
908#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
909#[non_exhaustive]
910pub enum TokenizerConfig {
911 Standard,
912 Whitespace,
913 Ngram { min: u8, max: u8 },
914 Custom { name: String },
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
918pub struct JsonFtsIndexConfig {
919 pub name: String,
920 pub label: String,
921 pub column: String,
922 #[serde(default)]
923 pub paths: Vec<String>,
924 #[serde(default)]
925 pub with_positions: bool,
926 #[serde(default)]
927 pub metadata: IndexMetadata,
928}
929
930#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
931pub struct ScalarIndexConfig {
932 pub name: String,
933 pub label: String,
934 pub properties: Vec<String>,
935 pub index_type: ScalarIndexType,
936 pub where_clause: Option<String>,
937 #[serde(default)]
938 pub metadata: IndexMetadata,
939}
940
941#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
942#[non_exhaustive]
943pub enum ScalarIndexType {
944 BTree,
945 Hash,
946 Bitmap,
947 LabelList,
948}
949
950pub struct SchemaManager {
951 store: Arc<dyn ObjectStore>,
952 path: ObjectStorePath,
953 schema: RwLock<Arc<Schema>>,
954}
955
956impl SchemaManager {
957 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
958 let path = path.as_ref();
959 let parent = path
960 .parent()
961 .ok_or_else(|| anyhow!("Invalid schema path"))?;
962 let filename = path
963 .file_name()
964 .ok_or_else(|| anyhow!("Invalid schema filename"))?
965 .to_str()
966 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
967
968 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
969 let obj_path = ObjectStorePath::from(filename);
970
971 Self::load_from_store(store, &obj_path).await
972 }
973
974 pub async fn load_from_store(
975 store: Arc<dyn ObjectStore>,
976 path: &ObjectStorePath,
977 ) -> Result<Self> {
978 match store.get(path).await {
979 Ok(result) => {
980 let bytes = result.bytes().await?;
981 let content = String::from_utf8(bytes.to_vec())?;
982 let mut schema: Schema = serde_json::from_str(&content)?;
983 let original_len = schema.indexes.len();
991 if original_len > 0 {
992 let mut seen: std::collections::HashSet<String> =
993 std::collections::HashSet::with_capacity(original_len);
994 let mut dedup: Vec<IndexDefinition> = schema
995 .indexes
996 .iter()
997 .rev()
998 .filter(|idx| seen.insert(idx.name().to_string()))
999 .cloned()
1000 .collect();
1001 dedup.reverse();
1002 if dedup.len() != original_len {
1003 tracing::warn!(
1004 collapsed = original_len - dedup.len(),
1005 kept = dedup.len(),
1006 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1007 );
1008 schema.indexes = dedup;
1009 }
1010 }
1011 Ok(Self {
1012 store,
1013 path: path.clone(),
1014 schema: RwLock::new(Arc::new(schema)),
1015 })
1016 }
1017 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1018 store,
1019 path: path.clone(),
1020 schema: RwLock::new(Arc::new(Schema::default())),
1021 }),
1022 Err(e) => Err(anyhow::Error::from(e)),
1023 }
1024 }
1025
1026 pub async fn save(&self) -> Result<()> {
1027 let content = {
1028 let schema_guard = acquire_read(&self.schema, "schema")?;
1029 serde_json::to_string_pretty(&**schema_guard)?
1030 };
1031 self.store
1032 .put(&self.path, content.into())
1033 .await
1034 .map_err(anyhow::Error::from)?;
1035 Ok(())
1036 }
1037
1038 pub fn path(&self) -> &ObjectStorePath {
1039 &self.path
1040 }
1041
1042 pub fn schema(&self) -> Arc<Schema> {
1043 self.schema
1044 .read()
1045 .expect("Schema lock poisoned - a thread panicked while holding it")
1046 .clone()
1047 }
1048
1049 fn normalize_function_names(expr: &str) -> String {
1052 let mut result = String::with_capacity(expr.len());
1053 let mut chars = expr.chars().peekable();
1054
1055 while let Some(ch) = chars.next() {
1056 if ch.is_alphabetic() {
1057 let mut ident = String::new();
1059 ident.push(ch);
1060
1061 while let Some(&next) = chars.peek() {
1062 if next.is_alphanumeric() || next == '_' {
1063 ident.push(chars.next().unwrap());
1064 } else {
1065 break;
1066 }
1067 }
1068
1069 if chars.peek() == Some(&'(') {
1071 result.push_str(&ident.to_uppercase());
1072 } else {
1073 result.push_str(&ident); }
1075 } else {
1076 result.push(ch);
1077 }
1078 }
1079
1080 result
1081 }
1082
1083 pub fn generated_column_name(expr: &str) -> String {
1091 let normalized = Self::normalize_function_names(expr);
1093
1094 let sanitized = normalized
1095 .replace(|c: char| !c.is_alphanumeric(), "_")
1096 .trim_matches('_')
1097 .to_string();
1098
1099 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1101 const FNV_PRIME: u64 = 1099511628211;
1102
1103 let mut hash = FNV_OFFSET_BASIS;
1104 for byte in normalized.as_bytes() {
1105 hash ^= *byte as u64;
1106 hash = hash.wrapping_mul(FNV_PRIME);
1107 }
1108
1109 format!("_gen_{}_{:x}", sanitized, hash)
1110 }
1111
1112 pub fn replace_schema(&self, new_schema: Schema) {
1113 let mut schema = self
1114 .schema
1115 .write()
1116 .expect("Schema lock poisoned - a thread panicked while holding it");
1117 *schema = Arc::new(new_schema);
1118 }
1119
1120 #[must_use]
1133 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1134 let primary = self.schema();
1135 let merged = if overlay.is_empty() {
1136 (*primary).clone()
1137 } else {
1138 let mut merged = (*primary).clone();
1139 for (name, label) in &overlay.added_labels {
1140 merged.labels.insert(name.clone(), label.clone());
1141 }
1142 for (name, edge_type) in &overlay.added_edge_types {
1143 merged.edge_types.insert(name.clone(), edge_type.clone());
1144 }
1145 for addition in &overlay.added_properties {
1146 let props = merged.properties.entry(addition.owner.clone()).or_default();
1147 props.insert(
1148 addition.property.clone(),
1149 PropertyMeta {
1150 r#type: addition.data_type.clone(),
1151 nullable: addition.nullable,
1152 added_in: merged.schema_version,
1153 state: SchemaElementState::Active,
1154 generation_expression: None,
1155 description: None,
1156 },
1157 );
1158 }
1159 merged
1160 };
1161
1162 Arc::new(Self {
1163 store: self.store.clone(),
1164 path: self.path.clone(),
1165 schema: RwLock::new(Arc::new(merged)),
1166 })
1167 }
1168
1169 pub fn next_label_id(&self) -> u16 {
1170 self.schema()
1171 .labels
1172 .values()
1173 .map(|l| l.id)
1174 .max()
1175 .unwrap_or(0)
1176 + 1
1177 }
1178
1179 pub fn next_type_id(&self) -> u32 {
1180 let max_schema_id = self
1181 .schema()
1182 .edge_types
1183 .values()
1184 .map(|t| t.id)
1185 .max()
1186 .unwrap_or(0);
1187
1188 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1190 panic!("Schema edge type ID exhaustion");
1191 }
1192
1193 max_schema_id + 1
1194 }
1195
1196 pub fn add_label(&self, name: &str) -> Result<u16> {
1197 self.add_label_with_desc(name, None)
1198 }
1199
1200 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1201 let mut guard = acquire_write(&self.schema, "schema")?;
1202 let schema = Arc::make_mut(&mut *guard);
1203 if schema.labels.contains_key(name) {
1204 return Err(anyhow!("Label '{}' already exists", name));
1205 }
1206
1207 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1208 if id >= VIRTUAL_LABEL_ID_START {
1209 return Err(anyhow!(
1210 "Native label space exhausted (next id {id:#x} would enter the \
1211 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1212 reserved for catalog-resolved labels)"
1213 ));
1214 }
1215 schema.labels.insert(
1216 name.to_string(),
1217 LabelMeta {
1218 id,
1219 created_at: Utc::now(),
1220 state: SchemaElementState::Active,
1221 description,
1222 },
1223 );
1224 schema.bump_version();
1225 Ok(id)
1226 }
1227
1228 pub fn add_edge_type(
1229 &self,
1230 name: &str,
1231 src_labels: Vec<String>,
1232 dst_labels: Vec<String>,
1233 ) -> Result<u32> {
1234 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1235 }
1236
1237 pub fn add_edge_type_with_desc(
1238 &self,
1239 name: &str,
1240 src_labels: Vec<String>,
1241 dst_labels: Vec<String>,
1242 description: Option<String>,
1243 ) -> Result<u32> {
1244 let mut guard = acquire_write(&self.schema, "schema")?;
1245 let schema = Arc::make_mut(&mut *guard);
1246 if schema.edge_types.contains_key(name) {
1247 return Err(anyhow!("Edge type '{}' already exists", name));
1248 }
1249
1250 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1251
1252 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1257 return Err(anyhow!(
1258 "Native edge type space exhausted (next id {id:#x} would enter the \
1259 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1260 reserved for catalog-resolved edge types)"
1261 ));
1262 }
1263
1264 schema.edge_types.insert(
1265 name.to_string(),
1266 EdgeTypeMeta {
1267 id,
1268 src_labels,
1269 dst_labels,
1270 state: SchemaElementState::Active,
1271 description,
1272 },
1273 );
1274 schema.bump_version();
1275 Ok(id)
1276 }
1277
1278 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1287 {
1288 let guard = acquire_read(&self.schema, "schema")
1289 .expect("Schema lock poisoned - a thread panicked while holding it");
1290 if let Some(id) = guard.edge_type_id_unified(type_name) {
1291 return id;
1292 }
1293 }
1294 let mut guard = acquire_write(&self.schema, "schema")
1295 .expect("Schema lock poisoned - a thread panicked while holding it");
1296 let schema = Arc::make_mut(&mut *guard);
1297 schema.get_or_assign_edge_type_id(type_name)
1298 }
1299
1300 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1302 let schema = acquire_read(&self.schema, "schema")
1303 .expect("Schema lock poisoned - a thread panicked while holding it");
1304 schema.edge_type_name_by_id_unified(type_id)
1305 }
1306
1307 pub fn add_property(
1308 &self,
1309 label_or_type: &str,
1310 prop_name: &str,
1311 data_type: DataType,
1312 nullable: bool,
1313 ) -> Result<()> {
1314 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1315 }
1316
1317 pub fn add_property_with_desc(
1318 &self,
1319 label_or_type: &str,
1320 prop_name: &str,
1321 data_type: DataType,
1322 nullable: bool,
1323 description: Option<String>,
1324 ) -> Result<()> {
1325 validate_property_name(prop_name)?;
1326 let mut guard = acquire_write(&self.schema, "schema")?;
1327 let schema = Arc::make_mut(&mut *guard);
1328 let version = schema.schema_version;
1329 let props = schema
1330 .properties
1331 .entry(label_or_type.to_string())
1332 .or_default();
1333
1334 if props.contains_key(prop_name) {
1335 return Err(anyhow!(
1336 "Property '{}' already exists for '{}'",
1337 prop_name,
1338 label_or_type
1339 ));
1340 }
1341
1342 props.insert(
1343 prop_name.to_string(),
1344 PropertyMeta {
1345 r#type: data_type,
1346 nullable,
1347 added_in: version,
1348 state: SchemaElementState::Active,
1349 generation_expression: None,
1350 description,
1351 },
1352 );
1353 schema.bump_version();
1355 Ok(())
1356 }
1357
1358 pub fn add_generated_property(
1359 &self,
1360 label_or_type: &str,
1361 prop_name: &str,
1362 data_type: DataType,
1363 expr: String,
1364 ) -> Result<()> {
1365 validate_reserved_property_name(prop_name)?;
1368 let mut guard = acquire_write(&self.schema, "schema")?;
1369 let schema = Arc::make_mut(&mut *guard);
1370 let version = schema.schema_version;
1371 let props = schema
1372 .properties
1373 .entry(label_or_type.to_string())
1374 .or_default();
1375
1376 if props.contains_key(prop_name) {
1377 return Err(anyhow!("Property '{}' already exists", prop_name));
1378 }
1379
1380 props.insert(
1381 prop_name.to_string(),
1382 PropertyMeta {
1383 r#type: data_type,
1384 nullable: true,
1385 added_in: version,
1386 state: SchemaElementState::Active,
1387 generation_expression: Some(expr),
1388 description: None,
1389 },
1390 );
1391 schema.bump_version();
1393 Ok(())
1394 }
1395
1396 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1397 let mut guard = acquire_write(&self.schema, "schema")?;
1398 let schema = Arc::make_mut(&mut *guard);
1399 let meta = schema
1400 .labels
1401 .get_mut(name)
1402 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1403 meta.description = description;
1404 Ok(())
1405 }
1406
1407 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1408 let mut guard = acquire_write(&self.schema, "schema")?;
1409 let schema = Arc::make_mut(&mut *guard);
1410 let meta = schema
1411 .edge_types
1412 .get_mut(name)
1413 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1414 meta.description = description;
1415 Ok(())
1416 }
1417
1418 pub fn set_property_description(
1419 &self,
1420 entity: &str,
1421 prop_name: &str,
1422 description: Option<String>,
1423 ) -> Result<()> {
1424 let mut guard = acquire_write(&self.schema, "schema")?;
1425 let schema = Arc::make_mut(&mut *guard);
1426 let props = schema
1427 .properties
1428 .get_mut(entity)
1429 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1430 let meta = props
1431 .get_mut(prop_name)
1432 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1433 meta.description = description;
1434 Ok(())
1435 }
1436
1437 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1446 let mut guard = acquire_write(&self.schema, "schema")?;
1447 let schema = Arc::make_mut(&mut *guard);
1448 if let Some(existing) = schema
1449 .indexes
1450 .iter_mut()
1451 .find(|i| i.name() == index_def.name())
1452 {
1453 *existing = index_def;
1454 } else {
1455 schema.indexes.push(index_def);
1456 }
1457 schema.bump_version();
1458 Ok(())
1459 }
1460
1461 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1462 let schema = self.schema.read().expect("Schema lock poisoned");
1463 schema.indexes.iter().find(|i| i.name() == name).cloned()
1464 }
1465
1466 pub fn update_index_metadata(
1471 &self,
1472 index_name: &str,
1473 f: impl FnOnce(&mut IndexMetadata),
1474 ) -> Result<()> {
1475 let mut guard = acquire_write(&self.schema, "schema")?;
1476 let schema = Arc::make_mut(&mut *guard);
1477 let idx = schema
1478 .indexes
1479 .iter_mut()
1480 .find(|i| i.name() == index_name)
1481 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1482 f(idx.metadata_mut());
1483 Ok(())
1484 }
1485
1486 pub fn remove_index(&self, name: &str) -> Result<()> {
1487 let mut guard = acquire_write(&self.schema, "schema")?;
1488 let schema = Arc::make_mut(&mut *guard);
1489 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1490 schema.indexes.remove(pos);
1491 schema.bump_version();
1492 Ok(())
1493 } else {
1494 Err(anyhow!("Index '{}' not found", name))
1495 }
1496 }
1497
1498 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1499 let mut guard = acquire_write(&self.schema, "schema")?;
1500 let schema = Arc::make_mut(&mut *guard);
1501 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1502 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1503 }
1504 schema.constraints.push(constraint);
1505 schema.bump_version();
1506 Ok(())
1507 }
1508
1509 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1510 let mut guard = acquire_write(&self.schema, "schema")?;
1511 let schema = Arc::make_mut(&mut *guard);
1512 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1513 schema.constraints.remove(pos);
1514 schema.bump_version();
1515 Ok(())
1516 } else if if_exists {
1517 Ok(())
1518 } else {
1519 Err(anyhow!("Constraint '{}' not found", name))
1520 }
1521 }
1522
1523 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1524 let mut guard = acquire_write(&self.schema, "schema")?;
1525 let schema = Arc::make_mut(&mut *guard);
1526 let Some(props) = schema.properties.get_mut(label_or_type) else {
1527 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1528 };
1529 if props.remove(prop_name).is_none() {
1530 return Err(anyhow!(
1531 "Property '{}' not found for '{}'",
1532 prop_name,
1533 label_or_type
1534 ));
1535 }
1536 schema.bump_version();
1537 Ok(())
1538 }
1539
1540 pub fn rename_property(
1541 &self,
1542 label_or_type: &str,
1543 old_name: &str,
1544 new_name: &str,
1545 ) -> Result<()> {
1546 let mut guard = acquire_write(&self.schema, "schema")?;
1547 let schema = Arc::make_mut(&mut *guard);
1548 let Some(props) = schema.properties.get_mut(label_or_type) else {
1549 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1550 };
1551 let Some(meta) = props.remove(old_name) else {
1552 return Err(anyhow!(
1553 "Property '{}' not found for '{}'",
1554 old_name,
1555 label_or_type
1556 ));
1557 };
1558 if props.contains_key(new_name) {
1559 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1562 }
1563 props.insert(new_name.to_string(), meta);
1564 schema.bump_version();
1565 Ok(())
1566 }
1567
1568 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1569 let mut guard = acquire_write(&self.schema, "schema")?;
1570 let schema = Arc::make_mut(&mut *guard);
1571 if let Some(label_meta) = schema.labels.get_mut(name) {
1572 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1573 schema.bump_version();
1575 Ok(())
1576 } else if if_exists {
1577 Ok(())
1578 } else {
1579 Err(anyhow!("Label '{}' not found", name))
1580 }
1581 }
1582
1583 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1584 let mut guard = acquire_write(&self.schema, "schema")?;
1585 let schema = Arc::make_mut(&mut *guard);
1586 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1587 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1588 schema.bump_version();
1590 Ok(())
1591 } else if if_exists {
1592 Ok(())
1593 } else {
1594 Err(anyhow!("Edge Type '{}' not found", name))
1595 }
1596 }
1597}
1598
1599pub fn validate_identifier(name: &str) -> Result<()> {
1601 if name.is_empty() || name.len() > 64 {
1603 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1604 }
1605
1606 let first = name.chars().next().unwrap();
1608 if !first.is_alphabetic() && first != '_' {
1609 return Err(anyhow!(
1610 "Identifier '{}' must start with letter or underscore",
1611 name
1612 ));
1613 }
1614
1615 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1617 return Err(anyhow!(
1618 "Identifier '{}' must contain only alphanumeric and underscore",
1619 name
1620 ));
1621 }
1622
1623 const RESERVED: &[&str] = &[
1625 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1626 "UNION", "ORDER", "LIMIT",
1627 ];
1628 if RESERVED.contains(&name.to_uppercase().as_str()) {
1629 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1630 }
1631
1632 Ok(())
1633}
1634
1635pub fn validate_property_name(name: &str) -> Result<()> {
1642 if name.starts_with('_') {
1643 return Err(anyhow!(
1644 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1645 name
1646 ));
1647 }
1648 validate_reserved_property_name(name)
1649}
1650
1651fn validate_reserved_property_name(name: &str) -> Result<()> {
1658 const RESERVED_PROPS: &[&str] = &[
1667 "ext_id",
1668 "overflow_json",
1669 "eid",
1670 "src_vid",
1671 "dst_vid",
1672 "op",
1673 "__set_struct__",
1681 ];
1682 if RESERVED_PROPS.contains(&name) {
1683 return Err(anyhow!(
1684 "Property name '{}' is reserved by the storage layer; please choose a different name",
1685 name
1686 ));
1687 }
1688 Ok(())
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693 use super::*;
1694 use crate::value::{TemporalValue, Value};
1695 use object_store::local::LocalFileSystem;
1696 use tempfile::tempdir;
1697
1698 #[test]
1699 fn test_datatype_accepts_matrix() {
1700 let dt = || TemporalValue::DateTime {
1701 nanos_since_epoch: 0,
1702 offset_seconds: 0,
1703 timezone_name: None,
1704 };
1705
1706 for ty in [
1708 DataType::String,
1709 DataType::Int64,
1710 DataType::Bool,
1711 DataType::DateTime,
1712 DataType::Float64,
1713 ] {
1714 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1715 }
1716
1717 assert!(DataType::String.accepts(&Value::String("x".into())));
1719 assert!(DataType::Int64.accepts(&Value::Int(1)));
1720 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1721 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1722
1723 assert!(
1725 DataType::Float64.accepts(&Value::Int(3)),
1726 "Int widens to Float"
1727 );
1728 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1729 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1730 assert!(
1731 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1732 "storage parses strings for non-struct Timestamp columns"
1733 );
1734
1735 assert!(
1737 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1738 "String into a DateTime struct column nulls silently — reject here"
1739 );
1740 assert!(!DataType::Bool.accepts(&Value::Int(1)));
1741 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1742 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1743 assert!(
1744 !DataType::String.accepts(&Value::Int(10)),
1745 "no implicit stringification"
1746 );
1747 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1748
1749 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1751 }
1752
1753 #[tokio::test]
1754 async fn test_schema_management() -> Result<()> {
1755 let dir = tempdir()?;
1756 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1757 let path = ObjectStorePath::from("schema.json");
1758 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1759
1760 let lid = manager.add_label("Person")?;
1762 assert_eq!(lid, 1);
1763 assert!(manager.add_label("Person").is_err());
1764
1765 manager.add_property("Person", "name", DataType::String, false)?;
1767 assert!(
1768 manager
1769 .add_property("Person", "name", DataType::String, false)
1770 .is_err()
1771 );
1772
1773 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1775 assert_eq!(tid, 1);
1776
1777 manager.save().await?;
1778 assert!(store.get(&path).await.is_ok());
1780
1781 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1782 assert!(manager2.schema().labels.contains_key("Person"));
1783 assert!(
1784 manager2
1785 .schema()
1786 .properties
1787 .get("Person")
1788 .unwrap()
1789 .contains_key("name")
1790 );
1791
1792 Ok(())
1793 }
1794
1795 #[tokio::test]
1796 async fn test_reserved_property_names_rejected() -> Result<()> {
1797 let dir = tempdir()?;
1798 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1799 let path = ObjectStorePath::from("schema.json");
1800 let manager = SchemaManager::load_from_store(store, &path).await?;
1801
1802 manager.add_label("Tiny")?;
1803
1804 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1808 let err = manager
1809 .add_property("Tiny", reserved, DataType::String, true)
1810 .expect_err(&format!("expected '{reserved}' to be rejected"));
1811 assert!(
1812 err.to_string().contains("reserved"),
1813 "error for '{reserved}' should mention 'reserved', got: {err}"
1814 );
1815 }
1816
1817 let err = manager
1822 .add_property("Tiny", "__set_struct__", DataType::String, true)
1823 .expect_err("expected '__set_struct__' to be rejected");
1824 assert!(
1825 err.to_string().contains("reserved"),
1826 "__set_struct__ rejection should mention 'reserved', got: {err}"
1827 );
1828
1829 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1831 assert!(
1832 manager
1833 .add_property("Tiny", reserved, DataType::String, true)
1834 .is_err(),
1835 "expected '{reserved}' to be rejected"
1836 );
1837 }
1838
1839 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1842 manager.add_property("Tiny", "user_op", DataType::String, true)?;
1843 manager.add_property("Tiny", "type_name", DataType::String, true)?;
1844
1845 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1847 assert!(
1848 manager
1849 .add_property("knows", "src_vid", DataType::Int64, true)
1850 .is_err()
1851 );
1852
1853 assert!(
1855 manager
1856 .add_generated_property(
1857 "Tiny",
1858 "ext_id",
1859 DataType::String,
1860 "concat('x', name)".into()
1861 )
1862 .is_err()
1863 );
1864
1865 Ok(())
1866 }
1867
1868 #[test]
1869 fn test_normalize_function_names() {
1870 assert_eq!(
1871 SchemaManager::normalize_function_names("lower(email)"),
1872 "LOWER(email)"
1873 );
1874 assert_eq!(
1875 SchemaManager::normalize_function_names("LOWER(email)"),
1876 "LOWER(email)"
1877 );
1878 assert_eq!(
1879 SchemaManager::normalize_function_names("Lower(email)"),
1880 "LOWER(email)"
1881 );
1882 assert_eq!(
1883 SchemaManager::normalize_function_names("trim(lower(email))"),
1884 "TRIM(LOWER(email))"
1885 );
1886 }
1887
1888 #[test]
1889 fn test_generated_column_name_case_insensitive() {
1890 let col1 = SchemaManager::generated_column_name("lower(email)");
1891 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1892 let col3 = SchemaManager::generated_column_name("Lower(email)");
1893 assert_eq!(col1, col2);
1894 assert_eq!(col2, col3);
1895 assert!(col1.starts_with("_gen_LOWER_email_"));
1896 }
1897
1898 #[test]
1899 fn test_index_metadata_serde_backward_compat() {
1900 let json = r#"{
1902 "type": "Scalar",
1903 "name": "idx_person_name",
1904 "label": "Person",
1905 "properties": ["name"],
1906 "index_type": "BTree",
1907 "where_clause": null
1908 }"#;
1909 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1910 let meta = def.metadata();
1911 assert_eq!(meta.status, IndexStatus::Online);
1912 assert!(meta.last_built_at.is_none());
1913 assert!(meta.row_count_at_build.is_none());
1914 }
1915
1916 #[test]
1917 fn test_index_metadata_serde_roundtrip() {
1918 let now = Utc::now();
1919 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1920 name: "idx_test".to_string(),
1921 label: "Test".to_string(),
1922 properties: vec!["prop".to_string()],
1923 index_type: ScalarIndexType::BTree,
1924 where_clause: None,
1925 metadata: IndexMetadata {
1926 status: IndexStatus::Building,
1927 last_built_at: Some(now),
1928 row_count_at_build: Some(42),
1929 },
1930 });
1931
1932 let json = serde_json::to_string(&def).unwrap();
1933 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1934 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1935 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1936 assert!(parsed.metadata().last_built_at.is_some());
1937 }
1938
1939 #[tokio::test]
1940 async fn test_update_index_metadata() -> Result<()> {
1941 let dir = tempdir()?;
1942 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1943 let path = ObjectStorePath::from("schema.json");
1944 let manager = SchemaManager::load_from_store(store, &path).await?;
1945
1946 manager.add_label("Person")?;
1947 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1948 name: "idx_test".to_string(),
1949 label: "Person".to_string(),
1950 properties: vec!["name".to_string()],
1951 index_type: ScalarIndexType::BTree,
1952 where_clause: None,
1953 metadata: Default::default(),
1954 });
1955 manager.add_index(idx)?;
1956
1957 let initial = manager.get_index("idx_test").unwrap();
1959 assert_eq!(initial.metadata().status, IndexStatus::Online);
1960
1961 manager.update_index_metadata("idx_test", |m| {
1963 m.status = IndexStatus::Building;
1964 m.row_count_at_build = Some(100);
1965 })?;
1966
1967 let updated = manager.get_index("idx_test").unwrap();
1968 assert_eq!(updated.metadata().status, IndexStatus::Building);
1969 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1970
1971 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1973
1974 Ok(())
1975 }
1976
1977 #[tokio::test]
1982 async fn test_add_index_is_upsert_by_name() -> Result<()> {
1983 let dir = tempdir()?;
1984 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1985 let path = ObjectStorePath::from("schema.json");
1986 let manager = SchemaManager::load_from_store(store, &path).await?;
1987 manager.add_label("Person")?;
1988
1989 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1990 name: "idx_test".to_string(),
1991 label: "Person".to_string(),
1992 properties: vec!["name".to_string()],
1993 index_type: ScalarIndexType::BTree,
1994 where_clause: None,
1995 metadata: IndexMetadata {
1996 status: IndexStatus::Building,
1997 ..Default::default()
1998 },
1999 });
2000 manager.add_index(initial.clone())?;
2001 assert_eq!(manager.schema().indexes.len(), 1);
2002
2003 manager.add_index(initial.clone())?;
2005 assert_eq!(
2006 manager.schema().indexes.len(),
2007 1,
2008 "duplicate add_index by name must not append"
2009 );
2010
2011 let mut updated_cfg = match initial {
2013 IndexDefinition::Scalar(c) => c,
2014 _ => unreachable!(),
2015 };
2016 updated_cfg.metadata.status = IndexStatus::Online;
2017 updated_cfg.metadata.row_count_at_build = Some(42);
2018 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2019 assert_eq!(manager.schema().indexes.len(), 1);
2020 let stored = manager.get_index("idx_test").unwrap();
2021 assert_eq!(stored.metadata().status, IndexStatus::Online);
2022 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2023
2024 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2026 name: "idx_other".to_string(),
2027 label: "Person".to_string(),
2028 properties: vec!["age".to_string()],
2029 index_type: ScalarIndexType::BTree,
2030 where_clause: None,
2031 metadata: IndexMetadata::default(),
2032 });
2033 manager.add_index(other)?;
2034 assert_eq!(manager.schema().indexes.len(), 2);
2035
2036 Ok(())
2037 }
2038
2039 #[tokio::test]
2042 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2043 let dir = tempdir()?;
2044 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2045 let path = ObjectStorePath::from("schema.json");
2046
2047 let mut schema = Schema::default();
2051 schema.labels.insert(
2052 "Person".to_string(),
2053 LabelMeta {
2054 id: 1,
2055 created_at: chrono::Utc::now(),
2056 state: SchemaElementState::Active,
2057 description: None,
2058 },
2059 );
2060 let make = |status: IndexStatus, count: Option<u64>| {
2061 IndexDefinition::Scalar(ScalarIndexConfig {
2062 name: "idx_dup".to_string(),
2063 label: "Person".to_string(),
2064 properties: vec!["name".to_string()],
2065 index_type: ScalarIndexType::BTree,
2066 where_clause: None,
2067 metadata: IndexMetadata {
2068 status,
2069 row_count_at_build: count,
2070 ..Default::default()
2071 },
2072 })
2073 };
2074 for _ in 0..49 {
2075 schema.indexes.push(make(IndexStatus::Building, None));
2076 }
2077 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2078 let json = serde_json::to_string_pretty(&schema)?;
2079 store.put(&path, json.into()).await?;
2080
2081 let manager = SchemaManager::load_from_store(store, &path).await?;
2082 let schema = manager.schema();
2083 assert_eq!(
2084 schema.indexes.len(),
2085 1,
2086 "load() must collapse 50 duplicates by name to 1"
2087 );
2088 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2090 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2091
2092 Ok(())
2093 }
2094
2095 #[test]
2096 fn test_vector_index_for_property_skips_non_online() {
2097 let mut schema = Schema::default();
2098 schema.labels.insert(
2099 "Document".to_string(),
2100 LabelMeta {
2101 id: 1,
2102 created_at: chrono::Utc::now(),
2103 state: SchemaElementState::Active,
2104 description: None,
2105 },
2106 );
2107
2108 schema
2110 .indexes
2111 .push(IndexDefinition::Vector(VectorIndexConfig {
2112 name: "vec_doc_embedding".to_string(),
2113 label: "Document".to_string(),
2114 property: "embedding".to_string(),
2115 index_type: VectorIndexType::Flat,
2116 metric: DistanceMetric::Cosine,
2117 embedding_config: None,
2118 metadata: IndexMetadata {
2119 status: IndexStatus::Stale,
2120 ..Default::default()
2121 },
2122 }));
2123
2124 assert!(
2126 schema
2127 .vector_index_for_property("Document", "embedding")
2128 .is_none()
2129 );
2130
2131 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2133 cfg.metadata.status = IndexStatus::Online;
2134 }
2135 let result = schema.vector_index_for_property("Document", "embedding");
2136 assert!(result.is_some());
2137 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2138 }
2139
2140 #[tokio::test]
2141 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2142 use crate::core::fork::SchemaDelta;
2143
2144 let dir = tempdir()?;
2145 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2146 let path = ObjectStorePath::from("schema.json");
2147 let primary = SchemaManager::load_from_store(store, &path).await?;
2148 primary.add_label("Person")?;
2149
2150 let overlay = primary.with_overlay(&SchemaDelta::empty());
2151 assert_eq!(overlay.schema().labels.len(), 1);
2152
2153 overlay.add_label("Forked")?;
2156 assert!(overlay.schema().labels.contains_key("Forked"));
2157 assert!(!primary.schema().labels.contains_key("Forked"));
2158
2159 Ok(())
2160 }
2161
2162 #[tokio::test]
2163 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2164 use crate::core::fork::SchemaDelta;
2165 use chrono::Utc;
2166
2167 let dir = tempdir()?;
2168 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2169 let path = ObjectStorePath::from("schema.json");
2170 let primary = SchemaManager::load_from_store(store, &path).await?;
2171 primary.add_label("Existing")?;
2172
2173 let label_meta = LabelMeta {
2174 id: 99,
2175 created_at: Utc::now(),
2176 state: SchemaElementState::Active,
2177 description: None,
2178 };
2179 let edge_meta = EdgeTypeMeta {
2180 id: 99,
2181 src_labels: vec!["NewLabel".into()],
2182 dst_labels: vec!["NewLabel".into()],
2183 state: SchemaElementState::Active,
2184 description: None,
2185 };
2186 let delta = SchemaDelta {
2187 added_labels: vec![("NewLabel".to_string(), label_meta)],
2188 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2189 added_properties: vec![],
2190 };
2191
2192 let overlay = primary.with_overlay(&delta);
2193 let merged = overlay.schema();
2194 assert!(merged.labels.contains_key("Existing"));
2195 assert!(merged.labels.contains_key("NewLabel"));
2196 assert!(merged.edge_types.contains_key("NewEdge"));
2197
2198 assert!(!primary.schema().labels.contains_key("NewLabel"));
2200 Ok(())
2201 }
2202
2203 #[tokio::test]
2208 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2209 let dir = tempdir()?;
2210 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2211 let path = ObjectStorePath::from("schema.json");
2212 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2213
2214 let mut handles = Vec::new();
2215 for _ in 0..16 {
2216 let m = manager.clone();
2217 handles.push(std::thread::spawn(move || {
2218 m.get_or_assign_edge_type_id("RACED")
2219 }));
2220 }
2221 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2222 assert!(
2223 ids.iter().all(|&id| id == ids[0]),
2224 "all racers must observe one id, got {ids:?}"
2225 );
2226 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2228
2229 manager.add_label("A")?;
2231 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2232 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2233 Ok(())
2234 }
2235}