Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19    Active,
20    Hidden {
21        since: DateTime<Utc>,
22        last_active_snapshot: String, // SnapshotId
23    },
24    Tombstone {
25        since: DateTime<Utc>,
26    },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
32///
33/// DateTime is encoded as a 3-field struct to preserve timezone information:
34/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
35/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
36/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
37pub fn datetime_struct_fields() -> Fields {
38    Fields::from(vec![
39        Field::new(
40            "nanos_since_epoch",
41            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42            true,
43        ),
44        Field::new("offset_seconds", ArrowDataType::Int32, true),
45        Field::new("timezone_name", ArrowDataType::Utf8, true),
46    ])
47}
48
49/// Returns the canonical struct field definitions for Time encoding in Arrow.
50///
51/// Time is encoded as a 2-field struct to preserve timezone offset:
52/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
53/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
54pub fn time_struct_fields() -> Fields {
55    Fields::from(vec![
56        Field::new(
57            "nanos_since_midnight",
58            ArrowDataType::Time64(TimeUnit::Nanosecond),
59            true,
60        ),
61        Field::new("offset_seconds", ArrowDataType::Int32, true),
62    ])
63}
64
65/// Detects if an Arrow DataType is the canonical DateTime struct.
66pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70/// Detects if an Arrow DataType is the canonical Time struct.
71pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78    GCounter,
79    GSet,
80    ORSet,
81    LWWRegister,
82    LWWMap,
83    Rga,
84    VectorClock,
85    VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90    Geographic,  // WGS84
91    Cartesian2D, // x, y
92    Cartesian3D, // x, y, z
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98    String,
99    Int32,
100    Int64,
101    Float32,
102    Float64,
103    Bool,
104    Timestamp,
105    Date,
106    Time,
107    DateTime,
108    Duration,
109    CypherValue,
110    Point(PointType),
111    Vector { dimensions: usize },
112    Crdt(CrdtType),
113    List(Box<DataType>),
114    Map(Box<DataType>, Box<DataType>),
115}
116
117impl DataType {
118    // Alias for compatibility/convenience if needed, but preferable to use exact types.
119    #[allow(non_upper_case_globals)]
120    pub const Float: DataType = DataType::Float64;
121    #[allow(non_upper_case_globals)]
122    pub const Int: DataType = DataType::Int64;
123
124    pub fn to_arrow(&self) -> ArrowDataType {
125        match self {
126            DataType::String => ArrowDataType::Utf8,
127            DataType::Int32 => ArrowDataType::Int32,
128            DataType::Int64 => ArrowDataType::Int64,
129            DataType::Float32 => ArrowDataType::Float32,
130            DataType::Float64 => ArrowDataType::Float64,
131            DataType::Bool => ArrowDataType::Boolean,
132            DataType::Timestamp => {
133                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
134            }
135            DataType::Date => ArrowDataType::Date32,
136            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
137            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
138            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
139            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
140            DataType::Point(pt) => match pt {
141                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
142                    Field::new("latitude", ArrowDataType::Float64, false),
143                    Field::new("longitude", ArrowDataType::Float64, false),
144                    Field::new("crs", ArrowDataType::Utf8, false),
145                ])),
146                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
147                    Field::new("x", ArrowDataType::Float64, false),
148                    Field::new("y", ArrowDataType::Float64, false),
149                    Field::new("crs", ArrowDataType::Utf8, false),
150                ])),
151                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
152                    Field::new("x", ArrowDataType::Float64, false),
153                    Field::new("y", ArrowDataType::Float64, false),
154                    Field::new("z", ArrowDataType::Float64, false),
155                    Field::new("crs", ArrowDataType::Utf8, false),
156                ])),
157            },
158            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
159                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
160                *dimensions as i32,
161            ),
162            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
163            DataType::List(inner) => {
164                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
165            }
166            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
167                "item",
168                ArrowDataType::Struct(Fields::from(vec![
169                    Field::new("key", key.to_arrow(), false),
170                    Field::new("value", value.to_arrow(), true),
171                ])),
172                true,
173            ))),
174        }
175    }
176}
177
178fn default_created_at() -> DateTime<Utc> {
179    Utc::now()
180}
181
182fn default_state() -> SchemaElementState {
183    SchemaElementState::Active
184}
185
186fn default_version_1() -> u32 {
187    1
188}
189
190#[derive(Clone, Debug, Serialize, Deserialize)]
191pub struct PropertyMeta {
192    pub r#type: DataType,
193    pub nullable: bool,
194    #[serde(default = "default_version_1")]
195    pub added_in: u32, // SchemaVersion
196    #[serde(default = "default_state")]
197    pub state: SchemaElementState,
198    #[serde(default)]
199    pub generation_expression: Option<String>,
200}
201
202#[derive(Clone, Debug, Serialize, Deserialize)]
203pub struct LabelMeta {
204    pub id: u16, // LabelId
205    #[serde(default = "default_created_at")]
206    pub created_at: DateTime<Utc>,
207    #[serde(default = "default_state")]
208    pub state: SchemaElementState,
209}
210
211#[derive(Clone, Debug, Serialize, Deserialize)]
212pub struct EdgeTypeMeta {
213    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
214    pub id: u32,
215    pub src_labels: Vec<String>,
216    pub dst_labels: Vec<String>,
217    #[serde(default = "default_state")]
218    pub state: SchemaElementState,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
222#[non_exhaustive]
223pub enum ConstraintType {
224    Unique { properties: Vec<String> },
225    Exists { property: String },
226    Check { expression: String },
227}
228
229#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
230#[non_exhaustive]
231pub enum ConstraintTarget {
232    Label(String),
233    EdgeType(String),
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize)]
237pub struct Constraint {
238    pub name: String,
239    pub constraint_type: ConstraintType,
240    pub target: ConstraintTarget,
241    pub enabled: bool,
242}
243
244/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
245///
246/// Edge types not defined in the schema are assigned IDs at runtime with
247/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
248/// the name-to-ID and ID-to-name mappings for those types.
249#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct SchemalessEdgeTypeRegistry {
251    name_to_id: HashMap<String, u32>,
252    id_to_name: HashMap<u32, String>,
253    /// Next local ID to assign (0 is reserved for invalid).
254    next_local_id: u32,
255}
256
257impl SchemalessEdgeTypeRegistry {
258    pub fn new() -> Self {
259        Self {
260            name_to_id: HashMap::new(),
261            id_to_name: HashMap::new(),
262            next_local_id: 1,
263        }
264    }
265
266    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
267    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
268        if let Some(&id) = self.name_to_id.get(type_name) {
269            return id;
270        }
271
272        let id = make_schemaless_id(self.next_local_id);
273        self.next_local_id += 1;
274
275        self.name_to_id.insert(type_name.to_string(), id);
276        self.id_to_name.insert(id, type_name.to_string());
277
278        id
279    }
280
281    /// Looks up the edge type name for a schemaless ID.
282    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
283        self.id_to_name.get(&type_id).map(String::as_str)
284    }
285
286    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
287    pub fn contains(&self, type_name: &str) -> bool {
288        self.name_to_id.contains_key(type_name)
289    }
290
291    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
292    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
293        self.name_to_id
294            .iter()
295            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
296            .map(|(_, &id)| id)
297    }
298
299    /// Returns all registered schemaless type IDs.
300    pub fn all_type_ids(&self) -> Vec<u32> {
301        self.id_to_name.keys().copied().collect()
302    }
303
304    /// Returns true if the registry has any schemaless types.
305    pub fn is_empty(&self) -> bool {
306        self.name_to_id.is_empty()
307    }
308}
309
310impl Default for SchemalessEdgeTypeRegistry {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
317pub struct Schema {
318    pub schema_version: u32,
319    pub labels: HashMap<String, LabelMeta>,
320    pub edge_types: HashMap<String, EdgeTypeMeta>,
321    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
322    #[serde(default)]
323    pub indexes: Vec<IndexDefinition>,
324    #[serde(default)]
325    pub constraints: Vec<Constraint>,
326    /// Registry for schemaless edge types (dynamically assigned IDs)
327    #[serde(default)]
328    pub schemaless_registry: SchemalessEdgeTypeRegistry,
329}
330
331impl Default for Schema {
332    fn default() -> Self {
333        Self {
334            schema_version: 1,
335            labels: HashMap::new(),
336            edge_types: HashMap::new(),
337            properties: HashMap::new(),
338            indexes: Vec::new(),
339            constraints: Vec::new(),
340            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
341        }
342    }
343}
344
345impl Schema {
346    /// Returns the label name for a given label ID.
347    ///
348    /// Performs a linear scan over all labels. This is efficient because
349    /// the number of labels in a schema is typically small.
350    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
351        self.labels
352            .iter()
353            .find(|(_, meta)| meta.id == label_id)
354            .map(|(name, _)| name.as_str())
355    }
356
357    /// Returns the label ID for a given label name.
358    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
359        self.labels.get(label_name).map(|meta| meta.id)
360    }
361
362    /// Returns the edge type name for a given type ID.
363    ///
364    /// Performs a linear scan over all edge types. This is efficient because
365    /// the number of edge types in a schema is typically small.
366    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
367        self.edge_types
368            .iter()
369            .find(|(_, meta)| meta.id == type_id)
370            .map(|(name, _)| name.as_str())
371    }
372
373    /// Returns the edge type ID for a given type name.
374    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
375        self.edge_types.get(type_name).map(|meta| meta.id)
376    }
377
378    /// Returns the vector index configuration for a given label and property.
379    ///
380    /// Performs a linear scan over all indexes. This is efficient because
381    /// the number of indexes in a schema is typically small.
382    pub fn vector_index_for_property(
383        &self,
384        label: &str,
385        property: &str,
386    ) -> Option<&VectorIndexConfig> {
387        self.indexes.iter().find_map(|idx| {
388            if let IndexDefinition::Vector(config) = idx
389                && config.label == label
390                && config.property == property
391            {
392                return Some(config);
393            }
394            None
395        })
396    }
397
398    /// Get label metadata with case-insensitive lookup.
399    ///
400    /// This allows queries to match labels regardless of case, providing
401    /// better user experience when label names vary in casing.
402    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
403        self.labels
404            .iter()
405            .find(|(k, _)| k.eq_ignore_ascii_case(name))
406            .map(|(_, v)| v)
407    }
408
409    /// Get label ID with case-insensitive lookup.
410    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
411        self.get_label_case_insensitive(label_name)
412            .map(|meta| meta.id)
413    }
414
415    /// Get edge type metadata with case-insensitive lookup.
416    ///
417    /// This allows queries to match edge types regardless of case, providing
418    /// better user experience when type names vary in casing.
419    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
420        self.edge_types
421            .iter()
422            .find(|(k, _)| k.eq_ignore_ascii_case(name))
423            .map(|(_, v)| v)
424    }
425
426    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
427    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
428        self.get_edge_type_case_insensitive(type_name)
429            .map(|meta| meta.id)
430    }
431
432    /// Get edge type ID with case-insensitive lookup, checking both
433    /// schema-defined and schemaless registries.
434    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
435        self.edge_type_id_by_name_case_insensitive(type_name)
436            .or_else(|| {
437                self.schemaless_registry
438                    .id_by_name_case_insensitive(type_name)
439            })
440    }
441
442    /// Returns the edge type ID for `type_name`, checking the schema first
443    /// and falling back to the schemaless registry (assigning a new ID if needed).
444    ///
445    /// Requires `&mut self` because it may assign a new schemaless ID.
446    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
447    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
448        if let Some(id) = self.edge_type_id_by_name(type_name) {
449            return id;
450        }
451        self.schemaless_registry.get_or_assign_id(type_name)
452    }
453
454    /// Returns the edge type name for `type_id`, checking both the schema
455    /// and schemaless registries. Returns an owned `String` because the
456    /// name may come from either registry.
457    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
458        if is_schemaless_edge_type(type_id) {
459            self.schemaless_registry
460                .type_name_by_id(type_id)
461                .map(str::to_owned)
462        } else {
463            self.edge_type_name_by_id(type_id).map(str::to_owned)
464        }
465    }
466
467    /// Returns all edge type IDs, including both schema-defined and schemaless types.
468    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
469    pub fn all_edge_type_ids(&self) -> Vec<u32> {
470        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
471        ids.extend(self.schemaless_registry.all_type_ids());
472        ids.sort_unstable();
473        ids
474    }
475}
476
477#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
478#[serde(tag = "type")]
479#[non_exhaustive]
480pub enum IndexDefinition {
481    Vector(VectorIndexConfig),
482    FullText(FullTextIndexConfig),
483    Scalar(ScalarIndexConfig),
484    Inverted(InvertedIndexConfig),
485    JsonFullText(JsonFtsIndexConfig),
486}
487
488impl IndexDefinition {
489    /// Returns the index name for any variant.
490    pub fn name(&self) -> &str {
491        match self {
492            IndexDefinition::Vector(c) => &c.name,
493            IndexDefinition::FullText(c) => &c.name,
494            IndexDefinition::Scalar(c) => &c.name,
495            IndexDefinition::Inverted(c) => &c.name,
496            IndexDefinition::JsonFullText(c) => &c.name,
497        }
498    }
499
500    /// Returns the label this index is defined on.
501    pub fn label(&self) -> &str {
502        match self {
503            IndexDefinition::Vector(c) => &c.label,
504            IndexDefinition::FullText(c) => &c.label,
505            IndexDefinition::Scalar(c) => &c.label,
506            IndexDefinition::Inverted(c) => &c.label,
507            IndexDefinition::JsonFullText(c) => &c.label,
508        }
509    }
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
513pub struct InvertedIndexConfig {
514    pub name: String,
515    pub label: String,
516    pub property: String,
517    #[serde(default = "default_normalize")]
518    pub normalize: bool,
519    #[serde(default = "default_max_terms_per_doc")]
520    pub max_terms_per_doc: usize,
521}
522
523fn default_normalize() -> bool {
524    true
525}
526
527fn default_max_terms_per_doc() -> usize {
528    10_000
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
532pub struct VectorIndexConfig {
533    pub name: String,
534    pub label: String,
535    pub property: String,
536    pub index_type: VectorIndexType,
537    pub metric: DistanceMetric,
538    pub embedding_config: Option<EmbeddingConfig>,
539}
540
541#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
542pub struct EmbeddingConfig {
543    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
544    pub alias: String,
545    pub source_properties: Vec<String>,
546    pub batch_size: usize,
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
550#[non_exhaustive]
551pub enum VectorIndexType {
552    IvfPq {
553        num_partitions: u32,
554        num_sub_vectors: u32,
555        bits_per_subvector: u8,
556    },
557    Hnsw {
558        m: u32,
559        ef_construction: u32,
560        ef_search: u32,
561    },
562    Flat,
563}
564
565#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
566#[non_exhaustive]
567pub enum DistanceMetric {
568    Cosine,
569    L2,
570    Dot,
571}
572
573impl DistanceMetric {
574    /// Computes the distance between two vectors using this metric.
575    ///
576    /// All metrics follow LanceDB conventions so that lower values indicate
577    /// greater similarity:
578    /// - **L2**: squared Euclidean distance.
579    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
580    /// - **Dot**: negative dot product.
581    ///
582    /// # Panics
583    ///
584    /// Panics if `a` and `b` have different lengths.
585    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
586        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
587        match self {
588            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
589            DistanceMetric::Cosine => {
590                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
591                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
592                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
593                let denom = norm_a * norm_b;
594                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
595            }
596            DistanceMetric::Dot => {
597                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
598                -dot
599            }
600        }
601    }
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
605pub struct FullTextIndexConfig {
606    pub name: String,
607    pub label: String,
608    pub properties: Vec<String>,
609    pub tokenizer: TokenizerConfig,
610    pub with_positions: bool,
611}
612
613#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
614#[non_exhaustive]
615pub enum TokenizerConfig {
616    Standard,
617    Whitespace,
618    Ngram { min: u8, max: u8 },
619    Custom { name: String },
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
623pub struct JsonFtsIndexConfig {
624    pub name: String,
625    pub label: String,
626    pub column: String,
627    #[serde(default)]
628    pub paths: Vec<String>,
629    #[serde(default)]
630    pub with_positions: bool,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
634pub struct ScalarIndexConfig {
635    pub name: String,
636    pub label: String,
637    pub properties: Vec<String>,
638    pub index_type: ScalarIndexType,
639    pub where_clause: Option<String>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
643#[non_exhaustive]
644pub enum ScalarIndexType {
645    BTree,
646    Hash,
647    Bitmap,
648}
649
650pub struct SchemaManager {
651    store: Arc<dyn ObjectStore>,
652    path: ObjectStorePath,
653    schema: RwLock<Arc<Schema>>,
654}
655
656impl SchemaManager {
657    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
658        let path = path.as_ref();
659        let parent = path
660            .parent()
661            .ok_or_else(|| anyhow!("Invalid schema path"))?;
662        let filename = path
663            .file_name()
664            .ok_or_else(|| anyhow!("Invalid schema filename"))?
665            .to_str()
666            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
667
668        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
669        let obj_path = ObjectStorePath::from(filename);
670
671        Self::load_from_store(store, &obj_path).await
672    }
673
674    pub async fn load_from_store(
675        store: Arc<dyn ObjectStore>,
676        path: &ObjectStorePath,
677    ) -> Result<Self> {
678        match store.get(path).await {
679            Ok(result) => {
680                let bytes = result.bytes().await?;
681                let content = String::from_utf8(bytes.to_vec())?;
682                let schema: Schema = serde_json::from_str(&content)?;
683                Ok(Self {
684                    store,
685                    path: path.clone(),
686                    schema: RwLock::new(Arc::new(schema)),
687                })
688            }
689            Err(object_store::Error::NotFound { .. }) => Ok(Self {
690                store,
691                path: path.clone(),
692                schema: RwLock::new(Arc::new(Schema::default())),
693            }),
694            Err(e) => Err(anyhow::Error::from(e)),
695        }
696    }
697
698    pub async fn save(&self) -> Result<()> {
699        let content = {
700            let schema_guard = acquire_read(&self.schema, "schema")?;
701            serde_json::to_string_pretty(&**schema_guard)?
702        };
703        self.store
704            .put(&self.path, content.into())
705            .await
706            .map_err(anyhow::Error::from)?;
707        Ok(())
708    }
709
710    pub fn path(&self) -> &ObjectStorePath {
711        &self.path
712    }
713
714    pub fn schema(&self) -> Arc<Schema> {
715        self.schema
716            .read()
717            .expect("Schema lock poisoned - a thread panicked while holding it")
718            .clone()
719    }
720
721    /// Normalize function names in an expression to uppercase for case-insensitive matching.
722    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
723    fn normalize_function_names(expr: &str) -> String {
724        let mut result = String::with_capacity(expr.len());
725        let mut chars = expr.chars().peekable();
726
727        while let Some(ch) = chars.next() {
728            if ch.is_alphabetic() {
729                // Collect identifier
730                let mut ident = String::new();
731                ident.push(ch);
732
733                while let Some(&next) = chars.peek() {
734                    if next.is_alphanumeric() || next == '_' {
735                        ident.push(chars.next().unwrap());
736                    } else {
737                        break;
738                    }
739                }
740
741                // If followed by '(', it's a function call - uppercase it
742                if chars.peek() == Some(&'(') {
743                    result.push_str(&ident.to_uppercase());
744                } else {
745                    result.push_str(&ident); // Keep property names as-is
746                }
747            } else {
748                result.push(ch);
749            }
750        }
751
752        result
753    }
754
755    /// Generate a consistent internal column name for an expression index.
756    /// Uses a hash suffix to ensure uniqueness for different expressions that
757    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
758    ///
759    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
760    /// DefaultHasher is not guaranteed to be stable and could break persistent data
761    /// if the hash changes after a compiler upgrade.
762    pub fn generated_column_name(expr: &str) -> String {
763        // Normalize function names to uppercase for case-insensitive matching
764        let normalized = Self::normalize_function_names(expr);
765
766        let sanitized = normalized
767            .replace(|c: char| !c.is_alphanumeric(), "_")
768            .trim_matches('_')
769            .to_string();
770
771        // FNV-1a 64-bit hash - stable across Rust versions and platforms
772        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
773        const FNV_PRIME: u64 = 1099511628211;
774
775        let mut hash = FNV_OFFSET_BASIS;
776        for byte in normalized.as_bytes() {
777            hash ^= *byte as u64;
778            hash = hash.wrapping_mul(FNV_PRIME);
779        }
780
781        format!("_gen_{}_{:x}", sanitized, hash)
782    }
783
784    pub fn replace_schema(&self, new_schema: Schema) {
785        let mut schema = self
786            .schema
787            .write()
788            .expect("Schema lock poisoned - a thread panicked while holding it");
789        *schema = Arc::new(new_schema);
790    }
791
792    pub fn next_label_id(&self) -> u16 {
793        self.schema()
794            .labels
795            .values()
796            .map(|l| l.id)
797            .max()
798            .unwrap_or(0)
799            + 1
800    }
801
802    pub fn next_type_id(&self) -> u32 {
803        let max_schema_id = self
804            .schema()
805            .edge_types
806            .values()
807            .map(|t| t.id)
808            .max()
809            .unwrap_or(0);
810
811        // Ensure we stay in schema'd ID space (bit 31 = 0)
812        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
813            panic!("Schema edge type ID exhaustion");
814        }
815
816        max_schema_id + 1
817    }
818
819    pub fn add_label(&self, name: &str) -> Result<u16> {
820        let mut guard = acquire_write(&self.schema, "schema")?;
821        let schema = Arc::make_mut(&mut *guard);
822        if schema.labels.contains_key(name) {
823            return Err(anyhow!("Label '{}' already exists", name));
824        }
825
826        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
827        schema.labels.insert(
828            name.to_string(),
829            LabelMeta {
830                id,
831                created_at: Utc::now(),
832                state: SchemaElementState::Active,
833            },
834        );
835        Ok(id)
836    }
837
838    pub fn add_edge_type(
839        &self,
840        name: &str,
841        src_labels: Vec<String>,
842        dst_labels: Vec<String>,
843    ) -> Result<u32> {
844        let mut guard = acquire_write(&self.schema, "schema")?;
845        let schema = Arc::make_mut(&mut *guard);
846        if schema.edge_types.contains_key(name) {
847            return Err(anyhow!("Edge type '{}' already exists", name));
848        }
849
850        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
851
852        // Ensure we stay in schema'd ID space (bit 31 = 0)
853        if id >= MAX_SCHEMA_TYPE_ID {
854            return Err(anyhow!("Schema edge type ID exhaustion"));
855        }
856
857        schema.edge_types.insert(
858            name.to_string(),
859            EdgeTypeMeta {
860                id,
861                src_labels,
862                dst_labels,
863                state: SchemaElementState::Active,
864            },
865        );
866        Ok(id)
867    }
868
869    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
870    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
871        let mut guard = acquire_write(&self.schema, "schema")
872            .expect("Schema lock poisoned - a thread panicked while holding it");
873        let schema = Arc::make_mut(&mut *guard);
874        schema.get_or_assign_edge_type_id(type_name)
875    }
876
877    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
878    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
879        let schema = acquire_read(&self.schema, "schema")
880            .expect("Schema lock poisoned - a thread panicked while holding it");
881        schema.edge_type_name_by_id_unified(type_id)
882    }
883
884    pub fn add_property(
885        &self,
886        label_or_type: &str,
887        prop_name: &str,
888        data_type: DataType,
889        nullable: bool,
890    ) -> Result<()> {
891        let mut guard = acquire_write(&self.schema, "schema")?;
892        let schema = Arc::make_mut(&mut *guard);
893        let version = schema.schema_version;
894        let props = schema
895            .properties
896            .entry(label_or_type.to_string())
897            .or_default();
898
899        if props.contains_key(prop_name) {
900            return Err(anyhow!(
901                "Property '{}' already exists for '{}'",
902                prop_name,
903                label_or_type
904            ));
905        }
906
907        props.insert(
908            prop_name.to_string(),
909            PropertyMeta {
910                r#type: data_type,
911                nullable,
912                added_in: version,
913                state: SchemaElementState::Active,
914                generation_expression: None,
915            },
916        );
917        Ok(())
918    }
919
920    pub fn add_generated_property(
921        &self,
922        label_or_type: &str,
923        prop_name: &str,
924        data_type: DataType,
925        expr: String,
926    ) -> Result<()> {
927        let mut guard = acquire_write(&self.schema, "schema")?;
928        let schema = Arc::make_mut(&mut *guard);
929        let version = schema.schema_version;
930        let props = schema
931            .properties
932            .entry(label_or_type.to_string())
933            .or_default();
934
935        if props.contains_key(prop_name) {
936            return Err(anyhow!("Property '{}' already exists", prop_name));
937        }
938
939        props.insert(
940            prop_name.to_string(),
941            PropertyMeta {
942                r#type: data_type,
943                nullable: true,
944                added_in: version,
945                state: SchemaElementState::Active,
946                generation_expression: Some(expr),
947            },
948        );
949        Ok(())
950    }
951
952    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
953        let mut guard = acquire_write(&self.schema, "schema")?;
954        let schema = Arc::make_mut(&mut *guard);
955        schema.indexes.push(index_def);
956        Ok(())
957    }
958
959    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
960        let schema = self.schema.read().expect("Schema lock poisoned");
961        schema.indexes.iter().find(|i| i.name() == name).cloned()
962    }
963
964    pub fn remove_index(&self, name: &str) -> Result<()> {
965        let mut guard = acquire_write(&self.schema, "schema")?;
966        let schema = Arc::make_mut(&mut *guard);
967        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
968            schema.indexes.remove(pos);
969            Ok(())
970        } else {
971            Err(anyhow!("Index '{}' not found", name))
972        }
973    }
974
975    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
976        let mut guard = acquire_write(&self.schema, "schema")?;
977        let schema = Arc::make_mut(&mut *guard);
978        if schema.constraints.iter().any(|c| c.name == constraint.name) {
979            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
980        }
981        schema.constraints.push(constraint);
982        Ok(())
983    }
984
985    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
986        let mut guard = acquire_write(&self.schema, "schema")?;
987        let schema = Arc::make_mut(&mut *guard);
988        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
989            schema.constraints.remove(pos);
990            Ok(())
991        } else if if_exists {
992            Ok(())
993        } else {
994            Err(anyhow!("Constraint '{}' not found", name))
995        }
996    }
997
998    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
999        let mut guard = acquire_write(&self.schema, "schema")?;
1000        let schema = Arc::make_mut(&mut *guard);
1001        if let Some(props) = schema.properties.get_mut(label_or_type) {
1002            if props.remove(prop_name).is_some() {
1003                Ok(())
1004            } else {
1005                Err(anyhow!(
1006                    "Property '{}' not found for '{}'",
1007                    prop_name,
1008                    label_or_type
1009                ))
1010            }
1011        } else {
1012            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1013        }
1014    }
1015
1016    pub fn rename_property(
1017        &self,
1018        label_or_type: &str,
1019        old_name: &str,
1020        new_name: &str,
1021    ) -> Result<()> {
1022        let mut guard = acquire_write(&self.schema, "schema")?;
1023        let schema = Arc::make_mut(&mut *guard);
1024        if let Some(props) = schema.properties.get_mut(label_or_type) {
1025            if let Some(meta) = props.remove(old_name) {
1026                if props.contains_key(new_name) {
1027                    // Rollback removal? Or just error.
1028                    props.insert(old_name.to_string(), meta); // Restore
1029                    return Err(anyhow!("Property '{}' already exists", new_name));
1030                }
1031                props.insert(new_name.to_string(), meta);
1032                Ok(())
1033            } else {
1034                Err(anyhow!(
1035                    "Property '{}' not found for '{}'",
1036                    old_name,
1037                    label_or_type
1038                ))
1039            }
1040        } else {
1041            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1042        }
1043    }
1044
1045    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1046        let mut guard = acquire_write(&self.schema, "schema")?;
1047        let schema = Arc::make_mut(&mut *guard);
1048        if let Some(label_meta) = schema.labels.get_mut(name) {
1049            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1050            // Do not remove properties; they are implicitly tombstoned by the label
1051            Ok(())
1052        } else if if_exists {
1053            Ok(())
1054        } else {
1055            Err(anyhow!("Label '{}' not found", name))
1056        }
1057    }
1058
1059    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1060        let mut guard = acquire_write(&self.schema, "schema")?;
1061        let schema = Arc::make_mut(&mut *guard);
1062        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1063            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1064            // Do not remove properties; they are implicitly tombstoned by the edge type
1065            Ok(())
1066        } else if if_exists {
1067            Ok(())
1068        } else {
1069            Err(anyhow!("Edge Type '{}' not found", name))
1070        }
1071    }
1072}
1073
1074/// Validate identifier names to prevent injection and ensure compatibility.
1075pub fn validate_identifier(name: &str) -> Result<()> {
1076    // Length check
1077    if name.is_empty() || name.len() > 64 {
1078        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1079    }
1080
1081    // First character must be letter or underscore
1082    let first = name.chars().next().unwrap();
1083    if !first.is_alphabetic() && first != '_' {
1084        return Err(anyhow!(
1085            "Identifier '{}' must start with letter or underscore",
1086            name
1087        ));
1088    }
1089
1090    // Remaining characters: alphanumeric or underscore
1091    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1092        return Err(anyhow!(
1093            "Identifier '{}' must contain only alphanumeric and underscore",
1094            name
1095        ));
1096    }
1097
1098    // Reserved words
1099    const RESERVED: &[&str] = &[
1100        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1101        "UNION", "ORDER", "LIMIT",
1102    ];
1103    if RESERVED.contains(&name.to_uppercase().as_str()) {
1104        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1105    }
1106
1107    Ok(())
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112    use super::*;
1113    use object_store::local::LocalFileSystem;
1114    use tempfile::tempdir;
1115
1116    #[tokio::test]
1117    async fn test_schema_management() -> Result<()> {
1118        let dir = tempdir()?;
1119        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1120        let path = ObjectStorePath::from("schema.json");
1121        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1122
1123        // Labels
1124        let lid = manager.add_label("Person")?;
1125        assert_eq!(lid, 1);
1126        assert!(manager.add_label("Person").is_err());
1127
1128        // Properties
1129        manager.add_property("Person", "name", DataType::String, false)?;
1130        assert!(
1131            manager
1132                .add_property("Person", "name", DataType::String, false)
1133                .is_err()
1134        );
1135
1136        // Edge types
1137        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1138        assert_eq!(tid, 1);
1139
1140        manager.save().await?;
1141        // Check file exists
1142        assert!(store.get(&path).await.is_ok());
1143
1144        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1145        assert!(manager2.schema().labels.contains_key("Person"));
1146        assert!(
1147            manager2
1148                .schema()
1149                .properties
1150                .get("Person")
1151                .unwrap()
1152                .contains_key("name")
1153        );
1154
1155        Ok(())
1156    }
1157
1158    #[test]
1159    fn test_normalize_function_names() {
1160        assert_eq!(
1161            SchemaManager::normalize_function_names("lower(email)"),
1162            "LOWER(email)"
1163        );
1164        assert_eq!(
1165            SchemaManager::normalize_function_names("LOWER(email)"),
1166            "LOWER(email)"
1167        );
1168        assert_eq!(
1169            SchemaManager::normalize_function_names("Lower(email)"),
1170            "LOWER(email)"
1171        );
1172        assert_eq!(
1173            SchemaManager::normalize_function_names("trim(lower(email))"),
1174            "TRIM(LOWER(email))"
1175        );
1176    }
1177
1178    #[test]
1179    fn test_generated_column_name_case_insensitive() {
1180        let col1 = SchemaManager::generated_column_name("lower(email)");
1181        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1182        let col3 = SchemaManager::generated_column_name("Lower(email)");
1183        assert_eq!(col1, col2);
1184        assert_eq!(col2, col3);
1185        assert!(col1.starts_with("_gen_LOWER_email_"));
1186    }
1187}