Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19    Active,
20    Hidden {
21        since: DateTime<Utc>,
22        last_active_snapshot: String, // SnapshotId
23    },
24    Tombstone {
25        since: DateTime<Utc>,
26    },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
32///
33/// DateTime is encoded as a 3-field struct to preserve timezone information:
34/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
35/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
36/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
37pub fn datetime_struct_fields() -> Fields {
38    Fields::from(vec![
39        Field::new(
40            "nanos_since_epoch",
41            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42            true,
43        ),
44        Field::new("offset_seconds", ArrowDataType::Int32, true),
45        Field::new("timezone_name", ArrowDataType::Utf8, true),
46    ])
47}
48
49/// Returns the canonical struct field definitions for Time encoding in Arrow.
50///
51/// Time is encoded as a 2-field struct to preserve timezone offset:
52/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
53/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
54pub fn time_struct_fields() -> Fields {
55    Fields::from(vec![
56        Field::new(
57            "nanos_since_midnight",
58            ArrowDataType::Time64(TimeUnit::Nanosecond),
59            true,
60        ),
61        Field::new("offset_seconds", ArrowDataType::Int32, true),
62    ])
63}
64
65/// Detects if an Arrow DataType is the canonical DateTime struct.
66pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70/// Detects if an Arrow DataType is the canonical Time struct.
71pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78    GCounter,
79    GSet,
80    ORSet,
81    LWWRegister,
82    LWWMap,
83    Rga,
84    VectorClock,
85    VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90    Geographic,  // WGS84
91    Cartesian2D, // x, y
92    Cartesian3D, // x, y, z
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98    String,
99    Int32,
100    Int64,
101    Float32,
102    Float64,
103    Bool,
104    Timestamp,
105    Date,
106    Time,
107    DateTime,
108    Duration,
109    CypherValue,
110    Point(PointType),
111    Vector { dimensions: usize },
112    Btic,
113    Crdt(CrdtType),
114    List(Box<DataType>),
115    Map(Box<DataType>, Box<DataType>),
116}
117
118impl DataType {
119    // Alias for compatibility/convenience if needed, but preferable to use exact types.
120    #[allow(non_upper_case_globals)]
121    pub const Float: DataType = DataType::Float64;
122    #[allow(non_upper_case_globals)]
123    pub const Int: DataType = DataType::Int64;
124
125    pub fn to_arrow(&self) -> ArrowDataType {
126        match self {
127            DataType::String => ArrowDataType::Utf8,
128            DataType::Int32 => ArrowDataType::Int32,
129            DataType::Int64 => ArrowDataType::Int64,
130            DataType::Float32 => ArrowDataType::Float32,
131            DataType::Float64 => ArrowDataType::Float64,
132            DataType::Bool => ArrowDataType::Boolean,
133            DataType::Timestamp => {
134                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
135            }
136            DataType::Date => ArrowDataType::Date32,
137            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
138            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
139            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
140            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
141            DataType::Point(pt) => match pt {
142                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
143                    Field::new("latitude", ArrowDataType::Float64, false),
144                    Field::new("longitude", ArrowDataType::Float64, false),
145                    Field::new("crs", ArrowDataType::Utf8, false),
146                ])),
147                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
148                    Field::new("x", ArrowDataType::Float64, false),
149                    Field::new("y", ArrowDataType::Float64, false),
150                    Field::new("crs", ArrowDataType::Utf8, false),
151                ])),
152                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
153                    Field::new("x", ArrowDataType::Float64, false),
154                    Field::new("y", ArrowDataType::Float64, false),
155                    Field::new("z", ArrowDataType::Float64, false),
156                    Field::new("crs", ArrowDataType::Utf8, false),
157                ])),
158            },
159            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
160                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
161                *dimensions as i32,
162            ),
163            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
164            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
165            DataType::List(inner) => {
166                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
167            }
168            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
169                "item",
170                ArrowDataType::Struct(Fields::from(vec![
171                    Field::new("key", key.to_arrow(), false),
172                    Field::new("value", value.to_arrow(), true),
173                ])),
174                true,
175            ))),
176        }
177    }
178}
179
180fn default_created_at() -> DateTime<Utc> {
181    Utc::now()
182}
183
184fn default_state() -> SchemaElementState {
185    SchemaElementState::Active
186}
187
188fn default_version_1() -> u32 {
189    1
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct PropertyMeta {
194    pub r#type: DataType,
195    pub nullable: bool,
196    #[serde(default = "default_version_1")]
197    pub added_in: u32, // SchemaVersion
198    #[serde(default = "default_state")]
199    pub state: SchemaElementState,
200    #[serde(default)]
201    pub generation_expression: Option<String>,
202}
203
204#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct LabelMeta {
206    pub id: u16, // LabelId
207    #[serde(default = "default_created_at")]
208    pub created_at: DateTime<Utc>,
209    #[serde(default = "default_state")]
210    pub state: SchemaElementState,
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize)]
214pub struct EdgeTypeMeta {
215    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
216    pub id: u32,
217    pub src_labels: Vec<String>,
218    pub dst_labels: Vec<String>,
219    #[serde(default = "default_state")]
220    pub state: SchemaElementState,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224#[non_exhaustive]
225pub enum ConstraintType {
226    Unique { properties: Vec<String> },
227    Exists { property: String },
228    Check { expression: String },
229}
230
231#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
232#[non_exhaustive]
233pub enum ConstraintTarget {
234    Label(String),
235    EdgeType(String),
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize)]
239pub struct Constraint {
240    pub name: String,
241    pub constraint_type: ConstraintType,
242    pub target: ConstraintTarget,
243    pub enabled: bool,
244}
245
246/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
247///
248/// Edge types not defined in the schema are assigned IDs at runtime with
249/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
250/// the name-to-ID and ID-to-name mappings for those types.
251#[derive(Clone, Debug, Serialize, Deserialize)]
252pub struct SchemalessEdgeTypeRegistry {
253    name_to_id: HashMap<String, u32>,
254    id_to_name: HashMap<u32, String>,
255    /// Next local ID to assign (0 is reserved for invalid).
256    next_local_id: u32,
257}
258
259impl SchemalessEdgeTypeRegistry {
260    pub fn new() -> Self {
261        Self {
262            name_to_id: HashMap::new(),
263            id_to_name: HashMap::new(),
264            next_local_id: 1,
265        }
266    }
267
268    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
269    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
270        if let Some(&id) = self.name_to_id.get(type_name) {
271            return id;
272        }
273
274        let id = make_schemaless_id(self.next_local_id);
275        self.next_local_id += 1;
276
277        self.name_to_id.insert(type_name.to_string(), id);
278        self.id_to_name.insert(id, type_name.to_string());
279
280        id
281    }
282
283    /// Looks up the edge type name for a schemaless ID.
284    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
285        self.id_to_name.get(&type_id).map(String::as_str)
286    }
287
288    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
289    pub fn contains(&self, type_name: &str) -> bool {
290        self.name_to_id.contains_key(type_name)
291    }
292
293    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
294    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
295        self.name_to_id
296            .iter()
297            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
298            .map(|(_, &id)| id)
299    }
300
301    /// Returns all registered schemaless type IDs.
302    pub fn all_type_ids(&self) -> Vec<u32> {
303        self.id_to_name.keys().copied().collect()
304    }
305
306    /// Returns true if the registry has any schemaless types.
307    pub fn is_empty(&self) -> bool {
308        self.name_to_id.is_empty()
309    }
310}
311
312impl Default for SchemalessEdgeTypeRegistry {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct Schema {
320    pub schema_version: u32,
321    pub labels: HashMap<String, LabelMeta>,
322    pub edge_types: HashMap<String, EdgeTypeMeta>,
323    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
324    #[serde(default)]
325    pub indexes: Vec<IndexDefinition>,
326    #[serde(default)]
327    pub constraints: Vec<Constraint>,
328    /// Registry for schemaless edge types (dynamically assigned IDs)
329    #[serde(default)]
330    pub schemaless_registry: SchemalessEdgeTypeRegistry,
331}
332
333impl Default for Schema {
334    fn default() -> Self {
335        Self {
336            schema_version: 1,
337            labels: HashMap::new(),
338            edge_types: HashMap::new(),
339            properties: HashMap::new(),
340            indexes: Vec::new(),
341            constraints: Vec::new(),
342            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
343        }
344    }
345}
346
347impl Schema {
348    /// Returns the label name for a given label ID.
349    ///
350    /// Performs a linear scan over all labels. This is efficient because
351    /// the number of labels in a schema is typically small.
352    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
353        self.labels
354            .iter()
355            .find(|(_, meta)| meta.id == label_id)
356            .map(|(name, _)| name.as_str())
357    }
358
359    /// Returns the label ID for a given label name.
360    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
361        self.labels.get(label_name).map(|meta| meta.id)
362    }
363
364    /// Returns the edge type name for a given type ID.
365    ///
366    /// Performs a linear scan over all edge types. This is efficient because
367    /// the number of edge types in a schema is typically small.
368    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
369        self.edge_types
370            .iter()
371            .find(|(_, meta)| meta.id == type_id)
372            .map(|(name, _)| name.as_str())
373    }
374
375    /// Returns the edge type ID for a given type name.
376    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
377        self.edge_types.get(type_name).map(|meta| meta.id)
378    }
379
380    /// Returns the vector index configuration for a given label and property.
381    ///
382    /// Performs a linear scan over all indexes. This is efficient because
383    /// the number of indexes in a schema is typically small.
384    pub fn vector_index_for_property(
385        &self,
386        label: &str,
387        property: &str,
388    ) -> Option<&VectorIndexConfig> {
389        self.indexes.iter().find_map(|idx| {
390            if let IndexDefinition::Vector(config) = idx
391                && config.label == label
392                && config.property == property
393                && config.metadata.status == IndexStatus::Online
394            {
395                return Some(config);
396            }
397            None
398        })
399    }
400
401    /// Returns the full-text index configuration for a given label and property.
402    ///
403    /// A full-text index covers one or more properties. This returns the config
404    /// if the specified property is among the indexed properties.
405    pub fn fulltext_index_for_property(
406        &self,
407        label: &str,
408        property: &str,
409    ) -> Option<&FullTextIndexConfig> {
410        self.indexes.iter().find_map(|idx| {
411            if let IndexDefinition::FullText(config) = idx
412                && config.label == label
413                && config.properties.iter().any(|p| p == property)
414                && config.metadata.status == IndexStatus::Online
415            {
416                return Some(config);
417            }
418            None
419        })
420    }
421
422    /// Get label metadata with case-insensitive lookup.
423    ///
424    /// This allows queries to match labels regardless of case, providing
425    /// better user experience when label names vary in casing.
426    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
427        self.labels
428            .iter()
429            .find(|(k, _)| k.eq_ignore_ascii_case(name))
430            .map(|(_, v)| v)
431    }
432
433    /// Get label ID with case-insensitive lookup.
434    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
435        self.get_label_case_insensitive(label_name)
436            .map(|meta| meta.id)
437    }
438
439    /// Get edge type metadata with case-insensitive lookup.
440    ///
441    /// This allows queries to match edge types regardless of case, providing
442    /// better user experience when type names vary in casing.
443    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
444        self.edge_types
445            .iter()
446            .find(|(k, _)| k.eq_ignore_ascii_case(name))
447            .map(|(_, v)| v)
448    }
449
450    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
451    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
452        self.get_edge_type_case_insensitive(type_name)
453            .map(|meta| meta.id)
454    }
455
456    /// Get edge type ID with case-insensitive lookup, checking both
457    /// schema-defined and schemaless registries.
458    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
459        self.edge_type_id_by_name_case_insensitive(type_name)
460            .or_else(|| {
461                self.schemaless_registry
462                    .id_by_name_case_insensitive(type_name)
463            })
464    }
465
466    /// Returns the edge type ID for `type_name`, checking the schema first
467    /// and falling back to the schemaless registry (assigning a new ID if needed).
468    ///
469    /// Requires `&mut self` because it may assign a new schemaless ID.
470    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
471    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
472        if let Some(id) = self.edge_type_id_by_name(type_name) {
473            return id;
474        }
475        self.schemaless_registry.get_or_assign_id(type_name)
476    }
477
478    /// Returns the edge type name for `type_id`, checking both the schema
479    /// and schemaless registries. Returns an owned `String` because the
480    /// name may come from either registry.
481    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
482        if is_schemaless_edge_type(type_id) {
483            self.schemaless_registry
484                .type_name_by_id(type_id)
485                .map(str::to_owned)
486        } else {
487            self.edge_type_name_by_id(type_id).map(str::to_owned)
488        }
489    }
490
491    /// Returns all edge type IDs, including both schema-defined and schemaless types.
492    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
493    pub fn all_edge_type_ids(&self) -> Vec<u32> {
494        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
495        ids.extend(self.schemaless_registry.all_type_ids());
496        ids.sort_unstable();
497        ids
498    }
499}
500
501/// Lifecycle status of an index.
502#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
503pub enum IndexStatus {
504    /// Index is up-to-date and available for queries.
505    #[default]
506    Online,
507    /// Index is currently being rebuilt.
508    Building,
509    /// Index is outdated and scheduled for rebuild.
510    Stale,
511    /// Index rebuild failed after exhausting retries.
512    Failed,
513}
514
515/// Metadata tracking the lifecycle state of an index.
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
517pub struct IndexMetadata {
518    /// Current lifecycle status.
519    #[serde(default)]
520    pub status: IndexStatus,
521    /// When the index was last successfully built.
522    #[serde(default)]
523    pub last_built_at: Option<DateTime<Utc>>,
524    /// Row count of the dataset when the index was last built.
525    #[serde(default)]
526    pub row_count_at_build: Option<u64>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
530#[serde(tag = "type")]
531#[non_exhaustive]
532pub enum IndexDefinition {
533    Vector(VectorIndexConfig),
534    FullText(FullTextIndexConfig),
535    Scalar(ScalarIndexConfig),
536    Inverted(InvertedIndexConfig),
537    JsonFullText(JsonFtsIndexConfig),
538}
539
540impl IndexDefinition {
541    /// Returns the index name for any variant.
542    pub fn name(&self) -> &str {
543        match self {
544            IndexDefinition::Vector(c) => &c.name,
545            IndexDefinition::FullText(c) => &c.name,
546            IndexDefinition::Scalar(c) => &c.name,
547            IndexDefinition::Inverted(c) => &c.name,
548            IndexDefinition::JsonFullText(c) => &c.name,
549        }
550    }
551
552    /// Returns the label this index is defined on.
553    pub fn label(&self) -> &str {
554        match self {
555            IndexDefinition::Vector(c) => &c.label,
556            IndexDefinition::FullText(c) => &c.label,
557            IndexDefinition::Scalar(c) => &c.label,
558            IndexDefinition::Inverted(c) => &c.label,
559            IndexDefinition::JsonFullText(c) => &c.label,
560        }
561    }
562
563    /// Returns a reference to the index lifecycle metadata.
564    pub fn metadata(&self) -> &IndexMetadata {
565        match self {
566            IndexDefinition::Vector(c) => &c.metadata,
567            IndexDefinition::FullText(c) => &c.metadata,
568            IndexDefinition::Scalar(c) => &c.metadata,
569            IndexDefinition::Inverted(c) => &c.metadata,
570            IndexDefinition::JsonFullText(c) => &c.metadata,
571        }
572    }
573
574    /// Returns a mutable reference to the index lifecycle metadata.
575    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
576        match self {
577            IndexDefinition::Vector(c) => &mut c.metadata,
578            IndexDefinition::FullText(c) => &mut c.metadata,
579            IndexDefinition::Scalar(c) => &mut c.metadata,
580            IndexDefinition::Inverted(c) => &mut c.metadata,
581            IndexDefinition::JsonFullText(c) => &mut c.metadata,
582        }
583    }
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub struct InvertedIndexConfig {
588    pub name: String,
589    pub label: String,
590    pub property: String,
591    #[serde(default = "default_normalize")]
592    pub normalize: bool,
593    #[serde(default = "default_max_terms_per_doc")]
594    pub max_terms_per_doc: usize,
595    #[serde(default)]
596    pub metadata: IndexMetadata,
597}
598
599fn default_normalize() -> bool {
600    true
601}
602
603fn default_max_terms_per_doc() -> usize {
604    10_000
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
608pub struct VectorIndexConfig {
609    pub name: String,
610    pub label: String,
611    pub property: String,
612    pub index_type: VectorIndexType,
613    pub metric: DistanceMetric,
614    pub embedding_config: Option<EmbeddingConfig>,
615    #[serde(default)]
616    pub metadata: IndexMetadata,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
620pub struct EmbeddingConfig {
621    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
622    pub alias: String,
623    pub source_properties: Vec<String>,
624    pub batch_size: usize,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
628#[non_exhaustive]
629pub enum VectorIndexType {
630    IvfPq {
631        num_partitions: u32,
632        num_sub_vectors: u32,
633        bits_per_subvector: u8,
634    },
635    Hnsw {
636        m: u32,
637        ef_construction: u32,
638        ef_search: u32,
639    },
640    Flat,
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
644#[non_exhaustive]
645pub enum DistanceMetric {
646    Cosine,
647    L2,
648    Dot,
649}
650
651impl DistanceMetric {
652    /// Computes the distance between two vectors using this metric.
653    ///
654    /// All metrics follow LanceDB conventions so that lower values indicate
655    /// greater similarity:
656    /// - **L2**: squared Euclidean distance.
657    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
658    /// - **Dot**: negative dot product.
659    ///
660    /// # Panics
661    ///
662    /// Panics if `a` and `b` have different lengths.
663    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
664        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
665        match self {
666            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
667            DistanceMetric::Cosine => {
668                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
669                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
670                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
671                let denom = norm_a * norm_b;
672                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
673            }
674            DistanceMetric::Dot => {
675                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
676                -dot
677            }
678        }
679    }
680}
681
682#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
683pub struct FullTextIndexConfig {
684    pub name: String,
685    pub label: String,
686    pub properties: Vec<String>,
687    pub tokenizer: TokenizerConfig,
688    pub with_positions: bool,
689    #[serde(default)]
690    pub metadata: IndexMetadata,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
694#[non_exhaustive]
695pub enum TokenizerConfig {
696    Standard,
697    Whitespace,
698    Ngram { min: u8, max: u8 },
699    Custom { name: String },
700}
701
702#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
703pub struct JsonFtsIndexConfig {
704    pub name: String,
705    pub label: String,
706    pub column: String,
707    #[serde(default)]
708    pub paths: Vec<String>,
709    #[serde(default)]
710    pub with_positions: bool,
711    #[serde(default)]
712    pub metadata: IndexMetadata,
713}
714
715#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
716pub struct ScalarIndexConfig {
717    pub name: String,
718    pub label: String,
719    pub properties: Vec<String>,
720    pub index_type: ScalarIndexType,
721    pub where_clause: Option<String>,
722    #[serde(default)]
723    pub metadata: IndexMetadata,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
727#[non_exhaustive]
728pub enum ScalarIndexType {
729    BTree,
730    Hash,
731    Bitmap,
732}
733
734pub struct SchemaManager {
735    store: Arc<dyn ObjectStore>,
736    path: ObjectStorePath,
737    schema: RwLock<Arc<Schema>>,
738}
739
740impl SchemaManager {
741    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
742        let path = path.as_ref();
743        let parent = path
744            .parent()
745            .ok_or_else(|| anyhow!("Invalid schema path"))?;
746        let filename = path
747            .file_name()
748            .ok_or_else(|| anyhow!("Invalid schema filename"))?
749            .to_str()
750            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
751
752        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
753        let obj_path = ObjectStorePath::from(filename);
754
755        Self::load_from_store(store, &obj_path).await
756    }
757
758    pub async fn load_from_store(
759        store: Arc<dyn ObjectStore>,
760        path: &ObjectStorePath,
761    ) -> Result<Self> {
762        match store.get(path).await {
763            Ok(result) => {
764                let bytes = result.bytes().await?;
765                let content = String::from_utf8(bytes.to_vec())?;
766                let schema: Schema = serde_json::from_str(&content)?;
767                Ok(Self {
768                    store,
769                    path: path.clone(),
770                    schema: RwLock::new(Arc::new(schema)),
771                })
772            }
773            Err(object_store::Error::NotFound { .. }) => Ok(Self {
774                store,
775                path: path.clone(),
776                schema: RwLock::new(Arc::new(Schema::default())),
777            }),
778            Err(e) => Err(anyhow::Error::from(e)),
779        }
780    }
781
782    pub async fn save(&self) -> Result<()> {
783        let content = {
784            let schema_guard = acquire_read(&self.schema, "schema")?;
785            serde_json::to_string_pretty(&**schema_guard)?
786        };
787        self.store
788            .put(&self.path, content.into())
789            .await
790            .map_err(anyhow::Error::from)?;
791        Ok(())
792    }
793
794    pub fn path(&self) -> &ObjectStorePath {
795        &self.path
796    }
797
798    pub fn schema(&self) -> Arc<Schema> {
799        self.schema
800            .read()
801            .expect("Schema lock poisoned - a thread panicked while holding it")
802            .clone()
803    }
804
805    /// Normalize function names in an expression to uppercase for case-insensitive matching.
806    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
807    fn normalize_function_names(expr: &str) -> String {
808        let mut result = String::with_capacity(expr.len());
809        let mut chars = expr.chars().peekable();
810
811        while let Some(ch) = chars.next() {
812            if ch.is_alphabetic() {
813                // Collect identifier
814                let mut ident = String::new();
815                ident.push(ch);
816
817                while let Some(&next) = chars.peek() {
818                    if next.is_alphanumeric() || next == '_' {
819                        ident.push(chars.next().unwrap());
820                    } else {
821                        break;
822                    }
823                }
824
825                // If followed by '(', it's a function call - uppercase it
826                if chars.peek() == Some(&'(') {
827                    result.push_str(&ident.to_uppercase());
828                } else {
829                    result.push_str(&ident); // Keep property names as-is
830                }
831            } else {
832                result.push(ch);
833            }
834        }
835
836        result
837    }
838
839    /// Generate a consistent internal column name for an expression index.
840    /// Uses a hash suffix to ensure uniqueness for different expressions that
841    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
842    ///
843    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
844    /// DefaultHasher is not guaranteed to be stable and could break persistent data
845    /// if the hash changes after a compiler upgrade.
846    pub fn generated_column_name(expr: &str) -> String {
847        // Normalize function names to uppercase for case-insensitive matching
848        let normalized = Self::normalize_function_names(expr);
849
850        let sanitized = normalized
851            .replace(|c: char| !c.is_alphanumeric(), "_")
852            .trim_matches('_')
853            .to_string();
854
855        // FNV-1a 64-bit hash - stable across Rust versions and platforms
856        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
857        const FNV_PRIME: u64 = 1099511628211;
858
859        let mut hash = FNV_OFFSET_BASIS;
860        for byte in normalized.as_bytes() {
861            hash ^= *byte as u64;
862            hash = hash.wrapping_mul(FNV_PRIME);
863        }
864
865        format!("_gen_{}_{:x}", sanitized, hash)
866    }
867
868    pub fn replace_schema(&self, new_schema: Schema) {
869        let mut schema = self
870            .schema
871            .write()
872            .expect("Schema lock poisoned - a thread panicked while holding it");
873        *schema = Arc::new(new_schema);
874    }
875
876    pub fn next_label_id(&self) -> u16 {
877        self.schema()
878            .labels
879            .values()
880            .map(|l| l.id)
881            .max()
882            .unwrap_or(0)
883            + 1
884    }
885
886    pub fn next_type_id(&self) -> u32 {
887        let max_schema_id = self
888            .schema()
889            .edge_types
890            .values()
891            .map(|t| t.id)
892            .max()
893            .unwrap_or(0);
894
895        // Ensure we stay in schema'd ID space (bit 31 = 0)
896        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
897            panic!("Schema edge type ID exhaustion");
898        }
899
900        max_schema_id + 1
901    }
902
903    pub fn add_label(&self, name: &str) -> Result<u16> {
904        let mut guard = acquire_write(&self.schema, "schema")?;
905        let schema = Arc::make_mut(&mut *guard);
906        if schema.labels.contains_key(name) {
907            return Err(anyhow!("Label '{}' already exists", name));
908        }
909
910        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
911        schema.labels.insert(
912            name.to_string(),
913            LabelMeta {
914                id,
915                created_at: Utc::now(),
916                state: SchemaElementState::Active,
917            },
918        );
919        Ok(id)
920    }
921
922    pub fn add_edge_type(
923        &self,
924        name: &str,
925        src_labels: Vec<String>,
926        dst_labels: Vec<String>,
927    ) -> Result<u32> {
928        let mut guard = acquire_write(&self.schema, "schema")?;
929        let schema = Arc::make_mut(&mut *guard);
930        if schema.edge_types.contains_key(name) {
931            return Err(anyhow!("Edge type '{}' already exists", name));
932        }
933
934        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
935
936        // Ensure we stay in schema'd ID space (bit 31 = 0)
937        if id >= MAX_SCHEMA_TYPE_ID {
938            return Err(anyhow!("Schema edge type ID exhaustion"));
939        }
940
941        schema.edge_types.insert(
942            name.to_string(),
943            EdgeTypeMeta {
944                id,
945                src_labels,
946                dst_labels,
947                state: SchemaElementState::Active,
948            },
949        );
950        Ok(id)
951    }
952
953    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
954    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
955        let mut guard = acquire_write(&self.schema, "schema")
956            .expect("Schema lock poisoned - a thread panicked while holding it");
957        let schema = Arc::make_mut(&mut *guard);
958        schema.get_or_assign_edge_type_id(type_name)
959    }
960
961    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
962    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
963        let schema = acquire_read(&self.schema, "schema")
964            .expect("Schema lock poisoned - a thread panicked while holding it");
965        schema.edge_type_name_by_id_unified(type_id)
966    }
967
968    pub fn add_property(
969        &self,
970        label_or_type: &str,
971        prop_name: &str,
972        data_type: DataType,
973        nullable: bool,
974    ) -> Result<()> {
975        let mut guard = acquire_write(&self.schema, "schema")?;
976        let schema = Arc::make_mut(&mut *guard);
977        let version = schema.schema_version;
978        let props = schema
979            .properties
980            .entry(label_or_type.to_string())
981            .or_default();
982
983        if props.contains_key(prop_name) {
984            return Err(anyhow!(
985                "Property '{}' already exists for '{}'",
986                prop_name,
987                label_or_type
988            ));
989        }
990
991        props.insert(
992            prop_name.to_string(),
993            PropertyMeta {
994                r#type: data_type,
995                nullable,
996                added_in: version,
997                state: SchemaElementState::Active,
998                generation_expression: None,
999            },
1000        );
1001        Ok(())
1002    }
1003
1004    pub fn add_generated_property(
1005        &self,
1006        label_or_type: &str,
1007        prop_name: &str,
1008        data_type: DataType,
1009        expr: String,
1010    ) -> Result<()> {
1011        let mut guard = acquire_write(&self.schema, "schema")?;
1012        let schema = Arc::make_mut(&mut *guard);
1013        let version = schema.schema_version;
1014        let props = schema
1015            .properties
1016            .entry(label_or_type.to_string())
1017            .or_default();
1018
1019        if props.contains_key(prop_name) {
1020            return Err(anyhow!("Property '{}' already exists", prop_name));
1021        }
1022
1023        props.insert(
1024            prop_name.to_string(),
1025            PropertyMeta {
1026                r#type: data_type,
1027                nullable: true,
1028                added_in: version,
1029                state: SchemaElementState::Active,
1030                generation_expression: Some(expr),
1031            },
1032        );
1033        Ok(())
1034    }
1035
1036    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1037        let mut guard = acquire_write(&self.schema, "schema")?;
1038        let schema = Arc::make_mut(&mut *guard);
1039        schema.indexes.push(index_def);
1040        Ok(())
1041    }
1042
1043    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1044        let schema = self.schema.read().expect("Schema lock poisoned");
1045        schema.indexes.iter().find(|i| i.name() == name).cloned()
1046    }
1047
1048    /// Updates the lifecycle metadata for an index by name.
1049    ///
1050    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1051    /// allowing callers to update status, timestamps, etc.
1052    pub fn update_index_metadata(
1053        &self,
1054        index_name: &str,
1055        f: impl FnOnce(&mut IndexMetadata),
1056    ) -> Result<()> {
1057        let mut guard = acquire_write(&self.schema, "schema")?;
1058        let schema = Arc::make_mut(&mut *guard);
1059        let idx = schema
1060            .indexes
1061            .iter_mut()
1062            .find(|i| i.name() == index_name)
1063            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1064        f(idx.metadata_mut());
1065        Ok(())
1066    }
1067
1068    pub fn remove_index(&self, name: &str) -> Result<()> {
1069        let mut guard = acquire_write(&self.schema, "schema")?;
1070        let schema = Arc::make_mut(&mut *guard);
1071        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1072            schema.indexes.remove(pos);
1073            Ok(())
1074        } else {
1075            Err(anyhow!("Index '{}' not found", name))
1076        }
1077    }
1078
1079    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1080        let mut guard = acquire_write(&self.schema, "schema")?;
1081        let schema = Arc::make_mut(&mut *guard);
1082        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1083            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1084        }
1085        schema.constraints.push(constraint);
1086        Ok(())
1087    }
1088
1089    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1090        let mut guard = acquire_write(&self.schema, "schema")?;
1091        let schema = Arc::make_mut(&mut *guard);
1092        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1093            schema.constraints.remove(pos);
1094            Ok(())
1095        } else if if_exists {
1096            Ok(())
1097        } else {
1098            Err(anyhow!("Constraint '{}' not found", name))
1099        }
1100    }
1101
1102    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1103        let mut guard = acquire_write(&self.schema, "schema")?;
1104        let schema = Arc::make_mut(&mut *guard);
1105        if let Some(props) = schema.properties.get_mut(label_or_type) {
1106            if props.remove(prop_name).is_some() {
1107                Ok(())
1108            } else {
1109                Err(anyhow!(
1110                    "Property '{}' not found for '{}'",
1111                    prop_name,
1112                    label_or_type
1113                ))
1114            }
1115        } else {
1116            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1117        }
1118    }
1119
1120    pub fn rename_property(
1121        &self,
1122        label_or_type: &str,
1123        old_name: &str,
1124        new_name: &str,
1125    ) -> Result<()> {
1126        let mut guard = acquire_write(&self.schema, "schema")?;
1127        let schema = Arc::make_mut(&mut *guard);
1128        if let Some(props) = schema.properties.get_mut(label_or_type) {
1129            if let Some(meta) = props.remove(old_name) {
1130                if props.contains_key(new_name) {
1131                    // Rollback removal? Or just error.
1132                    props.insert(old_name.to_string(), meta); // Restore
1133                    return Err(anyhow!("Property '{}' already exists", new_name));
1134                }
1135                props.insert(new_name.to_string(), meta);
1136                Ok(())
1137            } else {
1138                Err(anyhow!(
1139                    "Property '{}' not found for '{}'",
1140                    old_name,
1141                    label_or_type
1142                ))
1143            }
1144        } else {
1145            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1146        }
1147    }
1148
1149    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1150        let mut guard = acquire_write(&self.schema, "schema")?;
1151        let schema = Arc::make_mut(&mut *guard);
1152        if let Some(label_meta) = schema.labels.get_mut(name) {
1153            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1154            // Do not remove properties; they are implicitly tombstoned by the label
1155            Ok(())
1156        } else if if_exists {
1157            Ok(())
1158        } else {
1159            Err(anyhow!("Label '{}' not found", name))
1160        }
1161    }
1162
1163    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1164        let mut guard = acquire_write(&self.schema, "schema")?;
1165        let schema = Arc::make_mut(&mut *guard);
1166        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1167            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1168            // Do not remove properties; they are implicitly tombstoned by the edge type
1169            Ok(())
1170        } else if if_exists {
1171            Ok(())
1172        } else {
1173            Err(anyhow!("Edge Type '{}' not found", name))
1174        }
1175    }
1176}
1177
1178/// Validate identifier names to prevent injection and ensure compatibility.
1179pub fn validate_identifier(name: &str) -> Result<()> {
1180    // Length check
1181    if name.is_empty() || name.len() > 64 {
1182        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1183    }
1184
1185    // First character must be letter or underscore
1186    let first = name.chars().next().unwrap();
1187    if !first.is_alphabetic() && first != '_' {
1188        return Err(anyhow!(
1189            "Identifier '{}' must start with letter or underscore",
1190            name
1191        ));
1192    }
1193
1194    // Remaining characters: alphanumeric or underscore
1195    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1196        return Err(anyhow!(
1197            "Identifier '{}' must contain only alphanumeric and underscore",
1198            name
1199        ));
1200    }
1201
1202    // Reserved words
1203    const RESERVED: &[&str] = &[
1204        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1205        "UNION", "ORDER", "LIMIT",
1206    ];
1207    if RESERVED.contains(&name.to_uppercase().as_str()) {
1208        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1209    }
1210
1211    Ok(())
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use super::*;
1217    use object_store::local::LocalFileSystem;
1218    use tempfile::tempdir;
1219
1220    #[tokio::test]
1221    async fn test_schema_management() -> Result<()> {
1222        let dir = tempdir()?;
1223        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1224        let path = ObjectStorePath::from("schema.json");
1225        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1226
1227        // Labels
1228        let lid = manager.add_label("Person")?;
1229        assert_eq!(lid, 1);
1230        assert!(manager.add_label("Person").is_err());
1231
1232        // Properties
1233        manager.add_property("Person", "name", DataType::String, false)?;
1234        assert!(
1235            manager
1236                .add_property("Person", "name", DataType::String, false)
1237                .is_err()
1238        );
1239
1240        // Edge types
1241        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1242        assert_eq!(tid, 1);
1243
1244        manager.save().await?;
1245        // Check file exists
1246        assert!(store.get(&path).await.is_ok());
1247
1248        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1249        assert!(manager2.schema().labels.contains_key("Person"));
1250        assert!(
1251            manager2
1252                .schema()
1253                .properties
1254                .get("Person")
1255                .unwrap()
1256                .contains_key("name")
1257        );
1258
1259        Ok(())
1260    }
1261
1262    #[test]
1263    fn test_normalize_function_names() {
1264        assert_eq!(
1265            SchemaManager::normalize_function_names("lower(email)"),
1266            "LOWER(email)"
1267        );
1268        assert_eq!(
1269            SchemaManager::normalize_function_names("LOWER(email)"),
1270            "LOWER(email)"
1271        );
1272        assert_eq!(
1273            SchemaManager::normalize_function_names("Lower(email)"),
1274            "LOWER(email)"
1275        );
1276        assert_eq!(
1277            SchemaManager::normalize_function_names("trim(lower(email))"),
1278            "TRIM(LOWER(email))"
1279        );
1280    }
1281
1282    #[test]
1283    fn test_generated_column_name_case_insensitive() {
1284        let col1 = SchemaManager::generated_column_name("lower(email)");
1285        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1286        let col3 = SchemaManager::generated_column_name("Lower(email)");
1287        assert_eq!(col1, col2);
1288        assert_eq!(col2, col3);
1289        assert!(col1.starts_with("_gen_LOWER_email_"));
1290    }
1291
1292    #[test]
1293    fn test_index_metadata_serde_backward_compat() {
1294        // Simulate old JSON without metadata field
1295        let json = r#"{
1296            "type": "Scalar",
1297            "name": "idx_person_name",
1298            "label": "Person",
1299            "properties": ["name"],
1300            "index_type": "BTree",
1301            "where_clause": null
1302        }"#;
1303        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1304        let meta = def.metadata();
1305        assert_eq!(meta.status, IndexStatus::Online);
1306        assert!(meta.last_built_at.is_none());
1307        assert!(meta.row_count_at_build.is_none());
1308    }
1309
1310    #[test]
1311    fn test_index_metadata_serde_roundtrip() {
1312        let now = Utc::now();
1313        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1314            name: "idx_test".to_string(),
1315            label: "Test".to_string(),
1316            properties: vec!["prop".to_string()],
1317            index_type: ScalarIndexType::BTree,
1318            where_clause: None,
1319            metadata: IndexMetadata {
1320                status: IndexStatus::Building,
1321                last_built_at: Some(now),
1322                row_count_at_build: Some(42),
1323            },
1324        });
1325
1326        let json = serde_json::to_string(&def).unwrap();
1327        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1328        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1329        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1330        assert!(parsed.metadata().last_built_at.is_some());
1331    }
1332
1333    #[tokio::test]
1334    async fn test_update_index_metadata() -> Result<()> {
1335        let dir = tempdir()?;
1336        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1337        let path = ObjectStorePath::from("schema.json");
1338        let manager = SchemaManager::load_from_store(store, &path).await?;
1339
1340        manager.add_label("Person")?;
1341        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1342            name: "idx_test".to_string(),
1343            label: "Person".to_string(),
1344            properties: vec!["name".to_string()],
1345            index_type: ScalarIndexType::BTree,
1346            where_clause: None,
1347            metadata: Default::default(),
1348        });
1349        manager.add_index(idx)?;
1350
1351        // Verify initial status is Online
1352        let initial = manager.get_index("idx_test").unwrap();
1353        assert_eq!(initial.metadata().status, IndexStatus::Online);
1354
1355        // Update to Building
1356        manager.update_index_metadata("idx_test", |m| {
1357            m.status = IndexStatus::Building;
1358            m.row_count_at_build = Some(100);
1359        })?;
1360
1361        let updated = manager.get_index("idx_test").unwrap();
1362        assert_eq!(updated.metadata().status, IndexStatus::Building);
1363        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1364
1365        // Non-existent index should error
1366        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1367
1368        Ok(())
1369    }
1370
1371    #[test]
1372    fn test_vector_index_for_property_skips_non_online() {
1373        let mut schema = Schema::default();
1374        schema.labels.insert(
1375            "Document".to_string(),
1376            LabelMeta {
1377                id: 1,
1378                created_at: chrono::Utc::now(),
1379                state: SchemaElementState::Active,
1380            },
1381        );
1382
1383        // Add a vector index with Stale status
1384        schema
1385            .indexes
1386            .push(IndexDefinition::Vector(VectorIndexConfig {
1387                name: "vec_doc_embedding".to_string(),
1388                label: "Document".to_string(),
1389                property: "embedding".to_string(),
1390                index_type: VectorIndexType::Flat,
1391                metric: DistanceMetric::Cosine,
1392                embedding_config: None,
1393                metadata: IndexMetadata {
1394                    status: IndexStatus::Stale,
1395                    ..Default::default()
1396                },
1397            }));
1398
1399        // Stale index should NOT be returned
1400        assert!(
1401            schema
1402                .vector_index_for_property("Document", "embedding")
1403                .is_none()
1404        );
1405
1406        // Set to Online — should now be returned
1407        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1408            cfg.metadata.status = IndexStatus::Online;
1409        }
1410        let result = schema.vector_index_for_property("Document", "embedding");
1411        assert!(result.is_some());
1412        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1413    }
1414}