Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82    GCounter,
83    GSet,
84    ORSet,
85    LWWRegister,
86    LWWMap,
87    Rga,
88    VectorClock,
89    VCRegister,
90}
91
92impl CrdtType {
93    /// Returns the canonical variant name for this CRDT type.
94    ///
95    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
96    /// so a written CRDT value can be validated against its schema-declared
97    /// variant (see uni-store's write-time CRDT enforcement).
98    ///
99    /// # Examples
100    /// ```
101    /// use uni_common::core::schema::CrdtType;
102    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
103    /// ```
104    #[must_use]
105    pub fn type_name(&self) -> &'static str {
106        match self {
107            CrdtType::GCounter => "GCounter",
108            CrdtType::GSet => "GSet",
109            CrdtType::ORSet => "ORSet",
110            CrdtType::LWWRegister => "LWWRegister",
111            CrdtType::LWWMap => "LWWMap",
112            CrdtType::Rga => "Rga",
113            CrdtType::VectorClock => "VectorClock",
114            CrdtType::VCRegister => "VCRegister",
115        }
116    }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121    Geographic,  // WGS84
122    Cartesian2D, // x, y
123    Cartesian3D, // x, y, z
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129    String,
130    Int32,
131    Int64,
132    Float32,
133    Float64,
134    Bool,
135    Timestamp,
136    Date,
137    Time,
138    DateTime,
139    Duration,
140    CypherValue,
141    Bytes,
142    Point(PointType),
143    Vector { dimensions: usize },
144    Btic,
145    Crdt(CrdtType),
146    List(Box<DataType>),
147    Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151    // Alias for compatibility/convenience if needed, but preferable to use exact types.
152    #[allow(non_upper_case_globals)]
153    pub const Float: DataType = DataType::Float64;
154    #[allow(non_upper_case_globals)]
155    pub const Int: DataType = DataType::Int64;
156
157    pub fn to_arrow(&self) -> ArrowDataType {
158        match self {
159            DataType::String => ArrowDataType::Utf8,
160            DataType::Int32 => ArrowDataType::Int32,
161            DataType::Int64 => ArrowDataType::Int64,
162            DataType::Float32 => ArrowDataType::Float32,
163            DataType::Float64 => ArrowDataType::Float64,
164            DataType::Bool => ArrowDataType::Boolean,
165            DataType::Timestamp => {
166                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167            }
168            DataType::Date => ArrowDataType::Date32,
169            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
172            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
173            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
174            DataType::Point(pt) => match pt {
175                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176                    Field::new("latitude", ArrowDataType::Float64, false),
177                    Field::new("longitude", ArrowDataType::Float64, false),
178                    Field::new("crs", ArrowDataType::Utf8, false),
179                ])),
180                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181                    Field::new("x", ArrowDataType::Float64, false),
182                    Field::new("y", ArrowDataType::Float64, false),
183                    Field::new("crs", ArrowDataType::Utf8, false),
184                ])),
185                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186                    Field::new("x", ArrowDataType::Float64, false),
187                    Field::new("y", ArrowDataType::Float64, false),
188                    Field::new("z", ArrowDataType::Float64, false),
189                    Field::new("crs", ArrowDataType::Utf8, false),
190                ])),
191            },
192            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194                *dimensions as i32,
195            ),
196            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
198            DataType::List(inner) => {
199                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200            }
201            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202                "item",
203                ArrowDataType::Struct(Fields::from(vec![
204                    Field::new("key", key.to_arrow(), false),
205                    Field::new("value", value.to_arrow(), true),
206                ])),
207                true,
208            ))),
209        }
210    }
211
212    /// Returns `true` if `value` is directly storable in this column type without loss.
213    ///
214    /// This is the schema-level type guard used by the write path. `Value::Null` is
215    /// always accepted — column nullability is enforced separately by the `nullable`
216    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
217    /// For every other declared type, only the `Value` variants that the storage layer
218    /// persists *without silently nulling* are accepted (see the per-type converters in
219    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
220    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
221    ///
222    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
223    /// intentionally *not* accepted here: the write path first coerces such strings into
224    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
225    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
226    /// predicate.
227    ///
228    /// # Examples
229    /// ```
230    /// use uni_common::core::schema::DataType;
231    /// use uni_common::Value;
232    ///
233    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
234    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
235    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
236    /// ```
237    pub fn accepts(&self, value: &crate::value::Value) -> bool {
238        use crate::value::{TemporalValue, Value};
239
240        // Null is universally accepted; nullability is a separate concern.
241        if matches!(value, Value::Null) {
242            return true;
243        }
244
245        match self {
246            // Opaque / dynamically-typed columns accept any value.
247            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249            DataType::String => matches!(value, Value::String(_)),
250            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251            // Int widens to Float losslessly for the ranges we care about.
252            DataType::Float32 | DataType::Float64 => {
253                matches!(value, Value::Int(_) | Value::Float(_))
254            }
255            DataType::Bool => matches!(value, Value::Bool(_)),
256
257            // Non-struct timestamp column: storage parses strings and accepts ints,
258            // so both are lossless here (unlike the DateTime struct column below).
259            DataType::Timestamp => matches!(
260                value,
261                Value::String(_)
262                    | Value::Int(_)
263                    | Value::Temporal(
264                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265                    )
266            ),
267            DataType::DateTime => matches!(
268                value,
269                Value::Temporal(
270                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271                )
272            ),
273            DataType::Date => {
274                matches!(
275                    value,
276                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277                )
278            }
279            DataType::Time => matches!(
280                value,
281                Value::Int(_)
282                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283            ),
284            DataType::Duration => {
285                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286            }
287            DataType::Bytes => matches!(value, Value::Bytes(_)),
288            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
289            DataType::Btic => matches!(
290                value,
291                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292            ),
293            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294            DataType::List(_) => matches!(value, Value::List(_)),
295            DataType::Map(_, _) => matches!(value, Value::Map(_)),
296        }
297    }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301    Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305    SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309    1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314    pub r#type: DataType,
315    pub nullable: bool,
316    #[serde(default = "default_version_1")]
317    pub added_in: u32, // SchemaVersion
318    #[serde(default = "default_state")]
319    pub state: SchemaElementState,
320    #[serde(default)]
321    pub generation_expression: Option<String>,
322    #[serde(default, skip_serializing_if = "Option::is_none")]
323    pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328    pub id: u16, // LabelId
329    #[serde(default = "default_created_at")]
330    pub created_at: DateTime<Utc>,
331    #[serde(default = "default_state")]
332    pub state: SchemaElementState,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
340    pub id: u32,
341    pub src_labels: Vec<String>,
342    pub dst_labels: Vec<String>,
343    #[serde(default = "default_state")]
344    pub state: SchemaElementState,
345    #[serde(default, skip_serializing_if = "Option::is_none")]
346    pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352    Unique { properties: Vec<String> },
353    Exists { property: String },
354    Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360    Label(String),
361    EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366    pub name: String,
367    pub constraint_type: ConstraintType,
368    pub target: ConstraintTarget,
369    pub enabled: bool,
370}
371
372/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
373///
374/// Edge types not defined in the schema are assigned IDs at runtime with
375/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
376/// the name-to-ID and ID-to-name mappings for those types.
377#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379    name_to_id: HashMap<String, u32>,
380    id_to_name: HashMap<u32, String>,
381    /// Next local ID to assign (0 is reserved for invalid).
382    next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386    pub fn new() -> Self {
387        Self {
388            name_to_id: HashMap::new(),
389            id_to_name: HashMap::new(),
390            next_local_id: 1,
391        }
392    }
393
394    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
395    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396        if let Some(&id) = self.name_to_id.get(type_name) {
397            return id;
398        }
399
400        let id = make_schemaless_id(self.next_local_id);
401        self.next_local_id += 1;
402
403        self.name_to_id.insert(type_name.to_string(), id);
404        self.id_to_name.insert(id, type_name.to_string());
405
406        id
407    }
408
409    /// Looks up the edge type name for a schemaless ID.
410    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411        self.id_to_name.get(&type_id).map(String::as_str)
412    }
413
414    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
415    pub fn contains(&self, type_name: &str) -> bool {
416        self.name_to_id.contains_key(type_name)
417    }
418
419    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
420    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
421        self.name_to_id
422            .iter()
423            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
424            .map(|(_, &id)| id)
425    }
426
427    /// Returns all registered schemaless type IDs.
428    pub fn all_type_ids(&self) -> Vec<u32> {
429        self.id_to_name.keys().copied().collect()
430    }
431
432    /// Returns true if the registry has any schemaless types.
433    pub fn is_empty(&self) -> bool {
434        self.name_to_id.is_empty()
435    }
436}
437
438impl Default for SchemalessEdgeTypeRegistry {
439    fn default() -> Self {
440        Self::new()
441    }
442}
443
444/// First virtual (catalog-resolved) label ID. Label IDs in
445/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
446/// plugin-registered `CatalogProvider`s and allocated lazily by the
447/// planner via `PluginRegistry::register_virtual_label`. Native label
448/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
449pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
450/// Sentinel "no label" marker, kept distinct from any allocatable ID.
451pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
452
453/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
454#[inline]
455pub fn is_virtual_label_id(id: u16) -> bool {
456    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
457}
458
459#[derive(Clone, Debug, Serialize, Deserialize)]
460pub struct Schema {
461    pub schema_version: u32,
462    pub labels: HashMap<String, LabelMeta>,
463    pub edge_types: HashMap<String, EdgeTypeMeta>,
464    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
465    #[serde(default)]
466    pub indexes: Vec<IndexDefinition>,
467    #[serde(default)]
468    pub constraints: Vec<Constraint>,
469    /// Registry for schemaless edge types (dynamically assigned IDs)
470    #[serde(default)]
471    pub schemaless_registry: SchemalessEdgeTypeRegistry,
472}
473
474impl Default for Schema {
475    fn default() -> Self {
476        Self {
477            schema_version: 1,
478            labels: HashMap::new(),
479            edge_types: HashMap::new(),
480            properties: HashMap::new(),
481            indexes: Vec::new(),
482            constraints: Vec::new(),
483            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
484        }
485    }
486}
487
488impl Schema {
489    /// Returns the label name for a given label ID.
490    ///
491    /// Performs a linear scan over all labels. This is efficient because
492    /// the number of labels in a schema is typically small.
493    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
494        self.labels
495            .iter()
496            .find(|(_, meta)| meta.id == label_id)
497            .map(|(name, _)| name.as_str())
498    }
499
500    /// Returns the label ID for a given label name.
501    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
502        self.labels.get(label_name).map(|meta| meta.id)
503    }
504
505    /// Returns the edge type name for a given type ID.
506    ///
507    /// Performs a linear scan over all edge types. This is efficient because
508    /// the number of edge types in a schema is typically small.
509    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
510        self.edge_types
511            .iter()
512            .find(|(_, meta)| meta.id == type_id)
513            .map(|(name, _)| name.as_str())
514    }
515
516    /// Returns the edge type ID for a given type name.
517    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
518        self.edge_types.get(type_name).map(|meta| meta.id)
519    }
520
521    /// Returns the vector index configuration for a given label and property.
522    ///
523    /// Performs a linear scan over all indexes. This is efficient because
524    /// the number of indexes in a schema is typically small.
525    pub fn vector_index_for_property(
526        &self,
527        label: &str,
528        property: &str,
529    ) -> Option<&VectorIndexConfig> {
530        self.indexes.iter().find_map(|idx| {
531            if let IndexDefinition::Vector(config) = idx
532                && config.label == label
533                && config.property == property
534                && config.metadata.status == IndexStatus::Online
535            {
536                return Some(config);
537            }
538            None
539        })
540    }
541
542    /// Returns the full-text index configuration for a given label and property.
543    ///
544    /// A full-text index covers one or more properties. This returns the config
545    /// if the specified property is among the indexed properties.
546    pub fn fulltext_index_for_property(
547        &self,
548        label: &str,
549        property: &str,
550    ) -> Option<&FullTextIndexConfig> {
551        self.indexes.iter().find_map(|idx| {
552            if let IndexDefinition::FullText(config) = idx
553                && config.label == label
554                && config.properties.iter().any(|p| p == property)
555                && config.metadata.status == IndexStatus::Online
556            {
557                return Some(config);
558            }
559            None
560        })
561    }
562
563    /// Get label metadata with case-insensitive lookup.
564    ///
565    /// This allows queries to match labels regardless of case, providing
566    /// better user experience when label names vary in casing.
567    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
568        self.labels
569            .iter()
570            .find(|(k, _)| k.eq_ignore_ascii_case(name))
571            .map(|(_, v)| v)
572    }
573
574    /// Get label ID with case-insensitive lookup.
575    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
576        self.get_label_case_insensitive(label_name)
577            .map(|meta| meta.id)
578    }
579
580    /// Get edge type metadata with case-insensitive lookup.
581    ///
582    /// This allows queries to match edge types regardless of case, providing
583    /// better user experience when type names vary in casing.
584    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
585        self.edge_types
586            .iter()
587            .find(|(k, _)| k.eq_ignore_ascii_case(name))
588            .map(|(_, v)| v)
589    }
590
591    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
592    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
593        self.get_edge_type_case_insensitive(type_name)
594            .map(|meta| meta.id)
595    }
596
597    /// Get edge type ID with case-insensitive lookup, checking both
598    /// schema-defined and schemaless registries.
599    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
600        self.edge_type_id_by_name_case_insensitive(type_name)
601            .or_else(|| {
602                self.schemaless_registry
603                    .id_by_name_case_insensitive(type_name)
604            })
605    }
606
607    /// Returns the edge type ID for `type_name`, checking the schema first
608    /// and falling back to the schemaless registry (assigning a new ID if needed).
609    ///
610    /// Requires `&mut self` because it may assign a new schemaless ID.
611    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
612    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
613        if let Some(id) = self.edge_type_id_by_name(type_name) {
614            return id;
615        }
616        self.schemaless_registry.get_or_assign_id(type_name)
617    }
618
619    /// Returns the edge type name for `type_id`, checking both the schema
620    /// and schemaless registries. Returns an owned `String` because the
621    /// name may come from either registry.
622    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
623        if is_schemaless_edge_type(type_id) {
624            self.schemaless_registry
625                .type_name_by_id(type_id)
626                .map(str::to_owned)
627        } else {
628            self.edge_type_name_by_id(type_id).map(str::to_owned)
629        }
630    }
631
632    /// Returns all edge type IDs, including both schema-defined and schemaless types.
633    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
634    pub fn all_edge_type_ids(&self) -> Vec<u32> {
635        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
636        ids.extend(self.schemaless_registry.all_type_ids());
637        ids.sort_unstable();
638        ids
639    }
640}
641
642/// Lifecycle status of an index.
643#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
644pub enum IndexStatus {
645    /// Index is up-to-date and available for queries.
646    #[default]
647    Online,
648    /// Index is currently being rebuilt.
649    Building,
650    /// Index is outdated and scheduled for rebuild.
651    Stale,
652    /// Index rebuild failed after exhausting retries.
653    Failed,
654}
655
656/// Metadata tracking the lifecycle state of an index.
657#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
658pub struct IndexMetadata {
659    /// Current lifecycle status.
660    #[serde(default)]
661    pub status: IndexStatus,
662    /// When the index was last successfully built.
663    #[serde(default)]
664    pub last_built_at: Option<DateTime<Utc>>,
665    /// Row count of the dataset when the index was last built.
666    #[serde(default)]
667    pub row_count_at_build: Option<u64>,
668}
669
670#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
671#[serde(tag = "type")]
672#[non_exhaustive]
673pub enum IndexDefinition {
674    Vector(VectorIndexConfig),
675    FullText(FullTextIndexConfig),
676    Scalar(ScalarIndexConfig),
677    Inverted(InvertedIndexConfig),
678    JsonFullText(JsonFtsIndexConfig),
679}
680
681impl IndexDefinition {
682    /// Returns the index name for any variant.
683    pub fn name(&self) -> &str {
684        match self {
685            IndexDefinition::Vector(c) => &c.name,
686            IndexDefinition::FullText(c) => &c.name,
687            IndexDefinition::Scalar(c) => &c.name,
688            IndexDefinition::Inverted(c) => &c.name,
689            IndexDefinition::JsonFullText(c) => &c.name,
690        }
691    }
692
693    /// Returns the label this index is defined on.
694    pub fn label(&self) -> &str {
695        match self {
696            IndexDefinition::Vector(c) => &c.label,
697            IndexDefinition::FullText(c) => &c.label,
698            IndexDefinition::Scalar(c) => &c.label,
699            IndexDefinition::Inverted(c) => &c.label,
700            IndexDefinition::JsonFullText(c) => &c.label,
701        }
702    }
703
704    /// Returns a reference to the index lifecycle metadata.
705    pub fn metadata(&self) -> &IndexMetadata {
706        match self {
707            IndexDefinition::Vector(c) => &c.metadata,
708            IndexDefinition::FullText(c) => &c.metadata,
709            IndexDefinition::Scalar(c) => &c.metadata,
710            IndexDefinition::Inverted(c) => &c.metadata,
711            IndexDefinition::JsonFullText(c) => &c.metadata,
712        }
713    }
714
715    /// Returns a mutable reference to the index lifecycle metadata.
716    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
717        match self {
718            IndexDefinition::Vector(c) => &mut c.metadata,
719            IndexDefinition::FullText(c) => &mut c.metadata,
720            IndexDefinition::Scalar(c) => &mut c.metadata,
721            IndexDefinition::Inverted(c) => &mut c.metadata,
722            IndexDefinition::JsonFullText(c) => &mut c.metadata,
723        }
724    }
725}
726
727#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
728pub struct InvertedIndexConfig {
729    pub name: String,
730    pub label: String,
731    pub property: String,
732    #[serde(default = "default_normalize")]
733    pub normalize: bool,
734    #[serde(default = "default_max_terms_per_doc")]
735    pub max_terms_per_doc: usize,
736    #[serde(default)]
737    pub metadata: IndexMetadata,
738}
739
740fn default_normalize() -> bool {
741    true
742}
743
744fn default_max_terms_per_doc() -> usize {
745    10_000
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
749pub struct VectorIndexConfig {
750    pub name: String,
751    pub label: String,
752    pub property: String,
753    pub index_type: VectorIndexType,
754    pub metric: DistanceMetric,
755    pub embedding_config: Option<EmbeddingConfig>,
756    #[serde(default)]
757    pub metadata: IndexMetadata,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
761pub struct EmbeddingConfig {
762    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
763    pub alias: String,
764    pub source_properties: Vec<String>,
765    pub batch_size: usize,
766    /// Prefix prepended to text before embedding during auto-embed (document side).
767    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
768    #[serde(default)]
769    pub document_prefix: Option<String>,
770    /// Prefix prepended to text before embedding during query-time embed calls.
771    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
772    #[serde(default)]
773    pub query_prefix: Option<String>,
774}
775
776#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
777#[non_exhaustive]
778pub enum VectorIndexType {
779    Flat,
780    IvfFlat {
781        num_partitions: u32,
782    },
783    IvfPq {
784        num_partitions: u32,
785        num_sub_vectors: u32,
786        bits_per_subvector: u8,
787    },
788    IvfSq {
789        num_partitions: u32,
790    },
791    IvfRq {
792        num_partitions: u32,
793        #[serde(default)]
794        num_bits: Option<u8>,
795    },
796    HnswFlat {
797        m: u32,
798        ef_construction: u32,
799        #[serde(default)]
800        num_partitions: Option<u32>,
801    },
802    HnswSq {
803        m: u32,
804        ef_construction: u32,
805        #[serde(default)]
806        num_partitions: Option<u32>,
807    },
808    HnswPq {
809        m: u32,
810        ef_construction: u32,
811        num_sub_vectors: u32,
812        #[serde(default)]
813        num_partitions: Option<u32>,
814    },
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
818#[non_exhaustive]
819pub enum DistanceMetric {
820    Cosine,
821    L2,
822    Dot,
823}
824
825impl DistanceMetric {
826    /// Computes the distance between two vectors using this metric.
827    ///
828    /// All metrics follow LanceDB conventions so that lower values indicate
829    /// greater similarity:
830    /// - **L2**: squared Euclidean distance.
831    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
832    /// - **Dot**: negative dot product.
833    ///
834    /// # Panics
835    ///
836    /// Panics if `a` and `b` have different lengths.
837    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
838        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
839        match self {
840            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
841            DistanceMetric::Cosine => {
842                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
843                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
844                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
845                let denom = norm_a * norm_b;
846                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
847            }
848            DistanceMetric::Dot => {
849                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
850                -dot
851            }
852        }
853    }
854}
855
856#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
857pub struct FullTextIndexConfig {
858    pub name: String,
859    pub label: String,
860    pub properties: Vec<String>,
861    pub tokenizer: TokenizerConfig,
862    pub with_positions: bool,
863    #[serde(default)]
864    pub metadata: IndexMetadata,
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
868#[non_exhaustive]
869pub enum TokenizerConfig {
870    Standard,
871    Whitespace,
872    Ngram { min: u8, max: u8 },
873    Custom { name: String },
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
877pub struct JsonFtsIndexConfig {
878    pub name: String,
879    pub label: String,
880    pub column: String,
881    #[serde(default)]
882    pub paths: Vec<String>,
883    #[serde(default)]
884    pub with_positions: bool,
885    #[serde(default)]
886    pub metadata: IndexMetadata,
887}
888
889#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
890pub struct ScalarIndexConfig {
891    pub name: String,
892    pub label: String,
893    pub properties: Vec<String>,
894    pub index_type: ScalarIndexType,
895    pub where_clause: Option<String>,
896    #[serde(default)]
897    pub metadata: IndexMetadata,
898}
899
900#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
901#[non_exhaustive]
902pub enum ScalarIndexType {
903    BTree,
904    Hash,
905    Bitmap,
906    LabelList,
907}
908
909pub struct SchemaManager {
910    store: Arc<dyn ObjectStore>,
911    path: ObjectStorePath,
912    schema: RwLock<Arc<Schema>>,
913}
914
915impl SchemaManager {
916    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
917        let path = path.as_ref();
918        let parent = path
919            .parent()
920            .ok_or_else(|| anyhow!("Invalid schema path"))?;
921        let filename = path
922            .file_name()
923            .ok_or_else(|| anyhow!("Invalid schema filename"))?
924            .to_str()
925            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
926
927        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
928        let obj_path = ObjectStorePath::from(filename);
929
930        Self::load_from_store(store, &obj_path).await
931    }
932
933    pub async fn load_from_store(
934        store: Arc<dyn ObjectStore>,
935        path: &ObjectStorePath,
936    ) -> Result<Self> {
937        match store.get(path).await {
938            Ok(result) => {
939                let bytes = result.bytes().await?;
940                let content = String::from_utf8(bytes.to_vec())?;
941                let mut schema: Schema = serde_json::from_str(&content)?;
942                // Self-heal catalogs that grew super-linearly under the
943                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
944                // duplicate index entries by name, keeping the *last*
945                // occurrence — matches the upsert semantics in `add_index`
946                // and preserves whatever metadata the most recent rebuild
947                // wrote. The dedup persists on the next mutation that
948                // calls `save()`.
949                let original_len = schema.indexes.len();
950                if original_len > 0 {
951                    let mut seen: std::collections::HashSet<String> =
952                        std::collections::HashSet::with_capacity(original_len);
953                    let mut dedup: Vec<IndexDefinition> = schema
954                        .indexes
955                        .iter()
956                        .rev()
957                        .filter(|idx| seen.insert(idx.name().to_string()))
958                        .cloned()
959                        .collect();
960                    dedup.reverse();
961                    if dedup.len() != original_len {
962                        tracing::warn!(
963                            collapsed = original_len - dedup.len(),
964                            kept = dedup.len(),
965                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
966                        );
967                        schema.indexes = dedup;
968                    }
969                }
970                Ok(Self {
971                    store,
972                    path: path.clone(),
973                    schema: RwLock::new(Arc::new(schema)),
974                })
975            }
976            Err(object_store::Error::NotFound { .. }) => Ok(Self {
977                store,
978                path: path.clone(),
979                schema: RwLock::new(Arc::new(Schema::default())),
980            }),
981            Err(e) => Err(anyhow::Error::from(e)),
982        }
983    }
984
985    pub async fn save(&self) -> Result<()> {
986        let content = {
987            let schema_guard = acquire_read(&self.schema, "schema")?;
988            serde_json::to_string_pretty(&**schema_guard)?
989        };
990        self.store
991            .put(&self.path, content.into())
992            .await
993            .map_err(anyhow::Error::from)?;
994        Ok(())
995    }
996
997    pub fn path(&self) -> &ObjectStorePath {
998        &self.path
999    }
1000
1001    pub fn schema(&self) -> Arc<Schema> {
1002        self.schema
1003            .read()
1004            .expect("Schema lock poisoned - a thread panicked while holding it")
1005            .clone()
1006    }
1007
1008    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1009    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1010    fn normalize_function_names(expr: &str) -> String {
1011        let mut result = String::with_capacity(expr.len());
1012        let mut chars = expr.chars().peekable();
1013
1014        while let Some(ch) = chars.next() {
1015            if ch.is_alphabetic() {
1016                // Collect identifier
1017                let mut ident = String::new();
1018                ident.push(ch);
1019
1020                while let Some(&next) = chars.peek() {
1021                    if next.is_alphanumeric() || next == '_' {
1022                        ident.push(chars.next().unwrap());
1023                    } else {
1024                        break;
1025                    }
1026                }
1027
1028                // If followed by '(', it's a function call - uppercase it
1029                if chars.peek() == Some(&'(') {
1030                    result.push_str(&ident.to_uppercase());
1031                } else {
1032                    result.push_str(&ident); // Keep property names as-is
1033                }
1034            } else {
1035                result.push(ch);
1036            }
1037        }
1038
1039        result
1040    }
1041
1042    /// Generate a consistent internal column name for an expression index.
1043    /// Uses a hash suffix to ensure uniqueness for different expressions that
1044    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1045    ///
1046    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1047    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1048    /// if the hash changes after a compiler upgrade.
1049    pub fn generated_column_name(expr: &str) -> String {
1050        // Normalize function names to uppercase for case-insensitive matching
1051        let normalized = Self::normalize_function_names(expr);
1052
1053        let sanitized = normalized
1054            .replace(|c: char| !c.is_alphanumeric(), "_")
1055            .trim_matches('_')
1056            .to_string();
1057
1058        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1059        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1060        const FNV_PRIME: u64 = 1099511628211;
1061
1062        let mut hash = FNV_OFFSET_BASIS;
1063        for byte in normalized.as_bytes() {
1064            hash ^= *byte as u64;
1065            hash = hash.wrapping_mul(FNV_PRIME);
1066        }
1067
1068        format!("_gen_{}_{:x}", sanitized, hash)
1069    }
1070
1071    pub fn replace_schema(&self, new_schema: Schema) {
1072        let mut schema = self
1073            .schema
1074            .write()
1075            .expect("Schema lock poisoned - a thread panicked while holding it");
1076        *schema = Arc::new(new_schema);
1077    }
1078
1079    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1080    ///
1081    /// Used by `UniInner::at_fork` to give a forked session a schema view
1082    /// that includes any labels/edge-types/properties the fork has
1083    /// introduced on top of primary. The returned manager owns its own
1084    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1085    /// schema file. The returned manager is *not* intended for `.save()`;
1086    /// fork-overlay persistence is owned by the registry layer
1087    /// (`catalog/fork_schemas/{fork_id}.json`).
1088    ///
1089    /// In Phase 1 the delta is always empty, so the merge is a clone.
1090    /// Phase 2 starts populating it when on-the-fly label creation lands.
1091    #[must_use]
1092    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1093        let primary = self.schema();
1094        let merged = if overlay.is_empty() {
1095            (*primary).clone()
1096        } else {
1097            let mut merged = (*primary).clone();
1098            for (name, label) in &overlay.added_labels {
1099                merged.labels.insert(name.clone(), label.clone());
1100            }
1101            for (name, edge_type) in &overlay.added_edge_types {
1102                merged.edge_types.insert(name.clone(), edge_type.clone());
1103            }
1104            for addition in &overlay.added_properties {
1105                let props = merged.properties.entry(addition.owner.clone()).or_default();
1106                props.insert(
1107                    addition.property.clone(),
1108                    PropertyMeta {
1109                        r#type: addition.data_type.clone(),
1110                        nullable: addition.nullable,
1111                        added_in: merged.schema_version,
1112                        state: SchemaElementState::Active,
1113                        generation_expression: None,
1114                        description: None,
1115                    },
1116                );
1117            }
1118            merged
1119        };
1120
1121        Arc::new(Self {
1122            store: self.store.clone(),
1123            path: self.path.clone(),
1124            schema: RwLock::new(Arc::new(merged)),
1125        })
1126    }
1127
1128    pub fn next_label_id(&self) -> u16 {
1129        self.schema()
1130            .labels
1131            .values()
1132            .map(|l| l.id)
1133            .max()
1134            .unwrap_or(0)
1135            + 1
1136    }
1137
1138    pub fn next_type_id(&self) -> u32 {
1139        let max_schema_id = self
1140            .schema()
1141            .edge_types
1142            .values()
1143            .map(|t| t.id)
1144            .max()
1145            .unwrap_or(0);
1146
1147        // Ensure we stay in schema'd ID space (bit 31 = 0)
1148        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1149            panic!("Schema edge type ID exhaustion");
1150        }
1151
1152        max_schema_id + 1
1153    }
1154
1155    pub fn add_label(&self, name: &str) -> Result<u16> {
1156        self.add_label_with_desc(name, None)
1157    }
1158
1159    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1160        let mut guard = acquire_write(&self.schema, "schema")?;
1161        let schema = Arc::make_mut(&mut *guard);
1162        if schema.labels.contains_key(name) {
1163            return Err(anyhow!("Label '{}' already exists", name));
1164        }
1165
1166        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1167        if id >= VIRTUAL_LABEL_ID_START {
1168            return Err(anyhow!(
1169                "Native label space exhausted (next id {id:#x} would enter the \
1170                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1171                 reserved for catalog-resolved labels)"
1172            ));
1173        }
1174        schema.labels.insert(
1175            name.to_string(),
1176            LabelMeta {
1177                id,
1178                created_at: Utc::now(),
1179                state: SchemaElementState::Active,
1180                description,
1181            },
1182        );
1183        Ok(id)
1184    }
1185
1186    pub fn add_edge_type(
1187        &self,
1188        name: &str,
1189        src_labels: Vec<String>,
1190        dst_labels: Vec<String>,
1191    ) -> Result<u32> {
1192        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1193    }
1194
1195    pub fn add_edge_type_with_desc(
1196        &self,
1197        name: &str,
1198        src_labels: Vec<String>,
1199        dst_labels: Vec<String>,
1200        description: Option<String>,
1201    ) -> Result<u32> {
1202        let mut guard = acquire_write(&self.schema, "schema")?;
1203        let schema = Arc::make_mut(&mut *guard);
1204        if schema.edge_types.contains_key(name) {
1205            return Err(anyhow!("Edge type '{}' already exists", name));
1206        }
1207
1208        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1209
1210        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1211        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1212        // `add_edge_type`, so the two entry points cannot disagree on the
1213        // legal ceiling.
1214        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1215            return Err(anyhow!(
1216                "Native edge type space exhausted (next id {id:#x} would enter the \
1217                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1218                 reserved for catalog-resolved edge types)"
1219            ));
1220        }
1221
1222        schema.edge_types.insert(
1223            name.to_string(),
1224            EdgeTypeMeta {
1225                id,
1226                src_labels,
1227                dst_labels,
1228                state: SchemaElementState::Active,
1229                description,
1230            },
1231        );
1232        Ok(id)
1233    }
1234
1235    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1236    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1237        let mut guard = acquire_write(&self.schema, "schema")
1238            .expect("Schema lock poisoned - a thread panicked while holding it");
1239        let schema = Arc::make_mut(&mut *guard);
1240        schema.get_or_assign_edge_type_id(type_name)
1241    }
1242
1243    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1244    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1245        let schema = acquire_read(&self.schema, "schema")
1246            .expect("Schema lock poisoned - a thread panicked while holding it");
1247        schema.edge_type_name_by_id_unified(type_id)
1248    }
1249
1250    pub fn add_property(
1251        &self,
1252        label_or_type: &str,
1253        prop_name: &str,
1254        data_type: DataType,
1255        nullable: bool,
1256    ) -> Result<()> {
1257        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1258    }
1259
1260    pub fn add_property_with_desc(
1261        &self,
1262        label_or_type: &str,
1263        prop_name: &str,
1264        data_type: DataType,
1265        nullable: bool,
1266        description: Option<String>,
1267    ) -> Result<()> {
1268        validate_property_name(prop_name)?;
1269        let mut guard = acquire_write(&self.schema, "schema")?;
1270        let schema = Arc::make_mut(&mut *guard);
1271        let version = schema.schema_version;
1272        let props = schema
1273            .properties
1274            .entry(label_or_type.to_string())
1275            .or_default();
1276
1277        if props.contains_key(prop_name) {
1278            return Err(anyhow!(
1279                "Property '{}' already exists for '{}'",
1280                prop_name,
1281                label_or_type
1282            ));
1283        }
1284
1285        props.insert(
1286            prop_name.to_string(),
1287            PropertyMeta {
1288                r#type: data_type,
1289                nullable,
1290                added_in: version,
1291                state: SchemaElementState::Active,
1292                generation_expression: None,
1293                description,
1294            },
1295        );
1296        Ok(())
1297    }
1298
1299    pub fn add_generated_property(
1300        &self,
1301        label_or_type: &str,
1302        prop_name: &str,
1303        data_type: DataType,
1304        expr: String,
1305    ) -> Result<()> {
1306        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1307        // but must still avoid storage-layer column-name collisions.
1308        validate_reserved_property_name(prop_name)?;
1309        let mut guard = acquire_write(&self.schema, "schema")?;
1310        let schema = Arc::make_mut(&mut *guard);
1311        let version = schema.schema_version;
1312        let props = schema
1313            .properties
1314            .entry(label_or_type.to_string())
1315            .or_default();
1316
1317        if props.contains_key(prop_name) {
1318            return Err(anyhow!("Property '{}' already exists", prop_name));
1319        }
1320
1321        props.insert(
1322            prop_name.to_string(),
1323            PropertyMeta {
1324                r#type: data_type,
1325                nullable: true,
1326                added_in: version,
1327                state: SchemaElementState::Active,
1328                generation_expression: Some(expr),
1329                description: None,
1330            },
1331        );
1332        Ok(())
1333    }
1334
1335    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1336        let mut guard = acquire_write(&self.schema, "schema")?;
1337        let schema = Arc::make_mut(&mut *guard);
1338        let meta = schema
1339            .labels
1340            .get_mut(name)
1341            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1342        meta.description = description;
1343        Ok(())
1344    }
1345
1346    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1347        let mut guard = acquire_write(&self.schema, "schema")?;
1348        let schema = Arc::make_mut(&mut *guard);
1349        let meta = schema
1350            .edge_types
1351            .get_mut(name)
1352            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1353        meta.description = description;
1354        Ok(())
1355    }
1356
1357    pub fn set_property_description(
1358        &self,
1359        entity: &str,
1360        prop_name: &str,
1361        description: Option<String>,
1362    ) -> Result<()> {
1363        let mut guard = acquire_write(&self.schema, "schema")?;
1364        let schema = Arc::make_mut(&mut *guard);
1365        let props = schema
1366            .properties
1367            .get_mut(entity)
1368            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1369        let meta = props
1370            .get_mut(prop_name)
1371            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1372        meta.description = description;
1373        Ok(())
1374    }
1375
1376    /// Register an index definition on the schema, **upsert by name**.
1377    ///
1378    /// If an index with the same `IndexDefinition::name()` already exists, it
1379    /// is replaced in place; otherwise the def is appended. Idempotent under
1380    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1381    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1382    /// every `IndexManager::create_*_index` re-record metadata updates without
1383    /// duplicating entries (issue rustic-ai/uni-db#63).
1384    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1385        let mut guard = acquire_write(&self.schema, "schema")?;
1386        let schema = Arc::make_mut(&mut *guard);
1387        if let Some(existing) = schema
1388            .indexes
1389            .iter_mut()
1390            .find(|i| i.name() == index_def.name())
1391        {
1392            *existing = index_def;
1393        } else {
1394            schema.indexes.push(index_def);
1395        }
1396        Ok(())
1397    }
1398
1399    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1400        let schema = self.schema.read().expect("Schema lock poisoned");
1401        schema.indexes.iter().find(|i| i.name() == name).cloned()
1402    }
1403
1404    /// Updates the lifecycle metadata for an index by name.
1405    ///
1406    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1407    /// allowing callers to update status, timestamps, etc.
1408    pub fn update_index_metadata(
1409        &self,
1410        index_name: &str,
1411        f: impl FnOnce(&mut IndexMetadata),
1412    ) -> Result<()> {
1413        let mut guard = acquire_write(&self.schema, "schema")?;
1414        let schema = Arc::make_mut(&mut *guard);
1415        let idx = schema
1416            .indexes
1417            .iter_mut()
1418            .find(|i| i.name() == index_name)
1419            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1420        f(idx.metadata_mut());
1421        Ok(())
1422    }
1423
1424    pub fn remove_index(&self, name: &str) -> Result<()> {
1425        let mut guard = acquire_write(&self.schema, "schema")?;
1426        let schema = Arc::make_mut(&mut *guard);
1427        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1428            schema.indexes.remove(pos);
1429            Ok(())
1430        } else {
1431            Err(anyhow!("Index '{}' not found", name))
1432        }
1433    }
1434
1435    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1436        let mut guard = acquire_write(&self.schema, "schema")?;
1437        let schema = Arc::make_mut(&mut *guard);
1438        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1439            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1440        }
1441        schema.constraints.push(constraint);
1442        Ok(())
1443    }
1444
1445    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1446        let mut guard = acquire_write(&self.schema, "schema")?;
1447        let schema = Arc::make_mut(&mut *guard);
1448        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1449            schema.constraints.remove(pos);
1450            Ok(())
1451        } else if if_exists {
1452            Ok(())
1453        } else {
1454            Err(anyhow!("Constraint '{}' not found", name))
1455        }
1456    }
1457
1458    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1459        let mut guard = acquire_write(&self.schema, "schema")?;
1460        let schema = Arc::make_mut(&mut *guard);
1461        if let Some(props) = schema.properties.get_mut(label_or_type) {
1462            if props.remove(prop_name).is_some() {
1463                Ok(())
1464            } else {
1465                Err(anyhow!(
1466                    "Property '{}' not found for '{}'",
1467                    prop_name,
1468                    label_or_type
1469                ))
1470            }
1471        } else {
1472            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1473        }
1474    }
1475
1476    pub fn rename_property(
1477        &self,
1478        label_or_type: &str,
1479        old_name: &str,
1480        new_name: &str,
1481    ) -> Result<()> {
1482        let mut guard = acquire_write(&self.schema, "schema")?;
1483        let schema = Arc::make_mut(&mut *guard);
1484        if let Some(props) = schema.properties.get_mut(label_or_type) {
1485            if let Some(meta) = props.remove(old_name) {
1486                if props.contains_key(new_name) {
1487                    // Rollback removal? Or just error.
1488                    props.insert(old_name.to_string(), meta); // Restore
1489                    return Err(anyhow!("Property '{}' already exists", new_name));
1490                }
1491                props.insert(new_name.to_string(), meta);
1492                Ok(())
1493            } else {
1494                Err(anyhow!(
1495                    "Property '{}' not found for '{}'",
1496                    old_name,
1497                    label_or_type
1498                ))
1499            }
1500        } else {
1501            Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
1502        }
1503    }
1504
1505    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1506        let mut guard = acquire_write(&self.schema, "schema")?;
1507        let schema = Arc::make_mut(&mut *guard);
1508        if let Some(label_meta) = schema.labels.get_mut(name) {
1509            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1510            // Do not remove properties; they are implicitly tombstoned by the label
1511            Ok(())
1512        } else if if_exists {
1513            Ok(())
1514        } else {
1515            Err(anyhow!("Label '{}' not found", name))
1516        }
1517    }
1518
1519    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1520        let mut guard = acquire_write(&self.schema, "schema")?;
1521        let schema = Arc::make_mut(&mut *guard);
1522        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1523            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1524            // Do not remove properties; they are implicitly tombstoned by the edge type
1525            Ok(())
1526        } else if if_exists {
1527            Ok(())
1528        } else {
1529            Err(anyhow!("Edge Type '{}' not found", name))
1530        }
1531    }
1532}
1533
1534/// Validate identifier names to prevent injection and ensure compatibility.
1535pub fn validate_identifier(name: &str) -> Result<()> {
1536    // Length check
1537    if name.is_empty() || name.len() > 64 {
1538        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1539    }
1540
1541    // First character must be letter or underscore
1542    let first = name.chars().next().unwrap();
1543    if !first.is_alphabetic() && first != '_' {
1544        return Err(anyhow!(
1545            "Identifier '{}' must start with letter or underscore",
1546            name
1547        ));
1548    }
1549
1550    // Remaining characters: alphanumeric or underscore
1551    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1552        return Err(anyhow!(
1553            "Identifier '{}' must contain only alphanumeric and underscore",
1554            name
1555        ));
1556    }
1557
1558    // Reserved words
1559    const RESERVED: &[&str] = &[
1560        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1561        "UNION", "ORDER", "LIMIT",
1562    ];
1563    if RESERVED.contains(&name.to_uppercase().as_str()) {
1564        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1565    }
1566
1567    Ok(())
1568}
1569
1570/// Reject user-declared property names that collide with internal Arrow column
1571/// names used by the storage layer.
1572///
1573/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1574/// schema with two `ext_id` fields at flush time, which Lance rejects with
1575/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1576pub fn validate_property_name(name: &str) -> Result<()> {
1577    if name.starts_with('_') {
1578        return Err(anyhow!(
1579            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1580            name
1581        ));
1582    }
1583    validate_reserved_property_name(name)
1584}
1585
1586/// Reject names that collide with storage-layer Arrow column names.
1587///
1588/// Used both by `validate_property_name` (user-facing path) and directly by
1589/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1590/// needs to bypass the underscore-prefix rule but must still reject the
1591/// fixed-name collisions below.
1592fn validate_reserved_property_name(name: &str) -> Result<()> {
1593    // Unprefixed names that get appended alongside user properties in the
1594    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1595    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1596    // Arrow schemas — declaring one of these as a user property produces a
1597    // duplicate Arrow field and a Lance "Duplicate field name" error at
1598    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1599    // `labels` in the main tables) are NOT listed: those tables don't
1600    // append user properties, so no collision can occur.
1601    const RESERVED_PROPS: &[&str] = &[
1602        "ext_id",
1603        "overflow_json",
1604        "eid",
1605        "src_vid",
1606        "dst_vid",
1607        "op",
1608        // Internal planner sentinel: a column-name marker used by
1609        // `mark_set_item_variables` (uni-query::query::planner) to request
1610        // narrow structural projection without full-schema expansion.
1611        // Reserved here defensively so an internal `add_generated_property`
1612        // path can't accidentally create a colliding user-facing column.
1613        // The user-facing `validate_property_name` already rejects this
1614        // via the underscore-prefix rule, so this is belt-and-suspenders.
1615        "__set_struct__",
1616    ];
1617    if RESERVED_PROPS.contains(&name) {
1618        return Err(anyhow!(
1619            "Property name '{}' is reserved by the storage layer; please choose a different name",
1620            name
1621        ));
1622    }
1623    Ok(())
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628    use super::*;
1629    use crate::value::{TemporalValue, Value};
1630    use object_store::local::LocalFileSystem;
1631    use tempfile::tempdir;
1632
1633    #[test]
1634    fn test_datatype_accepts_matrix() {
1635        let dt = || TemporalValue::DateTime {
1636            nanos_since_epoch: 0,
1637            offset_seconds: 0,
1638            timezone_name: None,
1639        };
1640
1641        // Null is accepted by every type (nullability checked separately).
1642        for ty in [
1643            DataType::String,
1644            DataType::Int64,
1645            DataType::Bool,
1646            DataType::DateTime,
1647            DataType::Float64,
1648        ] {
1649            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1650        }
1651
1652        // Exact-type matches.
1653        assert!(DataType::String.accepts(&Value::String("x".into())));
1654        assert!(DataType::Int64.accepts(&Value::Int(1)));
1655        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1656        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1657
1658        // Intentional lossless widenings remain allowed.
1659        assert!(
1660            DataType::Float64.accepts(&Value::Int(3)),
1661            "Int widens to Float"
1662        );
1663        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1664        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1665        assert!(
1666            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1667            "storage parses strings for non-struct Timestamp columns"
1668        );
1669
1670        // The #68 data-loss cases must be rejected (coercion handles strings separately).
1671        assert!(
1672            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1673            "String into a DateTime struct column nulls silently — reject here"
1674        );
1675        assert!(!DataType::Bool.accepts(&Value::Int(1)));
1676        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1677        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1678        assert!(
1679            !DataType::String.accepts(&Value::Int(10)),
1680            "no implicit stringification"
1681        );
1682        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1683
1684        // Opaque columns accept anything.
1685        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1686    }
1687
1688    #[tokio::test]
1689    async fn test_schema_management() -> Result<()> {
1690        let dir = tempdir()?;
1691        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1692        let path = ObjectStorePath::from("schema.json");
1693        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1694
1695        // Labels
1696        let lid = manager.add_label("Person")?;
1697        assert_eq!(lid, 1);
1698        assert!(manager.add_label("Person").is_err());
1699
1700        // Properties
1701        manager.add_property("Person", "name", DataType::String, false)?;
1702        assert!(
1703            manager
1704                .add_property("Person", "name", DataType::String, false)
1705                .is_err()
1706        );
1707
1708        // Edge types
1709        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1710        assert_eq!(tid, 1);
1711
1712        manager.save().await?;
1713        // Check file exists
1714        assert!(store.get(&path).await.is_ok());
1715
1716        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1717        assert!(manager2.schema().labels.contains_key("Person"));
1718        assert!(
1719            manager2
1720                .schema()
1721                .properties
1722                .get("Person")
1723                .unwrap()
1724                .contains_key("name")
1725        );
1726
1727        Ok(())
1728    }
1729
1730    #[tokio::test]
1731    async fn test_reserved_property_names_rejected() -> Result<()> {
1732        let dir = tempdir()?;
1733        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1734        let path = ObjectStorePath::from("schema.json");
1735        let manager = SchemaManager::load_from_store(store, &path).await?;
1736
1737        manager.add_label("Tiny")?;
1738
1739        // Unprefixed reserved names — these collide with internal Arrow
1740        // columns in storage tables and previously caused Lance
1741        // "Duplicate field name" errors at flush time.
1742        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1743            let err = manager
1744                .add_property("Tiny", reserved, DataType::String, true)
1745                .expect_err(&format!("expected '{reserved}' to be rejected"));
1746            assert!(
1747                err.to_string().contains("reserved"),
1748                "error for '{reserved}' should mention 'reserved', got: {err}"
1749            );
1750        }
1751
1752        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
1753        // alongside the underscore-prefix rule). Confirms an internal
1754        // `add_generated_property` path cannot accidentally create a column
1755        // that collides with the SET-target structural-projection marker.
1756        let err = manager
1757            .add_property("Tiny", "__set_struct__", DataType::String, true)
1758            .expect_err("expected '__set_struct__' to be rejected");
1759        assert!(
1760            err.to_string().contains("reserved"),
1761            "__set_struct__ rejection should mention 'reserved', got: {err}"
1762        );
1763
1764        // Leading-underscore pattern rule.
1765        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1766            assert!(
1767                manager
1768                    .add_property("Tiny", reserved, DataType::String, true)
1769                    .is_err(),
1770                "expected '{reserved}' to be rejected"
1771            );
1772        }
1773
1774        // Names that merely contain a reserved substring should still be
1775        // accepted.
1776        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1777        manager.add_property("Tiny", "user_op", DataType::String, true)?;
1778        manager.add_property("Tiny", "type_name", DataType::String, true)?;
1779
1780        // Same check applies to edge-type properties (single dispatch).
1781        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1782        assert!(
1783            manager
1784                .add_property("knows", "src_vid", DataType::Int64, true)
1785                .is_err()
1786        );
1787
1788        // And to generated properties.
1789        assert!(
1790            manager
1791                .add_generated_property(
1792                    "Tiny",
1793                    "ext_id",
1794                    DataType::String,
1795                    "concat('x', name)".into()
1796                )
1797                .is_err()
1798        );
1799
1800        Ok(())
1801    }
1802
1803    #[test]
1804    fn test_normalize_function_names() {
1805        assert_eq!(
1806            SchemaManager::normalize_function_names("lower(email)"),
1807            "LOWER(email)"
1808        );
1809        assert_eq!(
1810            SchemaManager::normalize_function_names("LOWER(email)"),
1811            "LOWER(email)"
1812        );
1813        assert_eq!(
1814            SchemaManager::normalize_function_names("Lower(email)"),
1815            "LOWER(email)"
1816        );
1817        assert_eq!(
1818            SchemaManager::normalize_function_names("trim(lower(email))"),
1819            "TRIM(LOWER(email))"
1820        );
1821    }
1822
1823    #[test]
1824    fn test_generated_column_name_case_insensitive() {
1825        let col1 = SchemaManager::generated_column_name("lower(email)");
1826        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1827        let col3 = SchemaManager::generated_column_name("Lower(email)");
1828        assert_eq!(col1, col2);
1829        assert_eq!(col2, col3);
1830        assert!(col1.starts_with("_gen_LOWER_email_"));
1831    }
1832
1833    #[test]
1834    fn test_index_metadata_serde_backward_compat() {
1835        // Simulate old JSON without metadata field
1836        let json = r#"{
1837            "type": "Scalar",
1838            "name": "idx_person_name",
1839            "label": "Person",
1840            "properties": ["name"],
1841            "index_type": "BTree",
1842            "where_clause": null
1843        }"#;
1844        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1845        let meta = def.metadata();
1846        assert_eq!(meta.status, IndexStatus::Online);
1847        assert!(meta.last_built_at.is_none());
1848        assert!(meta.row_count_at_build.is_none());
1849    }
1850
1851    #[test]
1852    fn test_index_metadata_serde_roundtrip() {
1853        let now = Utc::now();
1854        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1855            name: "idx_test".to_string(),
1856            label: "Test".to_string(),
1857            properties: vec!["prop".to_string()],
1858            index_type: ScalarIndexType::BTree,
1859            where_clause: None,
1860            metadata: IndexMetadata {
1861                status: IndexStatus::Building,
1862                last_built_at: Some(now),
1863                row_count_at_build: Some(42),
1864            },
1865        });
1866
1867        let json = serde_json::to_string(&def).unwrap();
1868        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1869        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1870        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1871        assert!(parsed.metadata().last_built_at.is_some());
1872    }
1873
1874    #[tokio::test]
1875    async fn test_update_index_metadata() -> Result<()> {
1876        let dir = tempdir()?;
1877        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1878        let path = ObjectStorePath::from("schema.json");
1879        let manager = SchemaManager::load_from_store(store, &path).await?;
1880
1881        manager.add_label("Person")?;
1882        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1883            name: "idx_test".to_string(),
1884            label: "Person".to_string(),
1885            properties: vec!["name".to_string()],
1886            index_type: ScalarIndexType::BTree,
1887            where_clause: None,
1888            metadata: Default::default(),
1889        });
1890        manager.add_index(idx)?;
1891
1892        // Verify initial status is Online
1893        let initial = manager.get_index("idx_test").unwrap();
1894        assert_eq!(initial.metadata().status, IndexStatus::Online);
1895
1896        // Update to Building
1897        manager.update_index_metadata("idx_test", |m| {
1898            m.status = IndexStatus::Building;
1899            m.row_count_at_build = Some(100);
1900        })?;
1901
1902        let updated = manager.get_index("idx_test").unwrap();
1903        assert_eq!(updated.metadata().status, IndexStatus::Building);
1904        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1905
1906        // Non-existent index should error
1907        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1908
1909        Ok(())
1910    }
1911
1912    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
1913    /// invocations with the same `IndexDefinition::name()` must replace
1914    /// the entry in place rather than appending. Subsequent `add_index`
1915    /// calls also reflect metadata updates from the new definition.
1916    #[tokio::test]
1917    async fn test_add_index_is_upsert_by_name() -> Result<()> {
1918        let dir = tempdir()?;
1919        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1920        let path = ObjectStorePath::from("schema.json");
1921        let manager = SchemaManager::load_from_store(store, &path).await?;
1922        manager.add_label("Person")?;
1923
1924        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1925            name: "idx_test".to_string(),
1926            label: "Person".to_string(),
1927            properties: vec!["name".to_string()],
1928            index_type: ScalarIndexType::BTree,
1929            where_clause: None,
1930            metadata: IndexMetadata {
1931                status: IndexStatus::Building,
1932                ..Default::default()
1933            },
1934        });
1935        manager.add_index(initial.clone())?;
1936        assert_eq!(manager.schema().indexes.len(), 1);
1937
1938        // Re-add the identical def — must remain a single entry.
1939        manager.add_index(initial.clone())?;
1940        assert_eq!(
1941            manager.schema().indexes.len(),
1942            1,
1943            "duplicate add_index by name must not append"
1944        );
1945
1946        // Re-add with updated metadata — must replace in place, len unchanged.
1947        let mut updated_cfg = match initial {
1948            IndexDefinition::Scalar(c) => c,
1949            _ => unreachable!(),
1950        };
1951        updated_cfg.metadata.status = IndexStatus::Online;
1952        updated_cfg.metadata.row_count_at_build = Some(42);
1953        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
1954        assert_eq!(manager.schema().indexes.len(), 1);
1955        let stored = manager.get_index("idx_test").unwrap();
1956        assert_eq!(stored.metadata().status, IndexStatus::Online);
1957        assert_eq!(stored.metadata().row_count_at_build, Some(42));
1958
1959        // A *different* name appends as a new entry.
1960        let other = IndexDefinition::Scalar(ScalarIndexConfig {
1961            name: "idx_other".to_string(),
1962            label: "Person".to_string(),
1963            properties: vec!["age".to_string()],
1964            index_type: ScalarIndexType::BTree,
1965            where_clause: None,
1966            metadata: IndexMetadata::default(),
1967        });
1968        manager.add_index(other)?;
1969        assert_eq!(manager.schema().indexes.len(), 2);
1970
1971        Ok(())
1972    }
1973
1974    /// `load_from_store` self-heals catalogs that were bloated by the
1975    /// pre-fix `add_index` (kept the *last* def per name).
1976    #[tokio::test]
1977    async fn test_load_dedups_bloated_indexes() -> Result<()> {
1978        let dir = tempdir()?;
1979        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1980        let path = ObjectStorePath::from("schema.json");
1981
1982        // Seed disk with a hand-crafted bloated schema: 50 entries, all
1983        // sharing the same name. The last entry has distinct metadata so
1984        // we can assert "last writer wins" semantics.
1985        let mut schema = Schema::default();
1986        schema.labels.insert(
1987            "Person".to_string(),
1988            LabelMeta {
1989                id: 1,
1990                created_at: chrono::Utc::now(),
1991                state: SchemaElementState::Active,
1992                description: None,
1993            },
1994        );
1995        let make = |status: IndexStatus, count: Option<u64>| {
1996            IndexDefinition::Scalar(ScalarIndexConfig {
1997                name: "idx_dup".to_string(),
1998                label: "Person".to_string(),
1999                properties: vec!["name".to_string()],
2000                index_type: ScalarIndexType::BTree,
2001                where_clause: None,
2002                metadata: IndexMetadata {
2003                    status,
2004                    row_count_at_build: count,
2005                    ..Default::default()
2006                },
2007            })
2008        };
2009        for _ in 0..49 {
2010            schema.indexes.push(make(IndexStatus::Building, None));
2011        }
2012        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2013        let json = serde_json::to_string_pretty(&schema)?;
2014        store.put(&path, json.into()).await?;
2015
2016        let manager = SchemaManager::load_from_store(store, &path).await?;
2017        let schema = manager.schema();
2018        assert_eq!(
2019            schema.indexes.len(),
2020            1,
2021            "load() must collapse 50 duplicates by name to 1"
2022        );
2023        // Last-writer-wins: the kept entry is the final push (Online, 123).
2024        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2025        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2026
2027        Ok(())
2028    }
2029
2030    #[test]
2031    fn test_vector_index_for_property_skips_non_online() {
2032        let mut schema = Schema::default();
2033        schema.labels.insert(
2034            "Document".to_string(),
2035            LabelMeta {
2036                id: 1,
2037                created_at: chrono::Utc::now(),
2038                state: SchemaElementState::Active,
2039                description: None,
2040            },
2041        );
2042
2043        // Add a vector index with Stale status
2044        schema
2045            .indexes
2046            .push(IndexDefinition::Vector(VectorIndexConfig {
2047                name: "vec_doc_embedding".to_string(),
2048                label: "Document".to_string(),
2049                property: "embedding".to_string(),
2050                index_type: VectorIndexType::Flat,
2051                metric: DistanceMetric::Cosine,
2052                embedding_config: None,
2053                metadata: IndexMetadata {
2054                    status: IndexStatus::Stale,
2055                    ..Default::default()
2056                },
2057            }));
2058
2059        // Stale index should NOT be returned
2060        assert!(
2061            schema
2062                .vector_index_for_property("Document", "embedding")
2063                .is_none()
2064        );
2065
2066        // Set to Online — should now be returned
2067        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2068            cfg.metadata.status = IndexStatus::Online;
2069        }
2070        let result = schema.vector_index_for_property("Document", "embedding");
2071        assert!(result.is_some());
2072        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2073    }
2074
2075    #[tokio::test]
2076    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2077        use crate::core::fork::SchemaDelta;
2078
2079        let dir = tempdir()?;
2080        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2081        let path = ObjectStorePath::from("schema.json");
2082        let primary = SchemaManager::load_from_store(store, &path).await?;
2083        primary.add_label("Person")?;
2084
2085        let overlay = primary.with_overlay(&SchemaDelta::empty());
2086        assert_eq!(overlay.schema().labels.len(), 1);
2087
2088        // Phase 1 invariant: mutating the overlay manager must not bleed
2089        // into primary's schema.
2090        overlay.add_label("Forked")?;
2091        assert!(overlay.schema().labels.contains_key("Forked"));
2092        assert!(!primary.schema().labels.contains_key("Forked"));
2093
2094        Ok(())
2095    }
2096
2097    #[tokio::test]
2098    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2099        use crate::core::fork::SchemaDelta;
2100        use chrono::Utc;
2101
2102        let dir = tempdir()?;
2103        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2104        let path = ObjectStorePath::from("schema.json");
2105        let primary = SchemaManager::load_from_store(store, &path).await?;
2106        primary.add_label("Existing")?;
2107
2108        let label_meta = LabelMeta {
2109            id: 99,
2110            created_at: Utc::now(),
2111            state: SchemaElementState::Active,
2112            description: None,
2113        };
2114        let edge_meta = EdgeTypeMeta {
2115            id: 99,
2116            src_labels: vec!["NewLabel".into()],
2117            dst_labels: vec!["NewLabel".into()],
2118            state: SchemaElementState::Active,
2119            description: None,
2120        };
2121        let delta = SchemaDelta {
2122            added_labels: vec![("NewLabel".to_string(), label_meta)],
2123            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2124            added_properties: vec![],
2125        };
2126
2127        let overlay = primary.with_overlay(&delta);
2128        let merged = overlay.schema();
2129        assert!(merged.labels.contains_key("Existing"));
2130        assert!(merged.labels.contains_key("NewLabel"));
2131        assert!(merged.edge_types.contains_key("NewEdge"));
2132
2133        // Primary unchanged.
2134        assert!(!primary.schema().labels.contains_key("NewLabel"));
2135        Ok(())
2136    }
2137}