1use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19 Active,
20 Hidden {
21 since: DateTime<Utc>,
22 last_active_snapshot: String, },
24 Tombstone {
25 since: DateTime<Utc>,
26 },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31pub fn datetime_struct_fields() -> Fields {
38 Fields::from(vec![
39 Field::new(
40 "nanos_since_epoch",
41 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42 true,
43 ),
44 Field::new("offset_seconds", ArrowDataType::Int32, true),
45 Field::new("timezone_name", ArrowDataType::Utf8, true),
46 ])
47}
48
49pub fn time_struct_fields() -> Fields {
55 Fields::from(vec![
56 Field::new(
57 "nanos_since_midnight",
58 ArrowDataType::Time64(TimeUnit::Nanosecond),
59 true,
60 ),
61 Field::new("offset_seconds", ArrowDataType::Int32, true),
62 ])
63}
64
65pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78 GCounter,
79 GSet,
80 ORSet,
81 LWWRegister,
82 LWWMap,
83 Rga,
84 VectorClock,
85 VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90 Geographic, Cartesian2D, Cartesian3D, }
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98 String,
99 Int32,
100 Int64,
101 Float32,
102 Float64,
103 Bool,
104 Timestamp,
105 Date,
106 Time,
107 DateTime,
108 Duration,
109 CypherValue,
110 Point(PointType),
111 Vector { dimensions: usize },
112 Crdt(CrdtType),
113 List(Box<DataType>),
114 Map(Box<DataType>, Box<DataType>),
115}
116
117impl DataType {
118 #[allow(non_upper_case_globals)]
120 pub const Float: DataType = DataType::Float64;
121 #[allow(non_upper_case_globals)]
122 pub const Int: DataType = DataType::Int64;
123
124 pub fn to_arrow(&self) -> ArrowDataType {
125 match self {
126 DataType::String => ArrowDataType::Utf8,
127 DataType::Int32 => ArrowDataType::Int32,
128 DataType::Int64 => ArrowDataType::Int64,
129 DataType::Float32 => ArrowDataType::Float32,
130 DataType::Float64 => ArrowDataType::Float64,
131 DataType::Bool => ArrowDataType::Boolean,
132 DataType::Timestamp => {
133 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
134 }
135 DataType::Date => ArrowDataType::Date32,
136 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
137 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
138 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
141 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
142 Field::new("latitude", ArrowDataType::Float64, false),
143 Field::new("longitude", ArrowDataType::Float64, false),
144 Field::new("crs", ArrowDataType::Utf8, false),
145 ])),
146 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
147 Field::new("x", ArrowDataType::Float64, false),
148 Field::new("y", ArrowDataType::Float64, false),
149 Field::new("crs", ArrowDataType::Utf8, false),
150 ])),
151 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
152 Field::new("x", ArrowDataType::Float64, false),
153 Field::new("y", ArrowDataType::Float64, false),
154 Field::new("z", ArrowDataType::Float64, false),
155 Field::new("crs", ArrowDataType::Utf8, false),
156 ])),
157 },
158 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
159 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
160 *dimensions as i32,
161 ),
162 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
164 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
165 }
166 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
167 "item",
168 ArrowDataType::Struct(Fields::from(vec![
169 Field::new("key", key.to_arrow(), false),
170 Field::new("value", value.to_arrow(), true),
171 ])),
172 true,
173 ))),
174 }
175 }
176}
177
178fn default_created_at() -> DateTime<Utc> {
179 Utc::now()
180}
181
182fn default_state() -> SchemaElementState {
183 SchemaElementState::Active
184}
185
186fn default_version_1() -> u32 {
187 1
188}
189
190#[derive(Clone, Debug, Serialize, Deserialize)]
191pub struct PropertyMeta {
192 pub r#type: DataType,
193 pub nullable: bool,
194 #[serde(default = "default_version_1")]
195 pub added_in: u32, #[serde(default = "default_state")]
197 pub state: SchemaElementState,
198 #[serde(default)]
199 pub generation_expression: Option<String>,
200}
201
202#[derive(Clone, Debug, Serialize, Deserialize)]
203pub struct LabelMeta {
204 pub id: u16, #[serde(default = "default_created_at")]
206 pub created_at: DateTime<Utc>,
207 #[serde(default = "default_state")]
208 pub state: SchemaElementState,
209}
210
211#[derive(Clone, Debug, Serialize, Deserialize)]
212pub struct EdgeTypeMeta {
213 pub id: u32,
215 pub src_labels: Vec<String>,
216 pub dst_labels: Vec<String>,
217 #[serde(default = "default_state")]
218 pub state: SchemaElementState,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
222#[non_exhaustive]
223pub enum ConstraintType {
224 Unique { properties: Vec<String> },
225 Exists { property: String },
226 Check { expression: String },
227}
228
229#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
230#[non_exhaustive]
231pub enum ConstraintTarget {
232 Label(String),
233 EdgeType(String),
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize)]
237pub struct Constraint {
238 pub name: String,
239 pub constraint_type: ConstraintType,
240 pub target: ConstraintTarget,
241 pub enabled: bool,
242}
243
244#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct SchemalessEdgeTypeRegistry {
251 name_to_id: HashMap<String, u32>,
252 id_to_name: HashMap<u32, String>,
253 next_local_id: u32,
255}
256
257impl SchemalessEdgeTypeRegistry {
258 pub fn new() -> Self {
259 Self {
260 name_to_id: HashMap::new(),
261 id_to_name: HashMap::new(),
262 next_local_id: 1,
263 }
264 }
265
266 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
268 if let Some(&id) = self.name_to_id.get(type_name) {
269 return id;
270 }
271
272 let id = make_schemaless_id(self.next_local_id);
273 self.next_local_id += 1;
274
275 self.name_to_id.insert(type_name.to_string(), id);
276 self.id_to_name.insert(id, type_name.to_string());
277
278 id
279 }
280
281 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
283 self.id_to_name.get(&type_id).map(String::as_str)
284 }
285
286 pub fn contains(&self, type_name: &str) -> bool {
288 self.name_to_id.contains_key(type_name)
289 }
290
291 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
293 self.name_to_id
294 .iter()
295 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
296 .map(|(_, &id)| id)
297 }
298
299 pub fn all_type_ids(&self) -> Vec<u32> {
301 self.id_to_name.keys().copied().collect()
302 }
303
304 pub fn is_empty(&self) -> bool {
306 self.name_to_id.is_empty()
307 }
308}
309
310impl Default for SchemalessEdgeTypeRegistry {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
317pub struct Schema {
318 pub schema_version: u32,
319 pub labels: HashMap<String, LabelMeta>,
320 pub edge_types: HashMap<String, EdgeTypeMeta>,
321 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
322 #[serde(default)]
323 pub indexes: Vec<IndexDefinition>,
324 #[serde(default)]
325 pub constraints: Vec<Constraint>,
326 #[serde(default)]
328 pub schemaless_registry: SchemalessEdgeTypeRegistry,
329}
330
331impl Default for Schema {
332 fn default() -> Self {
333 Self {
334 schema_version: 1,
335 labels: HashMap::new(),
336 edge_types: HashMap::new(),
337 properties: HashMap::new(),
338 indexes: Vec::new(),
339 constraints: Vec::new(),
340 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
341 }
342 }
343}
344
345impl Schema {
346 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
351 self.labels
352 .iter()
353 .find(|(_, meta)| meta.id == label_id)
354 .map(|(name, _)| name.as_str())
355 }
356
357 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
359 self.labels.get(label_name).map(|meta| meta.id)
360 }
361
362 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
367 self.edge_types
368 .iter()
369 .find(|(_, meta)| meta.id == type_id)
370 .map(|(name, _)| name.as_str())
371 }
372
373 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
375 self.edge_types.get(type_name).map(|meta| meta.id)
376 }
377
378 pub fn vector_index_for_property(
383 &self,
384 label: &str,
385 property: &str,
386 ) -> Option<&VectorIndexConfig> {
387 self.indexes.iter().find_map(|idx| {
388 if let IndexDefinition::Vector(config) = idx
389 && config.label == label
390 && config.property == property
391 && config.metadata.status == IndexStatus::Online
392 {
393 return Some(config);
394 }
395 None
396 })
397 }
398
399 pub fn fulltext_index_for_property(
404 &self,
405 label: &str,
406 property: &str,
407 ) -> Option<&FullTextIndexConfig> {
408 self.indexes.iter().find_map(|idx| {
409 if let IndexDefinition::FullText(config) = idx
410 && config.label == label
411 && config.properties.iter().any(|p| p == property)
412 && config.metadata.status == IndexStatus::Online
413 {
414 return Some(config);
415 }
416 None
417 })
418 }
419
420 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
425 self.labels
426 .iter()
427 .find(|(k, _)| k.eq_ignore_ascii_case(name))
428 .map(|(_, v)| v)
429 }
430
431 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
433 self.get_label_case_insensitive(label_name)
434 .map(|meta| meta.id)
435 }
436
437 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
442 self.edge_types
443 .iter()
444 .find(|(k, _)| k.eq_ignore_ascii_case(name))
445 .map(|(_, v)| v)
446 }
447
448 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
450 self.get_edge_type_case_insensitive(type_name)
451 .map(|meta| meta.id)
452 }
453
454 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
457 self.edge_type_id_by_name_case_insensitive(type_name)
458 .or_else(|| {
459 self.schemaless_registry
460 .id_by_name_case_insensitive(type_name)
461 })
462 }
463
464 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
470 if let Some(id) = self.edge_type_id_by_name(type_name) {
471 return id;
472 }
473 self.schemaless_registry.get_or_assign_id(type_name)
474 }
475
476 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
480 if is_schemaless_edge_type(type_id) {
481 self.schemaless_registry
482 .type_name_by_id(type_id)
483 .map(str::to_owned)
484 } else {
485 self.edge_type_name_by_id(type_id).map(str::to_owned)
486 }
487 }
488
489 pub fn all_edge_type_ids(&self) -> Vec<u32> {
492 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
493 ids.extend(self.schemaless_registry.all_type_ids());
494 ids.sort_unstable();
495 ids
496 }
497}
498
499#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
501pub enum IndexStatus {
502 #[default]
504 Online,
505 Building,
507 Stale,
509 Failed,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
515pub struct IndexMetadata {
516 #[serde(default)]
518 pub status: IndexStatus,
519 #[serde(default)]
521 pub last_built_at: Option<DateTime<Utc>>,
522 #[serde(default)]
524 pub row_count_at_build: Option<u64>,
525}
526
527#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
528#[serde(tag = "type")]
529#[non_exhaustive]
530pub enum IndexDefinition {
531 Vector(VectorIndexConfig),
532 FullText(FullTextIndexConfig),
533 Scalar(ScalarIndexConfig),
534 Inverted(InvertedIndexConfig),
535 JsonFullText(JsonFtsIndexConfig),
536}
537
538impl IndexDefinition {
539 pub fn name(&self) -> &str {
541 match self {
542 IndexDefinition::Vector(c) => &c.name,
543 IndexDefinition::FullText(c) => &c.name,
544 IndexDefinition::Scalar(c) => &c.name,
545 IndexDefinition::Inverted(c) => &c.name,
546 IndexDefinition::JsonFullText(c) => &c.name,
547 }
548 }
549
550 pub fn label(&self) -> &str {
552 match self {
553 IndexDefinition::Vector(c) => &c.label,
554 IndexDefinition::FullText(c) => &c.label,
555 IndexDefinition::Scalar(c) => &c.label,
556 IndexDefinition::Inverted(c) => &c.label,
557 IndexDefinition::JsonFullText(c) => &c.label,
558 }
559 }
560
561 pub fn metadata(&self) -> &IndexMetadata {
563 match self {
564 IndexDefinition::Vector(c) => &c.metadata,
565 IndexDefinition::FullText(c) => &c.metadata,
566 IndexDefinition::Scalar(c) => &c.metadata,
567 IndexDefinition::Inverted(c) => &c.metadata,
568 IndexDefinition::JsonFullText(c) => &c.metadata,
569 }
570 }
571
572 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
574 match self {
575 IndexDefinition::Vector(c) => &mut c.metadata,
576 IndexDefinition::FullText(c) => &mut c.metadata,
577 IndexDefinition::Scalar(c) => &mut c.metadata,
578 IndexDefinition::Inverted(c) => &mut c.metadata,
579 IndexDefinition::JsonFullText(c) => &mut c.metadata,
580 }
581 }
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
585pub struct InvertedIndexConfig {
586 pub name: String,
587 pub label: String,
588 pub property: String,
589 #[serde(default = "default_normalize")]
590 pub normalize: bool,
591 #[serde(default = "default_max_terms_per_doc")]
592 pub max_terms_per_doc: usize,
593 #[serde(default)]
594 pub metadata: IndexMetadata,
595}
596
597fn default_normalize() -> bool {
598 true
599}
600
601fn default_max_terms_per_doc() -> usize {
602 10_000
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
606pub struct VectorIndexConfig {
607 pub name: String,
608 pub label: String,
609 pub property: String,
610 pub index_type: VectorIndexType,
611 pub metric: DistanceMetric,
612 pub embedding_config: Option<EmbeddingConfig>,
613 #[serde(default)]
614 pub metadata: IndexMetadata,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
618pub struct EmbeddingConfig {
619 pub alias: String,
621 pub source_properties: Vec<String>,
622 pub batch_size: usize,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
626#[non_exhaustive]
627pub enum VectorIndexType {
628 IvfPq {
629 num_partitions: u32,
630 num_sub_vectors: u32,
631 bits_per_subvector: u8,
632 },
633 Hnsw {
634 m: u32,
635 ef_construction: u32,
636 ef_search: u32,
637 },
638 Flat,
639}
640
641#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
642#[non_exhaustive]
643pub enum DistanceMetric {
644 Cosine,
645 L2,
646 Dot,
647}
648
649impl DistanceMetric {
650 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
662 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
663 match self {
664 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
665 DistanceMetric::Cosine => {
666 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
667 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
668 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
669 let denom = norm_a * norm_b;
670 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
671 }
672 DistanceMetric::Dot => {
673 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
674 -dot
675 }
676 }
677 }
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
681pub struct FullTextIndexConfig {
682 pub name: String,
683 pub label: String,
684 pub properties: Vec<String>,
685 pub tokenizer: TokenizerConfig,
686 pub with_positions: bool,
687 #[serde(default)]
688 pub metadata: IndexMetadata,
689}
690
691#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
692#[non_exhaustive]
693pub enum TokenizerConfig {
694 Standard,
695 Whitespace,
696 Ngram { min: u8, max: u8 },
697 Custom { name: String },
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
701pub struct JsonFtsIndexConfig {
702 pub name: String,
703 pub label: String,
704 pub column: String,
705 #[serde(default)]
706 pub paths: Vec<String>,
707 #[serde(default)]
708 pub with_positions: bool,
709 #[serde(default)]
710 pub metadata: IndexMetadata,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
714pub struct ScalarIndexConfig {
715 pub name: String,
716 pub label: String,
717 pub properties: Vec<String>,
718 pub index_type: ScalarIndexType,
719 pub where_clause: Option<String>,
720 #[serde(default)]
721 pub metadata: IndexMetadata,
722}
723
724#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
725#[non_exhaustive]
726pub enum ScalarIndexType {
727 BTree,
728 Hash,
729 Bitmap,
730}
731
732pub struct SchemaManager {
733 store: Arc<dyn ObjectStore>,
734 path: ObjectStorePath,
735 schema: RwLock<Arc<Schema>>,
736}
737
738impl SchemaManager {
739 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
740 let path = path.as_ref();
741 let parent = path
742 .parent()
743 .ok_or_else(|| anyhow!("Invalid schema path"))?;
744 let filename = path
745 .file_name()
746 .ok_or_else(|| anyhow!("Invalid schema filename"))?
747 .to_str()
748 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
749
750 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
751 let obj_path = ObjectStorePath::from(filename);
752
753 Self::load_from_store(store, &obj_path).await
754 }
755
756 pub async fn load_from_store(
757 store: Arc<dyn ObjectStore>,
758 path: &ObjectStorePath,
759 ) -> Result<Self> {
760 match store.get(path).await {
761 Ok(result) => {
762 let bytes = result.bytes().await?;
763 let content = String::from_utf8(bytes.to_vec())?;
764 let schema: Schema = serde_json::from_str(&content)?;
765 Ok(Self {
766 store,
767 path: path.clone(),
768 schema: RwLock::new(Arc::new(schema)),
769 })
770 }
771 Err(object_store::Error::NotFound { .. }) => Ok(Self {
772 store,
773 path: path.clone(),
774 schema: RwLock::new(Arc::new(Schema::default())),
775 }),
776 Err(e) => Err(anyhow::Error::from(e)),
777 }
778 }
779
780 pub async fn save(&self) -> Result<()> {
781 let content = {
782 let schema_guard = acquire_read(&self.schema, "schema")?;
783 serde_json::to_string_pretty(&**schema_guard)?
784 };
785 self.store
786 .put(&self.path, content.into())
787 .await
788 .map_err(anyhow::Error::from)?;
789 Ok(())
790 }
791
792 pub fn path(&self) -> &ObjectStorePath {
793 &self.path
794 }
795
796 pub fn schema(&self) -> Arc<Schema> {
797 self.schema
798 .read()
799 .expect("Schema lock poisoned - a thread panicked while holding it")
800 .clone()
801 }
802
803 fn normalize_function_names(expr: &str) -> String {
806 let mut result = String::with_capacity(expr.len());
807 let mut chars = expr.chars().peekable();
808
809 while let Some(ch) = chars.next() {
810 if ch.is_alphabetic() {
811 let mut ident = String::new();
813 ident.push(ch);
814
815 while let Some(&next) = chars.peek() {
816 if next.is_alphanumeric() || next == '_' {
817 ident.push(chars.next().unwrap());
818 } else {
819 break;
820 }
821 }
822
823 if chars.peek() == Some(&'(') {
825 result.push_str(&ident.to_uppercase());
826 } else {
827 result.push_str(&ident); }
829 } else {
830 result.push(ch);
831 }
832 }
833
834 result
835 }
836
837 pub fn generated_column_name(expr: &str) -> String {
845 let normalized = Self::normalize_function_names(expr);
847
848 let sanitized = normalized
849 .replace(|c: char| !c.is_alphanumeric(), "_")
850 .trim_matches('_')
851 .to_string();
852
853 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
855 const FNV_PRIME: u64 = 1099511628211;
856
857 let mut hash = FNV_OFFSET_BASIS;
858 for byte in normalized.as_bytes() {
859 hash ^= *byte as u64;
860 hash = hash.wrapping_mul(FNV_PRIME);
861 }
862
863 format!("_gen_{}_{:x}", sanitized, hash)
864 }
865
866 pub fn replace_schema(&self, new_schema: Schema) {
867 let mut schema = self
868 .schema
869 .write()
870 .expect("Schema lock poisoned - a thread panicked while holding it");
871 *schema = Arc::new(new_schema);
872 }
873
874 pub fn next_label_id(&self) -> u16 {
875 self.schema()
876 .labels
877 .values()
878 .map(|l| l.id)
879 .max()
880 .unwrap_or(0)
881 + 1
882 }
883
884 pub fn next_type_id(&self) -> u32 {
885 let max_schema_id = self
886 .schema()
887 .edge_types
888 .values()
889 .map(|t| t.id)
890 .max()
891 .unwrap_or(0);
892
893 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
895 panic!("Schema edge type ID exhaustion");
896 }
897
898 max_schema_id + 1
899 }
900
901 pub fn add_label(&self, name: &str) -> Result<u16> {
902 let mut guard = acquire_write(&self.schema, "schema")?;
903 let schema = Arc::make_mut(&mut *guard);
904 if schema.labels.contains_key(name) {
905 return Err(anyhow!("Label '{}' already exists", name));
906 }
907
908 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
909 schema.labels.insert(
910 name.to_string(),
911 LabelMeta {
912 id,
913 created_at: Utc::now(),
914 state: SchemaElementState::Active,
915 },
916 );
917 Ok(id)
918 }
919
920 pub fn add_edge_type(
921 &self,
922 name: &str,
923 src_labels: Vec<String>,
924 dst_labels: Vec<String>,
925 ) -> Result<u32> {
926 let mut guard = acquire_write(&self.schema, "schema")?;
927 let schema = Arc::make_mut(&mut *guard);
928 if schema.edge_types.contains_key(name) {
929 return Err(anyhow!("Edge type '{}' already exists", name));
930 }
931
932 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
933
934 if id >= MAX_SCHEMA_TYPE_ID {
936 return Err(anyhow!("Schema edge type ID exhaustion"));
937 }
938
939 schema.edge_types.insert(
940 name.to_string(),
941 EdgeTypeMeta {
942 id,
943 src_labels,
944 dst_labels,
945 state: SchemaElementState::Active,
946 },
947 );
948 Ok(id)
949 }
950
951 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
953 let mut guard = acquire_write(&self.schema, "schema")
954 .expect("Schema lock poisoned - a thread panicked while holding it");
955 let schema = Arc::make_mut(&mut *guard);
956 schema.get_or_assign_edge_type_id(type_name)
957 }
958
959 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
961 let schema = acquire_read(&self.schema, "schema")
962 .expect("Schema lock poisoned - a thread panicked while holding it");
963 schema.edge_type_name_by_id_unified(type_id)
964 }
965
966 pub fn add_property(
967 &self,
968 label_or_type: &str,
969 prop_name: &str,
970 data_type: DataType,
971 nullable: bool,
972 ) -> Result<()> {
973 let mut guard = acquire_write(&self.schema, "schema")?;
974 let schema = Arc::make_mut(&mut *guard);
975 let version = schema.schema_version;
976 let props = schema
977 .properties
978 .entry(label_or_type.to_string())
979 .or_default();
980
981 if props.contains_key(prop_name) {
982 return Err(anyhow!(
983 "Property '{}' already exists for '{}'",
984 prop_name,
985 label_or_type
986 ));
987 }
988
989 props.insert(
990 prop_name.to_string(),
991 PropertyMeta {
992 r#type: data_type,
993 nullable,
994 added_in: version,
995 state: SchemaElementState::Active,
996 generation_expression: None,
997 },
998 );
999 Ok(())
1000 }
1001
1002 pub fn add_generated_property(
1003 &self,
1004 label_or_type: &str,
1005 prop_name: &str,
1006 data_type: DataType,
1007 expr: String,
1008 ) -> Result<()> {
1009 let mut guard = acquire_write(&self.schema, "schema")?;
1010 let schema = Arc::make_mut(&mut *guard);
1011 let version = schema.schema_version;
1012 let props = schema
1013 .properties
1014 .entry(label_or_type.to_string())
1015 .or_default();
1016
1017 if props.contains_key(prop_name) {
1018 return Err(anyhow!("Property '{}' already exists", prop_name));
1019 }
1020
1021 props.insert(
1022 prop_name.to_string(),
1023 PropertyMeta {
1024 r#type: data_type,
1025 nullable: true,
1026 added_in: version,
1027 state: SchemaElementState::Active,
1028 generation_expression: Some(expr),
1029 },
1030 );
1031 Ok(())
1032 }
1033
1034 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1035 let mut guard = acquire_write(&self.schema, "schema")?;
1036 let schema = Arc::make_mut(&mut *guard);
1037 schema.indexes.push(index_def);
1038 Ok(())
1039 }
1040
1041 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1042 let schema = self.schema.read().expect("Schema lock poisoned");
1043 schema.indexes.iter().find(|i| i.name() == name).cloned()
1044 }
1045
1046 pub fn update_index_metadata(
1051 &self,
1052 index_name: &str,
1053 f: impl FnOnce(&mut IndexMetadata),
1054 ) -> Result<()> {
1055 let mut guard = acquire_write(&self.schema, "schema")?;
1056 let schema = Arc::make_mut(&mut *guard);
1057 let idx = schema
1058 .indexes
1059 .iter_mut()
1060 .find(|i| i.name() == index_name)
1061 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1062 f(idx.metadata_mut());
1063 Ok(())
1064 }
1065
1066 pub fn remove_index(&self, name: &str) -> Result<()> {
1067 let mut guard = acquire_write(&self.schema, "schema")?;
1068 let schema = Arc::make_mut(&mut *guard);
1069 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1070 schema.indexes.remove(pos);
1071 Ok(())
1072 } else {
1073 Err(anyhow!("Index '{}' not found", name))
1074 }
1075 }
1076
1077 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1078 let mut guard = acquire_write(&self.schema, "schema")?;
1079 let schema = Arc::make_mut(&mut *guard);
1080 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1081 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1082 }
1083 schema.constraints.push(constraint);
1084 Ok(())
1085 }
1086
1087 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1088 let mut guard = acquire_write(&self.schema, "schema")?;
1089 let schema = Arc::make_mut(&mut *guard);
1090 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1091 schema.constraints.remove(pos);
1092 Ok(())
1093 } else if if_exists {
1094 Ok(())
1095 } else {
1096 Err(anyhow!("Constraint '{}' not found", name))
1097 }
1098 }
1099
1100 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1101 let mut guard = acquire_write(&self.schema, "schema")?;
1102 let schema = Arc::make_mut(&mut *guard);
1103 if let Some(props) = schema.properties.get_mut(label_or_type) {
1104 if props.remove(prop_name).is_some() {
1105 Ok(())
1106 } else {
1107 Err(anyhow!(
1108 "Property '{}' not found for '{}'",
1109 prop_name,
1110 label_or_type
1111 ))
1112 }
1113 } else {
1114 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1115 }
1116 }
1117
1118 pub fn rename_property(
1119 &self,
1120 label_or_type: &str,
1121 old_name: &str,
1122 new_name: &str,
1123 ) -> Result<()> {
1124 let mut guard = acquire_write(&self.schema, "schema")?;
1125 let schema = Arc::make_mut(&mut *guard);
1126 if let Some(props) = schema.properties.get_mut(label_or_type) {
1127 if let Some(meta) = props.remove(old_name) {
1128 if props.contains_key(new_name) {
1129 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1132 }
1133 props.insert(new_name.to_string(), meta);
1134 Ok(())
1135 } else {
1136 Err(anyhow!(
1137 "Property '{}' not found for '{}'",
1138 old_name,
1139 label_or_type
1140 ))
1141 }
1142 } else {
1143 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1144 }
1145 }
1146
1147 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1148 let mut guard = acquire_write(&self.schema, "schema")?;
1149 let schema = Arc::make_mut(&mut *guard);
1150 if let Some(label_meta) = schema.labels.get_mut(name) {
1151 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1152 Ok(())
1154 } else if if_exists {
1155 Ok(())
1156 } else {
1157 Err(anyhow!("Label '{}' not found", name))
1158 }
1159 }
1160
1161 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1162 let mut guard = acquire_write(&self.schema, "schema")?;
1163 let schema = Arc::make_mut(&mut *guard);
1164 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1165 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1166 Ok(())
1168 } else if if_exists {
1169 Ok(())
1170 } else {
1171 Err(anyhow!("Edge Type '{}' not found", name))
1172 }
1173 }
1174}
1175
1176pub fn validate_identifier(name: &str) -> Result<()> {
1178 if name.is_empty() || name.len() > 64 {
1180 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1181 }
1182
1183 let first = name.chars().next().unwrap();
1185 if !first.is_alphabetic() && first != '_' {
1186 return Err(anyhow!(
1187 "Identifier '{}' must start with letter or underscore",
1188 name
1189 ));
1190 }
1191
1192 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1194 return Err(anyhow!(
1195 "Identifier '{}' must contain only alphanumeric and underscore",
1196 name
1197 ));
1198 }
1199
1200 const RESERVED: &[&str] = &[
1202 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1203 "UNION", "ORDER", "LIMIT",
1204 ];
1205 if RESERVED.contains(&name.to_uppercase().as_str()) {
1206 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1207 }
1208
1209 Ok(())
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215 use object_store::local::LocalFileSystem;
1216 use tempfile::tempdir;
1217
1218 #[tokio::test]
1219 async fn test_schema_management() -> Result<()> {
1220 let dir = tempdir()?;
1221 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1222 let path = ObjectStorePath::from("schema.json");
1223 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1224
1225 let lid = manager.add_label("Person")?;
1227 assert_eq!(lid, 1);
1228 assert!(manager.add_label("Person").is_err());
1229
1230 manager.add_property("Person", "name", DataType::String, false)?;
1232 assert!(
1233 manager
1234 .add_property("Person", "name", DataType::String, false)
1235 .is_err()
1236 );
1237
1238 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1240 assert_eq!(tid, 1);
1241
1242 manager.save().await?;
1243 assert!(store.get(&path).await.is_ok());
1245
1246 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1247 assert!(manager2.schema().labels.contains_key("Person"));
1248 assert!(
1249 manager2
1250 .schema()
1251 .properties
1252 .get("Person")
1253 .unwrap()
1254 .contains_key("name")
1255 );
1256
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn test_normalize_function_names() {
1262 assert_eq!(
1263 SchemaManager::normalize_function_names("lower(email)"),
1264 "LOWER(email)"
1265 );
1266 assert_eq!(
1267 SchemaManager::normalize_function_names("LOWER(email)"),
1268 "LOWER(email)"
1269 );
1270 assert_eq!(
1271 SchemaManager::normalize_function_names("Lower(email)"),
1272 "LOWER(email)"
1273 );
1274 assert_eq!(
1275 SchemaManager::normalize_function_names("trim(lower(email))"),
1276 "TRIM(LOWER(email))"
1277 );
1278 }
1279
1280 #[test]
1281 fn test_generated_column_name_case_insensitive() {
1282 let col1 = SchemaManager::generated_column_name("lower(email)");
1283 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1284 let col3 = SchemaManager::generated_column_name("Lower(email)");
1285 assert_eq!(col1, col2);
1286 assert_eq!(col2, col3);
1287 assert!(col1.starts_with("_gen_LOWER_email_"));
1288 }
1289
1290 #[test]
1291 fn test_index_metadata_serde_backward_compat() {
1292 let json = r#"{
1294 "type": "Scalar",
1295 "name": "idx_person_name",
1296 "label": "Person",
1297 "properties": ["name"],
1298 "index_type": "BTree",
1299 "where_clause": null
1300 }"#;
1301 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1302 let meta = def.metadata();
1303 assert_eq!(meta.status, IndexStatus::Online);
1304 assert!(meta.last_built_at.is_none());
1305 assert!(meta.row_count_at_build.is_none());
1306 }
1307
1308 #[test]
1309 fn test_index_metadata_serde_roundtrip() {
1310 let now = Utc::now();
1311 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1312 name: "idx_test".to_string(),
1313 label: "Test".to_string(),
1314 properties: vec!["prop".to_string()],
1315 index_type: ScalarIndexType::BTree,
1316 where_clause: None,
1317 metadata: IndexMetadata {
1318 status: IndexStatus::Building,
1319 last_built_at: Some(now),
1320 row_count_at_build: Some(42),
1321 },
1322 });
1323
1324 let json = serde_json::to_string(&def).unwrap();
1325 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1326 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1327 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1328 assert!(parsed.metadata().last_built_at.is_some());
1329 }
1330
1331 #[tokio::test]
1332 async fn test_update_index_metadata() -> Result<()> {
1333 let dir = tempdir()?;
1334 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1335 let path = ObjectStorePath::from("schema.json");
1336 let manager = SchemaManager::load_from_store(store, &path).await?;
1337
1338 manager.add_label("Person")?;
1339 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1340 name: "idx_test".to_string(),
1341 label: "Person".to_string(),
1342 properties: vec!["name".to_string()],
1343 index_type: ScalarIndexType::BTree,
1344 where_clause: None,
1345 metadata: Default::default(),
1346 });
1347 manager.add_index(idx)?;
1348
1349 let initial = manager.get_index("idx_test").unwrap();
1351 assert_eq!(initial.metadata().status, IndexStatus::Online);
1352
1353 manager.update_index_metadata("idx_test", |m| {
1355 m.status = IndexStatus::Building;
1356 m.row_count_at_build = Some(100);
1357 })?;
1358
1359 let updated = manager.get_index("idx_test").unwrap();
1360 assert_eq!(updated.metadata().status, IndexStatus::Building);
1361 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1362
1363 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1365
1366 Ok(())
1367 }
1368
1369 #[test]
1370 fn test_vector_index_for_property_skips_non_online() {
1371 let mut schema = Schema::default();
1372 schema.labels.insert(
1373 "Document".to_string(),
1374 LabelMeta {
1375 id: 1,
1376 created_at: chrono::Utc::now(),
1377 state: SchemaElementState::Active,
1378 },
1379 );
1380
1381 schema
1383 .indexes
1384 .push(IndexDefinition::Vector(VectorIndexConfig {
1385 name: "vec_doc_embedding".to_string(),
1386 label: "Document".to_string(),
1387 property: "embedding".to_string(),
1388 index_type: VectorIndexType::Flat,
1389 metric: DistanceMetric::Cosine,
1390 embedding_config: None,
1391 metadata: IndexMetadata {
1392 status: IndexStatus::Stale,
1393 ..Default::default()
1394 },
1395 }));
1396
1397 assert!(
1399 schema
1400 .vector_index_for_property("Document", "embedding")
1401 .is_none()
1402 );
1403
1404 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1406 cfg.metadata.status = IndexStatus::Online;
1407 }
1408 let result = schema.vector_index_for_property("Document", "embedding");
1409 assert!(result.is_some());
1410 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1411 }
1412}