Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
5use crate::sync::{acquire_read, acquire_write};
6use anyhow::{Result, anyhow};
7use chrono::{DateTime, Utc};
8use object_store::ObjectStore;
9use object_store::local::LocalFileSystem;
10use object_store::path::Path as ObjectStorePath;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
17#[non_exhaustive]
18pub enum SchemaElementState {
19    Active,
20    Hidden {
21        since: DateTime<Utc>,
22        last_active_snapshot: String, // SnapshotId
23    },
24    Tombstone {
25        since: DateTime<Utc>,
26    },
27}
28
29use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
30
31/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
32///
33/// DateTime is encoded as a 3-field struct to preserve timezone information:
34/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
35/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
36/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
37pub fn datetime_struct_fields() -> Fields {
38    Fields::from(vec![
39        Field::new(
40            "nanos_since_epoch",
41            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
42            true,
43        ),
44        Field::new("offset_seconds", ArrowDataType::Int32, true),
45        Field::new("timezone_name", ArrowDataType::Utf8, true),
46    ])
47}
48
49/// Returns the canonical struct field definitions for Time encoding in Arrow.
50///
51/// Time is encoded as a 2-field struct to preserve timezone offset:
52/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
53/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
54pub fn time_struct_fields() -> Fields {
55    Fields::from(vec![
56        Field::new(
57            "nanos_since_midnight",
58            ArrowDataType::Time64(TimeUnit::Nanosecond),
59            true,
60        ),
61        Field::new("offset_seconds", ArrowDataType::Int32, true),
62    ])
63}
64
65/// Detects if an Arrow DataType is the canonical DateTime struct.
66pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
67    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
68}
69
70/// Detects if an Arrow DataType is the canonical Time struct.
71pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
72    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76#[non_exhaustive]
77pub enum CrdtType {
78    GCounter,
79    GSet,
80    ORSet,
81    LWWRegister,
82    LWWMap,
83    Rga,
84    VectorClock,
85    VCRegister,
86}
87
88#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
89pub enum PointType {
90    Geographic,  // WGS84
91    Cartesian2D, // x, y
92    Cartesian3D, // x, y, z
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
96#[non_exhaustive]
97pub enum DataType {
98    String,
99    Int32,
100    Int64,
101    Float32,
102    Float64,
103    Bool,
104    Timestamp,
105    Date,
106    Time,
107    DateTime,
108    Duration,
109    CypherValue,
110    Point(PointType),
111    Vector { dimensions: usize },
112    Btic,
113    Crdt(CrdtType),
114    List(Box<DataType>),
115    Map(Box<DataType>, Box<DataType>),
116}
117
118impl DataType {
119    // Alias for compatibility/convenience if needed, but preferable to use exact types.
120    #[allow(non_upper_case_globals)]
121    pub const Float: DataType = DataType::Float64;
122    #[allow(non_upper_case_globals)]
123    pub const Int: DataType = DataType::Int64;
124
125    pub fn to_arrow(&self) -> ArrowDataType {
126        match self {
127            DataType::String => ArrowDataType::Utf8,
128            DataType::Int32 => ArrowDataType::Int32,
129            DataType::Int64 => ArrowDataType::Int64,
130            DataType::Float32 => ArrowDataType::Float32,
131            DataType::Float64 => ArrowDataType::Float64,
132            DataType::Bool => ArrowDataType::Boolean,
133            DataType::Timestamp => {
134                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
135            }
136            DataType::Date => ArrowDataType::Date32,
137            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
138            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
139            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
140            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
141            DataType::Point(pt) => match pt {
142                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
143                    Field::new("latitude", ArrowDataType::Float64, false),
144                    Field::new("longitude", ArrowDataType::Float64, false),
145                    Field::new("crs", ArrowDataType::Utf8, false),
146                ])),
147                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
148                    Field::new("x", ArrowDataType::Float64, false),
149                    Field::new("y", ArrowDataType::Float64, false),
150                    Field::new("crs", ArrowDataType::Utf8, false),
151                ])),
152                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
153                    Field::new("x", ArrowDataType::Float64, false),
154                    Field::new("y", ArrowDataType::Float64, false),
155                    Field::new("z", ArrowDataType::Float64, false),
156                    Field::new("crs", ArrowDataType::Utf8, false),
157                ])),
158            },
159            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
160                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
161                *dimensions as i32,
162            ),
163            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
164            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
165            DataType::List(inner) => {
166                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
167            }
168            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
169                "item",
170                ArrowDataType::Struct(Fields::from(vec![
171                    Field::new("key", key.to_arrow(), false),
172                    Field::new("value", value.to_arrow(), true),
173                ])),
174                true,
175            ))),
176        }
177    }
178}
179
180fn default_created_at() -> DateTime<Utc> {
181    Utc::now()
182}
183
184fn default_state() -> SchemaElementState {
185    SchemaElementState::Active
186}
187
188fn default_version_1() -> u32 {
189    1
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct PropertyMeta {
194    pub r#type: DataType,
195    pub nullable: bool,
196    #[serde(default = "default_version_1")]
197    pub added_in: u32, // SchemaVersion
198    #[serde(default = "default_state")]
199    pub state: SchemaElementState,
200    #[serde(default)]
201    pub generation_expression: Option<String>,
202}
203
204#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct LabelMeta {
206    pub id: u16, // LabelId
207    #[serde(default = "default_created_at")]
208    pub created_at: DateTime<Utc>,
209    #[serde(default = "default_state")]
210    pub state: SchemaElementState,
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize)]
214pub struct EdgeTypeMeta {
215    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
216    pub id: u32,
217    pub src_labels: Vec<String>,
218    pub dst_labels: Vec<String>,
219    #[serde(default = "default_state")]
220    pub state: SchemaElementState,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224#[non_exhaustive]
225pub enum ConstraintType {
226    Unique { properties: Vec<String> },
227    Exists { property: String },
228    Check { expression: String },
229}
230
231#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
232#[non_exhaustive]
233pub enum ConstraintTarget {
234    Label(String),
235    EdgeType(String),
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize)]
239pub struct Constraint {
240    pub name: String,
241    pub constraint_type: ConstraintType,
242    pub target: ConstraintTarget,
243    pub enabled: bool,
244}
245
246/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
247///
248/// Edge types not defined in the schema are assigned IDs at runtime with
249/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
250/// the name-to-ID and ID-to-name mappings for those types.
251#[derive(Clone, Debug, Serialize, Deserialize)]
252pub struct SchemalessEdgeTypeRegistry {
253    name_to_id: HashMap<String, u32>,
254    id_to_name: HashMap<u32, String>,
255    /// Next local ID to assign (0 is reserved for invalid).
256    next_local_id: u32,
257}
258
259impl SchemalessEdgeTypeRegistry {
260    pub fn new() -> Self {
261        Self {
262            name_to_id: HashMap::new(),
263            id_to_name: HashMap::new(),
264            next_local_id: 1,
265        }
266    }
267
268    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
269    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
270        if let Some(&id) = self.name_to_id.get(type_name) {
271            return id;
272        }
273
274        let id = make_schemaless_id(self.next_local_id);
275        self.next_local_id += 1;
276
277        self.name_to_id.insert(type_name.to_string(), id);
278        self.id_to_name.insert(id, type_name.to_string());
279
280        id
281    }
282
283    /// Looks up the edge type name for a schemaless ID.
284    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
285        self.id_to_name.get(&type_id).map(String::as_str)
286    }
287
288    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
289    pub fn contains(&self, type_name: &str) -> bool {
290        self.name_to_id.contains_key(type_name)
291    }
292
293    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
294    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
295        self.name_to_id
296            .iter()
297            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
298            .map(|(_, &id)| id)
299    }
300
301    /// Returns all registered schemaless type IDs.
302    pub fn all_type_ids(&self) -> Vec<u32> {
303        self.id_to_name.keys().copied().collect()
304    }
305
306    /// Returns true if the registry has any schemaless types.
307    pub fn is_empty(&self) -> bool {
308        self.name_to_id.is_empty()
309    }
310}
311
312impl Default for SchemalessEdgeTypeRegistry {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct Schema {
320    pub schema_version: u32,
321    pub labels: HashMap<String, LabelMeta>,
322    pub edge_types: HashMap<String, EdgeTypeMeta>,
323    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
324    #[serde(default)]
325    pub indexes: Vec<IndexDefinition>,
326    #[serde(default)]
327    pub constraints: Vec<Constraint>,
328    /// Registry for schemaless edge types (dynamically assigned IDs)
329    #[serde(default)]
330    pub schemaless_registry: SchemalessEdgeTypeRegistry,
331}
332
333impl Default for Schema {
334    fn default() -> Self {
335        Self {
336            schema_version: 1,
337            labels: HashMap::new(),
338            edge_types: HashMap::new(),
339            properties: HashMap::new(),
340            indexes: Vec::new(),
341            constraints: Vec::new(),
342            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
343        }
344    }
345}
346
347impl Schema {
348    /// Returns the label name for a given label ID.
349    ///
350    /// Performs a linear scan over all labels. This is efficient because
351    /// the number of labels in a schema is typically small.
352    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
353        self.labels
354            .iter()
355            .find(|(_, meta)| meta.id == label_id)
356            .map(|(name, _)| name.as_str())
357    }
358
359    /// Returns the label ID for a given label name.
360    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
361        self.labels.get(label_name).map(|meta| meta.id)
362    }
363
364    /// Returns the edge type name for a given type ID.
365    ///
366    /// Performs a linear scan over all edge types. This is efficient because
367    /// the number of edge types in a schema is typically small.
368    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
369        self.edge_types
370            .iter()
371            .find(|(_, meta)| meta.id == type_id)
372            .map(|(name, _)| name.as_str())
373    }
374
375    /// Returns the edge type ID for a given type name.
376    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
377        self.edge_types.get(type_name).map(|meta| meta.id)
378    }
379
380    /// Returns the vector index configuration for a given label and property.
381    ///
382    /// Performs a linear scan over all indexes. This is efficient because
383    /// the number of indexes in a schema is typically small.
384    pub fn vector_index_for_property(
385        &self,
386        label: &str,
387        property: &str,
388    ) -> Option<&VectorIndexConfig> {
389        self.indexes.iter().find_map(|idx| {
390            if let IndexDefinition::Vector(config) = idx
391                && config.label == label
392                && config.property == property
393                && config.metadata.status == IndexStatus::Online
394            {
395                return Some(config);
396            }
397            None
398        })
399    }
400
401    /// Returns the full-text index configuration for a given label and property.
402    ///
403    /// A full-text index covers one or more properties. This returns the config
404    /// if the specified property is among the indexed properties.
405    pub fn fulltext_index_for_property(
406        &self,
407        label: &str,
408        property: &str,
409    ) -> Option<&FullTextIndexConfig> {
410        self.indexes.iter().find_map(|idx| {
411            if let IndexDefinition::FullText(config) = idx
412                && config.label == label
413                && config.properties.iter().any(|p| p == property)
414                && config.metadata.status == IndexStatus::Online
415            {
416                return Some(config);
417            }
418            None
419        })
420    }
421
422    /// Get label metadata with case-insensitive lookup.
423    ///
424    /// This allows queries to match labels regardless of case, providing
425    /// better user experience when label names vary in casing.
426    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
427        self.labels
428            .iter()
429            .find(|(k, _)| k.eq_ignore_ascii_case(name))
430            .map(|(_, v)| v)
431    }
432
433    /// Get label ID with case-insensitive lookup.
434    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
435        self.get_label_case_insensitive(label_name)
436            .map(|meta| meta.id)
437    }
438
439    /// Get edge type metadata with case-insensitive lookup.
440    ///
441    /// This allows queries to match edge types regardless of case, providing
442    /// better user experience when type names vary in casing.
443    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
444        self.edge_types
445            .iter()
446            .find(|(k, _)| k.eq_ignore_ascii_case(name))
447            .map(|(_, v)| v)
448    }
449
450    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
451    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
452        self.get_edge_type_case_insensitive(type_name)
453            .map(|meta| meta.id)
454    }
455
456    /// Get edge type ID with case-insensitive lookup, checking both
457    /// schema-defined and schemaless registries.
458    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
459        self.edge_type_id_by_name_case_insensitive(type_name)
460            .or_else(|| {
461                self.schemaless_registry
462                    .id_by_name_case_insensitive(type_name)
463            })
464    }
465
466    /// Returns the edge type ID for `type_name`, checking the schema first
467    /// and falling back to the schemaless registry (assigning a new ID if needed).
468    ///
469    /// Requires `&mut self` because it may assign a new schemaless ID.
470    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
471    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
472        if let Some(id) = self.edge_type_id_by_name(type_name) {
473            return id;
474        }
475        self.schemaless_registry.get_or_assign_id(type_name)
476    }
477
478    /// Returns the edge type name for `type_id`, checking both the schema
479    /// and schemaless registries. Returns an owned `String` because the
480    /// name may come from either registry.
481    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
482        if is_schemaless_edge_type(type_id) {
483            self.schemaless_registry
484                .type_name_by_id(type_id)
485                .map(str::to_owned)
486        } else {
487            self.edge_type_name_by_id(type_id).map(str::to_owned)
488        }
489    }
490
491    /// Returns all edge type IDs, including both schema-defined and schemaless types.
492    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
493    pub fn all_edge_type_ids(&self) -> Vec<u32> {
494        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
495        ids.extend(self.schemaless_registry.all_type_ids());
496        ids.sort_unstable();
497        ids
498    }
499}
500
501/// Lifecycle status of an index.
502#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
503pub enum IndexStatus {
504    /// Index is up-to-date and available for queries.
505    #[default]
506    Online,
507    /// Index is currently being rebuilt.
508    Building,
509    /// Index is outdated and scheduled for rebuild.
510    Stale,
511    /// Index rebuild failed after exhausting retries.
512    Failed,
513}
514
515/// Metadata tracking the lifecycle state of an index.
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
517pub struct IndexMetadata {
518    /// Current lifecycle status.
519    #[serde(default)]
520    pub status: IndexStatus,
521    /// When the index was last successfully built.
522    #[serde(default)]
523    pub last_built_at: Option<DateTime<Utc>>,
524    /// Row count of the dataset when the index was last built.
525    #[serde(default)]
526    pub row_count_at_build: Option<u64>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
530#[serde(tag = "type")]
531#[non_exhaustive]
532pub enum IndexDefinition {
533    Vector(VectorIndexConfig),
534    FullText(FullTextIndexConfig),
535    Scalar(ScalarIndexConfig),
536    Inverted(InvertedIndexConfig),
537    JsonFullText(JsonFtsIndexConfig),
538}
539
540impl IndexDefinition {
541    /// Returns the index name for any variant.
542    pub fn name(&self) -> &str {
543        match self {
544            IndexDefinition::Vector(c) => &c.name,
545            IndexDefinition::FullText(c) => &c.name,
546            IndexDefinition::Scalar(c) => &c.name,
547            IndexDefinition::Inverted(c) => &c.name,
548            IndexDefinition::JsonFullText(c) => &c.name,
549        }
550    }
551
552    /// Returns the label this index is defined on.
553    pub fn label(&self) -> &str {
554        match self {
555            IndexDefinition::Vector(c) => &c.label,
556            IndexDefinition::FullText(c) => &c.label,
557            IndexDefinition::Scalar(c) => &c.label,
558            IndexDefinition::Inverted(c) => &c.label,
559            IndexDefinition::JsonFullText(c) => &c.label,
560        }
561    }
562
563    /// Returns a reference to the index lifecycle metadata.
564    pub fn metadata(&self) -> &IndexMetadata {
565        match self {
566            IndexDefinition::Vector(c) => &c.metadata,
567            IndexDefinition::FullText(c) => &c.metadata,
568            IndexDefinition::Scalar(c) => &c.metadata,
569            IndexDefinition::Inverted(c) => &c.metadata,
570            IndexDefinition::JsonFullText(c) => &c.metadata,
571        }
572    }
573
574    /// Returns a mutable reference to the index lifecycle metadata.
575    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
576        match self {
577            IndexDefinition::Vector(c) => &mut c.metadata,
578            IndexDefinition::FullText(c) => &mut c.metadata,
579            IndexDefinition::Scalar(c) => &mut c.metadata,
580            IndexDefinition::Inverted(c) => &mut c.metadata,
581            IndexDefinition::JsonFullText(c) => &mut c.metadata,
582        }
583    }
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub struct InvertedIndexConfig {
588    pub name: String,
589    pub label: String,
590    pub property: String,
591    #[serde(default = "default_normalize")]
592    pub normalize: bool,
593    #[serde(default = "default_max_terms_per_doc")]
594    pub max_terms_per_doc: usize,
595    #[serde(default)]
596    pub metadata: IndexMetadata,
597}
598
599fn default_normalize() -> bool {
600    true
601}
602
603fn default_max_terms_per_doc() -> usize {
604    10_000
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
608pub struct VectorIndexConfig {
609    pub name: String,
610    pub label: String,
611    pub property: String,
612    pub index_type: VectorIndexType,
613    pub metric: DistanceMetric,
614    pub embedding_config: Option<EmbeddingConfig>,
615    #[serde(default)]
616    pub metadata: IndexMetadata,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
620pub struct EmbeddingConfig {
621    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
622    pub alias: String,
623    pub source_properties: Vec<String>,
624    pub batch_size: usize,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
628#[non_exhaustive]
629pub enum VectorIndexType {
630    Flat,
631    IvfFlat {
632        num_partitions: u32,
633    },
634    IvfPq {
635        num_partitions: u32,
636        num_sub_vectors: u32,
637        bits_per_subvector: u8,
638    },
639    IvfSq {
640        num_partitions: u32,
641    },
642    IvfRq {
643        num_partitions: u32,
644        #[serde(default)]
645        num_bits: Option<u8>,
646    },
647    HnswFlat {
648        m: u32,
649        ef_construction: u32,
650        #[serde(default)]
651        num_partitions: Option<u32>,
652    },
653    HnswSq {
654        m: u32,
655        ef_construction: u32,
656        #[serde(default)]
657        num_partitions: Option<u32>,
658    },
659    HnswPq {
660        m: u32,
661        ef_construction: u32,
662        num_sub_vectors: u32,
663        #[serde(default)]
664        num_partitions: Option<u32>,
665    },
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
669#[non_exhaustive]
670pub enum DistanceMetric {
671    Cosine,
672    L2,
673    Dot,
674}
675
676impl DistanceMetric {
677    /// Computes the distance between two vectors using this metric.
678    ///
679    /// All metrics follow LanceDB conventions so that lower values indicate
680    /// greater similarity:
681    /// - **L2**: squared Euclidean distance.
682    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
683    /// - **Dot**: negative dot product.
684    ///
685    /// # Panics
686    ///
687    /// Panics if `a` and `b` have different lengths.
688    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
689        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
690        match self {
691            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
692            DistanceMetric::Cosine => {
693                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
694                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
695                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
696                let denom = norm_a * norm_b;
697                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
698            }
699            DistanceMetric::Dot => {
700                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
701                -dot
702            }
703        }
704    }
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
708pub struct FullTextIndexConfig {
709    pub name: String,
710    pub label: String,
711    pub properties: Vec<String>,
712    pub tokenizer: TokenizerConfig,
713    pub with_positions: bool,
714    #[serde(default)]
715    pub metadata: IndexMetadata,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
719#[non_exhaustive]
720pub enum TokenizerConfig {
721    Standard,
722    Whitespace,
723    Ngram { min: u8, max: u8 },
724    Custom { name: String },
725}
726
727#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
728pub struct JsonFtsIndexConfig {
729    pub name: String,
730    pub label: String,
731    pub column: String,
732    #[serde(default)]
733    pub paths: Vec<String>,
734    #[serde(default)]
735    pub with_positions: bool,
736    #[serde(default)]
737    pub metadata: IndexMetadata,
738}
739
740#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
741pub struct ScalarIndexConfig {
742    pub name: String,
743    pub label: String,
744    pub properties: Vec<String>,
745    pub index_type: ScalarIndexType,
746    pub where_clause: Option<String>,
747    #[serde(default)]
748    pub metadata: IndexMetadata,
749}
750
751#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
752#[non_exhaustive]
753pub enum ScalarIndexType {
754    BTree,
755    Hash,
756    Bitmap,
757    LabelList,
758}
759
760pub struct SchemaManager {
761    store: Arc<dyn ObjectStore>,
762    path: ObjectStorePath,
763    schema: RwLock<Arc<Schema>>,
764}
765
766impl SchemaManager {
767    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
768        let path = path.as_ref();
769        let parent = path
770            .parent()
771            .ok_or_else(|| anyhow!("Invalid schema path"))?;
772        let filename = path
773            .file_name()
774            .ok_or_else(|| anyhow!("Invalid schema filename"))?
775            .to_str()
776            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
777
778        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
779        let obj_path = ObjectStorePath::from(filename);
780
781        Self::load_from_store(store, &obj_path).await
782    }
783
784    pub async fn load_from_store(
785        store: Arc<dyn ObjectStore>,
786        path: &ObjectStorePath,
787    ) -> Result<Self> {
788        match store.get(path).await {
789            Ok(result) => {
790                let bytes = result.bytes().await?;
791                let content = String::from_utf8(bytes.to_vec())?;
792                let schema: Schema = serde_json::from_str(&content)?;
793                Ok(Self {
794                    store,
795                    path: path.clone(),
796                    schema: RwLock::new(Arc::new(schema)),
797                })
798            }
799            Err(object_store::Error::NotFound { .. }) => Ok(Self {
800                store,
801                path: path.clone(),
802                schema: RwLock::new(Arc::new(Schema::default())),
803            }),
804            Err(e) => Err(anyhow::Error::from(e)),
805        }
806    }
807
808    pub async fn save(&self) -> Result<()> {
809        let content = {
810            let schema_guard = acquire_read(&self.schema, "schema")?;
811            serde_json::to_string_pretty(&**schema_guard)?
812        };
813        self.store
814            .put(&self.path, content.into())
815            .await
816            .map_err(anyhow::Error::from)?;
817        Ok(())
818    }
819
820    pub fn path(&self) -> &ObjectStorePath {
821        &self.path
822    }
823
824    pub fn schema(&self) -> Arc<Schema> {
825        self.schema
826            .read()
827            .expect("Schema lock poisoned - a thread panicked while holding it")
828            .clone()
829    }
830
831    /// Normalize function names in an expression to uppercase for case-insensitive matching.
832    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
833    fn normalize_function_names(expr: &str) -> String {
834        let mut result = String::with_capacity(expr.len());
835        let mut chars = expr.chars().peekable();
836
837        while let Some(ch) = chars.next() {
838            if ch.is_alphabetic() {
839                // Collect identifier
840                let mut ident = String::new();
841                ident.push(ch);
842
843                while let Some(&next) = chars.peek() {
844                    if next.is_alphanumeric() || next == '_' {
845                        ident.push(chars.next().unwrap());
846                    } else {
847                        break;
848                    }
849                }
850
851                // If followed by '(', it's a function call - uppercase it
852                if chars.peek() == Some(&'(') {
853                    result.push_str(&ident.to_uppercase());
854                } else {
855                    result.push_str(&ident); // Keep property names as-is
856                }
857            } else {
858                result.push(ch);
859            }
860        }
861
862        result
863    }
864
865    /// Generate a consistent internal column name for an expression index.
866    /// Uses a hash suffix to ensure uniqueness for different expressions that
867    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
868    ///
869    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
870    /// DefaultHasher is not guaranteed to be stable and could break persistent data
871    /// if the hash changes after a compiler upgrade.
872    pub fn generated_column_name(expr: &str) -> String {
873        // Normalize function names to uppercase for case-insensitive matching
874        let normalized = Self::normalize_function_names(expr);
875
876        let sanitized = normalized
877            .replace(|c: char| !c.is_alphanumeric(), "_")
878            .trim_matches('_')
879            .to_string();
880
881        // FNV-1a 64-bit hash - stable across Rust versions and platforms
882        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
883        const FNV_PRIME: u64 = 1099511628211;
884
885        let mut hash = FNV_OFFSET_BASIS;
886        for byte in normalized.as_bytes() {
887            hash ^= *byte as u64;
888            hash = hash.wrapping_mul(FNV_PRIME);
889        }
890
891        format!("_gen_{}_{:x}", sanitized, hash)
892    }
893
894    pub fn replace_schema(&self, new_schema: Schema) {
895        let mut schema = self
896            .schema
897            .write()
898            .expect("Schema lock poisoned - a thread panicked while holding it");
899        *schema = Arc::new(new_schema);
900    }
901
902    pub fn next_label_id(&self) -> u16 {
903        self.schema()
904            .labels
905            .values()
906            .map(|l| l.id)
907            .max()
908            .unwrap_or(0)
909            + 1
910    }
911
912    pub fn next_type_id(&self) -> u32 {
913        let max_schema_id = self
914            .schema()
915            .edge_types
916            .values()
917            .map(|t| t.id)
918            .max()
919            .unwrap_or(0);
920
921        // Ensure we stay in schema'd ID space (bit 31 = 0)
922        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
923            panic!("Schema edge type ID exhaustion");
924        }
925
926        max_schema_id + 1
927    }
928
929    pub fn add_label(&self, name: &str) -> Result<u16> {
930        let mut guard = acquire_write(&self.schema, "schema")?;
931        let schema = Arc::make_mut(&mut *guard);
932        if schema.labels.contains_key(name) {
933            return Err(anyhow!("Label '{}' already exists", name));
934        }
935
936        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
937        schema.labels.insert(
938            name.to_string(),
939            LabelMeta {
940                id,
941                created_at: Utc::now(),
942                state: SchemaElementState::Active,
943            },
944        );
945        Ok(id)
946    }
947
948    pub fn add_edge_type(
949        &self,
950        name: &str,
951        src_labels: Vec<String>,
952        dst_labels: Vec<String>,
953    ) -> Result<u32> {
954        let mut guard = acquire_write(&self.schema, "schema")?;
955        let schema = Arc::make_mut(&mut *guard);
956        if schema.edge_types.contains_key(name) {
957            return Err(anyhow!("Edge type '{}' already exists", name));
958        }
959
960        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
961
962        // Ensure we stay in schema'd ID space (bit 31 = 0)
963        if id >= MAX_SCHEMA_TYPE_ID {
964            return Err(anyhow!("Schema edge type ID exhaustion"));
965        }
966
967        schema.edge_types.insert(
968            name.to_string(),
969            EdgeTypeMeta {
970                id,
971                src_labels,
972                dst_labels,
973                state: SchemaElementState::Active,
974            },
975        );
976        Ok(id)
977    }
978
979    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
980    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
981        let mut guard = acquire_write(&self.schema, "schema")
982            .expect("Schema lock poisoned - a thread panicked while holding it");
983        let schema = Arc::make_mut(&mut *guard);
984        schema.get_or_assign_edge_type_id(type_name)
985    }
986
987    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
988    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
989        let schema = acquire_read(&self.schema, "schema")
990            .expect("Schema lock poisoned - a thread panicked while holding it");
991        schema.edge_type_name_by_id_unified(type_id)
992    }
993
994    pub fn add_property(
995        &self,
996        label_or_type: &str,
997        prop_name: &str,
998        data_type: DataType,
999        nullable: bool,
1000    ) -> Result<()> {
1001        let mut guard = acquire_write(&self.schema, "schema")?;
1002        let schema = Arc::make_mut(&mut *guard);
1003        let version = schema.schema_version;
1004        let props = schema
1005            .properties
1006            .entry(label_or_type.to_string())
1007            .or_default();
1008
1009        if props.contains_key(prop_name) {
1010            return Err(anyhow!(
1011                "Property '{}' already exists for '{}'",
1012                prop_name,
1013                label_or_type
1014            ));
1015        }
1016
1017        props.insert(
1018            prop_name.to_string(),
1019            PropertyMeta {
1020                r#type: data_type,
1021                nullable,
1022                added_in: version,
1023                state: SchemaElementState::Active,
1024                generation_expression: None,
1025            },
1026        );
1027        Ok(())
1028    }
1029
1030    pub fn add_generated_property(
1031        &self,
1032        label_or_type: &str,
1033        prop_name: &str,
1034        data_type: DataType,
1035        expr: String,
1036    ) -> Result<()> {
1037        let mut guard = acquire_write(&self.schema, "schema")?;
1038        let schema = Arc::make_mut(&mut *guard);
1039        let version = schema.schema_version;
1040        let props = schema
1041            .properties
1042            .entry(label_or_type.to_string())
1043            .or_default();
1044
1045        if props.contains_key(prop_name) {
1046            return Err(anyhow!("Property '{}' already exists", prop_name));
1047        }
1048
1049        props.insert(
1050            prop_name.to_string(),
1051            PropertyMeta {
1052                r#type: data_type,
1053                nullable: true,
1054                added_in: version,
1055                state: SchemaElementState::Active,
1056                generation_expression: Some(expr),
1057            },
1058        );
1059        Ok(())
1060    }
1061
1062    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1063        let mut guard = acquire_write(&self.schema, "schema")?;
1064        let schema = Arc::make_mut(&mut *guard);
1065        schema.indexes.push(index_def);
1066        Ok(())
1067    }
1068
1069    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1070        let schema = self.schema.read().expect("Schema lock poisoned");
1071        schema.indexes.iter().find(|i| i.name() == name).cloned()
1072    }
1073
1074    /// Updates the lifecycle metadata for an index by name.
1075    ///
1076    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1077    /// allowing callers to update status, timestamps, etc.
1078    pub fn update_index_metadata(
1079        &self,
1080        index_name: &str,
1081        f: impl FnOnce(&mut IndexMetadata),
1082    ) -> Result<()> {
1083        let mut guard = acquire_write(&self.schema, "schema")?;
1084        let schema = Arc::make_mut(&mut *guard);
1085        let idx = schema
1086            .indexes
1087            .iter_mut()
1088            .find(|i| i.name() == index_name)
1089            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1090        f(idx.metadata_mut());
1091        Ok(())
1092    }
1093
1094    pub fn remove_index(&self, name: &str) -> Result<()> {
1095        let mut guard = acquire_write(&self.schema, "schema")?;
1096        let schema = Arc::make_mut(&mut *guard);
1097        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1098            schema.indexes.remove(pos);
1099            Ok(())
1100        } else {
1101            Err(anyhow!("Index '{}' not found", name))
1102        }
1103    }
1104
1105    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1106        let mut guard = acquire_write(&self.schema, "schema")?;
1107        let schema = Arc::make_mut(&mut *guard);
1108        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1109            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1110        }
1111        schema.constraints.push(constraint);
1112        Ok(())
1113    }
1114
1115    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1116        let mut guard = acquire_write(&self.schema, "schema")?;
1117        let schema = Arc::make_mut(&mut *guard);
1118        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1119            schema.constraints.remove(pos);
1120            Ok(())
1121        } else if if_exists {
1122            Ok(())
1123        } else {
1124            Err(anyhow!("Constraint '{}' not found", name))
1125        }
1126    }
1127
1128    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1129        let mut guard = acquire_write(&self.schema, "schema")?;
1130        let schema = Arc::make_mut(&mut *guard);
1131        if let Some(props) = schema.properties.get_mut(label_or_type) {
1132            if props.remove(prop_name).is_some() {
1133                Ok(())
1134            } else {
1135                Err(anyhow!(
1136                    "Property '{}' not found for '{}'",
1137                    prop_name,
1138                    label_or_type
1139                ))
1140            }
1141        } else {
1142            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1143        }
1144    }
1145
1146    pub fn rename_property(
1147        &self,
1148        label_or_type: &str,
1149        old_name: &str,
1150        new_name: &str,
1151    ) -> Result<()> {
1152        let mut guard = acquire_write(&self.schema, "schema")?;
1153        let schema = Arc::make_mut(&mut *guard);
1154        if let Some(props) = schema.properties.get_mut(label_or_type) {
1155            if let Some(meta) = props.remove(old_name) {
1156                if props.contains_key(new_name) {
1157                    // Rollback removal? Or just error.
1158                    props.insert(old_name.to_string(), meta); // Restore
1159                    return Err(anyhow!("Property '{}' already exists", new_name));
1160                }
1161                props.insert(new_name.to_string(), meta);
1162                Ok(())
1163            } else {
1164                Err(anyhow!(
1165                    "Property '{}' not found for '{}'",
1166                    old_name,
1167                    label_or_type
1168                ))
1169            }
1170        } else {
1171            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1172        }
1173    }
1174
1175    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1176        let mut guard = acquire_write(&self.schema, "schema")?;
1177        let schema = Arc::make_mut(&mut *guard);
1178        if let Some(label_meta) = schema.labels.get_mut(name) {
1179            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1180            // Do not remove properties; they are implicitly tombstoned by the label
1181            Ok(())
1182        } else if if_exists {
1183            Ok(())
1184        } else {
1185            Err(anyhow!("Label '{}' not found", name))
1186        }
1187    }
1188
1189    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1190        let mut guard = acquire_write(&self.schema, "schema")?;
1191        let schema = Arc::make_mut(&mut *guard);
1192        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1193            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1194            // Do not remove properties; they are implicitly tombstoned by the edge type
1195            Ok(())
1196        } else if if_exists {
1197            Ok(())
1198        } else {
1199            Err(anyhow!("Edge Type '{}' not found", name))
1200        }
1201    }
1202}
1203
1204/// Validate identifier names to prevent injection and ensure compatibility.
1205pub fn validate_identifier(name: &str) -> Result<()> {
1206    // Length check
1207    if name.is_empty() || name.len() > 64 {
1208        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1209    }
1210
1211    // First character must be letter or underscore
1212    let first = name.chars().next().unwrap();
1213    if !first.is_alphabetic() && first != '_' {
1214        return Err(anyhow!(
1215            "Identifier '{}' must start with letter or underscore",
1216            name
1217        ));
1218    }
1219
1220    // Remaining characters: alphanumeric or underscore
1221    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1222        return Err(anyhow!(
1223            "Identifier '{}' must contain only alphanumeric and underscore",
1224            name
1225        ));
1226    }
1227
1228    // Reserved words
1229    const RESERVED: &[&str] = &[
1230        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1231        "UNION", "ORDER", "LIMIT",
1232    ];
1233    if RESERVED.contains(&name.to_uppercase().as_str()) {
1234        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1235    }
1236
1237    Ok(())
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use super::*;
1243    use object_store::local::LocalFileSystem;
1244    use tempfile::tempdir;
1245
1246    #[tokio::test]
1247    async fn test_schema_management() -> Result<()> {
1248        let dir = tempdir()?;
1249        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1250        let path = ObjectStorePath::from("schema.json");
1251        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1252
1253        // Labels
1254        let lid = manager.add_label("Person")?;
1255        assert_eq!(lid, 1);
1256        assert!(manager.add_label("Person").is_err());
1257
1258        // Properties
1259        manager.add_property("Person", "name", DataType::String, false)?;
1260        assert!(
1261            manager
1262                .add_property("Person", "name", DataType::String, false)
1263                .is_err()
1264        );
1265
1266        // Edge types
1267        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1268        assert_eq!(tid, 1);
1269
1270        manager.save().await?;
1271        // Check file exists
1272        assert!(store.get(&path).await.is_ok());
1273
1274        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1275        assert!(manager2.schema().labels.contains_key("Person"));
1276        assert!(
1277            manager2
1278                .schema()
1279                .properties
1280                .get("Person")
1281                .unwrap()
1282                .contains_key("name")
1283        );
1284
1285        Ok(())
1286    }
1287
1288    #[test]
1289    fn test_normalize_function_names() {
1290        assert_eq!(
1291            SchemaManager::normalize_function_names("lower(email)"),
1292            "LOWER(email)"
1293        );
1294        assert_eq!(
1295            SchemaManager::normalize_function_names("LOWER(email)"),
1296            "LOWER(email)"
1297        );
1298        assert_eq!(
1299            SchemaManager::normalize_function_names("Lower(email)"),
1300            "LOWER(email)"
1301        );
1302        assert_eq!(
1303            SchemaManager::normalize_function_names("trim(lower(email))"),
1304            "TRIM(LOWER(email))"
1305        );
1306    }
1307
1308    #[test]
1309    fn test_generated_column_name_case_insensitive() {
1310        let col1 = SchemaManager::generated_column_name("lower(email)");
1311        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1312        let col3 = SchemaManager::generated_column_name("Lower(email)");
1313        assert_eq!(col1, col2);
1314        assert_eq!(col2, col3);
1315        assert!(col1.starts_with("_gen_LOWER_email_"));
1316    }
1317
1318    #[test]
1319    fn test_index_metadata_serde_backward_compat() {
1320        // Simulate old JSON without metadata field
1321        let json = r#"{
1322            "type": "Scalar",
1323            "name": "idx_person_name",
1324            "label": "Person",
1325            "properties": ["name"],
1326            "index_type": "BTree",
1327            "where_clause": null
1328        }"#;
1329        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1330        let meta = def.metadata();
1331        assert_eq!(meta.status, IndexStatus::Online);
1332        assert!(meta.last_built_at.is_none());
1333        assert!(meta.row_count_at_build.is_none());
1334    }
1335
1336    #[test]
1337    fn test_index_metadata_serde_roundtrip() {
1338        let now = Utc::now();
1339        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1340            name: "idx_test".to_string(),
1341            label: "Test".to_string(),
1342            properties: vec!["prop".to_string()],
1343            index_type: ScalarIndexType::BTree,
1344            where_clause: None,
1345            metadata: IndexMetadata {
1346                status: IndexStatus::Building,
1347                last_built_at: Some(now),
1348                row_count_at_build: Some(42),
1349            },
1350        });
1351
1352        let json = serde_json::to_string(&def).unwrap();
1353        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1354        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1355        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1356        assert!(parsed.metadata().last_built_at.is_some());
1357    }
1358
1359    #[tokio::test]
1360    async fn test_update_index_metadata() -> Result<()> {
1361        let dir = tempdir()?;
1362        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1363        let path = ObjectStorePath::from("schema.json");
1364        let manager = SchemaManager::load_from_store(store, &path).await?;
1365
1366        manager.add_label("Person")?;
1367        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1368            name: "idx_test".to_string(),
1369            label: "Person".to_string(),
1370            properties: vec!["name".to_string()],
1371            index_type: ScalarIndexType::BTree,
1372            where_clause: None,
1373            metadata: Default::default(),
1374        });
1375        manager.add_index(idx)?;
1376
1377        // Verify initial status is Online
1378        let initial = manager.get_index("idx_test").unwrap();
1379        assert_eq!(initial.metadata().status, IndexStatus::Online);
1380
1381        // Update to Building
1382        manager.update_index_metadata("idx_test", |m| {
1383            m.status = IndexStatus::Building;
1384            m.row_count_at_build = Some(100);
1385        })?;
1386
1387        let updated = manager.get_index("idx_test").unwrap();
1388        assert_eq!(updated.metadata().status, IndexStatus::Building);
1389        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1390
1391        // Non-existent index should error
1392        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1393
1394        Ok(())
1395    }
1396
1397    #[test]
1398    fn test_vector_index_for_property_skips_non_online() {
1399        let mut schema = Schema::default();
1400        schema.labels.insert(
1401            "Document".to_string(),
1402            LabelMeta {
1403                id: 1,
1404                created_at: chrono::Utc::now(),
1405                state: SchemaElementState::Active,
1406            },
1407        );
1408
1409        // Add a vector index with Stale status
1410        schema
1411            .indexes
1412            .push(IndexDefinition::Vector(VectorIndexConfig {
1413                name: "vec_doc_embedding".to_string(),
1414                label: "Document".to_string(),
1415                property: "embedding".to_string(),
1416                index_type: VectorIndexType::Flat,
1417                metric: DistanceMetric::Cosine,
1418                embedding_config: None,
1419                metadata: IndexMetadata {
1420                    status: IndexStatus::Stale,
1421                    ..Default::default()
1422                },
1423            }));
1424
1425        // Stale index should NOT be returned
1426        assert!(
1427            schema
1428                .vector_index_for_property("Document", "embedding")
1429                .is_none()
1430        );
1431
1432        // Set to Online — should now be returned
1433        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
1434            cfg.metadata.status = IndexStatus::Online;
1435        }
1436        let result = schema.vector_index_for_property("Document", "embedding");
1437        assert!(result.is_some());
1438        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
1439    }
1440}