1use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19 Active,
20 Hidden {
21 since: DateTime<Utc>,
22 last_active_snapshot: String, },
24 Tombstone {
25 since: DateTime<Utc>,
26 },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31pub fn datetime_struct_fields() -> Fields {
38 Fields::from(vec![
39 Field::new(
40 "nanos_since_epoch",
41 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42 true,
43 ),
44 Field::new("offset_seconds", ArrowDataType::Int32, true),
45 Field::new("timezone_name", ArrowDataType::Utf8, true),
46 ])
47}
48
49pub fn time_struct_fields() -> Fields {
55 Fields::from(vec![
56 Field::new(
57 "nanos_since_midnight",
58 ArrowDataType::Time64(TimeUnit::Nanosecond),
59 true,
60 ),
61 Field::new("offset_seconds", ArrowDataType::Int32, true),
62 ])
63}
64
65pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78 GCounter,
79 GSet,
80 ORSet,
81 LWWRegister,
82 LWWMap,
83 Rga,
84 VectorClock,
85 VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90 Geographic, Cartesian2D, Cartesian3D, }
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98 String,
99 Int32,
100 Int64,
101 Float32,
102 Float64,
103 Bool,
104 Timestamp,
105 Date,
106 Time,
107 DateTime,
108 Duration,
109 CypherValue,
110 Point(PointType),
111 Vector { dimensions: usize },
112 Btic,
113 Crdt(CrdtType),
114 List(Box<DataType>),
115 Map(Box<DataType>, Box<DataType>),
116}
117
118impl DataType {
119 #[allow(non_upper_case_globals)]
121 pub const Float: DataType = DataType::Float64;
122 #[allow(non_upper_case_globals)]
123 pub const Int: DataType = DataType::Int64;
124
125 pub fn to_arrow(&self) -> ArrowDataType {
126 match self {
127 DataType::String => ArrowDataType::Utf8,
128 DataType::Int32 => ArrowDataType::Int32,
129 DataType::Int64 => ArrowDataType::Int64,
130 DataType::Float32 => ArrowDataType::Float32,
131 DataType::Float64 => ArrowDataType::Float64,
132 DataType::Bool => ArrowDataType::Boolean,
133 DataType::Timestamp => {
134 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
135 }
136 DataType::Date => ArrowDataType::Date32,
137 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
138 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
139 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
142 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
143 Field::new("latitude", ArrowDataType::Float64, false),
144 Field::new("longitude", ArrowDataType::Float64, false),
145 Field::new("crs", ArrowDataType::Utf8, false),
146 ])),
147 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
148 Field::new("x", ArrowDataType::Float64, false),
149 Field::new("y", ArrowDataType::Float64, false),
150 Field::new("crs", ArrowDataType::Utf8, false),
151 ])),
152 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
153 Field::new("x", ArrowDataType::Float64, false),
154 Field::new("y", ArrowDataType::Float64, false),
155 Field::new("z", ArrowDataType::Float64, false),
156 Field::new("crs", ArrowDataType::Utf8, false),
157 ])),
158 },
159 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
160 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
161 *dimensions as i32,
162 ),
163 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
164 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
166 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
167 }
168 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
169 "item",
170 ArrowDataType::Struct(Fields::from(vec![
171 Field::new("key", key.to_arrow(), false),
172 Field::new("value", value.to_arrow(), true),
173 ])),
174 true,
175 ))),
176 }
177 }
178}
179
180fn default_created_at() -> DateTime<Utc> {
181 Utc::now()
182}
183
184fn default_state() -> SchemaElementState {
185 SchemaElementState::Active
186}
187
188fn default_version_1() -> u32 {
189 1
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct PropertyMeta {
194 pub r#type: DataType,
195 pub nullable: bool,
196 #[serde(default = "default_version_1")]
197 pub added_in: u32, #[serde(default = "default_state")]
199 pub state: SchemaElementState,
200 #[serde(default)]
201 pub generation_expression: Option<String>,
202}
203
204#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct LabelMeta {
206 pub id: u16, #[serde(default = "default_created_at")]
208 pub created_at: DateTime<Utc>,
209 #[serde(default = "default_state")]
210 pub state: SchemaElementState,
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize)]
214pub struct EdgeTypeMeta {
215 pub id: u32,
217 pub src_labels: Vec<String>,
218 pub dst_labels: Vec<String>,
219 #[serde(default = "default_state")]
220 pub state: SchemaElementState,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224#[non_exhaustive]
225pub enum ConstraintType {
226 Unique { properties: Vec<String> },
227 Exists { property: String },
228 Check { expression: String },
229}
230
231#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
232#[non_exhaustive]
233pub enum ConstraintTarget {
234 Label(String),
235 EdgeType(String),
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize)]
239pub struct Constraint {
240 pub name: String,
241 pub constraint_type: ConstraintType,
242 pub target: ConstraintTarget,
243 pub enabled: bool,
244}
245
246#[derive(Clone, Debug, Serialize, Deserialize)]
252pub struct SchemalessEdgeTypeRegistry {
253 name_to_id: HashMap<String, u32>,
254 id_to_name: HashMap<u32, String>,
255 next_local_id: u32,
257}
258
259impl SchemalessEdgeTypeRegistry {
260 pub fn new() -> Self {
261 Self {
262 name_to_id: HashMap::new(),
263 id_to_name: HashMap::new(),
264 next_local_id: 1,
265 }
266 }
267
268 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
270 if let Some(&id) = self.name_to_id.get(type_name) {
271 return id;
272 }
273
274 let id = make_schemaless_id(self.next_local_id);
275 self.next_local_id += 1;
276
277 self.name_to_id.insert(type_name.to_string(), id);
278 self.id_to_name.insert(id, type_name.to_string());
279
280 id
281 }
282
283 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
285 self.id_to_name.get(&type_id).map(String::as_str)
286 }
287
288 pub fn contains(&self, type_name: &str) -> bool {
290 self.name_to_id.contains_key(type_name)
291 }
292
293 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
295 self.name_to_id
296 .iter()
297 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
298 .map(|(_, &id)| id)
299 }
300
301 pub fn all_type_ids(&self) -> Vec<u32> {
303 self.id_to_name.keys().copied().collect()
304 }
305
306 pub fn is_empty(&self) -> bool {
308 self.name_to_id.is_empty()
309 }
310}
311
312impl Default for SchemalessEdgeTypeRegistry {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct Schema {
320 pub schema_version: u32,
321 pub labels: HashMap<String, LabelMeta>,
322 pub edge_types: HashMap<String, EdgeTypeMeta>,
323 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
324 #[serde(default)]
325 pub indexes: Vec<IndexDefinition>,
326 #[serde(default)]
327 pub constraints: Vec<Constraint>,
328 #[serde(default)]
330 pub schemaless_registry: SchemalessEdgeTypeRegistry,
331}
332
333impl Default for Schema {
334 fn default() -> Self {
335 Self {
336 schema_version: 1,
337 labels: HashMap::new(),
338 edge_types: HashMap::new(),
339 properties: HashMap::new(),
340 indexes: Vec::new(),
341 constraints: Vec::new(),
342 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
343 }
344 }
345}
346
347impl Schema {
348 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
353 self.labels
354 .iter()
355 .find(|(_, meta)| meta.id == label_id)
356 .map(|(name, _)| name.as_str())
357 }
358
359 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
361 self.labels.get(label_name).map(|meta| meta.id)
362 }
363
364 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
369 self.edge_types
370 .iter()
371 .find(|(_, meta)| meta.id == type_id)
372 .map(|(name, _)| name.as_str())
373 }
374
375 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
377 self.edge_types.get(type_name).map(|meta| meta.id)
378 }
379
380 pub fn vector_index_for_property(
385 &self,
386 label: &str,
387 property: &str,
388 ) -> Option<&VectorIndexConfig> {
389 self.indexes.iter().find_map(|idx| {
390 if let IndexDefinition::Vector(config) = idx
391 && config.label == label
392 && config.property == property
393 && config.metadata.status == IndexStatus::Online
394 {
395 return Some(config);
396 }
397 None
398 })
399 }
400
401 pub fn fulltext_index_for_property(
406 &self,
407 label: &str,
408 property: &str,
409 ) -> Option<&FullTextIndexConfig> {
410 self.indexes.iter().find_map(|idx| {
411 if let IndexDefinition::FullText(config) = idx
412 && config.label == label
413 && config.properties.iter().any(|p| p == property)
414 && config.metadata.status == IndexStatus::Online
415 {
416 return Some(config);
417 }
418 None
419 })
420 }
421
422 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
427 self.labels
428 .iter()
429 .find(|(k, _)| k.eq_ignore_ascii_case(name))
430 .map(|(_, v)| v)
431 }
432
433 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
435 self.get_label_case_insensitive(label_name)
436 .map(|meta| meta.id)
437 }
438
439 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
444 self.edge_types
445 .iter()
446 .find(|(k, _)| k.eq_ignore_ascii_case(name))
447 .map(|(_, v)| v)
448 }
449
450 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
452 self.get_edge_type_case_insensitive(type_name)
453 .map(|meta| meta.id)
454 }
455
456 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
459 self.edge_type_id_by_name_case_insensitive(type_name)
460 .or_else(|| {
461 self.schemaless_registry
462 .id_by_name_case_insensitive(type_name)
463 })
464 }
465
466 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
472 if let Some(id) = self.edge_type_id_by_name(type_name) {
473 return id;
474 }
475 self.schemaless_registry.get_or_assign_id(type_name)
476 }
477
478 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
482 if is_schemaless_edge_type(type_id) {
483 self.schemaless_registry
484 .type_name_by_id(type_id)
485 .map(str::to_owned)
486 } else {
487 self.edge_type_name_by_id(type_id).map(str::to_owned)
488 }
489 }
490
491 pub fn all_edge_type_ids(&self) -> Vec<u32> {
494 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
495 ids.extend(self.schemaless_registry.all_type_ids());
496 ids.sort_unstable();
497 ids
498 }
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
503pub enum IndexStatus {
504 #[default]
506 Online,
507 Building,
509 Stale,
511 Failed,
513}
514
515#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
517pub struct IndexMetadata {
518 #[serde(default)]
520 pub status: IndexStatus,
521 #[serde(default)]
523 pub last_built_at: Option<DateTime<Utc>>,
524 #[serde(default)]
526 pub row_count_at_build: Option<u64>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
530#[serde(tag = "type")]
531#[non_exhaustive]
532pub enum IndexDefinition {
533 Vector(VectorIndexConfig),
534 FullText(FullTextIndexConfig),
535 Scalar(ScalarIndexConfig),
536 Inverted(InvertedIndexConfig),
537 JsonFullText(JsonFtsIndexConfig),
538}
539
540impl IndexDefinition {
541 pub fn name(&self) -> &str {
543 match self {
544 IndexDefinition::Vector(c) => &c.name,
545 IndexDefinition::FullText(c) => &c.name,
546 IndexDefinition::Scalar(c) => &c.name,
547 IndexDefinition::Inverted(c) => &c.name,
548 IndexDefinition::JsonFullText(c) => &c.name,
549 }
550 }
551
552 pub fn label(&self) -> &str {
554 match self {
555 IndexDefinition::Vector(c) => &c.label,
556 IndexDefinition::FullText(c) => &c.label,
557 IndexDefinition::Scalar(c) => &c.label,
558 IndexDefinition::Inverted(c) => &c.label,
559 IndexDefinition::JsonFullText(c) => &c.label,
560 }
561 }
562
563 pub fn metadata(&self) -> &IndexMetadata {
565 match self {
566 IndexDefinition::Vector(c) => &c.metadata,
567 IndexDefinition::FullText(c) => &c.metadata,
568 IndexDefinition::Scalar(c) => &c.metadata,
569 IndexDefinition::Inverted(c) => &c.metadata,
570 IndexDefinition::JsonFullText(c) => &c.metadata,
571 }
572 }
573
574 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
576 match self {
577 IndexDefinition::Vector(c) => &mut c.metadata,
578 IndexDefinition::FullText(c) => &mut c.metadata,
579 IndexDefinition::Scalar(c) => &mut c.metadata,
580 IndexDefinition::Inverted(c) => &mut c.metadata,
581 IndexDefinition::JsonFullText(c) => &mut c.metadata,
582 }
583 }
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub struct InvertedIndexConfig {
588 pub name: String,
589 pub label: String,
590 pub property: String,
591 #[serde(default = "default_normalize")]
592 pub normalize: bool,
593 #[serde(default = "default_max_terms_per_doc")]
594 pub max_terms_per_doc: usize,
595 #[serde(default)]
596 pub metadata: IndexMetadata,
597}
598
599fn default_normalize() -> bool {
600 true
601}
602
603fn default_max_terms_per_doc() -> usize {
604 10_000
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
608pub struct VectorIndexConfig {
609 pub name: String,
610 pub label: String,
611 pub property: String,
612 pub index_type: VectorIndexType,
613 pub metric: DistanceMetric,
614 pub embedding_config: Option<EmbeddingConfig>,
615 #[serde(default)]
616 pub metadata: IndexMetadata,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
620pub struct EmbeddingConfig {
621 pub alias: String,
623 pub source_properties: Vec<String>,
624 pub batch_size: usize,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
628#[non_exhaustive]
629pub enum VectorIndexType {
630 IvfPq {
631 num_partitions: u32,
632 num_sub_vectors: u32,
633 bits_per_subvector: u8,
634 },
635 Hnsw {
636 m: u32,
637 ef_construction: u32,
638 ef_search: u32,
639 },
640 Flat,
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
644#[non_exhaustive]
645pub enum DistanceMetric {
646 Cosine,
647 L2,
648 Dot,
649}
650
651impl DistanceMetric {
652 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
664 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
665 match self {
666 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
667 DistanceMetric::Cosine => {
668 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
669 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
670 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
671 let denom = norm_a * norm_b;
672 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
673 }
674 DistanceMetric::Dot => {
675 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
676 -dot
677 }
678 }
679 }
680}
681
682#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
683pub struct FullTextIndexConfig {
684 pub name: String,
685 pub label: String,
686 pub properties: Vec<String>,
687 pub tokenizer: TokenizerConfig,
688 pub with_positions: bool,
689 #[serde(default)]
690 pub metadata: IndexMetadata,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
694#[non_exhaustive]
695pub enum TokenizerConfig {
696 Standard,
697 Whitespace,
698 Ngram { min: u8, max: u8 },
699 Custom { name: String },
700}
701
702#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
703pub struct JsonFtsIndexConfig {
704 pub name: String,
705 pub label: String,
706 pub column: String,
707 #[serde(default)]
708 pub paths: Vec<String>,
709 #[serde(default)]
710 pub with_positions: bool,
711 #[serde(default)]
712 pub metadata: IndexMetadata,
713}
714
715#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
716pub struct ScalarIndexConfig {
717 pub name: String,
718 pub label: String,
719 pub properties: Vec<String>,
720 pub index_type: ScalarIndexType,
721 pub where_clause: Option<String>,
722 #[serde(default)]
723 pub metadata: IndexMetadata,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
727#[non_exhaustive]
728pub enum ScalarIndexType {
729 BTree,
730 Hash,
731 Bitmap,
732}
733
734pub struct SchemaManager {
735 store: Arc<dyn ObjectStore>,
736 path: ObjectStorePath,
737 schema: RwLock<Arc<Schema>>,
738}
739
740impl SchemaManager {
741 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
742 let path = path.as_ref();
743 let parent = path
744 .parent()
745 .ok_or_else(|| anyhow!("Invalid schema path"))?;
746 let filename = path
747 .file_name()
748 .ok_or_else(|| anyhow!("Invalid schema filename"))?
749 .to_str()
750 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
751
752 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
753 let obj_path = ObjectStorePath::from(filename);
754
755 Self::load_from_store(store, &obj_path).await
756 }
757
758 pub async fn load_from_store(
759 store: Arc<dyn ObjectStore>,
760 path: &ObjectStorePath,
761 ) -> Result<Self> {
762 match store.get(path).await {
763 Ok(result) => {
764 let bytes = result.bytes().await?;
765 let content = String::from_utf8(bytes.to_vec())?;
766 let schema: Schema = serde_json::from_str(&content)?;
767 Ok(Self {
768 store,
769 path: path.clone(),
770 schema: RwLock::new(Arc::new(schema)),
771 })
772 }
773 Err(object_store::Error::NotFound { .. }) => Ok(Self {
774 store,
775 path: path.clone(),
776 schema: RwLock::new(Arc::new(Schema::default())),
777 }),
778 Err(e) => Err(anyhow::Error::from(e)),
779 }
780 }
781
782 pub async fn save(&self) -> Result<()> {
783 let content = {
784 let schema_guard = acquire_read(&self.schema, "schema")?;
785 serde_json::to_string_pretty(&**schema_guard)?
786 };
787 self.store
788 .put(&self.path, content.into())
789 .await
790 .map_err(anyhow::Error::from)?;
791 Ok(())
792 }
793
794 pub fn path(&self) -> &ObjectStorePath {
795 &self.path
796 }
797
798 pub fn schema(&self) -> Arc<Schema> {
799 self.schema
800 .read()
801 .expect("Schema lock poisoned - a thread panicked while holding it")
802 .clone()
803 }
804
805 fn normalize_function_names(expr: &str) -> String {
808 let mut result = String::with_capacity(expr.len());
809 let mut chars = expr.chars().peekable();
810
811 while let Some(ch) = chars.next() {
812 if ch.is_alphabetic() {
813 let mut ident = String::new();
815 ident.push(ch);
816
817 while let Some(&next) = chars.peek() {
818 if next.is_alphanumeric() || next == '_' {
819 ident.push(chars.next().unwrap());
820 } else {
821 break;
822 }
823 }
824
825 if chars.peek() == Some(&'(') {
827 result.push_str(&ident.to_uppercase());
828 } else {
829 result.push_str(&ident); }
831 } else {
832 result.push(ch);
833 }
834 }
835
836 result
837 }
838
839 pub fn generated_column_name(expr: &str) -> String {
847 let normalized = Self::normalize_function_names(expr);
849
850 let sanitized = normalized
851 .replace(|c: char| !c.is_alphanumeric(), "_")
852 .trim_matches('_')
853 .to_string();
854
855 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
857 const FNV_PRIME: u64 = 1099511628211;
858
859 let mut hash = FNV_OFFSET_BASIS;
860 for byte in normalized.as_bytes() {
861 hash ^= *byte as u64;
862 hash = hash.wrapping_mul(FNV_PRIME);
863 }
864
865 format!("_gen_{}_{:x}", sanitized, hash)
866 }
867
868 pub fn replace_schema(&self, new_schema: Schema) {
869 let mut schema = self
870 .schema
871 .write()
872 .expect("Schema lock poisoned - a thread panicked while holding it");
873 *schema = Arc::new(new_schema);
874 }
875
876 pub fn next_label_id(&self) -> u16 {
877 self.schema()
878 .labels
879 .values()
880 .map(|l| l.id)
881 .max()
882 .unwrap_or(0)
883 + 1
884 }
885
886 pub fn next_type_id(&self) -> u32 {
887 let max_schema_id = self
888 .schema()
889 .edge_types
890 .values()
891 .map(|t| t.id)
892 .max()
893 .unwrap_or(0);
894
895 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
897 panic!("Schema edge type ID exhaustion");
898 }
899
900 max_schema_id + 1
901 }
902
903 pub fn add_label(&self, name: &str) -> Result<u16> {
904 let mut guard = acquire_write(&self.schema, "schema")?;
905 let schema = Arc::make_mut(&mut *guard);
906 if schema.labels.contains_key(name) {
907 return Err(anyhow!("Label '{}' already exists", name));
908 }
909
910 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
911 schema.labels.insert(
912 name.to_string(),
913 LabelMeta {
914 id,
915 created_at: Utc::now(),
916 state: SchemaElementState::Active,
917 },
918 );
919 Ok(id)
920 }
921
922 pub fn add_edge_type(
923 &self,
924 name: &str,
925 src_labels: Vec<String>,
926 dst_labels: Vec<String>,
927 ) -> Result<u32> {
928 let mut guard = acquire_write(&self.schema, "schema")?;
929 let schema = Arc::make_mut(&mut *guard);
930 if schema.edge_types.contains_key(name) {
931 return Err(anyhow!("Edge type '{}' already exists", name));
932 }
933
934 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
935
936 if id >= MAX_SCHEMA_TYPE_ID {
938 return Err(anyhow!("Schema edge type ID exhaustion"));
939 }
940
941 schema.edge_types.insert(
942 name.to_string(),
943 EdgeTypeMeta {
944 id,
945 src_labels,
946 dst_labels,
947 state: SchemaElementState::Active,
948 },
949 );
950 Ok(id)
951 }
952
953 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
955 let mut guard = acquire_write(&self.schema, "schema")
956 .expect("Schema lock poisoned - a thread panicked while holding it");
957 let schema = Arc::make_mut(&mut *guard);
958 schema.get_or_assign_edge_type_id(type_name)
959 }
960
961 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
963 let schema = acquire_read(&self.schema, "schema")
964 .expect("Schema lock poisoned - a thread panicked while holding it");
965 schema.edge_type_name_by_id_unified(type_id)
966 }
967
968 pub fn add_property(
969 &self,
970 label_or_type: &str,
971 prop_name: &str,
972 data_type: DataType,
973 nullable: bool,
974 ) -> Result<()> {
975 let mut guard = acquire_write(&self.schema, "schema")?;
976 let schema = Arc::make_mut(&mut *guard);
977 let version = schema.schema_version;
978 let props = schema
979 .properties
980 .entry(label_or_type.to_string())
981 .or_default();
982
983 if props.contains_key(prop_name) {
984 return Err(anyhow!(
985 "Property '{}' already exists for '{}'",
986 prop_name,
987 label_or_type
988 ));
989 }
990
991 props.insert(
992 prop_name.to_string(),
993 PropertyMeta {
994 r#type: data_type,
995 nullable,
996 added_in: version,
997 state: SchemaElementState::Active,
998 generation_expression: None,
999 },
1000 );
1001 Ok(())
1002 }
1003
1004 pub fn add_generated_property(
1005 &self,
1006 label_or_type: &str,
1007 prop_name: &str,
1008 data_type: DataType,
1009 expr: String,
1010 ) -> Result<()> {
1011 let mut guard = acquire_write(&self.schema, "schema")?;
1012 let schema = Arc::make_mut(&mut *guard);
1013 let version = schema.schema_version;
1014 let props = schema
1015 .properties
1016 .entry(label_or_type.to_string())
1017 .or_default();
1018
1019 if props.contains_key(prop_name) {
1020 return Err(anyhow!("Property '{}' already exists", prop_name));
1021 }
1022
1023 props.insert(
1024 prop_name.to_string(),
1025 PropertyMeta {
1026 r#type: data_type,
1027 nullable: true,
1028 added_in: version,
1029 state: SchemaElementState::Active,
1030 generation_expression: Some(expr),
1031 },
1032 );
1033 Ok(())
1034 }
1035
1036 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1037 let mut guard = acquire_write(&self.schema, "schema")?;
1038 let schema = Arc::make_mut(&mut *guard);
1039 schema.indexes.push(index_def);
1040 Ok(())
1041 }
1042
1043 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1044 let schema = self.schema.read().expect("Schema lock poisoned");
1045 schema.indexes.iter().find(|i| i.name() == name).cloned()
1046 }
1047
1048 pub fn update_index_metadata(
1053 &self,
1054 index_name: &str,
1055 f: impl FnOnce(&mut IndexMetadata),
1056 ) -> Result<()> {
1057 let mut guard = acquire_write(&self.schema, "schema")?;
1058 let schema = Arc::make_mut(&mut *guard);
1059 let idx = schema
1060 .indexes
1061 .iter_mut()
1062 .find(|i| i.name() == index_name)
1063 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1064 f(idx.metadata_mut());
1065 Ok(())
1066 }
1067
1068 pub fn remove_index(&self, name: &str) -> Result<()> {
1069 let mut guard = acquire_write(&self.schema, "schema")?;
1070 let schema = Arc::make_mut(&mut *guard);
1071 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1072 schema.indexes.remove(pos);
1073 Ok(())
1074 } else {
1075 Err(anyhow!("Index '{}' not found", name))
1076 }
1077 }
1078
1079 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1080 let mut guard = acquire_write(&self.schema, "schema")?;
1081 let schema = Arc::make_mut(&mut *guard);
1082 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1083 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1084 }
1085 schema.constraints.push(constraint);
1086 Ok(())
1087 }
1088
1089 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1090 let mut guard = acquire_write(&self.schema, "schema")?;
1091 let schema = Arc::make_mut(&mut *guard);
1092 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1093 schema.constraints.remove(pos);
1094 Ok(())
1095 } else if if_exists {
1096 Ok(())
1097 } else {
1098 Err(anyhow!("Constraint '{}' not found", name))
1099 }
1100 }
1101
1102 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1103 let mut guard = acquire_write(&self.schema, "schema")?;
1104 let schema = Arc::make_mut(&mut *guard);
1105 if let Some(props) = schema.properties.get_mut(label_or_type) {
1106 if props.remove(prop_name).is_some() {
1107 Ok(())
1108 } else {
1109 Err(anyhow!(
1110 "Property '{}' not found for '{}'",
1111 prop_name,
1112 label_or_type
1113 ))
1114 }
1115 } else {
1116 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1117 }
1118 }
1119
1120 pub fn rename_property(
1121 &self,
1122 label_or_type: &str,
1123 old_name: &str,
1124 new_name: &str,
1125 ) -> Result<()> {
1126 let mut guard = acquire_write(&self.schema, "schema")?;
1127 let schema = Arc::make_mut(&mut *guard);
1128 if let Some(props) = schema.properties.get_mut(label_or_type) {
1129 if let Some(meta) = props.remove(old_name) {
1130 if props.contains_key(new_name) {
1131 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1134 }
1135 props.insert(new_name.to_string(), meta);
1136 Ok(())
1137 } else {
1138 Err(anyhow!(
1139 "Property '{}' not found for '{}'",
1140 old_name,
1141 label_or_type
1142 ))
1143 }
1144 } else {
1145 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1146 }
1147 }
1148
1149 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1150 let mut guard = acquire_write(&self.schema, "schema")?;
1151 let schema = Arc::make_mut(&mut *guard);
1152 if let Some(label_meta) = schema.labels.get_mut(name) {
1153 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1154 Ok(())
1156 } else if if_exists {
1157 Ok(())
1158 } else {
1159 Err(anyhow!("Label '{}' not found", name))
1160 }
1161 }
1162
1163 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1164 let mut guard = acquire_write(&self.schema, "schema")?;
1165 let schema = Arc::make_mut(&mut *guard);
1166 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1167 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1168 Ok(())
1170 } else if if_exists {
1171 Ok(())
1172 } else {
1173 Err(anyhow!("Edge Type '{}' not found", name))
1174 }
1175 }
1176}
1177
1178pub fn validate_identifier(name: &str) -> Result<()> {
1180 if name.is_empty() || name.len() > 64 {
1182 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1183 }
1184
1185 let first = name.chars().next().unwrap();
1187 if !first.is_alphabetic() && first != '_' {
1188 return Err(anyhow!(
1189 "Identifier '{}' must start with letter or underscore",
1190 name
1191 ));
1192 }
1193
1194 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1196 return Err(anyhow!(
1197 "Identifier '{}' must contain only alphanumeric and underscore",
1198 name
1199 ));
1200 }
1201
1202 const RESERVED: &[&str] = &[
1204 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1205 "UNION", "ORDER", "LIMIT",
1206 ];
1207 if RESERVED.contains(&name.to_uppercase().as_str()) {
1208 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1209 }
1210
1211 Ok(())
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use super::*;
1217 use object_store::local::LocalFileSystem;
1218 use tempfile::tempdir;
1219
1220 #[tokio::test]
1221 async fn test_schema_management() -> Result<()> {
1222 let dir = tempdir()?;
1223 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1224 let path = ObjectStorePath::from("schema.json");
1225 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1226
1227 let lid = manager.add_label("Person")?;
1229 assert_eq!(lid, 1);
1230 assert!(manager.add_label("Person").is_err());
1231
1232 manager.add_property("Person", "name", DataType::String, false)?;
1234 assert!(
1235 manager
1236 .add_property("Person", "name", DataType::String, false)
1237 .is_err()
1238 );
1239
1240 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1242 assert_eq!(tid, 1);
1243
1244 manager.save().await?;
1245 assert!(store.get(&path).await.is_ok());
1247
1248 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1249 assert!(manager2.schema().labels.contains_key("Person"));
1250 assert!(
1251 manager2
1252 .schema()
1253 .properties
1254 .get("Person")
1255 .unwrap()
1256 .contains_key("name")
1257 );
1258
1259 Ok(())
1260 }
1261
1262 #[test]
1263 fn test_normalize_function_names() {
1264 assert_eq!(
1265 SchemaManager::normalize_function_names("lower(email)"),
1266 "LOWER(email)"
1267 );
1268 assert_eq!(
1269 SchemaManager::normalize_function_names("LOWER(email)"),
1270 "LOWER(email)"
1271 );
1272 assert_eq!(
1273 SchemaManager::normalize_function_names("Lower(email)"),
1274 "LOWER(email)"
1275 );
1276 assert_eq!(
1277 SchemaManager::normalize_function_names("trim(lower(email))"),
1278 "TRIM(LOWER(email))"
1279 );
1280 }
1281
1282 #[test]
1283 fn test_generated_column_name_case_insensitive() {
1284 let col1 = SchemaManager::generated_column_name("lower(email)");
1285 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1286 let col3 = SchemaManager::generated_column_name("Lower(email)");
1287 assert_eq!(col1, col2);
1288 assert_eq!(col2, col3);
1289 assert!(col1.starts_with("_gen_LOWER_email_"));
1290 }
1291
1292 #[test]
1293 fn test_index_metadata_serde_backward_compat() {
1294 let json = r#"{
1296 "type": "Scalar",
1297 "name": "idx_person_name",
1298 "label": "Person",
1299 "properties": ["name"],
1300 "index_type": "BTree",
1301 "where_clause": null
1302 }"#;
1303 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1304 let meta = def.metadata();
1305 assert_eq!(meta.status, IndexStatus::Online);
1306 assert!(meta.last_built_at.is_none());
1307 assert!(meta.row_count_at_build.is_none());
1308 }
1309
1310 #[test]
1311 fn test_index_metadata_serde_roundtrip() {
1312 let now = Utc::now();
1313 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1314 name: "idx_test".to_string(),
1315 label: "Test".to_string(),
1316 properties: vec!["prop".to_string()],
1317 index_type: ScalarIndexType::BTree,
1318 where_clause: None,
1319 metadata: IndexMetadata {
1320 status: IndexStatus::Building,
1321 last_built_at: Some(now),
1322 row_count_at_build: Some(42),
1323 },
1324 });
1325
1326 let json = serde_json::to_string(&def).unwrap();
1327 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1328 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1329 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1330 assert!(parsed.metadata().last_built_at.is_some());
1331 }
1332
1333 #[tokio::test]
1334 async fn test_update_index_metadata() -> Result<()> {
1335 let dir = tempdir()?;
1336 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1337 let path = ObjectStorePath::from("schema.json");
1338 let manager = SchemaManager::load_from_store(store, &path).await?;
1339
1340 manager.add_label("Person")?;
1341 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1342 name: "idx_test".to_string(),
1343 label: "Person".to_string(),
1344 properties: vec!["name".to_string()],
1345 index_type: ScalarIndexType::BTree,
1346 where_clause: None,
1347 metadata: Default::default(),
1348 });
1349 manager.add_index(idx)?;
1350
1351 let initial = manager.get_index("idx_test").unwrap();
1353 assert_eq!(initial.metadata().status, IndexStatus::Online);
1354
1355 manager.update_index_metadata("idx_test", |m| {
1357 m.status = IndexStatus::Building;
1358 m.row_count_at_build = Some(100);
1359 })?;
1360
1361 let updated = manager.get_index("idx_test").unwrap();
1362 assert_eq!(updated.metadata().status, IndexStatus::Building);
1363 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1364
1365 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1367
1368 Ok(())
1369 }
1370
1371 #[test]
1372 fn test_vector_index_for_property_skips_non_online() {
1373 let mut schema = Schema::default();
1374 schema.labels.insert(
1375 "Document".to_string(),
1376 LabelMeta {
1377 id: 1,
1378 created_at: chrono::Utc::now(),
1379 state: SchemaElementState::Active,
1380 },
1381 );
1382
1383 schema
1385 .indexes
1386 .push(IndexDefinition::Vector(VectorIndexConfig {
1387 name: "vec_doc_embedding".to_string(),
1388 label: "Document".to_string(),
1389 property: "embedding".to_string(),
1390 index_type: VectorIndexType::Flat,
1391 metric: DistanceMetric::Cosine,
1392 embedding_config: None,
1393 metadata: IndexMetadata {
1394 status: IndexStatus::Stale,
1395 ..Default::default()
1396 },
1397 }));
1398
1399 assert!(
1401 schema
1402 .vector_index_for_property("Document", "embedding")
1403 .is_none()
1404 );
1405
1406 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1408 cfg.metadata.status = IndexStatus::Online;
1409 }
1410 let result = schema.vector_index_for_property("Document", "embedding");
1411 assert!(result.is_some());
1412 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1413 }
1414}