1use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19 Active,
20 Hidden {
21 since: DateTime<Utc>,
22 last_active_snapshot: String, },
24 Tombstone {
25 since: DateTime<Utc>,
26 },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31pub fn datetime_struct_fields() -> Fields {
38 Fields::from(vec![
39 Field::new(
40 "nanos_since_epoch",
41 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42 true,
43 ),
44 Field::new("offset_seconds", ArrowDataType::Int32, true),
45 Field::new("timezone_name", ArrowDataType::Utf8, true),
46 ])
47}
48
49pub fn time_struct_fields() -> Fields {
55 Fields::from(vec![
56 Field::new(
57 "nanos_since_midnight",
58 ArrowDataType::Time64(TimeUnit::Nanosecond),
59 true,
60 ),
61 Field::new("offset_seconds", ArrowDataType::Int32, true),
62 ])
63}
64
65pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72 matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78 GCounter,
79 GSet,
80 ORSet,
81 LWWRegister,
82 LWWMap,
83 Rga,
84 VectorClock,
85 VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90 Geographic, Cartesian2D, Cartesian3D, }
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98 String,
99 Int32,
100 Int64,
101 Float32,
102 Float64,
103 Bool,
104 Timestamp,
105 Date,
106 Time,
107 DateTime,
108 Duration,
109 CypherValue,
110 Point(PointType),
111 Vector { dimensions: usize },
112 Btic,
113 Crdt(CrdtType),
114 List(Box<DataType>),
115 Map(Box<DataType>, Box<DataType>),
116}
117
118impl DataType {
119 #[allow(non_upper_case_globals)]
121 pub const Float: DataType = DataType::Float64;
122 #[allow(non_upper_case_globals)]
123 pub const Int: DataType = DataType::Int64;
124
125 pub fn to_arrow(&self) -> ArrowDataType {
126 match self {
127 DataType::String => ArrowDataType::Utf8,
128 DataType::Int32 => ArrowDataType::Int32,
129 DataType::Int64 => ArrowDataType::Int64,
130 DataType::Float32 => ArrowDataType::Float32,
131 DataType::Float64 => ArrowDataType::Float64,
132 DataType::Bool => ArrowDataType::Boolean,
133 DataType::Timestamp => {
134 ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
135 }
136 DataType::Date => ArrowDataType::Date32,
137 DataType::Time => ArrowDataType::Struct(time_struct_fields()),
138 DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
139 DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
142 PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
143 Field::new("latitude", ArrowDataType::Float64, false),
144 Field::new("longitude", ArrowDataType::Float64, false),
145 Field::new("crs", ArrowDataType::Utf8, false),
146 ])),
147 PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
148 Field::new("x", ArrowDataType::Float64, false),
149 Field::new("y", ArrowDataType::Float64, false),
150 Field::new("crs", ArrowDataType::Utf8, false),
151 ])),
152 PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
153 Field::new("x", ArrowDataType::Float64, false),
154 Field::new("y", ArrowDataType::Float64, false),
155 Field::new("z", ArrowDataType::Float64, false),
156 Field::new("crs", ArrowDataType::Utf8, false),
157 ])),
158 },
159 DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
160 Arc::new(Field::new("item", ArrowDataType::Float32, true)),
161 *dimensions as i32,
162 ),
163 DataType::Btic => ArrowDataType::FixedSizeBinary(24),
164 DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
166 ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
167 }
168 DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
169 "item",
170 ArrowDataType::Struct(Fields::from(vec![
171 Field::new("key", key.to_arrow(), false),
172 Field::new("value", value.to_arrow(), true),
173 ])),
174 true,
175 ))),
176 }
177 }
178}
179
180fn default_created_at() -> DateTime<Utc> {
181 Utc::now()
182}
183
184fn default_state() -> SchemaElementState {
185 SchemaElementState::Active
186}
187
188fn default_version_1() -> u32 {
189 1
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct PropertyMeta {
194 pub r#type: DataType,
195 pub nullable: bool,
196 #[serde(default = "default_version_1")]
197 pub added_in: u32, #[serde(default = "default_state")]
199 pub state: SchemaElementState,
200 #[serde(default)]
201 pub generation_expression: Option<String>,
202}
203
204#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct LabelMeta {
206 pub id: u16, #[serde(default = "default_created_at")]
208 pub created_at: DateTime<Utc>,
209 #[serde(default = "default_state")]
210 pub state: SchemaElementState,
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize)]
214pub struct EdgeTypeMeta {
215 pub id: u32,
217 pub src_labels: Vec<String>,
218 pub dst_labels: Vec<String>,
219 #[serde(default = "default_state")]
220 pub state: SchemaElementState,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224#[non_exhaustive]
225pub enum ConstraintType {
226 Unique { properties: Vec<String> },
227 Exists { property: String },
228 Check { expression: String },
229}
230
231#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
232#[non_exhaustive]
233pub enum ConstraintTarget {
234 Label(String),
235 EdgeType(String),
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize)]
239pub struct Constraint {
240 pub name: String,
241 pub constraint_type: ConstraintType,
242 pub target: ConstraintTarget,
243 pub enabled: bool,
244}
245
246#[derive(Clone, Debug, Serialize, Deserialize)]
252pub struct SchemalessEdgeTypeRegistry {
253 name_to_id: HashMap<String, u32>,
254 id_to_name: HashMap<u32, String>,
255 next_local_id: u32,
257}
258
259impl SchemalessEdgeTypeRegistry {
260 pub fn new() -> Self {
261 Self {
262 name_to_id: HashMap::new(),
263 id_to_name: HashMap::new(),
264 next_local_id: 1,
265 }
266 }
267
268 pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
270 if let Some(&id) = self.name_to_id.get(type_name) {
271 return id;
272 }
273
274 let id = make_schemaless_id(self.next_local_id);
275 self.next_local_id += 1;
276
277 self.name_to_id.insert(type_name.to_string(), id);
278 self.id_to_name.insert(id, type_name.to_string());
279
280 id
281 }
282
283 pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
285 self.id_to_name.get(&type_id).map(String::as_str)
286 }
287
288 pub fn contains(&self, type_name: &str) -> bool {
290 self.name_to_id.contains_key(type_name)
291 }
292
293 pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
295 self.name_to_id
296 .iter()
297 .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
298 .map(|(_, &id)| id)
299 }
300
301 pub fn all_type_ids(&self) -> Vec<u32> {
303 self.id_to_name.keys().copied().collect()
304 }
305
306 pub fn is_empty(&self) -> bool {
308 self.name_to_id.is_empty()
309 }
310}
311
312impl Default for SchemalessEdgeTypeRegistry {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct Schema {
320 pub schema_version: u32,
321 pub labels: HashMap<String, LabelMeta>,
322 pub edge_types: HashMap<String, EdgeTypeMeta>,
323 pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
324 #[serde(default)]
325 pub indexes: Vec<IndexDefinition>,
326 #[serde(default)]
327 pub constraints: Vec<Constraint>,
328 #[serde(default)]
330 pub schemaless_registry: SchemalessEdgeTypeRegistry,
331}
332
333impl Default for Schema {
334 fn default() -> Self {
335 Self {
336 schema_version: 1,
337 labels: HashMap::new(),
338 edge_types: HashMap::new(),
339 properties: HashMap::new(),
340 indexes: Vec::new(),
341 constraints: Vec::new(),
342 schemaless_registry: SchemalessEdgeTypeRegistry::new(),
343 }
344 }
345}
346
347impl Schema {
348 pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
353 self.labels
354 .iter()
355 .find(|(_, meta)| meta.id == label_id)
356 .map(|(name, _)| name.as_str())
357 }
358
359 pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
361 self.labels.get(label_name).map(|meta| meta.id)
362 }
363
364 pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
369 self.edge_types
370 .iter()
371 .find(|(_, meta)| meta.id == type_id)
372 .map(|(name, _)| name.as_str())
373 }
374
375 pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
377 self.edge_types.get(type_name).map(|meta| meta.id)
378 }
379
380 pub fn vector_index_for_property(
385 &self,
386 label: &str,
387 property: &str,
388 ) -> Option<&VectorIndexConfig> {
389 self.indexes.iter().find_map(|idx| {
390 if let IndexDefinition::Vector(config) = idx
391 && config.label == label
392 && config.property == property
393 && config.metadata.status == IndexStatus::Online
394 {
395 return Some(config);
396 }
397 None
398 })
399 }
400
401 pub fn fulltext_index_for_property(
406 &self,
407 label: &str,
408 property: &str,
409 ) -> Option<&FullTextIndexConfig> {
410 self.indexes.iter().find_map(|idx| {
411 if let IndexDefinition::FullText(config) = idx
412 && config.label == label
413 && config.properties.iter().any(|p| p == property)
414 && config.metadata.status == IndexStatus::Online
415 {
416 return Some(config);
417 }
418 None
419 })
420 }
421
422 pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
427 self.labels
428 .iter()
429 .find(|(k, _)| k.eq_ignore_ascii_case(name))
430 .map(|(_, v)| v)
431 }
432
433 pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
435 self.get_label_case_insensitive(label_name)
436 .map(|meta| meta.id)
437 }
438
439 pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
444 self.edge_types
445 .iter()
446 .find(|(k, _)| k.eq_ignore_ascii_case(name))
447 .map(|(_, v)| v)
448 }
449
450 pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
452 self.get_edge_type_case_insensitive(type_name)
453 .map(|meta| meta.id)
454 }
455
456 pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
459 self.edge_type_id_by_name_case_insensitive(type_name)
460 .or_else(|| {
461 self.schemaless_registry
462 .id_by_name_case_insensitive(type_name)
463 })
464 }
465
466 pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
472 if let Some(id) = self.edge_type_id_by_name(type_name) {
473 return id;
474 }
475 self.schemaless_registry.get_or_assign_id(type_name)
476 }
477
478 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
482 if is_schemaless_edge_type(type_id) {
483 self.schemaless_registry
484 .type_name_by_id(type_id)
485 .map(str::to_owned)
486 } else {
487 self.edge_type_name_by_id(type_id).map(str::to_owned)
488 }
489 }
490
491 pub fn all_edge_type_ids(&self) -> Vec<u32> {
494 let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
495 ids.extend(self.schemaless_registry.all_type_ids());
496 ids.sort_unstable();
497 ids
498 }
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
503pub enum IndexStatus {
504 #[default]
506 Online,
507 Building,
509 Stale,
511 Failed,
513}
514
515#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
517pub struct IndexMetadata {
518 #[serde(default)]
520 pub status: IndexStatus,
521 #[serde(default)]
523 pub last_built_at: Option<DateTime<Utc>>,
524 #[serde(default)]
526 pub row_count_at_build: Option<u64>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
530#[serde(tag = "type")]
531#[non_exhaustive]
532pub enum IndexDefinition {
533 Vector(VectorIndexConfig),
534 FullText(FullTextIndexConfig),
535 Scalar(ScalarIndexConfig),
536 Inverted(InvertedIndexConfig),
537 JsonFullText(JsonFtsIndexConfig),
538}
539
540impl IndexDefinition {
541 pub fn name(&self) -> &str {
543 match self {
544 IndexDefinition::Vector(c) => &c.name,
545 IndexDefinition::FullText(c) => &c.name,
546 IndexDefinition::Scalar(c) => &c.name,
547 IndexDefinition::Inverted(c) => &c.name,
548 IndexDefinition::JsonFullText(c) => &c.name,
549 }
550 }
551
552 pub fn label(&self) -> &str {
554 match self {
555 IndexDefinition::Vector(c) => &c.label,
556 IndexDefinition::FullText(c) => &c.label,
557 IndexDefinition::Scalar(c) => &c.label,
558 IndexDefinition::Inverted(c) => &c.label,
559 IndexDefinition::JsonFullText(c) => &c.label,
560 }
561 }
562
563 pub fn metadata(&self) -> &IndexMetadata {
565 match self {
566 IndexDefinition::Vector(c) => &c.metadata,
567 IndexDefinition::FullText(c) => &c.metadata,
568 IndexDefinition::Scalar(c) => &c.metadata,
569 IndexDefinition::Inverted(c) => &c.metadata,
570 IndexDefinition::JsonFullText(c) => &c.metadata,
571 }
572 }
573
574 pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
576 match self {
577 IndexDefinition::Vector(c) => &mut c.metadata,
578 IndexDefinition::FullText(c) => &mut c.metadata,
579 IndexDefinition::Scalar(c) => &mut c.metadata,
580 IndexDefinition::Inverted(c) => &mut c.metadata,
581 IndexDefinition::JsonFullText(c) => &mut c.metadata,
582 }
583 }
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub struct InvertedIndexConfig {
588 pub name: String,
589 pub label: String,
590 pub property: String,
591 #[serde(default = "default_normalize")]
592 pub normalize: bool,
593 #[serde(default = "default_max_terms_per_doc")]
594 pub max_terms_per_doc: usize,
595 #[serde(default)]
596 pub metadata: IndexMetadata,
597}
598
599fn default_normalize() -> bool {
600 true
601}
602
603fn default_max_terms_per_doc() -> usize {
604 10_000
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
608pub struct VectorIndexConfig {
609 pub name: String,
610 pub label: String,
611 pub property: String,
612 pub index_type: VectorIndexType,
613 pub metric: DistanceMetric,
614 pub embedding_config: Option<EmbeddingConfig>,
615 #[serde(default)]
616 pub metadata: IndexMetadata,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
620pub struct EmbeddingConfig {
621 pub alias: String,
623 pub source_properties: Vec<String>,
624 pub batch_size: usize,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
628#[non_exhaustive]
629pub enum VectorIndexType {
630 Flat,
631 IvfFlat {
632 num_partitions: u32,
633 },
634 IvfPq {
635 num_partitions: u32,
636 num_sub_vectors: u32,
637 bits_per_subvector: u8,
638 },
639 IvfSq {
640 num_partitions: u32,
641 },
642 IvfRq {
643 num_partitions: u32,
644 #[serde(default)]
645 num_bits: Option<u8>,
646 },
647 HnswFlat {
648 m: u32,
649 ef_construction: u32,
650 #[serde(default)]
651 num_partitions: Option<u32>,
652 },
653 HnswSq {
654 m: u32,
655 ef_construction: u32,
656 #[serde(default)]
657 num_partitions: Option<u32>,
658 },
659 HnswPq {
660 m: u32,
661 ef_construction: u32,
662 num_sub_vectors: u32,
663 #[serde(default)]
664 num_partitions: Option<u32>,
665 },
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
669#[non_exhaustive]
670pub enum DistanceMetric {
671 Cosine,
672 L2,
673 Dot,
674}
675
676impl DistanceMetric {
677 pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
689 assert_eq!(a.len(), b.len(), "vector dimension mismatch");
690 match self {
691 DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
692 DistanceMetric::Cosine => {
693 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
694 let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
695 let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
696 let denom = norm_a * norm_b;
697 if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
698 }
699 DistanceMetric::Dot => {
700 let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
701 -dot
702 }
703 }
704 }
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
708pub struct FullTextIndexConfig {
709 pub name: String,
710 pub label: String,
711 pub properties: Vec<String>,
712 pub tokenizer: TokenizerConfig,
713 pub with_positions: bool,
714 #[serde(default)]
715 pub metadata: IndexMetadata,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
719#[non_exhaustive]
720pub enum TokenizerConfig {
721 Standard,
722 Whitespace,
723 Ngram { min: u8, max: u8 },
724 Custom { name: String },
725}
726
727#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
728pub struct JsonFtsIndexConfig {
729 pub name: String,
730 pub label: String,
731 pub column: String,
732 #[serde(default)]
733 pub paths: Vec<String>,
734 #[serde(default)]
735 pub with_positions: bool,
736 #[serde(default)]
737 pub metadata: IndexMetadata,
738}
739
740#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
741pub struct ScalarIndexConfig {
742 pub name: String,
743 pub label: String,
744 pub properties: Vec<String>,
745 pub index_type: ScalarIndexType,
746 pub where_clause: Option<String>,
747 #[serde(default)]
748 pub metadata: IndexMetadata,
749}
750
751#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
752#[non_exhaustive]
753pub enum ScalarIndexType {
754 BTree,
755 Hash,
756 Bitmap,
757 LabelList,
758}
759
760pub struct SchemaManager {
761 store: Arc<dyn ObjectStore>,
762 path: ObjectStorePath,
763 schema: RwLock<Arc<Schema>>,
764}
765
766impl SchemaManager {
767 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
768 let path = path.as_ref();
769 let parent = path
770 .parent()
771 .ok_or_else(|| anyhow!("Invalid schema path"))?;
772 let filename = path
773 .file_name()
774 .ok_or_else(|| anyhow!("Invalid schema filename"))?
775 .to_str()
776 .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
777
778 let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
779 let obj_path = ObjectStorePath::from(filename);
780
781 Self::load_from_store(store, &obj_path).await
782 }
783
784 pub async fn load_from_store(
785 store: Arc<dyn ObjectStore>,
786 path: &ObjectStorePath,
787 ) -> Result<Self> {
788 match store.get(path).await {
789 Ok(result) => {
790 let bytes = result.bytes().await?;
791 let content = String::from_utf8(bytes.to_vec())?;
792 let schema: Schema = serde_json::from_str(&content)?;
793 Ok(Self {
794 store,
795 path: path.clone(),
796 schema: RwLock::new(Arc::new(schema)),
797 })
798 }
799 Err(object_store::Error::NotFound { .. }) => Ok(Self {
800 store,
801 path: path.clone(),
802 schema: RwLock::new(Arc::new(Schema::default())),
803 }),
804 Err(e) => Err(anyhow::Error::from(e)),
805 }
806 }
807
808 pub async fn save(&self) -> Result<()> {
809 let content = {
810 let schema_guard = acquire_read(&self.schema, "schema")?;
811 serde_json::to_string_pretty(&**schema_guard)?
812 };
813 self.store
814 .put(&self.path, content.into())
815 .await
816 .map_err(anyhow::Error::from)?;
817 Ok(())
818 }
819
820 pub fn path(&self) -> &ObjectStorePath {
821 &self.path
822 }
823
824 pub fn schema(&self) -> Arc<Schema> {
825 self.schema
826 .read()
827 .expect("Schema lock poisoned - a thread panicked while holding it")
828 .clone()
829 }
830
831 fn normalize_function_names(expr: &str) -> String {
834 let mut result = String::with_capacity(expr.len());
835 let mut chars = expr.chars().peekable();
836
837 while let Some(ch) = chars.next() {
838 if ch.is_alphabetic() {
839 let mut ident = String::new();
841 ident.push(ch);
842
843 while let Some(&next) = chars.peek() {
844 if next.is_alphanumeric() || next == '_' {
845 ident.push(chars.next().unwrap());
846 } else {
847 break;
848 }
849 }
850
851 if chars.peek() == Some(&'(') {
853 result.push_str(&ident.to_uppercase());
854 } else {
855 result.push_str(&ident); }
857 } else {
858 result.push(ch);
859 }
860 }
861
862 result
863 }
864
865 pub fn generated_column_name(expr: &str) -> String {
873 let normalized = Self::normalize_function_names(expr);
875
876 let sanitized = normalized
877 .replace(|c: char| !c.is_alphanumeric(), "_")
878 .trim_matches('_')
879 .to_string();
880
881 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
883 const FNV_PRIME: u64 = 1099511628211;
884
885 let mut hash = FNV_OFFSET_BASIS;
886 for byte in normalized.as_bytes() {
887 hash ^= *byte as u64;
888 hash = hash.wrapping_mul(FNV_PRIME);
889 }
890
891 format!("_gen_{}_{:x}", sanitized, hash)
892 }
893
894 pub fn replace_schema(&self, new_schema: Schema) {
895 let mut schema = self
896 .schema
897 .write()
898 .expect("Schema lock poisoned - a thread panicked while holding it");
899 *schema = Arc::new(new_schema);
900 }
901
902 pub fn next_label_id(&self) -> u16 {
903 self.schema()
904 .labels
905 .values()
906 .map(|l| l.id)
907 .max()
908 .unwrap_or(0)
909 + 1
910 }
911
912 pub fn next_type_id(&self) -> u32 {
913 let max_schema_id = self
914 .schema()
915 .edge_types
916 .values()
917 .map(|t| t.id)
918 .max()
919 .unwrap_or(0);
920
921 if max_schema_id >= MAX_SCHEMA_TYPE_ID {
923 panic!("Schema edge type ID exhaustion");
924 }
925
926 max_schema_id + 1
927 }
928
929 pub fn add_label(&self, name: &str) -> Result<u16> {
930 let mut guard = acquire_write(&self.schema, "schema")?;
931 let schema = Arc::make_mut(&mut *guard);
932 if schema.labels.contains_key(name) {
933 return Err(anyhow!("Label '{}' already exists", name));
934 }
935
936 let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
937 schema.labels.insert(
938 name.to_string(),
939 LabelMeta {
940 id,
941 created_at: Utc::now(),
942 state: SchemaElementState::Active,
943 },
944 );
945 Ok(id)
946 }
947
948 pub fn add_edge_type(
949 &self,
950 name: &str,
951 src_labels: Vec<String>,
952 dst_labels: Vec<String>,
953 ) -> Result<u32> {
954 let mut guard = acquire_write(&self.schema, "schema")?;
955 let schema = Arc::make_mut(&mut *guard);
956 if schema.edge_types.contains_key(name) {
957 return Err(anyhow!("Edge type '{}' already exists", name));
958 }
959
960 let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
961
962 if id >= MAX_SCHEMA_TYPE_ID {
964 return Err(anyhow!("Schema edge type ID exhaustion"));
965 }
966
967 schema.edge_types.insert(
968 name.to_string(),
969 EdgeTypeMeta {
970 id,
971 src_labels,
972 dst_labels,
973 state: SchemaElementState::Active,
974 },
975 );
976 Ok(id)
977 }
978
979 pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
981 let mut guard = acquire_write(&self.schema, "schema")
982 .expect("Schema lock poisoned - a thread panicked while holding it");
983 let schema = Arc::make_mut(&mut *guard);
984 schema.get_or_assign_edge_type_id(type_name)
985 }
986
987 pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
989 let schema = acquire_read(&self.schema, "schema")
990 .expect("Schema lock poisoned - a thread panicked while holding it");
991 schema.edge_type_name_by_id_unified(type_id)
992 }
993
994 pub fn add_property(
995 &self,
996 label_or_type: &str,
997 prop_name: &str,
998 data_type: DataType,
999 nullable: bool,
1000 ) -> Result<()> {
1001 let mut guard = acquire_write(&self.schema, "schema")?;
1002 let schema = Arc::make_mut(&mut *guard);
1003 let version = schema.schema_version;
1004 let props = schema
1005 .properties
1006 .entry(label_or_type.to_string())
1007 .or_default();
1008
1009 if props.contains_key(prop_name) {
1010 return Err(anyhow!(
1011 "Property '{}' already exists for '{}'",
1012 prop_name,
1013 label_or_type
1014 ));
1015 }
1016
1017 props.insert(
1018 prop_name.to_string(),
1019 PropertyMeta {
1020 r#type: data_type,
1021 nullable,
1022 added_in: version,
1023 state: SchemaElementState::Active,
1024 generation_expression: None,
1025 },
1026 );
1027 Ok(())
1028 }
1029
1030 pub fn add_generated_property(
1031 &self,
1032 label_or_type: &str,
1033 prop_name: &str,
1034 data_type: DataType,
1035 expr: String,
1036 ) -> Result<()> {
1037 let mut guard = acquire_write(&self.schema, "schema")?;
1038 let schema = Arc::make_mut(&mut *guard);
1039 let version = schema.schema_version;
1040 let props = schema
1041 .properties
1042 .entry(label_or_type.to_string())
1043 .or_default();
1044
1045 if props.contains_key(prop_name) {
1046 return Err(anyhow!("Property '{}' already exists", prop_name));
1047 }
1048
1049 props.insert(
1050 prop_name.to_string(),
1051 PropertyMeta {
1052 r#type: data_type,
1053 nullable: true,
1054 added_in: version,
1055 state: SchemaElementState::Active,
1056 generation_expression: Some(expr),
1057 },
1058 );
1059 Ok(())
1060 }
1061
1062 pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1063 let mut guard = acquire_write(&self.schema, "schema")?;
1064 let schema = Arc::make_mut(&mut *guard);
1065 schema.indexes.push(index_def);
1066 Ok(())
1067 }
1068
1069 pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1070 let schema = self.schema.read().expect("Schema lock poisoned");
1071 schema.indexes.iter().find(|i| i.name() == name).cloned()
1072 }
1073
1074 pub fn update_index_metadata(
1079 &self,
1080 index_name: &str,
1081 f: impl FnOnce(&mut IndexMetadata),
1082 ) -> Result<()> {
1083 let mut guard = acquire_write(&self.schema, "schema")?;
1084 let schema = Arc::make_mut(&mut *guard);
1085 let idx = schema
1086 .indexes
1087 .iter_mut()
1088 .find(|i| i.name() == index_name)
1089 .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1090 f(idx.metadata_mut());
1091 Ok(())
1092 }
1093
1094 pub fn remove_index(&self, name: &str) -> Result<()> {
1095 let mut guard = acquire_write(&self.schema, "schema")?;
1096 let schema = Arc::make_mut(&mut *guard);
1097 if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1098 schema.indexes.remove(pos);
1099 Ok(())
1100 } else {
1101 Err(anyhow!("Index '{}' not found", name))
1102 }
1103 }
1104
1105 pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1106 let mut guard = acquire_write(&self.schema, "schema")?;
1107 let schema = Arc::make_mut(&mut *guard);
1108 if schema.constraints.iter().any(|c| c.name == constraint.name) {
1109 return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1110 }
1111 schema.constraints.push(constraint);
1112 Ok(())
1113 }
1114
1115 pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1116 let mut guard = acquire_write(&self.schema, "schema")?;
1117 let schema = Arc::make_mut(&mut *guard);
1118 if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1119 schema.constraints.remove(pos);
1120 Ok(())
1121 } else if if_exists {
1122 Ok(())
1123 } else {
1124 Err(anyhow!("Constraint '{}' not found", name))
1125 }
1126 }
1127
1128 pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1129 let mut guard = acquire_write(&self.schema, "schema")?;
1130 let schema = Arc::make_mut(&mut *guard);
1131 if let Some(props) = schema.properties.get_mut(label_or_type) {
1132 if props.remove(prop_name).is_some() {
1133 Ok(())
1134 } else {
1135 Err(anyhow!(
1136 "Property '{}' not found for '{}'",
1137 prop_name,
1138 label_or_type
1139 ))
1140 }
1141 } else {
1142 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1143 }
1144 }
1145
1146 pub fn rename_property(
1147 &self,
1148 label_or_type: &str,
1149 old_name: &str,
1150 new_name: &str,
1151 ) -> Result<()> {
1152 let mut guard = acquire_write(&self.schema, "schema")?;
1153 let schema = Arc::make_mut(&mut *guard);
1154 if let Some(props) = schema.properties.get_mut(label_or_type) {
1155 if let Some(meta) = props.remove(old_name) {
1156 if props.contains_key(new_name) {
1157 props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
1160 }
1161 props.insert(new_name.to_string(), meta);
1162 Ok(())
1163 } else {
1164 Err(anyhow!(
1165 "Property '{}' not found for '{}'",
1166 old_name,
1167 label_or_type
1168 ))
1169 }
1170 } else {
1171 Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1172 }
1173 }
1174
1175 pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1176 let mut guard = acquire_write(&self.schema, "schema")?;
1177 let schema = Arc::make_mut(&mut *guard);
1178 if let Some(label_meta) = schema.labels.get_mut(name) {
1179 label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1180 Ok(())
1182 } else if if_exists {
1183 Ok(())
1184 } else {
1185 Err(anyhow!("Label '{}' not found", name))
1186 }
1187 }
1188
1189 pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1190 let mut guard = acquire_write(&self.schema, "schema")?;
1191 let schema = Arc::make_mut(&mut *guard);
1192 if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1193 edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1194 Ok(())
1196 } else if if_exists {
1197 Ok(())
1198 } else {
1199 Err(anyhow!("Edge Type '{}' not found", name))
1200 }
1201 }
1202}
1203
1204pub fn validate_identifier(name: &str) -> Result<()> {
1206 if name.is_empty() || name.len() > 64 {
1208 return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1209 }
1210
1211 let first = name.chars().next().unwrap();
1213 if !first.is_alphabetic() && first != '_' {
1214 return Err(anyhow!(
1215 "Identifier '{}' must start with letter or underscore",
1216 name
1217 ));
1218 }
1219
1220 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1222 return Err(anyhow!(
1223 "Identifier '{}' must contain only alphanumeric and underscore",
1224 name
1225 ));
1226 }
1227
1228 const RESERVED: &[&str] = &[
1230 "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1231 "UNION", "ORDER", "LIMIT",
1232 ];
1233 if RESERVED.contains(&name.to_uppercase().as_str()) {
1234 return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1235 }
1236
1237 Ok(())
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::*;
1243 use object_store::local::LocalFileSystem;
1244 use tempfile::tempdir;
1245
1246 #[tokio::test]
1247 async fn test_schema_management() -> Result<()> {
1248 let dir = tempdir()?;
1249 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1250 let path = ObjectStorePath::from("schema.json");
1251 let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1252
1253 let lid = manager.add_label("Person")?;
1255 assert_eq!(lid, 1);
1256 assert!(manager.add_label("Person").is_err());
1257
1258 manager.add_property("Person", "name", DataType::String, false)?;
1260 assert!(
1261 manager
1262 .add_property("Person", "name", DataType::String, false)
1263 .is_err()
1264 );
1265
1266 let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1268 assert_eq!(tid, 1);
1269
1270 manager.save().await?;
1271 assert!(store.get(&path).await.is_ok());
1273
1274 let manager2 = SchemaManager::load_from_store(store, &path).await?;
1275 assert!(manager2.schema().labels.contains_key("Person"));
1276 assert!(
1277 manager2
1278 .schema()
1279 .properties
1280 .get("Person")
1281 .unwrap()
1282 .contains_key("name")
1283 );
1284
1285 Ok(())
1286 }
1287
1288 #[test]
1289 fn test_normalize_function_names() {
1290 assert_eq!(
1291 SchemaManager::normalize_function_names("lower(email)"),
1292 "LOWER(email)"
1293 );
1294 assert_eq!(
1295 SchemaManager::normalize_function_names("LOWER(email)"),
1296 "LOWER(email)"
1297 );
1298 assert_eq!(
1299 SchemaManager::normalize_function_names("Lower(email)"),
1300 "LOWER(email)"
1301 );
1302 assert_eq!(
1303 SchemaManager::normalize_function_names("trim(lower(email))"),
1304 "TRIM(LOWER(email))"
1305 );
1306 }
1307
1308 #[test]
1309 fn test_generated_column_name_case_insensitive() {
1310 let col1 = SchemaManager::generated_column_name("lower(email)");
1311 let col2 = SchemaManager::generated_column_name("LOWER(email)");
1312 let col3 = SchemaManager::generated_column_name("Lower(email)");
1313 assert_eq!(col1, col2);
1314 assert_eq!(col2, col3);
1315 assert!(col1.starts_with("_gen_LOWER_email_"));
1316 }
1317
1318 #[test]
1319 fn test_index_metadata_serde_backward_compat() {
1320 let json = r#"{
1322 "type": "Scalar",
1323 "name": "idx_person_name",
1324 "label": "Person",
1325 "properties": ["name"],
1326 "index_type": "BTree",
1327 "where_clause": null
1328 }"#;
1329 let def: IndexDefinition = serde_json::from_str(json).unwrap();
1330 let meta = def.metadata();
1331 assert_eq!(meta.status, IndexStatus::Online);
1332 assert!(meta.last_built_at.is_none());
1333 assert!(meta.row_count_at_build.is_none());
1334 }
1335
1336 #[test]
1337 fn test_index_metadata_serde_roundtrip() {
1338 let now = Utc::now();
1339 let def = IndexDefinition::Scalar(ScalarIndexConfig {
1340 name: "idx_test".to_string(),
1341 label: "Test".to_string(),
1342 properties: vec!["prop".to_string()],
1343 index_type: ScalarIndexType::BTree,
1344 where_clause: None,
1345 metadata: IndexMetadata {
1346 status: IndexStatus::Building,
1347 last_built_at: Some(now),
1348 row_count_at_build: Some(42),
1349 },
1350 });
1351
1352 let json = serde_json::to_string(&def).unwrap();
1353 let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1354 assert_eq!(parsed.metadata().status, IndexStatus::Building);
1355 assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1356 assert!(parsed.metadata().last_built_at.is_some());
1357 }
1358
1359 #[tokio::test]
1360 async fn test_update_index_metadata() -> Result<()> {
1361 let dir = tempdir()?;
1362 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1363 let path = ObjectStorePath::from("schema.json");
1364 let manager = SchemaManager::load_from_store(store, &path).await?;
1365
1366 manager.add_label("Person")?;
1367 let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1368 name: "idx_test".to_string(),
1369 label: "Person".to_string(),
1370 properties: vec!["name".to_string()],
1371 index_type: ScalarIndexType::BTree,
1372 where_clause: None,
1373 metadata: Default::default(),
1374 });
1375 manager.add_index(idx)?;
1376
1377 let initial = manager.get_index("idx_test").unwrap();
1379 assert_eq!(initial.metadata().status, IndexStatus::Online);
1380
1381 manager.update_index_metadata("idx_test", |m| {
1383 m.status = IndexStatus::Building;
1384 m.row_count_at_build = Some(100);
1385 })?;
1386
1387 let updated = manager.get_index("idx_test").unwrap();
1388 assert_eq!(updated.metadata().status, IndexStatus::Building);
1389 assert_eq!(updated.metadata().row_count_at_build, Some(100));
1390
1391 assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1393
1394 Ok(())
1395 }
1396
1397 #[test]
1398 fn test_vector_index_for_property_skips_non_online() {
1399 let mut schema = Schema::default();
1400 schema.labels.insert(
1401 "Document".to_string(),
1402 LabelMeta {
1403 id: 1,
1404 created_at: chrono::Utc::now(),
1405 state: SchemaElementState::Active,
1406 },
1407 );
1408
1409 schema
1411 .indexes
1412 .push(IndexDefinition::Vector(VectorIndexConfig {
1413 name: "vec_doc_embedding".to_string(),
1414 label: "Document".to_string(),
1415 property: "embedding".to_string(),
1416 index_type: VectorIndexType::Flat,
1417 metric: DistanceMetric::Cosine,
1418 embedding_config: None,
1419 metadata: IndexMetadata {
1420 status: IndexStatus::Stale,
1421 ..Default::default()
1422 },
1423 }));
1424
1425 assert!(
1427 schema
1428 .vector_index_for_property("Document", "embedding")
1429 .is_none()
1430 );
1431
1432 if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1434 cfg.metadata.status = IndexStatus::Online;
1435 }
1436 let result = schema.vector_index_for_property("Document", "embedding");
1437 assert!(result.is_some());
1438 assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1439 }
1440}