1use crate::core::edge_type::{
5 MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6 is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23 Active,
24 Hidden {
25 since: DateTime<Utc>,
26 last_active_snapshot: String, },
28 Tombstone {
29 since: DateTime<Utc>,
30 },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35pub fn datetime_struct_fields() -> Fields {
42 Fields::from(vec![
43 Field::new(
44 "nanos_since_epoch",
45 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46 true,
47 ),
48 Field::new("offset_seconds", ArrowDataType::Int32, true),
49 Field::new("timezone_name", ArrowDataType::Utf8, true),
50 ])
51}
52
53pub fn time_struct_fields() -> Fields {
59 Fields::from(vec![
60 Field::new(
61 "nanos_since_midnight",
62 ArrowDataType::Time64(TimeUnit::Nanosecond),
63 true,
64 ),
65 Field::new("offset_seconds", ArrowDataType::Int32, true),
66 ])
67}
68
69pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
87 HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91#[non_exhaustive]
92pub enum CrdtType {
93 GCounter,
94 GSet,
95 ORSet,
96 LWWRegister,
97 LWWMap,
98 Rga,
99 VectorClock,
100 VCRegister,
101}
102
103impl CrdtType {
104 #[must_use]
116 pub fn type_name(&self) -> &'static str {
117 match self {
118 CrdtType::GCounter => "GCounter",
119 CrdtType::GSet => "GSet",
120 CrdtType::ORSet => "ORSet",
121 CrdtType::LWWRegister => "LWWRegister",
122 CrdtType::LWWMap => "LWWMap",
123 CrdtType::Rga => "Rga",
124 CrdtType::VectorClock => "VectorClock",
125 CrdtType::VCRegister => "VCRegister",
126 }
127 }
128}
129
130#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PointType {
132 Geographic, Cartesian2D, Cartesian3D, }
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138#[non_exhaustive]
139pub enum DataType {
140 String,
141 Int32,
142 Int64,
143 Float32,
144 Float64,
145 Bool,
146 Timestamp,
147 Date,
148 Time,
149 DateTime,
150 Duration,
151 CypherValue,
152 Bytes,
153 Point(PointType),
154 Vector { dimensions: usize },
155 Btic,
156 Crdt(CrdtType),
157 List(Box<DataType>),
158 Map(Box<DataType>, Box<DataType>),
159}
160
161impl DataType {
162 #[allow(non_upper_case_globals)]
164 pub const Float: DataType = DataType::Float64;
165 #[allow(non_upper_case_globals)]
166 pub const Int: DataType = DataType::Int64;
167
168 pub fn to_arrow(&self) -> ArrowDataType {
169 match self {
170 DataType::String => ArrowDataType::Utf8,
171 DataType::Int32 => ArrowDataType::Int32,
172 DataType::Int64 => ArrowDataType::Int64,
173 DataType::Float32 => ArrowDataType::Float32,
174 DataType::Float64 => ArrowDataType::Float64,
175 DataType::Bool => ArrowDataType::Boolean,
176 DataType::Timestamp => {
177 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
178 }
179 DataType::Date => ArrowDataType::Date32,
180 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
181 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
182 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
186 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
187 Field::new("latitude", ArrowDataType::Float64, false),
188 Field::new("longitude", ArrowDataType::Float64, false),
189 Field::new("crs", ArrowDataType::Utf8, false),
190 ])),
191 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
192 Field::new("x", ArrowDataType::Float64, false),
193 Field::new("y", ArrowDataType::Float64, false),
194 Field::new("crs", ArrowDataType::Utf8, false),
195 ])),
196 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
197 Field::new("x", ArrowDataType::Float64, false),
198 Field::new("y", ArrowDataType::Float64, false),
199 Field::new("z", ArrowDataType::Float64, false),
200 Field::new("crs", ArrowDataType::Utf8, false),
201 ])),
202 },
203 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
204 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
205 *dimensions as i32,
206 ),
207 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
208 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
210 let item = Field::new("item", inner.to_arrow(), true);
214 let item = if matches!(**inner, DataType::Bytes) {
215 item.with_metadata(raw_bytes_field_metadata())
216 } else {
217 item
218 };
219 ArrowDataType::List(Arc::new(item))
220 }
221 DataType::Map(key, value) => {
222 let value_field = if value.map_value_is_typed() {
229 let f = Field::new("value", value.to_arrow(), true);
230 if matches!(**value, DataType::Bytes) {
231 f.with_metadata(raw_bytes_field_metadata())
232 } else {
233 f
234 }
235 } else {
236 Field::new("value", ArrowDataType::LargeBinary, true)
237 };
238 ArrowDataType::List(Arc::new(Field::new(
239 "item",
240 ArrowDataType::Struct(Fields::from(vec![
241 Field::new("key", key.to_arrow(), false),
242 value_field,
243 ])),
244 true,
245 )))
246 }
247 }
248 }
249
250 pub fn map_value_is_typed(&self) -> bool {
255 matches!(
256 self,
257 DataType::String
258 | DataType::Int64
259 | DataType::Int32
260 | DataType::Float64
261 | DataType::Float32
262 | DataType::Bool
263 | DataType::Bytes
264 )
265 }
266
267 pub fn accepts(&self, value: &crate::value::Value) -> bool {
293 use crate::value::{TemporalValue, Value};
294
295 if matches!(value, Value::Null) {
297 return true;
298 }
299
300 match self {
301 DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
303
304 DataType::String => matches!(value, Value::String(_)),
305 DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
306 DataType::Float32 | DataType::Float64 => {
308 matches!(value, Value::Int(_) | Value::Float(_))
309 }
310 DataType::Bool => matches!(value, Value::Bool(_)),
311
312 DataType::Timestamp => matches!(
315 value,
316 Value::String(_)
317 | Value::Int(_)
318 | Value::Temporal(
319 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
320 )
321 ),
322 DataType::DateTime => matches!(
323 value,
324 Value::Temporal(
325 TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
326 )
327 ),
328 DataType::Date => {
329 matches!(
330 value,
331 Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
332 )
333 }
334 DataType::Time => matches!(
335 value,
336 Value::Int(_)
337 | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
338 ),
339 DataType::Duration => {
340 matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
341 }
342 DataType::Bytes => matches!(value, Value::Bytes(_)),
343 DataType::Btic => matches!(
345 value,
346 Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
347 ),
348 DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
349 DataType::List(_) => matches!(value, Value::List(_)),
350 DataType::Map(_, _) => matches!(value, Value::Map(_)),
351 }
352 }
353}
354
355fn default_created_at() -> DateTime<Utc> {
356 Utc::now()
357}
358
359fn default_state() -> SchemaElementState {
360 SchemaElementState::Active
361}
362
363fn default_version_1() -> u32 {
364 1
365}
366
367#[derive(Clone, Debug, Serialize, Deserialize)]
368pub struct PropertyMeta {
369 pub r#type: DataType,
370 pub nullable: bool,
371 #[serde(default = "default_version_1")]
372 pub added_in: u32, #[serde(default = "default_state")]
374 pub state: SchemaElementState,
375 #[serde(default)]
376 pub generation_expression: Option<String>,
377 #[serde(default, skip_serializing_if = "Option::is_none")]
378 pub description: Option<String>,
379}
380
381#[derive(Clone, Debug, Serialize, Deserialize)]
382pub struct LabelMeta {
383 pub id: u16, #[serde(default = "default_created_at")]
385 pub created_at: DateTime<Utc>,
386 #[serde(default = "default_state")]
387 pub state: SchemaElementState,
388 #[serde(default, skip_serializing_if = "Option::is_none")]
389 pub description: Option<String>,
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub struct EdgeTypeMeta {
394 pub id: u32,
396 pub src_labels: Vec<String>,
397 pub dst_labels: Vec<String>,
398 #[serde(default = "default_state")]
399 pub state: SchemaElementState,
400 #[serde(default, skip_serializing_if = "Option::is_none")]
401 pub description: Option<String>,
402}
403
404#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
405#[non_exhaustive]
406pub enum ConstraintType {
407 Unique { properties: Vec<String> },
408 Exists { property: String },
409 Check { expression: String },
410}
411
412#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
413#[non_exhaustive]
414pub enum ConstraintTarget {
415 Label(String),
416 EdgeType(String),
417}
418
419#[derive(Clone, Debug, Serialize, Deserialize)]
420pub struct Constraint {
421 pub name: String,
422 pub constraint_type: ConstraintType,
423 pub target: ConstraintTarget,
424 pub enabled: bool,
425}
426
427#[derive(Clone, Debug, Serialize, Deserialize)]
433pub struct SchemalessEdgeTypeRegistry {
434 name_to_id: HashMap<String, u32>,
435 id_to_name: HashMap<u32, String>,
436 next_local_id: u32,
438}
439
440impl SchemalessEdgeTypeRegistry {
441 pub fn new() -> Self {
442 Self {
443 name_to_id: HashMap::new(),
444 id_to_name: HashMap::new(),
445 next_local_id: 1,
446 }
447 }
448
449 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
451 if let Some(&id) = self.name_to_id.get(type_name) {
452 return id;
453 }
454
455 let id = make_schemaless_id(self.next_local_id);
456 self.next_local_id += 1;
457
458 self.name_to_id.insert(type_name.to_string(), id);
459 self.id_to_name.insert(id, type_name.to_string());
460
461 id
462 }
463
464 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
466 self.id_to_name.get(&type_id).map(String::as_str)
467 }
468
469 pub fn contains(&self, type_name: &str) -> bool {
471 self.name_to_id.contains_key(type_name)
472 }
473
474 pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
476 self.name_to_id.get(type_name).copied()
477 }
478
479 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
481 self.name_to_id
482 .iter()
483 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
484 .map(|(_, &id)| id)
485 }
486
487 pub fn all_type_ids(&self) -> Vec<u32> {
489 self.id_to_name.keys().copied().collect()
490 }
491
492 pub fn is_empty(&self) -> bool {
494 self.name_to_id.is_empty()
495 }
496}
497
498impl Default for SchemalessEdgeTypeRegistry {
499 fn default() -> Self {
500 Self::new()
501 }
502}
503
504pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
510pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
512
513const MAX_SCHEMA_NAME_LEN: usize = 255;
518
519#[inline]
521pub fn is_virtual_label_id(id: u16) -> bool {
522 (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
526pub struct Schema {
527 pub schema_version: u32,
528 pub labels: HashMap<String, LabelMeta>,
529 pub edge_types: HashMap<String, EdgeTypeMeta>,
530 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
531 #[serde(default)]
532 pub indexes: Vec<IndexDefinition>,
533 #[serde(default)]
534 pub constraints: Vec<Constraint>,
535 #[serde(default)]
537 pub schemaless_registry: SchemalessEdgeTypeRegistry,
538}
539
540impl Default for Schema {
541 fn default() -> Self {
542 Self {
543 schema_version: 1,
544 labels: HashMap::new(),
545 edge_types: HashMap::new(),
546 properties: HashMap::new(),
547 indexes: Vec::new(),
548 constraints: Vec::new(),
549 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
550 }
551 }
552}
553
554impl Schema {
555 fn bump_version(&mut self) {
564 self.schema_version = self.schema_version.wrapping_add(1);
565 }
566
567 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
572 self.labels
573 .iter()
574 .find(|(_, meta)| meta.id == label_id)
575 .map(|(name, _)| name.as_str())
576 }
577
578 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
580 self.labels.get(label_name).map(|meta| meta.id)
581 }
582
583 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
588 self.edge_types
589 .iter()
590 .find(|(_, meta)| meta.id == type_id)
591 .map(|(name, _)| name.as_str())
592 }
593
594 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
596 self.edge_types.get(type_name).map(|meta| meta.id)
597 }
598
599 pub fn vector_index_for_property(
604 &self,
605 label: &str,
606 property: &str,
607 ) -> Option<&VectorIndexConfig> {
608 self.indexes.iter().find_map(|idx| {
609 if let IndexDefinition::Vector(config) = idx
610 && config.label == label
611 && config.property == property
612 && config.metadata.status == IndexStatus::Online
613 {
614 return Some(config);
615 }
616 None
617 })
618 }
619
620 pub fn fulltext_index_for_property(
625 &self,
626 label: &str,
627 property: &str,
628 ) -> Option<&FullTextIndexConfig> {
629 self.indexes.iter().find_map(|idx| {
630 if let IndexDefinition::FullText(config) = idx
631 && config.label == label
632 && config.properties.iter().any(|p| p == property)
633 && config.metadata.status == IndexStatus::Online
634 {
635 return Some(config);
636 }
637 None
638 })
639 }
640
641 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
646 self.labels
647 .iter()
648 .find(|(k, _)| k.eq_ignore_ascii_case(name))
649 .map(|(_, v)| v)
650 }
651
652 pub fn canonical_label_name(&self, name: &str) -> Option<String> {
659 self.labels
660 .iter()
661 .find(|(k, _)| k.eq_ignore_ascii_case(name))
662 .map(|(k, _)| k.clone())
663 }
664
665 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
667 self.get_label_case_insensitive(label_name)
668 .map(|meta| meta.id)
669 }
670
671 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
676 self.edge_types
677 .iter()
678 .find(|(k, _)| k.eq_ignore_ascii_case(name))
679 .map(|(_, v)| v)
680 }
681
682 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
684 self.get_edge_type_case_insensitive(type_name)
685 .map(|meta| meta.id)
686 }
687
688 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
691 self.edge_type_id_by_name_case_insensitive(type_name)
692 .or_else(|| {
693 self.schemaless_registry
694 .id_by_name_case_insensitive(type_name)
695 })
696 }
697
698 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
704 if let Some(id) = self.edge_type_id_unified(type_name) {
705 return id;
706 }
707 let id = self.schemaless_registry.get_or_assign_id(type_name);
715 self.bump_version();
716 id
717 }
718
719 pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
726 self.edge_type_id_by_name(type_name)
727 .or_else(|| self.schemaless_registry.id_by_name(type_name))
728 }
729
730 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
734 if is_schemaless_edge_type(type_id) {
735 self.schemaless_registry
736 .type_name_by_id(type_id)
737 .map(str::to_owned)
738 } else {
739 self.edge_type_name_by_id(type_id).map(str::to_owned)
740 }
741 }
742
743 pub fn all_edge_type_ids(&self) -> Vec<u32> {
746 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
747 ids.extend(self.schemaless_registry.all_type_ids());
748 ids.sort_unstable();
749 ids
750 }
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
755pub enum IndexStatus {
756 #[default]
758 Online,
759 Building,
761 Stale,
763 Failed,
765}
766
767#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
769pub struct IndexMetadata {
770 #[serde(default)]
772 pub status: IndexStatus,
773 #[serde(default)]
775 pub last_built_at: Option<DateTime<Utc>>,
776 #[serde(default)]
778 pub row_count_at_build: Option<u64>,
779}
780
781#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
782#[serde(tag = "type")]
783#[non_exhaustive]
784pub enum IndexDefinition {
785 Vector(VectorIndexConfig),
786 FullText(FullTextIndexConfig),
787 Scalar(ScalarIndexConfig),
788 Inverted(InvertedIndexConfig),
789 JsonFullText(JsonFtsIndexConfig),
790}
791
792impl IndexDefinition {
793 pub fn name(&self) -> &str {
795 match self {
796 IndexDefinition::Vector(c) => &c.name,
797 IndexDefinition::FullText(c) => &c.name,
798 IndexDefinition::Scalar(c) => &c.name,
799 IndexDefinition::Inverted(c) => &c.name,
800 IndexDefinition::JsonFullText(c) => &c.name,
801 }
802 }
803
804 pub fn label(&self) -> &str {
806 match self {
807 IndexDefinition::Vector(c) => &c.label,
808 IndexDefinition::FullText(c) => &c.label,
809 IndexDefinition::Scalar(c) => &c.label,
810 IndexDefinition::Inverted(c) => &c.label,
811 IndexDefinition::JsonFullText(c) => &c.label,
812 }
813 }
814
815 pub fn metadata(&self) -> &IndexMetadata {
817 match self {
818 IndexDefinition::Vector(c) => &c.metadata,
819 IndexDefinition::FullText(c) => &c.metadata,
820 IndexDefinition::Scalar(c) => &c.metadata,
821 IndexDefinition::Inverted(c) => &c.metadata,
822 IndexDefinition::JsonFullText(c) => &c.metadata,
823 }
824 }
825
826 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
828 match self {
829 IndexDefinition::Vector(c) => &mut c.metadata,
830 IndexDefinition::FullText(c) => &mut c.metadata,
831 IndexDefinition::Scalar(c) => &mut c.metadata,
832 IndexDefinition::Inverted(c) => &mut c.metadata,
833 IndexDefinition::JsonFullText(c) => &mut c.metadata,
834 }
835 }
836}
837
838#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
839pub struct InvertedIndexConfig {
840 pub name: String,
841 pub label: String,
842 pub property: String,
843 #[serde(default = "default_normalize")]
844 pub normalize: bool,
845 #[serde(default = "default_max_terms_per_doc")]
846 pub max_terms_per_doc: usize,
847 #[serde(default)]
848 pub metadata: IndexMetadata,
849}
850
851fn default_normalize() -> bool {
852 true
853}
854
855fn default_max_terms_per_doc() -> usize {
856 10_000
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
860pub struct VectorIndexConfig {
861 pub name: String,
862 pub label: String,
863 pub property: String,
864 pub index_type: VectorIndexType,
865 pub metric: DistanceMetric,
866 pub embedding_config: Option<EmbeddingConfig>,
867 #[serde(default)]
868 pub metadata: IndexMetadata,
869}
870
871#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
872pub struct EmbeddingConfig {
873 pub alias: String,
875 pub source_properties: Vec<String>,
876 pub batch_size: usize,
877 #[serde(default)]
880 pub document_prefix: Option<String>,
881 #[serde(default)]
884 pub query_prefix: Option<String>,
885}
886
887#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
888#[non_exhaustive]
889pub enum VectorIndexType {
890 Flat,
891 IvfFlat {
892 num_partitions: u32,
893 },
894 IvfPq {
895 num_partitions: u32,
896 num_sub_vectors: u32,
897 bits_per_subvector: u8,
898 },
899 IvfSq {
900 num_partitions: u32,
901 },
902 IvfRq {
903 num_partitions: u32,
904 #[serde(default)]
905 num_bits: Option<u8>,
906 },
907 HnswFlat {
908 m: u32,
909 ef_construction: u32,
910 #[serde(default)]
911 num_partitions: Option<u32>,
912 },
913 HnswSq {
914 m: u32,
915 ef_construction: u32,
916 #[serde(default)]
917 num_partitions: Option<u32>,
918 },
919 HnswPq {
920 m: u32,
921 ef_construction: u32,
922 num_sub_vectors: u32,
923 #[serde(default)]
924 num_partitions: Option<u32>,
925 },
926 Muvera {
933 k_sim: u32,
935 reps: u32,
937 d_proj: u32,
939 seed: u64,
941 inner: Box<VectorIndexType>,
943 },
944}
945
946#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
947#[non_exhaustive]
948pub enum DistanceMetric {
949 Cosine,
950 L2,
951 Dot,
952}
953
954impl DistanceMetric {
955 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
967 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
968 match self {
969 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
970 DistanceMetric::Cosine => {
971 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
972 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
973 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
974 let denom = norm_a * norm_b;
975 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
976 }
977 DistanceMetric::Dot => {
978 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
979 -dot
980 }
981 }
982 }
983}
984
985#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
986pub struct FullTextIndexConfig {
987 pub name: String,
988 pub label: String,
989 pub properties: Vec<String>,
990 pub tokenizer: TokenizerConfig,
991 pub with_positions: bool,
992 #[serde(default)]
993 pub metadata: IndexMetadata,
994}
995
996#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
997#[non_exhaustive]
998pub enum TokenizerConfig {
999 Standard,
1000 Whitespace,
1001 Ngram { min: u8, max: u8 },
1002 Custom { name: String },
1003}
1004
1005#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1006pub struct JsonFtsIndexConfig {
1007 pub name: String,
1008 pub label: String,
1009 pub column: String,
1010 #[serde(default)]
1011 pub paths: Vec<String>,
1012 #[serde(default)]
1013 pub with_positions: bool,
1014 #[serde(default)]
1015 pub metadata: IndexMetadata,
1016}
1017
1018#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1019pub struct ScalarIndexConfig {
1020 pub name: String,
1021 pub label: String,
1022 pub properties: Vec<String>,
1023 pub index_type: ScalarIndexType,
1024 pub where_clause: Option<String>,
1025 #[serde(default)]
1026 pub metadata: IndexMetadata,
1027}
1028
1029#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1030#[non_exhaustive]
1031pub enum ScalarIndexType {
1032 BTree,
1033 Hash,
1034 Bitmap,
1035 LabelList,
1036}
1037
1038pub struct SchemaManager {
1039 store: Arc<dyn ObjectStore>,
1040 path: ObjectStorePath,
1041 schema: RwLock<Arc<Schema>>,
1042}
1043
1044impl SchemaManager {
1045 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1046 let path = path.as_ref();
1047 let parent = path
1048 .parent()
1049 .ok_or_else(|| anyhow!("Invalid schema path"))?;
1050 let filename = path
1051 .file_name()
1052 .ok_or_else(|| anyhow!("Invalid schema filename"))?
1053 .to_str()
1054 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1055
1056 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1057 let obj_path = ObjectStorePath::from(filename);
1058
1059 Self::load_from_store(store, &obj_path).await
1060 }
1061
1062 pub async fn load_from_store(
1063 store: Arc<dyn ObjectStore>,
1064 path: &ObjectStorePath,
1065 ) -> Result<Self> {
1066 match store.get(path).await {
1067 Ok(result) => {
1068 let bytes = result.bytes().await?;
1069 let content = String::from_utf8(bytes.to_vec())?;
1070 let mut schema: Schema = serde_json::from_str(&content)?;
1071 let original_len = schema.indexes.len();
1079 if original_len > 0 {
1080 let mut seen: std::collections::HashSet<String> =
1081 std::collections::HashSet::with_capacity(original_len);
1082 let mut dedup: Vec<IndexDefinition> = schema
1083 .indexes
1084 .iter()
1085 .rev()
1086 .filter(|idx| seen.insert(idx.name().to_string()))
1087 .cloned()
1088 .collect();
1089 dedup.reverse();
1090 if dedup.len() != original_len {
1091 tracing::warn!(
1092 collapsed = original_len - dedup.len(),
1093 kept = dedup.len(),
1094 "schema.indexes: collapsed duplicate entries on load (issue #63)"
1095 );
1096 schema.indexes = dedup;
1097 }
1098 }
1099 Ok(Self {
1100 store,
1101 path: path.clone(),
1102 schema: RwLock::new(Arc::new(schema)),
1103 })
1104 }
1105 Err(object_store::Error::NotFound { .. }) => Ok(Self {
1106 store,
1107 path: path.clone(),
1108 schema: RwLock::new(Arc::new(Schema::default())),
1109 }),
1110 Err(e) => Err(anyhow::Error::from(e)),
1111 }
1112 }
1113
1114 pub async fn save(&self) -> Result<()> {
1115 let content = {
1116 let schema_guard = acquire_read(&self.schema, "schema")?;
1117 serde_json::to_string_pretty(&**schema_guard)?
1118 };
1119 self.store
1120 .put(&self.path, content.into())
1121 .await
1122 .map_err(anyhow::Error::from)?;
1123 Ok(())
1124 }
1125
1126 pub fn path(&self) -> &ObjectStorePath {
1127 &self.path
1128 }
1129
1130 pub fn schema(&self) -> Arc<Schema> {
1131 self.schema
1132 .read()
1133 .expect("Schema lock poisoned - a thread panicked while holding it")
1134 .clone()
1135 }
1136
1137 fn normalize_function_names(expr: &str) -> String {
1140 let mut result = String::with_capacity(expr.len());
1141 let mut chars = expr.chars().peekable();
1142
1143 while let Some(ch) = chars.next() {
1144 if ch.is_alphabetic() {
1145 let mut ident = String::new();
1147 ident.push(ch);
1148
1149 while let Some(&next) = chars.peek() {
1150 if next.is_alphanumeric() || next == '_' {
1151 ident.push(chars.next().unwrap());
1152 } else {
1153 break;
1154 }
1155 }
1156
1157 if chars.peek() == Some(&'(') {
1159 result.push_str(&ident.to_uppercase());
1160 } else {
1161 result.push_str(&ident); }
1163 } else {
1164 result.push(ch);
1165 }
1166 }
1167
1168 result
1169 }
1170
1171 pub fn generated_column_name(expr: &str) -> String {
1179 let normalized = Self::normalize_function_names(expr);
1181
1182 let sanitized = normalized
1183 .replace(|c: char| !c.is_alphanumeric(), "_")
1184 .trim_matches('_')
1185 .to_string();
1186
1187 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1189 const FNV_PRIME: u64 = 1099511628211;
1190
1191 let mut hash = FNV_OFFSET_BASIS;
1192 for byte in normalized.as_bytes() {
1193 hash ^= *byte as u64;
1194 hash = hash.wrapping_mul(FNV_PRIME);
1195 }
1196
1197 format!("_gen_{}_{:x}", sanitized, hash)
1198 }
1199
1200 pub fn replace_schema(&self, new_schema: Schema) {
1201 let mut schema = self
1202 .schema
1203 .write()
1204 .expect("Schema lock poisoned - a thread panicked while holding it");
1205 *schema = Arc::new(new_schema);
1206 }
1207
1208 #[must_use]
1221 pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1222 let primary = self.schema();
1223 let merged = if overlay.is_empty() {
1224 (*primary).clone()
1225 } else {
1226 let mut merged = (*primary).clone();
1227 for (name, label) in &overlay.added_labels {
1228 merged.labels.insert(name.clone(), label.clone());
1229 }
1230 for (name, edge_type) in &overlay.added_edge_types {
1231 merged.edge_types.insert(name.clone(), edge_type.clone());
1232 }
1233 for addition in &overlay.added_properties {
1234 let props = merged.properties.entry(addition.owner.clone()).or_default();
1235 props.insert(
1236 addition.property.clone(),
1237 PropertyMeta {
1238 r#type: addition.data_type.clone(),
1239 nullable: addition.nullable,
1240 added_in: merged.schema_version,
1241 state: SchemaElementState::Active,
1242 generation_expression: None,
1243 description: None,
1244 },
1245 );
1246 }
1247 merged
1248 };
1249
1250 Arc::new(Self {
1251 store: self.store.clone(),
1252 path: self.path.clone(),
1253 schema: RwLock::new(Arc::new(merged)),
1254 })
1255 }
1256
1257 pub fn next_label_id(&self) -> u16 {
1258 self.schema()
1259 .labels
1260 .values()
1261 .map(|l| l.id)
1262 .max()
1263 .unwrap_or(0)
1264 + 1
1265 }
1266
1267 pub fn next_type_id(&self) -> u32 {
1268 let max_schema_id = self
1269 .schema()
1270 .edge_types
1271 .values()
1272 .map(|t| t.id)
1273 .max()
1274 .unwrap_or(0);
1275
1276 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1278 panic!("Schema edge type ID exhaustion");
1279 }
1280
1281 max_schema_id + 1
1282 }
1283
1284 pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1302 if name.is_empty() || name.chars().all(char::is_whitespace) {
1303 return Err(anyhow!(
1304 "{kind} name must be non-empty and not all whitespace"
1305 ));
1306 }
1307 if name.len() > MAX_SCHEMA_NAME_LEN {
1308 return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1309 }
1310 if let Some(c) = name
1311 .chars()
1312 .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1313 {
1314 return Err(anyhow!(
1315 "{kind} name '{name}' contains an unsafe character ({c:?})"
1316 ));
1317 }
1318 Ok(())
1319 }
1320
1321 pub fn add_label(&self, name: &str) -> Result<u16> {
1322 self.add_label_with_desc(name, None)
1323 }
1324
1325 pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1326 Self::validate_schema_element_name("Label", name)?;
1327 let mut guard = acquire_write(&self.schema, "schema")?;
1328 let schema = Arc::make_mut(&mut *guard);
1329 if schema.labels.contains_key(name) {
1330 return Err(anyhow!("Label '{}' already exists", name));
1331 }
1332
1333 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1334 if id >= VIRTUAL_LABEL_ID_START {
1335 return Err(anyhow!(
1336 "Native label space exhausted (next id {id:#x} would enter the \
1337 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1338 reserved for catalog-resolved labels)"
1339 ));
1340 }
1341 schema.labels.insert(
1342 name.to_string(),
1343 LabelMeta {
1344 id,
1345 created_at: Utc::now(),
1346 state: SchemaElementState::Active,
1347 description,
1348 },
1349 );
1350 schema.bump_version();
1351 Ok(id)
1352 }
1353
1354 pub fn add_edge_type(
1355 &self,
1356 name: &str,
1357 src_labels: Vec<String>,
1358 dst_labels: Vec<String>,
1359 ) -> Result<u32> {
1360 self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1361 }
1362
1363 pub fn add_edge_type_with_desc(
1364 &self,
1365 name: &str,
1366 src_labels: Vec<String>,
1367 dst_labels: Vec<String>,
1368 description: Option<String>,
1369 ) -> Result<u32> {
1370 Self::validate_schema_element_name("Edge type", name)?;
1371 let mut guard = acquire_write(&self.schema, "schema")?;
1372 let schema = Arc::make_mut(&mut *guard);
1373 if schema.edge_types.contains_key(name) {
1374 return Err(anyhow!("Edge type '{}' already exists", name));
1375 }
1376
1377 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1378
1379 if id >= VIRTUAL_EDGE_TYPE_ID_START {
1384 return Err(anyhow!(
1385 "Native edge type space exhausted (next id {id:#x} would enter the \
1386 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1387 reserved for catalog-resolved edge types)"
1388 ));
1389 }
1390
1391 schema.edge_types.insert(
1392 name.to_string(),
1393 EdgeTypeMeta {
1394 id,
1395 src_labels,
1396 dst_labels,
1397 state: SchemaElementState::Active,
1398 description,
1399 },
1400 );
1401 schema.bump_version();
1402 Ok(id)
1403 }
1404
1405 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1414 {
1415 let guard = acquire_read(&self.schema, "schema")
1416 .expect("Schema lock poisoned - a thread panicked while holding it");
1417 if let Some(id) = guard.edge_type_id_unified(type_name) {
1418 return id;
1419 }
1420 }
1421 let mut guard = acquire_write(&self.schema, "schema")
1422 .expect("Schema lock poisoned - a thread panicked while holding it");
1423 let schema = Arc::make_mut(&mut *guard);
1424 schema.get_or_assign_edge_type_id(type_name)
1425 }
1426
1427 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1429 let schema = acquire_read(&self.schema, "schema")
1430 .expect("Schema lock poisoned - a thread panicked while holding it");
1431 schema.edge_type_name_by_id_unified(type_id)
1432 }
1433
1434 pub fn add_property(
1435 &self,
1436 label_or_type: &str,
1437 prop_name: &str,
1438 data_type: DataType,
1439 nullable: bool,
1440 ) -> Result<()> {
1441 self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1442 }
1443
1444 pub fn add_property_with_desc(
1445 &self,
1446 label_or_type: &str,
1447 prop_name: &str,
1448 data_type: DataType,
1449 nullable: bool,
1450 description: Option<String>,
1451 ) -> Result<()> {
1452 validate_property_name(prop_name)?;
1453 let mut guard = acquire_write(&self.schema, "schema")?;
1454 let schema = Arc::make_mut(&mut *guard);
1455 let version = schema.schema_version;
1456 let props = schema
1457 .properties
1458 .entry(label_or_type.to_string())
1459 .or_default();
1460
1461 if props.contains_key(prop_name) {
1462 return Err(anyhow!(
1463 "Property '{}' already exists for '{}'",
1464 prop_name,
1465 label_or_type
1466 ));
1467 }
1468
1469 props.insert(
1470 prop_name.to_string(),
1471 PropertyMeta {
1472 r#type: data_type,
1473 nullable,
1474 added_in: version,
1475 state: SchemaElementState::Active,
1476 generation_expression: None,
1477 description,
1478 },
1479 );
1480 schema.bump_version();
1482 Ok(())
1483 }
1484
1485 pub fn add_internal_property(
1496 &self,
1497 label_or_type: &str,
1498 prop_name: &str,
1499 data_type: DataType,
1500 nullable: bool,
1501 ) -> Result<bool> {
1502 validate_reserved_property_name(prop_name)?;
1503 let mut guard = acquire_write(&self.schema, "schema")?;
1504 let schema = Arc::make_mut(&mut *guard);
1505 let version = schema.schema_version;
1506 let props = schema
1507 .properties
1508 .entry(label_or_type.to_string())
1509 .or_default();
1510
1511 if let Some(existing) = props.get(prop_name) {
1512 if existing.r#type == data_type {
1513 return Ok(false); }
1515 return Err(anyhow!(
1516 "Internal property '{}' already exists for '{}' with a different type",
1517 prop_name,
1518 label_or_type
1519 ));
1520 }
1521
1522 props.insert(
1523 prop_name.to_string(),
1524 PropertyMeta {
1525 r#type: data_type,
1526 nullable,
1527 added_in: version,
1528 state: SchemaElementState::Active,
1529 generation_expression: None,
1530 description: None,
1531 },
1532 );
1533 schema.bump_version();
1534 Ok(true)
1535 }
1536
1537 pub fn add_generated_property(
1538 &self,
1539 label_or_type: &str,
1540 prop_name: &str,
1541 data_type: DataType,
1542 expr: String,
1543 ) -> Result<()> {
1544 validate_reserved_property_name(prop_name)?;
1547 let mut guard = acquire_write(&self.schema, "schema")?;
1548 let schema = Arc::make_mut(&mut *guard);
1549 let version = schema.schema_version;
1550 let props = schema
1551 .properties
1552 .entry(label_or_type.to_string())
1553 .or_default();
1554
1555 if props.contains_key(prop_name) {
1556 return Err(anyhow!("Property '{}' already exists", prop_name));
1557 }
1558
1559 props.insert(
1560 prop_name.to_string(),
1561 PropertyMeta {
1562 r#type: data_type,
1563 nullable: true,
1564 added_in: version,
1565 state: SchemaElementState::Active,
1566 generation_expression: Some(expr),
1567 description: None,
1568 },
1569 );
1570 schema.bump_version();
1572 Ok(())
1573 }
1574
1575 pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1576 let mut guard = acquire_write(&self.schema, "schema")?;
1577 let schema = Arc::make_mut(&mut *guard);
1578 let meta = schema
1579 .labels
1580 .get_mut(name)
1581 .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1582 meta.description = description;
1583 Ok(())
1584 }
1585
1586 pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1587 let mut guard = acquire_write(&self.schema, "schema")?;
1588 let schema = Arc::make_mut(&mut *guard);
1589 let meta = schema
1590 .edge_types
1591 .get_mut(name)
1592 .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1593 meta.description = description;
1594 Ok(())
1595 }
1596
1597 pub fn set_property_description(
1598 &self,
1599 entity: &str,
1600 prop_name: &str,
1601 description: Option<String>,
1602 ) -> Result<()> {
1603 let mut guard = acquire_write(&self.schema, "schema")?;
1604 let schema = Arc::make_mut(&mut *guard);
1605 let props = schema
1606 .properties
1607 .get_mut(entity)
1608 .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1609 let meta = props
1610 .get_mut(prop_name)
1611 .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1612 meta.description = description;
1613 Ok(())
1614 }
1615
1616 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1625 let mut guard = acquire_write(&self.schema, "schema")?;
1626 let schema = Arc::make_mut(&mut *guard);
1627 if let Some(existing) = schema
1628 .indexes
1629 .iter_mut()
1630 .find(|i| i.name() == index_def.name())
1631 {
1632 *existing = index_def;
1633 } else {
1634 schema.indexes.push(index_def);
1635 }
1636 schema.bump_version();
1637 Ok(())
1638 }
1639
1640 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1641 let schema = self.schema.read().expect("Schema lock poisoned");
1642 schema.indexes.iter().find(|i| i.name() == name).cloned()
1643 }
1644
1645 pub fn update_index_metadata(
1650 &self,
1651 index_name: &str,
1652 f: impl FnOnce(&mut IndexMetadata),
1653 ) -> Result<()> {
1654 let mut guard = acquire_write(&self.schema, "schema")?;
1655 let schema = Arc::make_mut(&mut *guard);
1656 let idx = schema
1657 .indexes
1658 .iter_mut()
1659 .find(|i| i.name() == index_name)
1660 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1661 f(idx.metadata_mut());
1662 Ok(())
1663 }
1664
1665 pub fn remove_index(&self, name: &str) -> Result<()> {
1666 let mut guard = acquire_write(&self.schema, "schema")?;
1667 let schema = Arc::make_mut(&mut *guard);
1668 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1669 schema.indexes.remove(pos);
1670 schema.bump_version();
1671 Ok(())
1672 } else {
1673 Err(anyhow!("Index '{}' not found", name))
1674 }
1675 }
1676
1677 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1678 let mut guard = acquire_write(&self.schema, "schema")?;
1679 let schema = Arc::make_mut(&mut *guard);
1680 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1681 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1682 }
1683 schema.constraints.push(constraint);
1684 schema.bump_version();
1685 Ok(())
1686 }
1687
1688 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1689 let mut guard = acquire_write(&self.schema, "schema")?;
1690 let schema = Arc::make_mut(&mut *guard);
1691 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1692 schema.constraints.remove(pos);
1693 schema.bump_version();
1694 Ok(())
1695 } else if if_exists {
1696 Ok(())
1697 } else {
1698 Err(anyhow!("Constraint '{}' not found", name))
1699 }
1700 }
1701
1702 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1703 let mut guard = acquire_write(&self.schema, "schema")?;
1704 let schema = Arc::make_mut(&mut *guard);
1705 let Some(props) = schema.properties.get_mut(label_or_type) else {
1706 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1707 };
1708 if props.remove(prop_name).is_none() {
1709 return Err(anyhow!(
1710 "Property '{}' not found for '{}'",
1711 prop_name,
1712 label_or_type
1713 ));
1714 }
1715 schema.bump_version();
1716 Ok(())
1717 }
1718
1719 pub fn rename_property(
1720 &self,
1721 label_or_type: &str,
1722 old_name: &str,
1723 new_name: &str,
1724 ) -> Result<()> {
1725 let mut guard = acquire_write(&self.schema, "schema")?;
1726 let schema = Arc::make_mut(&mut *guard);
1727 let Some(props) = schema.properties.get_mut(label_or_type) else {
1728 return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1729 };
1730 let Some(meta) = props.remove(old_name) else {
1731 return Err(anyhow!(
1732 "Property '{}' not found for '{}'",
1733 old_name,
1734 label_or_type
1735 ));
1736 };
1737 if props.contains_key(new_name) {
1738 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1741 }
1742 props.insert(new_name.to_string(), meta);
1743 schema.bump_version();
1744 Ok(())
1745 }
1746
1747 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1748 let mut guard = acquire_write(&self.schema, "schema")?;
1749 let schema = Arc::make_mut(&mut *guard);
1750 if let Some(label_meta) = schema.labels.get_mut(name) {
1751 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1752 schema.bump_version();
1754 Ok(())
1755 } else if if_exists {
1756 Ok(())
1757 } else {
1758 Err(anyhow!("Label '{}' not found", name))
1759 }
1760 }
1761
1762 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1763 let mut guard = acquire_write(&self.schema, "schema")?;
1764 let schema = Arc::make_mut(&mut *guard);
1765 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1766 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1767 schema.bump_version();
1769 Ok(())
1770 } else if if_exists {
1771 Ok(())
1772 } else {
1773 Err(anyhow!("Edge Type '{}' not found", name))
1774 }
1775 }
1776}
1777
1778pub fn validate_identifier(name: &str) -> Result<()> {
1780 if name.is_empty() || name.len() > 64 {
1782 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1783 }
1784
1785 let first = name.chars().next().unwrap();
1787 if !first.is_alphabetic() && first != '_' {
1788 return Err(anyhow!(
1789 "Identifier '{}' must start with letter or underscore",
1790 name
1791 ));
1792 }
1793
1794 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1796 return Err(anyhow!(
1797 "Identifier '{}' must contain only alphanumeric and underscore",
1798 name
1799 ));
1800 }
1801
1802 const RESERVED: &[&str] = &[
1804 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1805 "UNION", "ORDER", "LIMIT",
1806 ];
1807 if RESERVED.contains(&name.to_uppercase().as_str()) {
1808 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1809 }
1810
1811 Ok(())
1812}
1813
1814pub fn validate_property_name(name: &str) -> Result<()> {
1821 if name.starts_with('_') {
1822 return Err(anyhow!(
1823 "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1824 name
1825 ));
1826 }
1827 validate_reserved_property_name(name)
1828}
1829
1830fn validate_reserved_property_name(name: &str) -> Result<()> {
1837 const RESERVED_PROPS: &[&str] = &[
1846 "ext_id",
1847 "overflow_json",
1848 "eid",
1849 "src_vid",
1850 "dst_vid",
1851 "op",
1852 "__set_struct__",
1860 ];
1861 if RESERVED_PROPS.contains(&name) {
1862 return Err(anyhow!(
1863 "Property name '{}' is reserved by the storage layer; please choose a different name",
1864 name
1865 ));
1866 }
1867 Ok(())
1868}
1869
1870#[cfg(test)]
1871mod tests {
1872 use super::*;
1873 use crate::value::{TemporalValue, Value};
1874 use object_store::local::LocalFileSystem;
1875 use tempfile::tempdir;
1876
1877 #[test]
1878 fn test_datatype_accepts_matrix() {
1879 let dt = || TemporalValue::DateTime {
1880 nanos_since_epoch: 0,
1881 offset_seconds: 0,
1882 timezone_name: None,
1883 };
1884
1885 for ty in [
1887 DataType::String,
1888 DataType::Int64,
1889 DataType::Bool,
1890 DataType::DateTime,
1891 DataType::Float64,
1892 ] {
1893 assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1894 }
1895
1896 assert!(DataType::String.accepts(&Value::String("x".into())));
1898 assert!(DataType::Int64.accepts(&Value::Int(1)));
1899 assert!(DataType::Bool.accepts(&Value::Bool(true)));
1900 assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1901
1902 assert!(
1904 DataType::Float64.accepts(&Value::Int(3)),
1905 "Int widens to Float"
1906 );
1907 assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1908 assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1909 assert!(
1910 DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1911 "storage parses strings for non-struct Timestamp columns"
1912 );
1913
1914 assert!(
1916 !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1917 "String into a DateTime struct column nulls silently — reject here"
1918 );
1919 assert!(!DataType::Bool.accepts(&Value::Int(1)));
1920 assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1921 assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1922 assert!(
1923 !DataType::String.accepts(&Value::Int(10)),
1924 "no implicit stringification"
1925 );
1926 assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1927
1928 assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1930 }
1931
1932 #[tokio::test]
1933 async fn test_schema_management() -> Result<()> {
1934 let dir = tempdir()?;
1935 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1936 let path = ObjectStorePath::from("schema.json");
1937 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1938
1939 let lid = manager.add_label("Person")?;
1941 assert_eq!(lid, 1);
1942 assert!(manager.add_label("Person").is_err());
1943
1944 manager.add_property("Person", "name", DataType::String, false)?;
1946 assert!(
1947 manager
1948 .add_property("Person", "name", DataType::String, false)
1949 .is_err()
1950 );
1951
1952 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1954 assert_eq!(tid, 1);
1955
1956 manager.save().await?;
1957 assert!(store.get(&path).await.is_ok());
1959
1960 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1961 assert!(manager2.schema().labels.contains_key("Person"));
1962 assert!(
1963 manager2
1964 .schema()
1965 .properties
1966 .get("Person")
1967 .unwrap()
1968 .contains_key("name")
1969 );
1970
1971 Ok(())
1972 }
1973
1974 #[tokio::test]
1975 async fn test_reserved_property_names_rejected() -> Result<()> {
1976 let dir = tempdir()?;
1977 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1978 let path = ObjectStorePath::from("schema.json");
1979 let manager = SchemaManager::load_from_store(store, &path).await?;
1980
1981 manager.add_label("Tiny")?;
1982
1983 for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1987 let err = manager
1988 .add_property("Tiny", reserved, DataType::String, true)
1989 .expect_err(&format!("expected '{reserved}' to be rejected"));
1990 assert!(
1991 err.to_string().contains("reserved"),
1992 "error for '{reserved}' should mention 'reserved', got: {err}"
1993 );
1994 }
1995
1996 let err = manager
2001 .add_property("Tiny", "__set_struct__", DataType::String, true)
2002 .expect_err("expected '__set_struct__' to be rejected");
2003 assert!(
2004 err.to_string().contains("reserved"),
2005 "__set_struct__ rejection should mention 'reserved', got: {err}"
2006 );
2007
2008 for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2010 assert!(
2011 manager
2012 .add_property("Tiny", reserved, DataType::String, true)
2013 .is_err(),
2014 "expected '{reserved}' to be rejected"
2015 );
2016 }
2017
2018 manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2021 manager.add_property("Tiny", "user_op", DataType::String, true)?;
2022 manager.add_property("Tiny", "type_name", DataType::String, true)?;
2023
2024 manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2026 assert!(
2027 manager
2028 .add_property("knows", "src_vid", DataType::Int64, true)
2029 .is_err()
2030 );
2031
2032 assert!(
2034 manager
2035 .add_generated_property(
2036 "Tiny",
2037 "ext_id",
2038 DataType::String,
2039 "concat('x', name)".into()
2040 )
2041 .is_err()
2042 );
2043
2044 Ok(())
2045 }
2046
2047 #[test]
2048 fn test_normalize_function_names() {
2049 assert_eq!(
2050 SchemaManager::normalize_function_names("lower(email)"),
2051 "LOWER(email)"
2052 );
2053 assert_eq!(
2054 SchemaManager::normalize_function_names("LOWER(email)"),
2055 "LOWER(email)"
2056 );
2057 assert_eq!(
2058 SchemaManager::normalize_function_names("Lower(email)"),
2059 "LOWER(email)"
2060 );
2061 assert_eq!(
2062 SchemaManager::normalize_function_names("trim(lower(email))"),
2063 "TRIM(LOWER(email))"
2064 );
2065 }
2066
2067 #[test]
2068 fn test_generated_column_name_case_insensitive() {
2069 let col1 = SchemaManager::generated_column_name("lower(email)");
2070 let col2 = SchemaManager::generated_column_name("LOWER(email)");
2071 let col3 = SchemaManager::generated_column_name("Lower(email)");
2072 assert_eq!(col1, col2);
2073 assert_eq!(col2, col3);
2074 assert!(col1.starts_with("_gen_LOWER_email_"));
2075 }
2076
2077 #[test]
2078 fn test_index_metadata_serde_backward_compat() {
2079 let json = r#"{
2081 "type": "Scalar",
2082 "name": "idx_person_name",
2083 "label": "Person",
2084 "properties": ["name"],
2085 "index_type": "BTree",
2086 "where_clause": null
2087 }"#;
2088 let def: IndexDefinition = serde_json::from_str(json).unwrap();
2089 let meta = def.metadata();
2090 assert_eq!(meta.status, IndexStatus::Online);
2091 assert!(meta.last_built_at.is_none());
2092 assert!(meta.row_count_at_build.is_none());
2093 }
2094
2095 #[test]
2096 fn test_index_metadata_serde_roundtrip() {
2097 let now = Utc::now();
2098 let def = IndexDefinition::Scalar(ScalarIndexConfig {
2099 name: "idx_test".to_string(),
2100 label: "Test".to_string(),
2101 properties: vec!["prop".to_string()],
2102 index_type: ScalarIndexType::BTree,
2103 where_clause: None,
2104 metadata: IndexMetadata {
2105 status: IndexStatus::Building,
2106 last_built_at: Some(now),
2107 row_count_at_build: Some(42),
2108 },
2109 });
2110
2111 let json = serde_json::to_string(&def).unwrap();
2112 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2113 assert_eq!(parsed.metadata().status, IndexStatus::Building);
2114 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2115 assert!(parsed.metadata().last_built_at.is_some());
2116 }
2117
2118 #[tokio::test]
2119 async fn test_update_index_metadata() -> Result<()> {
2120 let dir = tempdir()?;
2121 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2122 let path = ObjectStorePath::from("schema.json");
2123 let manager = SchemaManager::load_from_store(store, &path).await?;
2124
2125 manager.add_label("Person")?;
2126 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2127 name: "idx_test".to_string(),
2128 label: "Person".to_string(),
2129 properties: vec!["name".to_string()],
2130 index_type: ScalarIndexType::BTree,
2131 where_clause: None,
2132 metadata: Default::default(),
2133 });
2134 manager.add_index(idx)?;
2135
2136 let initial = manager.get_index("idx_test").unwrap();
2138 assert_eq!(initial.metadata().status, IndexStatus::Online);
2139
2140 manager.update_index_metadata("idx_test", |m| {
2142 m.status = IndexStatus::Building;
2143 m.row_count_at_build = Some(100);
2144 })?;
2145
2146 let updated = manager.get_index("idx_test").unwrap();
2147 assert_eq!(updated.metadata().status, IndexStatus::Building);
2148 assert_eq!(updated.metadata().row_count_at_build, Some(100));
2149
2150 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2152
2153 Ok(())
2154 }
2155
2156 #[tokio::test]
2161 async fn add_internal_property_reports_newly_added() -> Result<()> {
2162 let dir = tempdir()?;
2163 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2164 let path = ObjectStorePath::from("schema.json");
2165 let manager = SchemaManager::load_from_store(store, &path).await?;
2166 manager.add_label("Doc")?;
2167
2168 let dt = DataType::Vector { dimensions: 16 };
2169 assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2171 assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2173 assert!(
2175 manager
2176 .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2177 .is_err()
2178 );
2179 Ok(())
2180 }
2181
2182 #[tokio::test]
2187 async fn test_add_index_is_upsert_by_name() -> Result<()> {
2188 let dir = tempdir()?;
2189 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2190 let path = ObjectStorePath::from("schema.json");
2191 let manager = SchemaManager::load_from_store(store, &path).await?;
2192 manager.add_label("Person")?;
2193
2194 let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2195 name: "idx_test".to_string(),
2196 label: "Person".to_string(),
2197 properties: vec!["name".to_string()],
2198 index_type: ScalarIndexType::BTree,
2199 where_clause: None,
2200 metadata: IndexMetadata {
2201 status: IndexStatus::Building,
2202 ..Default::default()
2203 },
2204 });
2205 manager.add_index(initial.clone())?;
2206 assert_eq!(manager.schema().indexes.len(), 1);
2207
2208 manager.add_index(initial.clone())?;
2210 assert_eq!(
2211 manager.schema().indexes.len(),
2212 1,
2213 "duplicate add_index by name must not append"
2214 );
2215
2216 let mut updated_cfg = match initial {
2218 IndexDefinition::Scalar(c) => c,
2219 _ => unreachable!(),
2220 };
2221 updated_cfg.metadata.status = IndexStatus::Online;
2222 updated_cfg.metadata.row_count_at_build = Some(42);
2223 manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2224 assert_eq!(manager.schema().indexes.len(), 1);
2225 let stored = manager.get_index("idx_test").unwrap();
2226 assert_eq!(stored.metadata().status, IndexStatus::Online);
2227 assert_eq!(stored.metadata().row_count_at_build, Some(42));
2228
2229 let other = IndexDefinition::Scalar(ScalarIndexConfig {
2231 name: "idx_other".to_string(),
2232 label: "Person".to_string(),
2233 properties: vec!["age".to_string()],
2234 index_type: ScalarIndexType::BTree,
2235 where_clause: None,
2236 metadata: IndexMetadata::default(),
2237 });
2238 manager.add_index(other)?;
2239 assert_eq!(manager.schema().indexes.len(), 2);
2240
2241 Ok(())
2242 }
2243
2244 #[tokio::test]
2247 async fn test_load_dedups_bloated_indexes() -> Result<()> {
2248 let dir = tempdir()?;
2249 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2250 let path = ObjectStorePath::from("schema.json");
2251
2252 let mut schema = Schema::default();
2256 schema.labels.insert(
2257 "Person".to_string(),
2258 LabelMeta {
2259 id: 1,
2260 created_at: chrono::Utc::now(),
2261 state: SchemaElementState::Active,
2262 description: None,
2263 },
2264 );
2265 let make = |status: IndexStatus, count: Option<u64>| {
2266 IndexDefinition::Scalar(ScalarIndexConfig {
2267 name: "idx_dup".to_string(),
2268 label: "Person".to_string(),
2269 properties: vec!["name".to_string()],
2270 index_type: ScalarIndexType::BTree,
2271 where_clause: None,
2272 metadata: IndexMetadata {
2273 status,
2274 row_count_at_build: count,
2275 ..Default::default()
2276 },
2277 })
2278 };
2279 for _ in 0..49 {
2280 schema.indexes.push(make(IndexStatus::Building, None));
2281 }
2282 schema.indexes.push(make(IndexStatus::Online, Some(123)));
2283 let json = serde_json::to_string_pretty(&schema)?;
2284 store.put(&path, json.into()).await?;
2285
2286 let manager = SchemaManager::load_from_store(store, &path).await?;
2287 let schema = manager.schema();
2288 assert_eq!(
2289 schema.indexes.len(),
2290 1,
2291 "load() must collapse 50 duplicates by name to 1"
2292 );
2293 assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2295 assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2296
2297 Ok(())
2298 }
2299
2300 #[test]
2301 fn test_vector_index_for_property_skips_non_online() {
2302 let mut schema = Schema::default();
2303 schema.labels.insert(
2304 "Document".to_string(),
2305 LabelMeta {
2306 id: 1,
2307 created_at: chrono::Utc::now(),
2308 state: SchemaElementState::Active,
2309 description: None,
2310 },
2311 );
2312
2313 schema
2315 .indexes
2316 .push(IndexDefinition::Vector(VectorIndexConfig {
2317 name: "vec_doc_embedding".to_string(),
2318 label: "Document".to_string(),
2319 property: "embedding".to_string(),
2320 index_type: VectorIndexType::Flat,
2321 metric: DistanceMetric::Cosine,
2322 embedding_config: None,
2323 metadata: IndexMetadata {
2324 status: IndexStatus::Stale,
2325 ..Default::default()
2326 },
2327 }));
2328
2329 assert!(
2331 schema
2332 .vector_index_for_property("Document", "embedding")
2333 .is_none()
2334 );
2335
2336 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2338 cfg.metadata.status = IndexStatus::Online;
2339 }
2340 let result = schema.vector_index_for_property("Document", "embedding");
2341 assert!(result.is_some());
2342 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2343 }
2344
2345 #[tokio::test]
2346 async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2347 use crate::core::fork::SchemaDelta;
2348
2349 let dir = tempdir()?;
2350 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2351 let path = ObjectStorePath::from("schema.json");
2352 let primary = SchemaManager::load_from_store(store, &path).await?;
2353 primary.add_label("Person")?;
2354
2355 let overlay = primary.with_overlay(&SchemaDelta::empty());
2356 assert_eq!(overlay.schema().labels.len(), 1);
2357
2358 overlay.add_label("Forked")?;
2361 assert!(overlay.schema().labels.contains_key("Forked"));
2362 assert!(!primary.schema().labels.contains_key("Forked"));
2363
2364 Ok(())
2365 }
2366
2367 #[tokio::test]
2368 async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2369 use crate::core::fork::SchemaDelta;
2370 use chrono::Utc;
2371
2372 let dir = tempdir()?;
2373 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2374 let path = ObjectStorePath::from("schema.json");
2375 let primary = SchemaManager::load_from_store(store, &path).await?;
2376 primary.add_label("Existing")?;
2377
2378 let label_meta = LabelMeta {
2379 id: 99,
2380 created_at: Utc::now(),
2381 state: SchemaElementState::Active,
2382 description: None,
2383 };
2384 let edge_meta = EdgeTypeMeta {
2385 id: 99,
2386 src_labels: vec!["NewLabel".into()],
2387 dst_labels: vec!["NewLabel".into()],
2388 state: SchemaElementState::Active,
2389 description: None,
2390 };
2391 let delta = SchemaDelta {
2392 added_labels: vec![("NewLabel".to_string(), label_meta)],
2393 added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2394 added_properties: vec![],
2395 };
2396
2397 let overlay = primary.with_overlay(&delta);
2398 let merged = overlay.schema();
2399 assert!(merged.labels.contains_key("Existing"));
2400 assert!(merged.labels.contains_key("NewLabel"));
2401 assert!(merged.edge_types.contains_key("NewEdge"));
2402
2403 assert!(!primary.schema().labels.contains_key("NewLabel"));
2405 Ok(())
2406 }
2407
2408 #[tokio::test]
2413 async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2414 let dir = tempdir()?;
2415 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2416 let path = ObjectStorePath::from("schema.json");
2417 let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2418
2419 let mut handles = Vec::new();
2420 for _ in 0..16 {
2421 let m = manager.clone();
2422 handles.push(std::thread::spawn(move || {
2423 m.get_or_assign_edge_type_id("RACED")
2424 }));
2425 }
2426 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2427 assert!(
2428 ids.iter().all(|&id| id == ids[0]),
2429 "all racers must observe one id, got {ids:?}"
2430 );
2431 assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2433
2434 manager.add_label("A")?;
2436 let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2437 assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2438 Ok(())
2439 }
2440
2441 #[test]
2446 fn test_new_schemaless_edge_type_bumps_schema_version() {
2447 let mut schema = Schema::default();
2448 let v0 = schema.schema_version;
2449
2450 let id1 = schema.get_or_assign_edge_type_id("FRESH");
2451 assert_eq!(
2452 schema.schema_version,
2453 v0.wrapping_add(1),
2454 "minting a new edge type must bump schema_version"
2455 );
2456
2457 let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2459 assert_eq!(id1, id1_again);
2460 assert_eq!(
2461 schema.schema_version,
2462 v0.wrapping_add(1),
2463 "resolving an existing edge type must not bump schema_version"
2464 );
2465
2466 let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2468 assert_eq!(
2469 schema.schema_version,
2470 v0.wrapping_add(2),
2471 "a second new edge type must bump schema_version again"
2472 );
2473 }
2474
2475 #[test]
2479 fn validate_schema_element_name_rejects_unsafe() {
2480 for bad in ["", " ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2481 assert!(
2482 SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2483 "expected {bad:?} to be rejected"
2484 );
2485 }
2486 for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2487 assert!(
2488 SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2489 "expected {good:?} to be accepted"
2490 );
2491 }
2492 let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2494 assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2495 }
2496}