1use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19 Active,
20 Hidden {
21 since: DateTime<Utc>,
22 last_active_snapshot: String, },
24 Tombstone {
25 since: DateTime<Utc>,
26 },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31pub fn datetime_struct_fields() -> Fields {
38 Fields::from(vec![
39 Field::new(
40 "nanos_since_epoch",
41 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42 true,
43 ),
44 Field::new("offset_seconds", ArrowDataType::Int32, true),
45 Field::new("timezone_name", ArrowDataType::Utf8, true),
46 ])
47}
48
49pub fn time_struct_fields() -> Fields {
55 Fields::from(vec![
56 Field::new(
57 "nanos_since_midnight",
58 ArrowDataType::Time64(TimeUnit::Nanosecond),
59 true,
60 ),
61 Field::new("offset_seconds", ArrowDataType::Int32, true),
62 ])
63}
64
65pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78 GCounter,
79 GSet,
80 ORSet,
81 LWWRegister,
82 LWWMap,
83 Rga,
84 VectorClock,
85 VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90 Geographic, Cartesian2D, Cartesian3D, }
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98 String,
99 Int32,
100 Int64,
101 Float32,
102 Float64,
103 Bool,
104 Timestamp,
105 Date,
106 Time,
107 DateTime,
108 Duration,
109 CypherValue,
110 Point(PointType),
111 Vector { dimensions: usize },
112 Crdt(CrdtType),
113 List(Box<DataType>),
114 Map(Box<DataType>, Box<DataType>),
115}
116
117impl DataType {
118 #[allow(non_upper_case_globals)]
120 pub const Float: DataType = DataType::Float64;
121 #[allow(non_upper_case_globals)]
122 pub const Int: DataType = DataType::Int64;
123
124 pub fn to_arrow(&self) -> ArrowDataType {
125 match self {
126 DataType::String => ArrowDataType::Utf8,
127 DataType::Int32 => ArrowDataType::Int32,
128 DataType::Int64 => ArrowDataType::Int64,
129 DataType::Float32 => ArrowDataType::Float32,
130 DataType::Float64 => ArrowDataType::Float64,
131 DataType::Bool => ArrowDataType::Boolean,
132 DataType::Timestamp => {
133 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
134 }
135 DataType::Date => ArrowDataType::Date32,
136 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
137 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
138 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
141 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
142 Field::new("latitude", ArrowDataType::Float64, false),
143 Field::new("longitude", ArrowDataType::Float64, false),
144 Field::new("crs", ArrowDataType::Utf8, false),
145 ])),
146 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
147 Field::new("x", ArrowDataType::Float64, false),
148 Field::new("y", ArrowDataType::Float64, false),
149 Field::new("crs", ArrowDataType::Utf8, false),
150 ])),
151 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
152 Field::new("x", ArrowDataType::Float64, false),
153 Field::new("y", ArrowDataType::Float64, false),
154 Field::new("z", ArrowDataType::Float64, false),
155 Field::new("crs", ArrowDataType::Utf8, false),
156 ])),
157 },
158 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
159 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
160 *dimensions as i32,
161 ),
162 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
164 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
165 }
166 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
167 "item",
168 ArrowDataType::Struct(Fields::from(vec![
169 Field::new("key", key.to_arrow(), false),
170 Field::new("value", value.to_arrow(), true),
171 ])),
172 true,
173 ))),
174 }
175 }
176}
177
178fn default_created_at() -> DateTime<Utc> {
179 Utc::now()
180}
181
182fn default_state() -> SchemaElementState {
183 SchemaElementState::Active
184}
185
186fn default_version_1() -> u32 {
187 1
188}
189
190#[derive(Clone, Debug, Serialize, Deserialize)]
191pub struct PropertyMeta {
192 pub r#type: DataType,
193 pub nullable: bool,
194 #[serde(default = "default_version_1")]
195 pub added_in: u32, #[serde(default = "default_state")]
197 pub state: SchemaElementState,
198 #[serde(default)]
199 pub generation_expression: Option<String>,
200}
201
202#[derive(Clone, Debug, Serialize, Deserialize)]
203pub struct LabelMeta {
204 pub id: u16, #[serde(default = "default_created_at")]
206 pub created_at: DateTime<Utc>,
207 #[serde(default = "default_state")]
208 pub state: SchemaElementState,
209}
210
211#[derive(Clone, Debug, Serialize, Deserialize)]
212pub struct EdgeTypeMeta {
213 pub id: u32,
215 pub src_labels: Vec<String>,
216 pub dst_labels: Vec<String>,
217 #[serde(default = "default_state")]
218 pub state: SchemaElementState,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
222#[non_exhaustive]
223pub enum ConstraintType {
224 Unique { properties: Vec<String> },
225 Exists { property: String },
226 Check { expression: String },
227}
228
229#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
230#[non_exhaustive]
231pub enum ConstraintTarget {
232 Label(String),
233 EdgeType(String),
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize)]
237pub struct Constraint {
238 pub name: String,
239 pub constraint_type: ConstraintType,
240 pub target: ConstraintTarget,
241 pub enabled: bool,
242}
243
244#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct SchemalessEdgeTypeRegistry {
251 name_to_id: HashMap<String, u32>,
252 id_to_name: HashMap<u32, String>,
253 next_local_id: u32,
255}
256
257impl SchemalessEdgeTypeRegistry {
258 pub fn new() -> Self {
259 Self {
260 name_to_id: HashMap::new(),
261 id_to_name: HashMap::new(),
262 next_local_id: 1,
263 }
264 }
265
266 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
268 if let Some(&id) = self.name_to_id.get(type_name) {
269 return id;
270 }
271
272 let id = make_schemaless_id(self.next_local_id);
273 self.next_local_id += 1;
274
275 self.name_to_id.insert(type_name.to_string(), id);
276 self.id_to_name.insert(id, type_name.to_string());
277
278 id
279 }
280
281 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
283 self.id_to_name.get(&type_id).map(String::as_str)
284 }
285
286 pub fn contains(&self, type_name: &str) -> bool {
288 self.name_to_id.contains_key(type_name)
289 }
290
291 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
293 self.name_to_id
294 .iter()
295 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
296 .map(|(_, &id)| id)
297 }
298
299 pub fn all_type_ids(&self) -> Vec<u32> {
301 self.id_to_name.keys().copied().collect()
302 }
303
304 pub fn is_empty(&self) -> bool {
306 self.name_to_id.is_empty()
307 }
308}
309
310impl Default for SchemalessEdgeTypeRegistry {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
317pub struct Schema {
318 pub schema_version: u32,
319 pub labels: HashMap<String, LabelMeta>,
320 pub edge_types: HashMap<String, EdgeTypeMeta>,
321 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
322 #[serde(default)]
323 pub indexes: Vec<IndexDefinition>,
324 #[serde(default)]
325 pub constraints: Vec<Constraint>,
326 #[serde(default)]
328 pub schemaless_registry: SchemalessEdgeTypeRegistry,
329}
330
331impl Default for Schema {
332 fn default() -> Self {
333 Self {
334 schema_version: 1,
335 labels: HashMap::new(),
336 edge_types: HashMap::new(),
337 properties: HashMap::new(),
338 indexes: Vec::new(),
339 constraints: Vec::new(),
340 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
341 }
342 }
343}
344
345impl Schema {
346 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
351 self.labels
352 .iter()
353 .find(|(_, meta)| meta.id == label_id)
354 .map(|(name, _)| name.as_str())
355 }
356
357 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
359 self.labels.get(label_name).map(|meta| meta.id)
360 }
361
362 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
367 self.edge_types
368 .iter()
369 .find(|(_, meta)| meta.id == type_id)
370 .map(|(name, _)| name.as_str())
371 }
372
373 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
375 self.edge_types.get(type_name).map(|meta| meta.id)
376 }
377
378 pub fn vector_index_for_property(
383 &self,
384 label: &str,
385 property: &str,
386 ) -> Option<&VectorIndexConfig> {
387 self.indexes.iter().find_map(|idx| {
388 if let IndexDefinition::Vector(config) = idx
389 && config.label == label
390 && config.property == property
391 {
392 return Some(config);
393 }
394 None
395 })
396 }
397
398 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
403 self.labels
404 .iter()
405 .find(|(k, _)| k.eq_ignore_ascii_case(name))
406 .map(|(_, v)| v)
407 }
408
409 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
411 self.get_label_case_insensitive(label_name)
412 .map(|meta| meta.id)
413 }
414
415 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
420 self.edge_types
421 .iter()
422 .find(|(k, _)| k.eq_ignore_ascii_case(name))
423 .map(|(_, v)| v)
424 }
425
426 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
428 self.get_edge_type_case_insensitive(type_name)
429 .map(|meta| meta.id)
430 }
431
432 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
435 self.edge_type_id_by_name_case_insensitive(type_name)
436 .or_else(|| {
437 self.schemaless_registry
438 .id_by_name_case_insensitive(type_name)
439 })
440 }
441
442 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
448 if let Some(id) = self.edge_type_id_by_name(type_name) {
449 return id;
450 }
451 self.schemaless_registry.get_or_assign_id(type_name)
452 }
453
454 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
458 if is_schemaless_edge_type(type_id) {
459 self.schemaless_registry
460 .type_name_by_id(type_id)
461 .map(str::to_owned)
462 } else {
463 self.edge_type_name_by_id(type_id).map(str::to_owned)
464 }
465 }
466
467 pub fn all_edge_type_ids(&self) -> Vec<u32> {
470 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
471 ids.extend(self.schemaless_registry.all_type_ids());
472 ids.sort_unstable();
473 ids
474 }
475}
476
477#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
478#[serde(tag = "type")]
479#[non_exhaustive]
480pub enum IndexDefinition {
481 Vector(VectorIndexConfig),
482 FullText(FullTextIndexConfig),
483 Scalar(ScalarIndexConfig),
484 Inverted(InvertedIndexConfig),
485 JsonFullText(JsonFtsIndexConfig),
486}
487
488impl IndexDefinition {
489 pub fn name(&self) -> &str {
491 match self {
492 IndexDefinition::Vector(c) => &c.name,
493 IndexDefinition::FullText(c) => &c.name,
494 IndexDefinition::Scalar(c) => &c.name,
495 IndexDefinition::Inverted(c) => &c.name,
496 IndexDefinition::JsonFullText(c) => &c.name,
497 }
498 }
499
500 pub fn label(&self) -> &str {
502 match self {
503 IndexDefinition::Vector(c) => &c.label,
504 IndexDefinition::FullText(c) => &c.label,
505 IndexDefinition::Scalar(c) => &c.label,
506 IndexDefinition::Inverted(c) => &c.label,
507 IndexDefinition::JsonFullText(c) => &c.label,
508 }
509 }
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
513pub struct InvertedIndexConfig {
514 pub name: String,
515 pub label: String,
516 pub property: String,
517 #[serde(default = "default_normalize")]
518 pub normalize: bool,
519 #[serde(default = "default_max_terms_per_doc")]
520 pub max_terms_per_doc: usize,
521}
522
523fn default_normalize() -> bool {
524 true
525}
526
527fn default_max_terms_per_doc() -> usize {
528 10_000
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
532pub struct VectorIndexConfig {
533 pub name: String,
534 pub label: String,
535 pub property: String,
536 pub index_type: VectorIndexType,
537 pub metric: DistanceMetric,
538 pub embedding_config: Option<EmbeddingConfig>,
539}
540
541#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
542pub struct EmbeddingConfig {
543 pub alias: String,
545 pub source_properties: Vec<String>,
546 pub batch_size: usize,
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
550#[non_exhaustive]
551pub enum VectorIndexType {
552 IvfPq {
553 num_partitions: u32,
554 num_sub_vectors: u32,
555 bits_per_subvector: u8,
556 },
557 Hnsw {
558 m: u32,
559 ef_construction: u32,
560 ef_search: u32,
561 },
562 Flat,
563}
564
565#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
566#[non_exhaustive]
567pub enum DistanceMetric {
568 Cosine,
569 L2,
570 Dot,
571}
572
573impl DistanceMetric {
574 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
586 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
587 match self {
588 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
589 DistanceMetric::Cosine => {
590 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
591 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
592 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
593 let denom = norm_a * norm_b;
594 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
595 }
596 DistanceMetric::Dot => {
597 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
598 -dot
599 }
600 }
601 }
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
605pub struct FullTextIndexConfig {
606 pub name: String,
607 pub label: String,
608 pub properties: Vec<String>,
609 pub tokenizer: TokenizerConfig,
610 pub with_positions: bool,
611}
612
613#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
614#[non_exhaustive]
615pub enum TokenizerConfig {
616 Standard,
617 Whitespace,
618 Ngram { min: u8, max: u8 },
619 Custom { name: String },
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
623pub struct JsonFtsIndexConfig {
624 pub name: String,
625 pub label: String,
626 pub column: String,
627 #[serde(default)]
628 pub paths: Vec<String>,
629 #[serde(default)]
630 pub with_positions: bool,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
634pub struct ScalarIndexConfig {
635 pub name: String,
636 pub label: String,
637 pub properties: Vec<String>,
638 pub index_type: ScalarIndexType,
639 pub where_clause: Option<String>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
643#[non_exhaustive]
644pub enum ScalarIndexType {
645 BTree,
646 Hash,
647 Bitmap,
648}
649
650pub struct SchemaManager {
651 store: Arc<dyn ObjectStore>,
652 path: ObjectStorePath,
653 schema: RwLock<Arc<Schema>>,
654}
655
656impl SchemaManager {
657 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
658 let path = path.as_ref();
659 let parent = path
660 .parent()
661 .ok_or_else(|| anyhow!("Invalid schema path"))?;
662 let filename = path
663 .file_name()
664 .ok_or_else(|| anyhow!("Invalid schema filename"))?
665 .to_str()
666 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
667
668 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
669 let obj_path = ObjectStorePath::from(filename);
670
671 Self::load_from_store(store, &obj_path).await
672 }
673
674 pub async fn load_from_store(
675 store: Arc<dyn ObjectStore>,
676 path: &ObjectStorePath,
677 ) -> Result<Self> {
678 match store.get(path).await {
679 Ok(result) => {
680 let bytes = result.bytes().await?;
681 let content = String::from_utf8(bytes.to_vec())?;
682 let schema: Schema = serde_json::from_str(&content)?;
683 Ok(Self {
684 store,
685 path: path.clone(),
686 schema: RwLock::new(Arc::new(schema)),
687 })
688 }
689 Err(object_store::Error::NotFound { .. }) => Ok(Self {
690 store,
691 path: path.clone(),
692 schema: RwLock::new(Arc::new(Schema::default())),
693 }),
694 Err(e) => Err(anyhow::Error::from(e)),
695 }
696 }
697
698 pub async fn save(&self) -> Result<()> {
699 let content = {
700 let schema_guard = acquire_read(&self.schema, "schema")?;
701 serde_json::to_string_pretty(&**schema_guard)?
702 };
703 self.store
704 .put(&self.path, content.into())
705 .await
706 .map_err(anyhow::Error::from)?;
707 Ok(())
708 }
709
710 pub fn path(&self) -> &ObjectStorePath {
711 &self.path
712 }
713
714 pub fn schema(&self) -> Arc<Schema> {
715 self.schema
716 .read()
717 .expect("Schema lock poisoned - a thread panicked while holding it")
718 .clone()
719 }
720
721 fn normalize_function_names(expr: &str) -> String {
724 let mut result = String::with_capacity(expr.len());
725 let mut chars = expr.chars().peekable();
726
727 while let Some(ch) = chars.next() {
728 if ch.is_alphabetic() {
729 let mut ident = String::new();
731 ident.push(ch);
732
733 while let Some(&next) = chars.peek() {
734 if next.is_alphanumeric() || next == '_' {
735 ident.push(chars.next().unwrap());
736 } else {
737 break;
738 }
739 }
740
741 if chars.peek() == Some(&'(') {
743 result.push_str(&ident.to_uppercase());
744 } else {
745 result.push_str(&ident); }
747 } else {
748 result.push(ch);
749 }
750 }
751
752 result
753 }
754
755 pub fn generated_column_name(expr: &str) -> String {
763 let normalized = Self::normalize_function_names(expr);
765
766 let sanitized = normalized
767 .replace(|c: char| !c.is_alphanumeric(), "_")
768 .trim_matches('_')
769 .to_string();
770
771 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
773 const FNV_PRIME: u64 = 1099511628211;
774
775 let mut hash = FNV_OFFSET_BASIS;
776 for byte in normalized.as_bytes() {
777 hash ^= *byte as u64;
778 hash = hash.wrapping_mul(FNV_PRIME);
779 }
780
781 format!("_gen_{}_{:x}", sanitized, hash)
782 }
783
784 pub fn replace_schema(&self, new_schema: Schema) {
785 let mut schema = self
786 .schema
787 .write()
788 .expect("Schema lock poisoned - a thread panicked while holding it");
789 *schema = Arc::new(new_schema);
790 }
791
792 pub fn next_label_id(&self) -> u16 {
793 self.schema()
794 .labels
795 .values()
796 .map(|l| l.id)
797 .max()
798 .unwrap_or(0)
799 + 1
800 }
801
802 pub fn next_type_id(&self) -> u32 {
803 let max_schema_id = self
804 .schema()
805 .edge_types
806 .values()
807 .map(|t| t.id)
808 .max()
809 .unwrap_or(0);
810
811 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
813 panic!("Schema edge type ID exhaustion");
814 }
815
816 max_schema_id + 1
817 }
818
819 pub fn add_label(&self, name: &str) -> Result<u16> {
820 let mut guard = acquire_write(&self.schema, "schema")?;
821 let schema = Arc::make_mut(&mut *guard);
822 if schema.labels.contains_key(name) {
823 return Err(anyhow!("Label '{}' already exists", name));
824 }
825
826 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
827 schema.labels.insert(
828 name.to_string(),
829 LabelMeta {
830 id,
831 created_at: Utc::now(),
832 state: SchemaElementState::Active,
833 },
834 );
835 Ok(id)
836 }
837
838 pub fn add_edge_type(
839 &self,
840 name: &str,
841 src_labels: Vec<String>,
842 dst_labels: Vec<String>,
843 ) -> Result<u32> {
844 let mut guard = acquire_write(&self.schema, "schema")?;
845 let schema = Arc::make_mut(&mut *guard);
846 if schema.edge_types.contains_key(name) {
847 return Err(anyhow!("Edge type '{}' already exists", name));
848 }
849
850 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
851
852 if id >= MAX_SCHEMA_TYPE_ID {
854 return Err(anyhow!("Schema edge type ID exhaustion"));
855 }
856
857 schema.edge_types.insert(
858 name.to_string(),
859 EdgeTypeMeta {
860 id,
861 src_labels,
862 dst_labels,
863 state: SchemaElementState::Active,
864 },
865 );
866 Ok(id)
867 }
868
869 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
871 let mut guard = acquire_write(&self.schema, "schema")
872 .expect("Schema lock poisoned - a thread panicked while holding it");
873 let schema = Arc::make_mut(&mut *guard);
874 schema.get_or_assign_edge_type_id(type_name)
875 }
876
877 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
879 let schema = acquire_read(&self.schema, "schema")
880 .expect("Schema lock poisoned - a thread panicked while holding it");
881 schema.edge_type_name_by_id_unified(type_id)
882 }
883
884 pub fn add_property(
885 &self,
886 label_or_type: &str,
887 prop_name: &str,
888 data_type: DataType,
889 nullable: bool,
890 ) -> Result<()> {
891 let mut guard = acquire_write(&self.schema, "schema")?;
892 let schema = Arc::make_mut(&mut *guard);
893 let version = schema.schema_version;
894 let props = schema
895 .properties
896 .entry(label_or_type.to_string())
897 .or_default();
898
899 if props.contains_key(prop_name) {
900 return Err(anyhow!(
901 "Property '{}' already exists for '{}'",
902 prop_name,
903 label_or_type
904 ));
905 }
906
907 props.insert(
908 prop_name.to_string(),
909 PropertyMeta {
910 r#type: data_type,
911 nullable,
912 added_in: version,
913 state: SchemaElementState::Active,
914 generation_expression: None,
915 },
916 );
917 Ok(())
918 }
919
920 pub fn add_generated_property(
921 &self,
922 label_or_type: &str,
923 prop_name: &str,
924 data_type: DataType,
925 expr: String,
926 ) -> Result<()> {
927 let mut guard = acquire_write(&self.schema, "schema")?;
928 let schema = Arc::make_mut(&mut *guard);
929 let version = schema.schema_version;
930 let props = schema
931 .properties
932 .entry(label_or_type.to_string())
933 .or_default();
934
935 if props.contains_key(prop_name) {
936 return Err(anyhow!("Property '{}' already exists", prop_name));
937 }
938
939 props.insert(
940 prop_name.to_string(),
941 PropertyMeta {
942 r#type: data_type,
943 nullable: true,
944 added_in: version,
945 state: SchemaElementState::Active,
946 generation_expression: Some(expr),
947 },
948 );
949 Ok(())
950 }
951
952 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
953 let mut guard = acquire_write(&self.schema, "schema")?;
954 let schema = Arc::make_mut(&mut *guard);
955 schema.indexes.push(index_def);
956 Ok(())
957 }
958
959 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
960 let schema = self.schema.read().expect("Schema lock poisoned");
961 schema.indexes.iter().find(|i| i.name() == name).cloned()
962 }
963
964 pub fn remove_index(&self, name: &str) -> Result<()> {
965 let mut guard = acquire_write(&self.schema, "schema")?;
966 let schema = Arc::make_mut(&mut *guard);
967 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
968 schema.indexes.remove(pos);
969 Ok(())
970 } else {
971 Err(anyhow!("Index '{}' not found", name))
972 }
973 }
974
975 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
976 let mut guard = acquire_write(&self.schema, "schema")?;
977 let schema = Arc::make_mut(&mut *guard);
978 if schema.constraints.iter().any(|c| c.name == constraint.name) {
979 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
980 }
981 schema.constraints.push(constraint);
982 Ok(())
983 }
984
985 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
986 let mut guard = acquire_write(&self.schema, "schema")?;
987 let schema = Arc::make_mut(&mut *guard);
988 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
989 schema.constraints.remove(pos);
990 Ok(())
991 } else if if_exists {
992 Ok(())
993 } else {
994 Err(anyhow!("Constraint '{}' not found", name))
995 }
996 }
997
998 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
999 let mut guard = acquire_write(&self.schema, "schema")?;
1000 let schema = Arc::make_mut(&mut *guard);
1001 if let Some(props) = schema.properties.get_mut(label_or_type) {
1002 if props.remove(prop_name).is_some() {
1003 Ok(())
1004 } else {
1005 Err(anyhow!(
1006 "Property '{}' not found for '{}'",
1007 prop_name,
1008 label_or_type
1009 ))
1010 }
1011 } else {
1012 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1013 }
1014 }
1015
1016 pub fn rename_property(
1017 &self,
1018 label_or_type: &str,
1019 old_name: &str,
1020 new_name: &str,
1021 ) -> Result<()> {
1022 let mut guard = acquire_write(&self.schema, "schema")?;
1023 let schema = Arc::make_mut(&mut *guard);
1024 if let Some(props) = schema.properties.get_mut(label_or_type) {
1025 if let Some(meta) = props.remove(old_name) {
1026 if props.contains_key(new_name) {
1027 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1030 }
1031 props.insert(new_name.to_string(), meta);
1032 Ok(())
1033 } else {
1034 Err(anyhow!(
1035 "Property '{}' not found for '{}'",
1036 old_name,
1037 label_or_type
1038 ))
1039 }
1040 } else {
1041 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1042 }
1043 }
1044
1045 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1046 let mut guard = acquire_write(&self.schema, "schema")?;
1047 let schema = Arc::make_mut(&mut *guard);
1048 if let Some(label_meta) = schema.labels.get_mut(name) {
1049 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1050 Ok(())
1052 } else if if_exists {
1053 Ok(())
1054 } else {
1055 Err(anyhow!("Label '{}' not found", name))
1056 }
1057 }
1058
1059 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1060 let mut guard = acquire_write(&self.schema, "schema")?;
1061 let schema = Arc::make_mut(&mut *guard);
1062 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1063 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1064 Ok(())
1066 } else if if_exists {
1067 Ok(())
1068 } else {
1069 Err(anyhow!("Edge Type '{}' not found", name))
1070 }
1071 }
1072}
1073
1074pub fn validate_identifier(name: &str) -> Result<()> {
1076 if name.is_empty() || name.len() > 64 {
1078 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1079 }
1080
1081 let first = name.chars().next().unwrap();
1083 if !first.is_alphabetic() && first != '_' {
1084 return Err(anyhow!(
1085 "Identifier '{}' must start with letter or underscore",
1086 name
1087 ));
1088 }
1089
1090 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1092 return Err(anyhow!(
1093 "Identifier '{}' must contain only alphanumeric and underscore",
1094 name
1095 ));
1096 }
1097
1098 const RESERVED: &[&str] = &[
1100 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1101 "UNION", "ORDER", "LIMIT",
1102 ];
1103 if RESERVED.contains(&name.to_uppercase().as_str()) {
1104 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1105 }
1106
1107 Ok(())
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112 use super::*;
1113 use object_store::local::LocalFileSystem;
1114 use tempfile::tempdir;
1115
1116 #[tokio::test]
1117 async fn test_schema_management() -> Result<()> {
1118 let dir = tempdir()?;
1119 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1120 let path = ObjectStorePath::from("schema.json");
1121 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1122
1123 let lid = manager.add_label("Person")?;
1125 assert_eq!(lid, 1);
1126 assert!(manager.add_label("Person").is_err());
1127
1128 manager.add_property("Person", "name", DataType::String, false)?;
1130 assert!(
1131 manager
1132 .add_property("Person", "name", DataType::String, false)
1133 .is_err()
1134 );
1135
1136 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1138 assert_eq!(tid, 1);
1139
1140 manager.save().await?;
1141 assert!(store.get(&path).await.is_ok());
1143
1144 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1145 assert!(manager2.schema().labels.contains_key("Person"));
1146 assert!(
1147 manager2
1148 .schema()
1149 .properties
1150 .get("Person")
1151 .unwrap()
1152 .contains_key("name")
1153 );
1154
1155 Ok(())
1156 }
1157
1158 #[test]
1159 fn test_normalize_function_names() {
1160 assert_eq!(
1161 SchemaManager::normalize_function_names("lower(email)"),
1162 "LOWER(email)"
1163 );
1164 assert_eq!(
1165 SchemaManager::normalize_function_names("LOWER(email)"),
1166 "LOWER(email)"
1167 );
1168 assert_eq!(
1169 SchemaManager::normalize_function_names("Lower(email)"),
1170 "LOWER(email)"
1171 );
1172 assert_eq!(
1173 SchemaManager::normalize_function_names("trim(lower(email))"),
1174 "TRIM(LOWER(email))"
1175 );
1176 }
1177
1178 #[test]
1179 fn test_generated_column_name_case_insensitive() {
1180 let col1 = SchemaManager::generated_column_name("lower(email)");
1181 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1182 let col3 = SchemaManager::generated_column_name("Lower(email)");
1183 assert_eq!(col1, col2);
1184 assert_eq!(col2, col3);
1185 assert!(col1.starts_with("_gen_LOWER_email_"));
1186 }
1187}