Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19    Active,
20    Hidden {
21        since: DateTime<Utc>,
22        last_active_snapshot: String, // SnapshotId
23    },
24    Tombstone {
25        since: DateTime<Utc>,
26    },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
32///
33/// DateTime is encoded as a 3-field struct to preserve timezone information:
34/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
35/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
36/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
37pub fn datetime_struct_fields() -> Fields {
38    Fields::from(vec![
39        Field::new(
40            "nanos_since_epoch",
41            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42            true,
43        ),
44        Field::new("offset_seconds", ArrowDataType::Int32, true),
45        Field::new("timezone_name", ArrowDataType::Utf8, true),
46    ])
47}
48
49/// Returns the canonical struct field definitions for Time encoding in Arrow.
50///
51/// Time is encoded as a 2-field struct to preserve timezone offset:
52/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
53/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
54pub fn time_struct_fields() -> Fields {
55    Fields::from(vec![
56        Field::new(
57            "nanos_since_midnight",
58            ArrowDataType::Time64(TimeUnit::Nanosecond),
59            true,
60        ),
61        Field::new("offset_seconds", ArrowDataType::Int32, true),
62    ])
63}
64
65/// Detects if an Arrow DataType is the canonical DateTime struct.
66pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70/// Detects if an Arrow DataType is the canonical Time struct.
71pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78    GCounter,
79    GSet,
80    ORSet,
81    LWWRegister,
82    LWWMap,
83    Rga,
84    VectorClock,
85    VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90    Geographic,  // WGS84
91    Cartesian2D, // x, y
92    Cartesian3D, // x, y, z
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98    String,
99    Int32,
100    Int64,
101    Float32,
102    Float64,
103    Bool,
104    Timestamp,
105    Date,
106    Time,
107    DateTime,
108    Duration,
109    CypherValue,
110    Point(PointType),
111    Vector { dimensions: usize },
112    Crdt(CrdtType),
113    List(Box<DataType>),
114    Map(Box<DataType>, Box<DataType>),
115}
116
117impl DataType {
118    // Alias for compatibility/convenience if needed, but preferable to use exact types.
119    #[allow(non_upper_case_globals)]
120    pub const Float: DataType = DataType::Float64;
121    #[allow(non_upper_case_globals)]
122    pub const Int: DataType = DataType::Int64;
123
124    pub fn to_arrow(&self) -> ArrowDataType {
125        match self {
126            DataType::String => ArrowDataType::Utf8,
127            DataType::Int32 => ArrowDataType::Int32,
128            DataType::Int64 => ArrowDataType::Int64,
129            DataType::Float32 => ArrowDataType::Float32,
130            DataType::Float64 => ArrowDataType::Float64,
131            DataType::Bool => ArrowDataType::Boolean,
132            DataType::Timestamp => {
133                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
134            }
135            DataType::Date => ArrowDataType::Date32,
136            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
137            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
138            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
139            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
140            DataType::Point(pt) => match pt {
141                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
142                    Field::new("latitude", ArrowDataType::Float64, false),
143                    Field::new("longitude", ArrowDataType::Float64, false),
144                    Field::new("crs", ArrowDataType::Utf8, false),
145                ])),
146                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
147                    Field::new("x", ArrowDataType::Float64, false),
148                    Field::new("y", ArrowDataType::Float64, false),
149                    Field::new("crs", ArrowDataType::Utf8, false),
150                ])),
151                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
152                    Field::new("x", ArrowDataType::Float64, false),
153                    Field::new("y", ArrowDataType::Float64, false),
154                    Field::new("z", ArrowDataType::Float64, false),
155                    Field::new("crs", ArrowDataType::Utf8, false),
156                ])),
157            },
158            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
159                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
160                *dimensions as i32,
161            ),
162            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
163            DataType::List(inner) => {
164                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
165            }
166            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
167                "item",
168                ArrowDataType::Struct(Fields::from(vec![
169                    Field::new("key", key.to_arrow(), false),
170                    Field::new("value", value.to_arrow(), true),
171                ])),
172                true,
173            ))),
174        }
175    }
176}
177
178fn default_created_at() -> DateTime<Utc> {
179    Utc::now()
180}
181
182fn default_state() -> SchemaElementState {
183    SchemaElementState::Active
184}
185
186fn default_version_1() -> u32 {
187    1
188}
189
190#[derive(Clone, Debug, Serialize, Deserialize)]
191pub struct PropertyMeta {
192    pub r#type: DataType,
193    pub nullable: bool,
194    #[serde(default = "default_version_1")]
195    pub added_in: u32, // SchemaVersion
196    #[serde(default = "default_state")]
197    pub state: SchemaElementState,
198    #[serde(default)]
199    pub generation_expression: Option<String>,
200}
201
202#[derive(Clone, Debug, Serialize, Deserialize)]
203pub struct LabelMeta {
204    pub id: u16, // LabelId
205    #[serde(default = "default_created_at")]
206    pub created_at: DateTime<Utc>,
207    #[serde(default = "default_state")]
208    pub state: SchemaElementState,
209}
210
211#[derive(Clone, Debug, Serialize, Deserialize)]
212pub struct EdgeTypeMeta {
213    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
214    pub id: u32,
215    pub src_labels: Vec<String>,
216    pub dst_labels: Vec<String>,
217    #[serde(default = "default_state")]
218    pub state: SchemaElementState,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
222#[non_exhaustive]
223pub enum ConstraintType {
224    Unique { properties: Vec<String> },
225    Exists { property: String },
226    Check { expression: String },
227}
228
229#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
230#[non_exhaustive]
231pub enum ConstraintTarget {
232    Label(String),
233    EdgeType(String),
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize)]
237pub struct Constraint {
238    pub name: String,
239    pub constraint_type: ConstraintType,
240    pub target: ConstraintTarget,
241    pub enabled: bool,
242}
243
244/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
245///
246/// Edge types not defined in the schema are assigned IDs at runtime with
247/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
248/// the name-to-ID and ID-to-name mappings for those types.
249#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct SchemalessEdgeTypeRegistry {
251    name_to_id: HashMap<String, u32>,
252    id_to_name: HashMap<u32, String>,
253    /// Next local ID to assign (0 is reserved for invalid).
254    next_local_id: u32,
255}
256
257impl SchemalessEdgeTypeRegistry {
258    pub fn new() -> Self {
259        Self {
260            name_to_id: HashMap::new(),
261            id_to_name: HashMap::new(),
262            next_local_id: 1,
263        }
264    }
265
266    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
267    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
268        if let Some(&id) = self.name_to_id.get(type_name) {
269            return id;
270        }
271
272        let id = make_schemaless_id(self.next_local_id);
273        self.next_local_id += 1;
274
275        self.name_to_id.insert(type_name.to_string(), id);
276        self.id_to_name.insert(id, type_name.to_string());
277
278        id
279    }
280
281    /// Looks up the edge type name for a schemaless ID.
282    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
283        self.id_to_name.get(&type_id).map(String::as_str)
284    }
285
286    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
287    pub fn contains(&self, type_name: &str) -> bool {
288        self.name_to_id.contains_key(type_name)
289    }
290
291    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
292    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
293        self.name_to_id
294            .iter()
295            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
296            .map(|(_, &id)| id)
297    }
298
299    /// Returns all registered schemaless type IDs.
300    pub fn all_type_ids(&self) -> Vec<u32> {
301        self.id_to_name.keys().copied().collect()
302    }
303
304    /// Returns true if the registry has any schemaless types.
305    pub fn is_empty(&self) -> bool {
306        self.name_to_id.is_empty()
307    }
308}
309
310impl Default for SchemalessEdgeTypeRegistry {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
317pub struct Schema {
318    pub schema_version: u32,
319    pub labels: HashMap<String, LabelMeta>,
320    pub edge_types: HashMap<String, EdgeTypeMeta>,
321    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
322    #[serde(default)]
323    pub indexes: Vec<IndexDefinition>,
324    #[serde(default)]
325    pub constraints: Vec<Constraint>,
326    /// Registry for schemaless edge types (dynamically assigned IDs)
327    #[serde(default)]
328    pub schemaless_registry: SchemalessEdgeTypeRegistry,
329}
330
331impl Default for Schema {
332    fn default() -> Self {
333        Self {
334            schema_version: 1,
335            labels: HashMap::new(),
336            edge_types: HashMap::new(),
337            properties: HashMap::new(),
338            indexes: Vec::new(),
339            constraints: Vec::new(),
340            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
341        }
342    }
343}
344
345impl Schema {
346    /// Returns the label name for a given label ID.
347    ///
348    /// Performs a linear scan over all labels. This is efficient because
349    /// the number of labels in a schema is typically small.
350    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
351        self.labels
352            .iter()
353            .find(|(_, meta)| meta.id == label_id)
354            .map(|(name, _)| name.as_str())
355    }
356
357    /// Returns the label ID for a given label name.
358    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
359        self.labels.get(label_name).map(|meta| meta.id)
360    }
361
362    /// Returns the edge type name for a given type ID.
363    ///
364    /// Performs a linear scan over all edge types. This is efficient because
365    /// the number of edge types in a schema is typically small.
366    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
367        self.edge_types
368            .iter()
369            .find(|(_, meta)| meta.id == type_id)
370            .map(|(name, _)| name.as_str())
371    }
372
373    /// Returns the edge type ID for a given type name.
374    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
375        self.edge_types.get(type_name).map(|meta| meta.id)
376    }
377
378    /// Returns the vector index configuration for a given label and property.
379    ///
380    /// Performs a linear scan over all indexes. This is efficient because
381    /// the number of indexes in a schema is typically small.
382    pub fn vector_index_for_property(
383        &self,
384        label: &str,
385        property: &str,
386    ) -> Option<&VectorIndexConfig> {
387        self.indexes.iter().find_map(|idx| {
388            if let IndexDefinition::Vector(config) = idx
389                && config.label == label
390                && config.property == property
391                && config.metadata.status == IndexStatus::Online
392            {
393                return Some(config);
394            }
395            None
396        })
397    }
398
399    /// Returns the full-text index configuration for a given label and property.
400    ///
401    /// A full-text index covers one or more properties. This returns the config
402    /// if the specified property is among the indexed properties.
403    pub fn fulltext_index_for_property(
404        &self,
405        label: &str,
406        property: &str,
407    ) -> Option<&FullTextIndexConfig> {
408        self.indexes.iter().find_map(|idx| {
409            if let IndexDefinition::FullText(config) = idx
410                && config.label == label
411                && config.properties.iter().any(|p| p == property)
412                && config.metadata.status == IndexStatus::Online
413            {
414                return Some(config);
415            }
416            None
417        })
418    }
419
420    /// Get label metadata with case-insensitive lookup.
421    ///
422    /// This allows queries to match labels regardless of case, providing
423    /// better user experience when label names vary in casing.
424    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
425        self.labels
426            .iter()
427            .find(|(k, _)| k.eq_ignore_ascii_case(name))
428            .map(|(_, v)| v)
429    }
430
431    /// Get label ID with case-insensitive lookup.
432    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
433        self.get_label_case_insensitive(label_name)
434            .map(|meta| meta.id)
435    }
436
437    /// Get edge type metadata with case-insensitive lookup.
438    ///
439    /// This allows queries to match edge types regardless of case, providing
440    /// better user experience when type names vary in casing.
441    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
442        self.edge_types
443            .iter()
444            .find(|(k, _)| k.eq_ignore_ascii_case(name))
445            .map(|(_, v)| v)
446    }
447
448    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
449    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
450        self.get_edge_type_case_insensitive(type_name)
451            .map(|meta| meta.id)
452    }
453
454    /// Get edge type ID with case-insensitive lookup, checking both
455    /// schema-defined and schemaless registries.
456    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
457        self.edge_type_id_by_name_case_insensitive(type_name)
458            .or_else(|| {
459                self.schemaless_registry
460                    .id_by_name_case_insensitive(type_name)
461            })
462    }
463
464    /// Returns the edge type ID for `type_name`, checking the schema first
465    /// and falling back to the schemaless registry (assigning a new ID if needed).
466    ///
467    /// Requires `&mut self` because it may assign a new schemaless ID.
468    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
469    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
470        if let Some(id) = self.edge_type_id_by_name(type_name) {
471            return id;
472        }
473        self.schemaless_registry.get_or_assign_id(type_name)
474    }
475
476    /// Returns the edge type name for `type_id`, checking both the schema
477    /// and schemaless registries. Returns an owned `String` because the
478    /// name may come from either registry.
479    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
480        if is_schemaless_edge_type(type_id) {
481            self.schemaless_registry
482                .type_name_by_id(type_id)
483                .map(str::to_owned)
484        } else {
485            self.edge_type_name_by_id(type_id).map(str::to_owned)
486        }
487    }
488
489    /// Returns all edge type IDs, including both schema-defined and schemaless types.
490    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
491    pub fn all_edge_type_ids(&self) -> Vec<u32> {
492        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
493        ids.extend(self.schemaless_registry.all_type_ids());
494        ids.sort_unstable();
495        ids
496    }
497}
498
499/// Lifecycle status of an index.
500#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
501pub enum IndexStatus {
502    /// Index is up-to-date and available for queries.
503    #[default]
504    Online,
505    /// Index is currently being rebuilt.
506    Building,
507    /// Index is outdated and scheduled for rebuild.
508    Stale,
509    /// Index rebuild failed after exhausting retries.
510    Failed,
511}
512
513/// Metadata tracking the lifecycle state of an index.
514#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
515pub struct IndexMetadata {
516    /// Current lifecycle status.
517    #[serde(default)]
518    pub status: IndexStatus,
519    /// When the index was last successfully built.
520    #[serde(default)]
521    pub last_built_at: Option<DateTime<Utc>>,
522    /// Row count of the dataset when the index was last built.
523    #[serde(default)]
524    pub row_count_at_build: Option<u64>,
525}
526
527#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
528#[serde(tag = "type")]
529#[non_exhaustive]
530pub enum IndexDefinition {
531    Vector(VectorIndexConfig),
532    FullText(FullTextIndexConfig),
533    Scalar(ScalarIndexConfig),
534    Inverted(InvertedIndexConfig),
535    JsonFullText(JsonFtsIndexConfig),
536}
537
538impl IndexDefinition {
539    /// Returns the index name for any variant.
540    pub fn name(&self) -> &str {
541        match self {
542            IndexDefinition::Vector(c) => &c.name,
543            IndexDefinition::FullText(c) => &c.name,
544            IndexDefinition::Scalar(c) => &c.name,
545            IndexDefinition::Inverted(c) => &c.name,
546            IndexDefinition::JsonFullText(c) => &c.name,
547        }
548    }
549
550    /// Returns the label this index is defined on.
551    pub fn label(&self) -> &str {
552        match self {
553            IndexDefinition::Vector(c) => &c.label,
554            IndexDefinition::FullText(c) => &c.label,
555            IndexDefinition::Scalar(c) => &c.label,
556            IndexDefinition::Inverted(c) => &c.label,
557            IndexDefinition::JsonFullText(c) => &c.label,
558        }
559    }
560
561    /// Returns a reference to the index lifecycle metadata.
562    pub fn metadata(&self) -> &IndexMetadata {
563        match self {
564            IndexDefinition::Vector(c) => &c.metadata,
565            IndexDefinition::FullText(c) => &c.metadata,
566            IndexDefinition::Scalar(c) => &c.metadata,
567            IndexDefinition::Inverted(c) => &c.metadata,
568            IndexDefinition::JsonFullText(c) => &c.metadata,
569        }
570    }
571
572    /// Returns a mutable reference to the index lifecycle metadata.
573    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
574        match self {
575            IndexDefinition::Vector(c) => &mut c.metadata,
576            IndexDefinition::FullText(c) => &mut c.metadata,
577            IndexDefinition::Scalar(c) => &mut c.metadata,
578            IndexDefinition::Inverted(c) => &mut c.metadata,
579            IndexDefinition::JsonFullText(c) => &mut c.metadata,
580        }
581    }
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
585pub struct InvertedIndexConfig {
586    pub name: String,
587    pub label: String,
588    pub property: String,
589    #[serde(default = "default_normalize")]
590    pub normalize: bool,
591    #[serde(default = "default_max_terms_per_doc")]
592    pub max_terms_per_doc: usize,
593    #[serde(default)]
594    pub metadata: IndexMetadata,
595}
596
597fn default_normalize() -> bool {
598    true
599}
600
601fn default_max_terms_per_doc() -> usize {
602    10_000
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
606pub struct VectorIndexConfig {
607    pub name: String,
608    pub label: String,
609    pub property: String,
610    pub index_type: VectorIndexType,
611    pub metric: DistanceMetric,
612    pub embedding_config: Option<EmbeddingConfig>,
613    #[serde(default)]
614    pub metadata: IndexMetadata,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
618pub struct EmbeddingConfig {
619    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
620    pub alias: String,
621    pub source_properties: Vec<String>,
622    pub batch_size: usize,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
626#[non_exhaustive]
627pub enum VectorIndexType {
628    IvfPq {
629        num_partitions: u32,
630        num_sub_vectors: u32,
631        bits_per_subvector: u8,
632    },
633    Hnsw {
634        m: u32,
635        ef_construction: u32,
636        ef_search: u32,
637    },
638    Flat,
639}
640
641#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
642#[non_exhaustive]
643pub enum DistanceMetric {
644    Cosine,
645    L2,
646    Dot,
647}
648
649impl DistanceMetric {
650    /// Computes the distance between two vectors using this metric.
651    ///
652    /// All metrics follow LanceDB conventions so that lower values indicate
653    /// greater similarity:
654    /// - **L2**: squared Euclidean distance.
655    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
656    /// - **Dot**: negative dot product.
657    ///
658    /// # Panics
659    ///
660    /// Panics if `a` and `b` have different lengths.
661    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
662        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
663        match self {
664            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
665            DistanceMetric::Cosine => {
666                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
667                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
668                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
669                let denom = norm_a * norm_b;
670                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
671            }
672            DistanceMetric::Dot => {
673                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
674                -dot
675            }
676        }
677    }
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
681pub struct FullTextIndexConfig {
682    pub name: String,
683    pub label: String,
684    pub properties: Vec<String>,
685    pub tokenizer: TokenizerConfig,
686    pub with_positions: bool,
687    #[serde(default)]
688    pub metadata: IndexMetadata,
689}
690
691#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
692#[non_exhaustive]
693pub enum TokenizerConfig {
694    Standard,
695    Whitespace,
696    Ngram { min: u8, max: u8 },
697    Custom { name: String },
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
701pub struct JsonFtsIndexConfig {
702    pub name: String,
703    pub label: String,
704    pub column: String,
705    #[serde(default)]
706    pub paths: Vec<String>,
707    #[serde(default)]
708    pub with_positions: bool,
709    #[serde(default)]
710    pub metadata: IndexMetadata,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
714pub struct ScalarIndexConfig {
715    pub name: String,
716    pub label: String,
717    pub properties: Vec<String>,
718    pub index_type: ScalarIndexType,
719    pub where_clause: Option<String>,
720    #[serde(default)]
721    pub metadata: IndexMetadata,
722}
723
724#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
725#[non_exhaustive]
726pub enum ScalarIndexType {
727    BTree,
728    Hash,
729    Bitmap,
730}
731
732pub struct SchemaManager {
733    store: Arc<dyn ObjectStore>,
734    path: ObjectStorePath,
735    schema: RwLock<Arc<Schema>>,
736}
737
738impl SchemaManager {
739    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
740        let path = path.as_ref();
741        let parent = path
742            .parent()
743            .ok_or_else(|| anyhow!("Invalid schema path"))?;
744        let filename = path
745            .file_name()
746            .ok_or_else(|| anyhow!("Invalid schema filename"))?
747            .to_str()
748            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
749
750        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
751        let obj_path = ObjectStorePath::from(filename);
752
753        Self::load_from_store(store, &obj_path).await
754    }
755
756    pub async fn load_from_store(
757        store: Arc<dyn ObjectStore>,
758        path: &ObjectStorePath,
759    ) -> Result<Self> {
760        match store.get(path).await {
761            Ok(result) => {
762                let bytes = result.bytes().await?;
763                let content = String::from_utf8(bytes.to_vec())?;
764                let schema: Schema = serde_json::from_str(&content)?;
765                Ok(Self {
766                    store,
767                    path: path.clone(),
768                    schema: RwLock::new(Arc::new(schema)),
769                })
770            }
771            Err(object_store::Error::NotFound { .. }) => Ok(Self {
772                store,
773                path: path.clone(),
774                schema: RwLock::new(Arc::new(Schema::default())),
775            }),
776            Err(e) => Err(anyhow::Error::from(e)),
777        }
778    }
779
780    pub async fn save(&self) -> Result<()> {
781        let content = {
782            let schema_guard = acquire_read(&self.schema, "schema")?;
783            serde_json::to_string_pretty(&**schema_guard)?
784        };
785        self.store
786            .put(&self.path, content.into())
787            .await
788            .map_err(anyhow::Error::from)?;
789        Ok(())
790    }
791
792    pub fn path(&self) -> &ObjectStorePath {
793        &self.path
794    }
795
796    pub fn schema(&self) -> Arc<Schema> {
797        self.schema
798            .read()
799            .expect("Schema lock poisoned - a thread panicked while holding it")
800            .clone()
801    }
802
803    /// Normalize function names in an expression to uppercase for case-insensitive matching.
804    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
805    fn normalize_function_names(expr: &str) -> String {
806        let mut result = String::with_capacity(expr.len());
807        let mut chars = expr.chars().peekable();
808
809        while let Some(ch) = chars.next() {
810            if ch.is_alphabetic() {
811                // Collect identifier
812                let mut ident = String::new();
813                ident.push(ch);
814
815                while let Some(&next) = chars.peek() {
816                    if next.is_alphanumeric() || next == '_' {
817                        ident.push(chars.next().unwrap());
818                    } else {
819                        break;
820                    }
821                }
822
823                // If followed by '(', it's a function call - uppercase it
824                if chars.peek() == Some(&'(') {
825                    result.push_str(&ident.to_uppercase());
826                } else {
827                    result.push_str(&ident); // Keep property names as-is
828                }
829            } else {
830                result.push(ch);
831            }
832        }
833
834        result
835    }
836
837    /// Generate a consistent internal column name for an expression index.
838    /// Uses a hash suffix to ensure uniqueness for different expressions that
839    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
840    ///
841    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
842    /// DefaultHasher is not guaranteed to be stable and could break persistent data
843    /// if the hash changes after a compiler upgrade.
844    pub fn generated_column_name(expr: &str) -> String {
845        // Normalize function names to uppercase for case-insensitive matching
846        let normalized = Self::normalize_function_names(expr);
847
848        let sanitized = normalized
849            .replace(|c: char| !c.is_alphanumeric(), "_")
850            .trim_matches('_')
851            .to_string();
852
853        // FNV-1a 64-bit hash - stable across Rust versions and platforms
854        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
855        const FNV_PRIME: u64 = 1099511628211;
856
857        let mut hash = FNV_OFFSET_BASIS;
858        for byte in normalized.as_bytes() {
859            hash ^= *byte as u64;
860            hash = hash.wrapping_mul(FNV_PRIME);
861        }
862
863        format!("_gen_{}_{:x}", sanitized, hash)
864    }
865
866    pub fn replace_schema(&self, new_schema: Schema) {
867        let mut schema = self
868            .schema
869            .write()
870            .expect("Schema lock poisoned - a thread panicked while holding it");
871        *schema = Arc::new(new_schema);
872    }
873
874    pub fn next_label_id(&self) -> u16 {
875        self.schema()
876            .labels
877            .values()
878            .map(|l| l.id)
879            .max()
880            .unwrap_or(0)
881            + 1
882    }
883
884    pub fn next_type_id(&self) -> u32 {
885        let max_schema_id = self
886            .schema()
887            .edge_types
888            .values()
889            .map(|t| t.id)
890            .max()
891            .unwrap_or(0);
892
893        // Ensure we stay in schema'd ID space (bit 31 = 0)
894        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
895            panic!("Schema edge type ID exhaustion");
896        }
897
898        max_schema_id + 1
899    }
900
901    pub fn add_label(&self, name: &str) -> Result<u16> {
902        let mut guard = acquire_write(&self.schema, "schema")?;
903        let schema = Arc::make_mut(&mut *guard);
904        if schema.labels.contains_key(name) {
905            return Err(anyhow!("Label '{}' already exists", name));
906        }
907
908        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
909        schema.labels.insert(
910            name.to_string(),
911            LabelMeta {
912                id,
913                created_at: Utc::now(),
914                state: SchemaElementState::Active,
915            },
916        );
917        Ok(id)
918    }
919
920    pub fn add_edge_type(
921        &self,
922        name: &str,
923        src_labels: Vec<String>,
924        dst_labels: Vec<String>,
925    ) -> Result<u32> {
926        let mut guard = acquire_write(&self.schema, "schema")?;
927        let schema = Arc::make_mut(&mut *guard);
928        if schema.edge_types.contains_key(name) {
929            return Err(anyhow!("Edge type '{}' already exists", name));
930        }
931
932        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
933
934        // Ensure we stay in schema'd ID space (bit 31 = 0)
935        if id >= MAX_SCHEMA_TYPE_ID {
936            return Err(anyhow!("Schema edge type ID exhaustion"));
937        }
938
939        schema.edge_types.insert(
940            name.to_string(),
941            EdgeTypeMeta {
942                id,
943                src_labels,
944                dst_labels,
945                state: SchemaElementState::Active,
946            },
947        );
948        Ok(id)
949    }
950
951    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
952    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
953        let mut guard = acquire_write(&self.schema, "schema")
954            .expect("Schema lock poisoned - a thread panicked while holding it");
955        let schema = Arc::make_mut(&mut *guard);
956        schema.get_or_assign_edge_type_id(type_name)
957    }
958
959    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
960    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
961        let schema = acquire_read(&self.schema, "schema")
962            .expect("Schema lock poisoned - a thread panicked while holding it");
963        schema.edge_type_name_by_id_unified(type_id)
964    }
965
966    pub fn add_property(
967        &self,
968        label_or_type: &str,
969        prop_name: &str,
970        data_type: DataType,
971        nullable: bool,
972    ) -> Result<()> {
973        let mut guard = acquire_write(&self.schema, "schema")?;
974        let schema = Arc::make_mut(&mut *guard);
975        let version = schema.schema_version;
976        let props = schema
977            .properties
978            .entry(label_or_type.to_string())
979            .or_default();
980
981        if props.contains_key(prop_name) {
982            return Err(anyhow!(
983                "Property '{}' already exists for '{}'",
984                prop_name,
985                label_or_type
986            ));
987        }
988
989        props.insert(
990            prop_name.to_string(),
991            PropertyMeta {
992                r#type: data_type,
993                nullable,
994                added_in: version,
995                state: SchemaElementState::Active,
996                generation_expression: None,
997            },
998        );
999        Ok(())
1000    }
1001
1002    pub fn add_generated_property(
1003        &self,
1004        label_or_type: &str,
1005        prop_name: &str,
1006        data_type: DataType,
1007        expr: String,
1008    ) -> Result<()> {
1009        let mut guard = acquire_write(&self.schema, "schema")?;
1010        let schema = Arc::make_mut(&mut *guard);
1011        let version = schema.schema_version;
1012        let props = schema
1013            .properties
1014            .entry(label_or_type.to_string())
1015            .or_default();
1016
1017        if props.contains_key(prop_name) {
1018            return Err(anyhow!("Property '{}' already exists", prop_name));
1019        }
1020
1021        props.insert(
1022            prop_name.to_string(),
1023            PropertyMeta {
1024                r#type: data_type,
1025                nullable: true,
1026                added_in: version,
1027                state: SchemaElementState::Active,
1028                generation_expression: Some(expr),
1029            },
1030        );
1031        Ok(())
1032    }
1033
1034    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1035        let mut guard = acquire_write(&self.schema, "schema")?;
1036        let schema = Arc::make_mut(&mut *guard);
1037        schema.indexes.push(index_def);
1038        Ok(())
1039    }
1040
1041    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1042        let schema = self.schema.read().expect("Schema lock poisoned");
1043        schema.indexes.iter().find(|i| i.name() == name).cloned()
1044    }
1045
1046    /// Updates the lifecycle metadata for an index by name.
1047    ///
1048    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1049    /// allowing callers to update status, timestamps, etc.
1050    pub fn update_index_metadata(
1051        &self,
1052        index_name: &str,
1053        f: impl FnOnce(&mut IndexMetadata),
1054    ) -> Result<()> {
1055        let mut guard = acquire_write(&self.schema, "schema")?;
1056        let schema = Arc::make_mut(&mut *guard);
1057        let idx = schema
1058            .indexes
1059            .iter_mut()
1060            .find(|i| i.name() == index_name)
1061            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1062        f(idx.metadata_mut());
1063        Ok(())
1064    }
1065
1066    pub fn remove_index(&self, name: &str) -> Result<()> {
1067        let mut guard = acquire_write(&self.schema, "schema")?;
1068        let schema = Arc::make_mut(&mut *guard);
1069        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1070            schema.indexes.remove(pos);
1071            Ok(())
1072        } else {
1073            Err(anyhow!("Index '{}' not found", name))
1074        }
1075    }
1076
1077    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1078        let mut guard = acquire_write(&self.schema, "schema")?;
1079        let schema = Arc::make_mut(&mut *guard);
1080        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1081            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1082        }
1083        schema.constraints.push(constraint);
1084        Ok(())
1085    }
1086
1087    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1088        let mut guard = acquire_write(&self.schema, "schema")?;
1089        let schema = Arc::make_mut(&mut *guard);
1090        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1091            schema.constraints.remove(pos);
1092            Ok(())
1093        } else if if_exists {
1094            Ok(())
1095        } else {
1096            Err(anyhow!("Constraint '{}' not found", name))
1097        }
1098    }
1099
1100    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1101        let mut guard = acquire_write(&self.schema, "schema")?;
1102        let schema = Arc::make_mut(&mut *guard);
1103        if let Some(props) = schema.properties.get_mut(label_or_type) {
1104            if props.remove(prop_name).is_some() {
1105                Ok(())
1106            } else {
1107                Err(anyhow!(
1108                    "Property '{}' not found for '{}'",
1109                    prop_name,
1110                    label_or_type
1111                ))
1112            }
1113        } else {
1114            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1115        }
1116    }
1117
1118    pub fn rename_property(
1119        &self,
1120        label_or_type: &str,
1121        old_name: &str,
1122        new_name: &str,
1123    ) -> Result<()> {
1124        let mut guard = acquire_write(&self.schema, "schema")?;
1125        let schema = Arc::make_mut(&mut *guard);
1126        if let Some(props) = schema.properties.get_mut(label_or_type) {
1127            if let Some(meta) = props.remove(old_name) {
1128                if props.contains_key(new_name) {
1129                    // Rollback removal? Or just error.
1130                    props.insert(old_name.to_string(), meta); // Restore
1131                    return Err(anyhow!("Property '{}' already exists", new_name));
1132                }
1133                props.insert(new_name.to_string(), meta);
1134                Ok(())
1135            } else {
1136                Err(anyhow!(
1137                    "Property '{}' not found for '{}'",
1138                    old_name,
1139                    label_or_type
1140                ))
1141            }
1142        } else {
1143            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1144        }
1145    }
1146
1147    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1148        let mut guard = acquire_write(&self.schema, "schema")?;
1149        let schema = Arc::make_mut(&mut *guard);
1150        if let Some(label_meta) = schema.labels.get_mut(name) {
1151            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1152            // Do not remove properties; they are implicitly tombstoned by the label
1153            Ok(())
1154        } else if if_exists {
1155            Ok(())
1156        } else {
1157            Err(anyhow!("Label '{}' not found", name))
1158        }
1159    }
1160
1161    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1162        let mut guard = acquire_write(&self.schema, "schema")?;
1163        let schema = Arc::make_mut(&mut *guard);
1164        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1165            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1166            // Do not remove properties; they are implicitly tombstoned by the edge type
1167            Ok(())
1168        } else if if_exists {
1169            Ok(())
1170        } else {
1171            Err(anyhow!("Edge Type '{}' not found", name))
1172        }
1173    }
1174}
1175
1176/// Validate identifier names to prevent injection and ensure compatibility.
1177pub fn validate_identifier(name: &str) -> Result<()> {
1178    // Length check
1179    if name.is_empty() || name.len() > 64 {
1180        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1181    }
1182
1183    // First character must be letter or underscore
1184    let first = name.chars().next().unwrap();
1185    if !first.is_alphabetic() && first != '_' {
1186        return Err(anyhow!(
1187            "Identifier '{}' must start with letter or underscore",
1188            name
1189        ));
1190    }
1191
1192    // Remaining characters: alphanumeric or underscore
1193    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1194        return Err(anyhow!(
1195            "Identifier '{}' must contain only alphanumeric and underscore",
1196            name
1197        ));
1198    }
1199
1200    // Reserved words
1201    const RESERVED: &[&str] = &[
1202        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1203        "UNION", "ORDER", "LIMIT",
1204    ];
1205    if RESERVED.contains(&name.to_uppercase().as_str()) {
1206        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1207    }
1208
1209    Ok(())
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use super::*;
1215    use object_store::local::LocalFileSystem;
1216    use tempfile::tempdir;
1217
1218    #[tokio::test]
1219    async fn test_schema_management() -> Result<()> {
1220        let dir = tempdir()?;
1221        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1222        let path = ObjectStorePath::from("schema.json");
1223        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1224
1225        // Labels
1226        let lid = manager.add_label("Person")?;
1227        assert_eq!(lid, 1);
1228        assert!(manager.add_label("Person").is_err());
1229
1230        // Properties
1231        manager.add_property("Person", "name", DataType::String, false)?;
1232        assert!(
1233            manager
1234                .add_property("Person", "name", DataType::String, false)
1235                .is_err()
1236        );
1237
1238        // Edge types
1239        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1240        assert_eq!(tid, 1);
1241
1242        manager.save().await?;
1243        // Check file exists
1244        assert!(store.get(&path).await.is_ok());
1245
1246        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1247        assert!(manager2.schema().labels.contains_key("Person"));
1248        assert!(
1249            manager2
1250                .schema()
1251                .properties
1252                .get("Person")
1253                .unwrap()
1254                .contains_key("name")
1255        );
1256
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn test_normalize_function_names() {
1262        assert_eq!(
1263            SchemaManager::normalize_function_names("lower(email)"),
1264            "LOWER(email)"
1265        );
1266        assert_eq!(
1267            SchemaManager::normalize_function_names("LOWER(email)"),
1268            "LOWER(email)"
1269        );
1270        assert_eq!(
1271            SchemaManager::normalize_function_names("Lower(email)"),
1272            "LOWER(email)"
1273        );
1274        assert_eq!(
1275            SchemaManager::normalize_function_names("trim(lower(email))"),
1276            "TRIM(LOWER(email))"
1277        );
1278    }
1279
1280    #[test]
1281    fn test_generated_column_name_case_insensitive() {
1282        let col1 = SchemaManager::generated_column_name("lower(email)");
1283        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1284        let col3 = SchemaManager::generated_column_name("Lower(email)");
1285        assert_eq!(col1, col2);
1286        assert_eq!(col2, col3);
1287        assert!(col1.starts_with("_gen_LOWER_email_"));
1288    }
1289
1290    #[test]
1291    fn test_index_metadata_serde_backward_compat() {
1292        // Simulate old JSON without metadata field
1293        let json = r#"{
1294            "type": "Scalar",
1295            "name": "idx_person_name",
1296            "label": "Person",
1297            "properties": ["name"],
1298            "index_type": "BTree",
1299            "where_clause": null
1300        }"#;
1301        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1302        let meta = def.metadata();
1303        assert_eq!(meta.status, IndexStatus::Online);
1304        assert!(meta.last_built_at.is_none());
1305        assert!(meta.row_count_at_build.is_none());
1306    }
1307
1308    #[test]
1309    fn test_index_metadata_serde_roundtrip() {
1310        let now = Utc::now();
1311        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1312            name: "idx_test".to_string(),
1313            label: "Test".to_string(),
1314            properties: vec!["prop".to_string()],
1315            index_type: ScalarIndexType::BTree,
1316            where_clause: None,
1317            metadata: IndexMetadata {
1318                status: IndexStatus::Building,
1319                last_built_at: Some(now),
1320                row_count_at_build: Some(42),
1321            },
1322        });
1323
1324        let json = serde_json::to_string(&def).unwrap();
1325        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1326        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1327        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1328        assert!(parsed.metadata().last_built_at.is_some());
1329    }
1330
1331    #[tokio::test]
1332    async fn test_update_index_metadata() -> Result<()> {
1333        let dir = tempdir()?;
1334        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1335        let path = ObjectStorePath::from("schema.json");
1336        let manager = SchemaManager::load_from_store(store, &path).await?;
1337
1338        manager.add_label("Person")?;
1339        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1340            name: "idx_test".to_string(),
1341            label: "Person".to_string(),
1342            properties: vec!["name".to_string()],
1343            index_type: ScalarIndexType::BTree,
1344            where_clause: None,
1345            metadata: Default::default(),
1346        });
1347        manager.add_index(idx)?;
1348
1349        // Verify initial status is Online
1350        let initial = manager.get_index("idx_test").unwrap();
1351        assert_eq!(initial.metadata().status, IndexStatus::Online);
1352
1353        // Update to Building
1354        manager.update_index_metadata("idx_test", |m| {
1355            m.status = IndexStatus::Building;
1356            m.row_count_at_build = Some(100);
1357        })?;
1358
1359        let updated = manager.get_index("idx_test").unwrap();
1360        assert_eq!(updated.metadata().status, IndexStatus::Building);
1361        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1362
1363        // Non-existent index should error
1364        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1365
1366        Ok(())
1367    }
1368
1369    #[test]
1370    fn test_vector_index_for_property_skips_non_online() {
1371        let mut schema = Schema::default();
1372        schema.labels.insert(
1373            "Document".to_string(),
1374            LabelMeta {
1375                id: 1,
1376                created_at: chrono::Utc::now(),
1377                state: SchemaElementState::Active,
1378            },
1379        );
1380
1381        // Add a vector index with Stale status
1382        schema
1383            .indexes
1384            .push(IndexDefinition::Vector(VectorIndexConfig {
1385                name: "vec_doc_embedding".to_string(),
1386                label: "Document".to_string(),
1387                property: "embedding".to_string(),
1388                index_type: VectorIndexType::Flat,
1389                metric: DistanceMetric::Cosine,
1390                embedding_config: None,
1391                metadata: IndexMetadata {
1392                    status: IndexStatus::Stale,
1393                    ..Default::default()
1394                },
1395            }));
1396
1397        // Stale index should NOT be returned
1398        assert!(
1399            schema
1400                .vector_index_for_property("Document", "embedding")
1401                .is_none()
1402        );
1403
1404        // Set to Online — should now be returned
1405        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1406            cfg.metadata.status = IndexStatus::Online;
1407        }
1408        let result = schema.vector_index_for_property("Document", "embedding");
1409        assert!(result.is_some());
1410        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1411    }
1412}