Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82    GCounter,
83    GSet,
84    ORSet,
85    LWWRegister,
86    LWWMap,
87    Rga,
88    VectorClock,
89    VCRegister,
90}
91
92impl CrdtType {
93    /// Returns the canonical variant name for this CRDT type.
94    ///
95    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
96    /// so a written CRDT value can be validated against its schema-declared
97    /// variant (see uni-store's write-time CRDT enforcement).
98    ///
99    /// # Examples
100    /// ```
101    /// use uni_common::core::schema::CrdtType;
102    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
103    /// ```
104    #[must_use]
105    pub fn type_name(&self) -> &'static str {
106        match self {
107            CrdtType::GCounter => "GCounter",
108            CrdtType::GSet => "GSet",
109            CrdtType::ORSet => "ORSet",
110            CrdtType::LWWRegister => "LWWRegister",
111            CrdtType::LWWMap => "LWWMap",
112            CrdtType::Rga => "Rga",
113            CrdtType::VectorClock => "VectorClock",
114            CrdtType::VCRegister => "VCRegister",
115        }
116    }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121    Geographic,  // WGS84
122    Cartesian2D, // x, y
123    Cartesian3D, // x, y, z
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129    String,
130    Int32,
131    Int64,
132    Float32,
133    Float64,
134    Bool,
135    Timestamp,
136    Date,
137    Time,
138    DateTime,
139    Duration,
140    CypherValue,
141    Bytes,
142    Point(PointType),
143    Vector { dimensions: usize },
144    Btic,
145    Crdt(CrdtType),
146    List(Box<DataType>),
147    Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151    // Alias for compatibility/convenience if needed, but preferable to use exact types.
152    #[allow(non_upper_case_globals)]
153    pub const Float: DataType = DataType::Float64;
154    #[allow(non_upper_case_globals)]
155    pub const Int: DataType = DataType::Int64;
156
157    pub fn to_arrow(&self) -> ArrowDataType {
158        match self {
159            DataType::String => ArrowDataType::Utf8,
160            DataType::Int32 => ArrowDataType::Int32,
161            DataType::Int64 => ArrowDataType::Int64,
162            DataType::Float32 => ArrowDataType::Float32,
163            DataType::Float64 => ArrowDataType::Float64,
164            DataType::Bool => ArrowDataType::Boolean,
165            DataType::Timestamp => {
166                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167            }
168            DataType::Date => ArrowDataType::Date32,
169            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
172            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
173            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
174            DataType::Point(pt) => match pt {
175                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176                    Field::new("latitude", ArrowDataType::Float64, false),
177                    Field::new("longitude", ArrowDataType::Float64, false),
178                    Field::new("crs", ArrowDataType::Utf8, false),
179                ])),
180                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181                    Field::new("x", ArrowDataType::Float64, false),
182                    Field::new("y", ArrowDataType::Float64, false),
183                    Field::new("crs", ArrowDataType::Utf8, false),
184                ])),
185                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186                    Field::new("x", ArrowDataType::Float64, false),
187                    Field::new("y", ArrowDataType::Float64, false),
188                    Field::new("z", ArrowDataType::Float64, false),
189                    Field::new("crs", ArrowDataType::Utf8, false),
190                ])),
191            },
192            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194                *dimensions as i32,
195            ),
196            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
198            DataType::List(inner) => {
199                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200            }
201            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202                "item",
203                ArrowDataType::Struct(Fields::from(vec![
204                    Field::new("key", key.to_arrow(), false),
205                    Field::new("value", value.to_arrow(), true),
206                ])),
207                true,
208            ))),
209        }
210    }
211
212    /// Returns `true` if `value` is directly storable in this column type without loss.
213    ///
214    /// This is the schema-level type guard used by the write path. `Value::Null` is
215    /// always accepted — column nullability is enforced separately by the `nullable`
216    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
217    /// For every other declared type, only the `Value` variants that the storage layer
218    /// persists *without silently nulling* are accepted (see the per-type converters in
219    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
220    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
221    ///
222    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
223    /// intentionally *not* accepted here: the write path first coerces such strings into
224    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
225    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
226    /// predicate.
227    ///
228    /// # Examples
229    /// ```
230    /// use uni_common::core::schema::DataType;
231    /// use uni_common::Value;
232    ///
233    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
234    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
235    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
236    /// ```
237    pub fn accepts(&self, value: &crate::value::Value) -> bool {
238        use crate::value::{TemporalValue, Value};
239
240        // Null is universally accepted; nullability is a separate concern.
241        if matches!(value, Value::Null) {
242            return true;
243        }
244
245        match self {
246            // Opaque / dynamically-typed columns accept any value.
247            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249            DataType::String => matches!(value, Value::String(_)),
250            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251            // Int widens to Float losslessly for the ranges we care about.
252            DataType::Float32 | DataType::Float64 => {
253                matches!(value, Value::Int(_) | Value::Float(_))
254            }
255            DataType::Bool => matches!(value, Value::Bool(_)),
256
257            // Non-struct timestamp column: storage parses strings and accepts ints,
258            // so both are lossless here (unlike the DateTime struct column below).
259            DataType::Timestamp => matches!(
260                value,
261                Value::String(_)
262                    | Value::Int(_)
263                    | Value::Temporal(
264                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265                    )
266            ),
267            DataType::DateTime => matches!(
268                value,
269                Value::Temporal(
270                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271                )
272            ),
273            DataType::Date => {
274                matches!(
275                    value,
276                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277                )
278            }
279            DataType::Time => matches!(
280                value,
281                Value::Int(_)
282                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283            ),
284            DataType::Duration => {
285                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286            }
287            DataType::Bytes => matches!(value, Value::Bytes(_)),
288            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
289            DataType::Btic => matches!(
290                value,
291                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292            ),
293            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294            DataType::List(_) => matches!(value, Value::List(_)),
295            DataType::Map(_, _) => matches!(value, Value::Map(_)),
296        }
297    }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301    Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305    SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309    1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314    pub r#type: DataType,
315    pub nullable: bool,
316    #[serde(default = "default_version_1")]
317    pub added_in: u32, // SchemaVersion
318    #[serde(default = "default_state")]
319    pub state: SchemaElementState,
320    #[serde(default)]
321    pub generation_expression: Option<String>,
322    #[serde(default, skip_serializing_if = "Option::is_none")]
323    pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328    pub id: u16, // LabelId
329    #[serde(default = "default_created_at")]
330    pub created_at: DateTime<Utc>,
331    #[serde(default = "default_state")]
332    pub state: SchemaElementState,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
340    pub id: u32,
341    pub src_labels: Vec<String>,
342    pub dst_labels: Vec<String>,
343    #[serde(default = "default_state")]
344    pub state: SchemaElementState,
345    #[serde(default, skip_serializing_if = "Option::is_none")]
346    pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352    Unique { properties: Vec<String> },
353    Exists { property: String },
354    Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360    Label(String),
361    EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366    pub name: String,
367    pub constraint_type: ConstraintType,
368    pub target: ConstraintTarget,
369    pub enabled: bool,
370}
371
372/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
373///
374/// Edge types not defined in the schema are assigned IDs at runtime with
375/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
376/// the name-to-ID and ID-to-name mappings for those types.
377#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379    name_to_id: HashMap<String, u32>,
380    id_to_name: HashMap<u32, String>,
381    /// Next local ID to assign (0 is reserved for invalid).
382    next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386    pub fn new() -> Self {
387        Self {
388            name_to_id: HashMap::new(),
389            id_to_name: HashMap::new(),
390            next_local_id: 1,
391        }
392    }
393
394    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
395    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396        if let Some(&id) = self.name_to_id.get(type_name) {
397            return id;
398        }
399
400        let id = make_schemaless_id(self.next_local_id);
401        self.next_local_id += 1;
402
403        self.name_to_id.insert(type_name.to_string(), id);
404        self.id_to_name.insert(id, type_name.to_string());
405
406        id
407    }
408
409    /// Looks up the edge type name for a schemaless ID.
410    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411        self.id_to_name.get(&type_id).map(String::as_str)
412    }
413
414    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
415    pub fn contains(&self, type_name: &str) -> bool {
416        self.name_to_id.contains_key(type_name)
417    }
418
419    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
420    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
421        self.name_to_id.get(type_name).copied()
422    }
423
424    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
425    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
426        self.name_to_id
427            .iter()
428            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
429            .map(|(_, &id)| id)
430    }
431
432    /// Returns all registered schemaless type IDs.
433    pub fn all_type_ids(&self) -> Vec<u32> {
434        self.id_to_name.keys().copied().collect()
435    }
436
437    /// Returns true if the registry has any schemaless types.
438    pub fn is_empty(&self) -> bool {
439        self.name_to_id.is_empty()
440    }
441}
442
443impl Default for SchemalessEdgeTypeRegistry {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449/// First virtual (catalog-resolved) label ID. Label IDs in
450/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
451/// plugin-registered `CatalogProvider`s and allocated lazily by the
452/// planner via `PluginRegistry::register_virtual_label`. Native label
453/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
454pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
455/// Sentinel "no label" marker, kept distinct from any allocatable ID.
456pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
457
458/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
459#[inline]
460pub fn is_virtual_label_id(id: u16) -> bool {
461    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct Schema {
466    pub schema_version: u32,
467    pub labels: HashMap<String, LabelMeta>,
468    pub edge_types: HashMap<String, EdgeTypeMeta>,
469    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
470    #[serde(default)]
471    pub indexes: Vec<IndexDefinition>,
472    #[serde(default)]
473    pub constraints: Vec<Constraint>,
474    /// Registry for schemaless edge types (dynamically assigned IDs)
475    #[serde(default)]
476    pub schemaless_registry: SchemalessEdgeTypeRegistry,
477}
478
479impl Default for Schema {
480    fn default() -> Self {
481        Self {
482            schema_version: 1,
483            labels: HashMap::new(),
484            edge_types: HashMap::new(),
485            properties: HashMap::new(),
486            indexes: Vec::new(),
487            constraints: Vec::new(),
488            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
489        }
490    }
491}
492
493impl Schema {
494    /// Bumps `schema_version` to invalidate cached query plans.
495    ///
496    /// Called at the end of every DDL mutation that changes the schema's
497    /// shape (labels, edge types, properties, indexes, constraints). The
498    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
499    /// built against an older shape is discarded once this advances. Uses
500    /// wrapping arithmetic: the value is a coarse change token, not a count,
501    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
502    fn bump_version(&mut self) {
503        self.schema_version = self.schema_version.wrapping_add(1);
504    }
505
506    /// Returns the label name for a given label ID.
507    ///
508    /// Performs a linear scan over all labels. This is efficient because
509    /// the number of labels in a schema is typically small.
510    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
511        self.labels
512            .iter()
513            .find(|(_, meta)| meta.id == label_id)
514            .map(|(name, _)| name.as_str())
515    }
516
517    /// Returns the label ID for a given label name.
518    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
519        self.labels.get(label_name).map(|meta| meta.id)
520    }
521
522    /// Returns the edge type name for a given type ID.
523    ///
524    /// Performs a linear scan over all edge types. This is efficient because
525    /// the number of edge types in a schema is typically small.
526    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
527        self.edge_types
528            .iter()
529            .find(|(_, meta)| meta.id == type_id)
530            .map(|(name, _)| name.as_str())
531    }
532
533    /// Returns the edge type ID for a given type name.
534    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
535        self.edge_types.get(type_name).map(|meta| meta.id)
536    }
537
538    /// Returns the vector index configuration for a given label and property.
539    ///
540    /// Performs a linear scan over all indexes. This is efficient because
541    /// the number of indexes in a schema is typically small.
542    pub fn vector_index_for_property(
543        &self,
544        label: &str,
545        property: &str,
546    ) -> Option<&VectorIndexConfig> {
547        self.indexes.iter().find_map(|idx| {
548            if let IndexDefinition::Vector(config) = idx
549                && config.label == label
550                && config.property == property
551                && config.metadata.status == IndexStatus::Online
552            {
553                return Some(config);
554            }
555            None
556        })
557    }
558
559    /// Returns the full-text index configuration for a given label and property.
560    ///
561    /// A full-text index covers one or more properties. This returns the config
562    /// if the specified property is among the indexed properties.
563    pub fn fulltext_index_for_property(
564        &self,
565        label: &str,
566        property: &str,
567    ) -> Option<&FullTextIndexConfig> {
568        self.indexes.iter().find_map(|idx| {
569            if let IndexDefinition::FullText(config) = idx
570                && config.label == label
571                && config.properties.iter().any(|p| p == property)
572                && config.metadata.status == IndexStatus::Online
573            {
574                return Some(config);
575            }
576            None
577        })
578    }
579
580    /// Get label metadata with case-insensitive lookup.
581    ///
582    /// This allows queries to match labels regardless of case, providing
583    /// better user experience when label names vary in casing.
584    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
585        self.labels
586            .iter()
587            .find(|(k, _)| k.eq_ignore_ascii_case(name))
588            .map(|(_, v)| v)
589    }
590
591    /// Get the schema-canonical spelling of a label, matched case-insensitively.
592    ///
593    /// Returns the stored label name whose spelling differs only in case from
594    /// `name`, or `None` if no such label is registered. Callers use this to
595    /// normalize a user-supplied label to the canonical form the storage tier
596    /// keys on, so case variants resolve to the same vertex table.
597    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
598        self.labels
599            .iter()
600            .find(|(k, _)| k.eq_ignore_ascii_case(name))
601            .map(|(k, _)| k.clone())
602    }
603
604    /// Get label ID with case-insensitive lookup.
605    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
606        self.get_label_case_insensitive(label_name)
607            .map(|meta| meta.id)
608    }
609
610    /// Get edge type metadata with case-insensitive lookup.
611    ///
612    /// This allows queries to match edge types regardless of case, providing
613    /// better user experience when type names vary in casing.
614    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
615        self.edge_types
616            .iter()
617            .find(|(k, _)| k.eq_ignore_ascii_case(name))
618            .map(|(_, v)| v)
619    }
620
621    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
622    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
623        self.get_edge_type_case_insensitive(type_name)
624            .map(|meta| meta.id)
625    }
626
627    /// Get edge type ID with case-insensitive lookup, checking both
628    /// schema-defined and schemaless registries.
629    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
630        self.edge_type_id_by_name_case_insensitive(type_name)
631            .or_else(|| {
632                self.schemaless_registry
633                    .id_by_name_case_insensitive(type_name)
634            })
635    }
636
637    /// Returns the edge type ID for `type_name`, checking the schema first
638    /// and falling back to the schemaless registry (assigning a new ID if needed).
639    ///
640    /// Requires `&mut self` because it may assign a new schemaless ID.
641    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
642    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
643        if let Some(id) = self.edge_type_id_unified(type_name) {
644            return id;
645        }
646        self.schemaless_registry.get_or_assign_id(type_name)
647    }
648
649    /// Read-only unified exact lookup: schema-defined edge type id, falling
650    /// back to an already-assigned schemaless id.
651    ///
652    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
653    /// performs before assigning, so a `Some` here means the assigning path
654    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
655    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
656        self.edge_type_id_by_name(type_name)
657            .or_else(|| self.schemaless_registry.id_by_name(type_name))
658    }
659
660    /// Returns the edge type name for `type_id`, checking both the schema
661    /// and schemaless registries. Returns an owned `String` because the
662    /// name may come from either registry.
663    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
664        if is_schemaless_edge_type(type_id) {
665            self.schemaless_registry
666                .type_name_by_id(type_id)
667                .map(str::to_owned)
668        } else {
669            self.edge_type_name_by_id(type_id).map(str::to_owned)
670        }
671    }
672
673    /// Returns all edge type IDs, including both schema-defined and schemaless types.
674    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
675    pub fn all_edge_type_ids(&self) -> Vec<u32> {
676        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
677        ids.extend(self.schemaless_registry.all_type_ids());
678        ids.sort_unstable();
679        ids
680    }
681}
682
683/// Lifecycle status of an index.
684#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
685pub enum IndexStatus {
686    /// Index is up-to-date and available for queries.
687    #[default]
688    Online,
689    /// Index is currently being rebuilt.
690    Building,
691    /// Index is outdated and scheduled for rebuild.
692    Stale,
693    /// Index rebuild failed after exhausting retries.
694    Failed,
695}
696
697/// Metadata tracking the lifecycle state of an index.
698#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
699pub struct IndexMetadata {
700    /// Current lifecycle status.
701    #[serde(default)]
702    pub status: IndexStatus,
703    /// When the index was last successfully built.
704    #[serde(default)]
705    pub last_built_at: Option<DateTime<Utc>>,
706    /// Row count of the dataset when the index was last built.
707    #[serde(default)]
708    pub row_count_at_build: Option<u64>,
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
712#[serde(tag = "type")]
713#[non_exhaustive]
714pub enum IndexDefinition {
715    Vector(VectorIndexConfig),
716    FullText(FullTextIndexConfig),
717    Scalar(ScalarIndexConfig),
718    Inverted(InvertedIndexConfig),
719    JsonFullText(JsonFtsIndexConfig),
720}
721
722impl IndexDefinition {
723    /// Returns the index name for any variant.
724    pub fn name(&self) -> &str {
725        match self {
726            IndexDefinition::Vector(c) => &c.name,
727            IndexDefinition::FullText(c) => &c.name,
728            IndexDefinition::Scalar(c) => &c.name,
729            IndexDefinition::Inverted(c) => &c.name,
730            IndexDefinition::JsonFullText(c) => &c.name,
731        }
732    }
733
734    /// Returns the label this index is defined on.
735    pub fn label(&self) -> &str {
736        match self {
737            IndexDefinition::Vector(c) => &c.label,
738            IndexDefinition::FullText(c) => &c.label,
739            IndexDefinition::Scalar(c) => &c.label,
740            IndexDefinition::Inverted(c) => &c.label,
741            IndexDefinition::JsonFullText(c) => &c.label,
742        }
743    }
744
745    /// Returns a reference to the index lifecycle metadata.
746    pub fn metadata(&self) -> &IndexMetadata {
747        match self {
748            IndexDefinition::Vector(c) => &c.metadata,
749            IndexDefinition::FullText(c) => &c.metadata,
750            IndexDefinition::Scalar(c) => &c.metadata,
751            IndexDefinition::Inverted(c) => &c.metadata,
752            IndexDefinition::JsonFullText(c) => &c.metadata,
753        }
754    }
755
756    /// Returns a mutable reference to the index lifecycle metadata.
757    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
758        match self {
759            IndexDefinition::Vector(c) => &mut c.metadata,
760            IndexDefinition::FullText(c) => &mut c.metadata,
761            IndexDefinition::Scalar(c) => &mut c.metadata,
762            IndexDefinition::Inverted(c) => &mut c.metadata,
763            IndexDefinition::JsonFullText(c) => &mut c.metadata,
764        }
765    }
766}
767
768#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
769pub struct InvertedIndexConfig {
770    pub name: String,
771    pub label: String,
772    pub property: String,
773    #[serde(default = "default_normalize")]
774    pub normalize: bool,
775    #[serde(default = "default_max_terms_per_doc")]
776    pub max_terms_per_doc: usize,
777    #[serde(default)]
778    pub metadata: IndexMetadata,
779}
780
781fn default_normalize() -> bool {
782    true
783}
784
785fn default_max_terms_per_doc() -> usize {
786    10_000
787}
788
789#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
790pub struct VectorIndexConfig {
791    pub name: String,
792    pub label: String,
793    pub property: String,
794    pub index_type: VectorIndexType,
795    pub metric: DistanceMetric,
796    pub embedding_config: Option<EmbeddingConfig>,
797    #[serde(default)]
798    pub metadata: IndexMetadata,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
802pub struct EmbeddingConfig {
803    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
804    pub alias: String,
805    pub source_properties: Vec<String>,
806    pub batch_size: usize,
807    /// Prefix prepended to text before embedding during auto-embed (document side).
808    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
809    #[serde(default)]
810    pub document_prefix: Option<String>,
811    /// Prefix prepended to text before embedding during query-time embed calls.
812    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
813    #[serde(default)]
814    pub query_prefix: Option<String>,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
818#[non_exhaustive]
819pub enum VectorIndexType {
820    Flat,
821    IvfFlat {
822        num_partitions: u32,
823    },
824    IvfPq {
825        num_partitions: u32,
826        num_sub_vectors: u32,
827        bits_per_subvector: u8,
828    },
829    IvfSq {
830        num_partitions: u32,
831    },
832    IvfRq {
833        num_partitions: u32,
834        #[serde(default)]
835        num_bits: Option<u8>,
836    },
837    HnswFlat {
838        m: u32,
839        ef_construction: u32,
840        #[serde(default)]
841        num_partitions: Option<u32>,
842    },
843    HnswSq {
844        m: u32,
845        ef_construction: u32,
846        #[serde(default)]
847        num_partitions: Option<u32>,
848    },
849    HnswPq {
850        m: u32,
851        ef_construction: u32,
852        num_sub_vectors: u32,
853        #[serde(default)]
854        num_partitions: Option<u32>,
855    },
856}
857
858#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
859#[non_exhaustive]
860pub enum DistanceMetric {
861    Cosine,
862    L2,
863    Dot,
864}
865
866impl DistanceMetric {
867    /// Computes the distance between two vectors using this metric.
868    ///
869    /// All metrics follow LanceDB conventions so that lower values indicate
870    /// greater similarity:
871    /// - **L2**: squared Euclidean distance.
872    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
873    /// - **Dot**: negative dot product.
874    ///
875    /// # Panics
876    ///
877    /// Panics if `a` and `b` have different lengths.
878    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
879        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
880        match self {
881            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
882            DistanceMetric::Cosine => {
883                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
884                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
885                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
886                let denom = norm_a * norm_b;
887                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
888            }
889            DistanceMetric::Dot => {
890                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
891                -dot
892            }
893        }
894    }
895}
896
897#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
898pub struct FullTextIndexConfig {
899    pub name: String,
900    pub label: String,
901    pub properties: Vec<String>,
902    pub tokenizer: TokenizerConfig,
903    pub with_positions: bool,
904    #[serde(default)]
905    pub metadata: IndexMetadata,
906}
907
908#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
909#[non_exhaustive]
910pub enum TokenizerConfig {
911    Standard,
912    Whitespace,
913    Ngram { min: u8, max: u8 },
914    Custom { name: String },
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
918pub struct JsonFtsIndexConfig {
919    pub name: String,
920    pub label: String,
921    pub column: String,
922    #[serde(default)]
923    pub paths: Vec<String>,
924    #[serde(default)]
925    pub with_positions: bool,
926    #[serde(default)]
927    pub metadata: IndexMetadata,
928}
929
930#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
931pub struct ScalarIndexConfig {
932    pub name: String,
933    pub label: String,
934    pub properties: Vec<String>,
935    pub index_type: ScalarIndexType,
936    pub where_clause: Option<String>,
937    #[serde(default)]
938    pub metadata: IndexMetadata,
939}
940
941#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
942#[non_exhaustive]
943pub enum ScalarIndexType {
944    BTree,
945    Hash,
946    Bitmap,
947    LabelList,
948}
949
950pub struct SchemaManager {
951    store: Arc<dyn ObjectStore>,
952    path: ObjectStorePath,
953    schema: RwLock<Arc<Schema>>,
954}
955
956impl SchemaManager {
957    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
958        let path = path.as_ref();
959        let parent = path
960            .parent()
961            .ok_or_else(|| anyhow!("Invalid schema path"))?;
962        let filename = path
963            .file_name()
964            .ok_or_else(|| anyhow!("Invalid schema filename"))?
965            .to_str()
966            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
967
968        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
969        let obj_path = ObjectStorePath::from(filename);
970
971        Self::load_from_store(store, &obj_path).await
972    }
973
974    pub async fn load_from_store(
975        store: Arc<dyn ObjectStore>,
976        path: &ObjectStorePath,
977    ) -> Result<Self> {
978        match store.get(path).await {
979            Ok(result) => {
980                let bytes = result.bytes().await?;
981                let content = String::from_utf8(bytes.to_vec())?;
982                let mut schema: Schema = serde_json::from_str(&content)?;
983                // Self-heal catalogs that grew super-linearly under the
984                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
985                // duplicate index entries by name, keeping the *last*
986                // occurrence — matches the upsert semantics in `add_index`
987                // and preserves whatever metadata the most recent rebuild
988                // wrote. The dedup persists on the next mutation that
989                // calls `save()`.
990                let original_len = schema.indexes.len();
991                if original_len > 0 {
992                    let mut seen: std::collections::HashSet<String> =
993                        std::collections::HashSet::with_capacity(original_len);
994                    let mut dedup: Vec<IndexDefinition> = schema
995                        .indexes
996                        .iter()
997                        .rev()
998                        .filter(|idx| seen.insert(idx.name().to_string()))
999                        .cloned()
1000                        .collect();
1001                    dedup.reverse();
1002                    if dedup.len() != original_len {
1003                        tracing::warn!(
1004                            collapsed = original_len - dedup.len(),
1005                            kept = dedup.len(),
1006                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1007                        );
1008                        schema.indexes = dedup;
1009                    }
1010                }
1011                Ok(Self {
1012                    store,
1013                    path: path.clone(),
1014                    schema: RwLock::new(Arc::new(schema)),
1015                })
1016            }
1017            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1018                store,
1019                path: path.clone(),
1020                schema: RwLock::new(Arc::new(Schema::default())),
1021            }),
1022            Err(e) => Err(anyhow::Error::from(e)),
1023        }
1024    }
1025
1026    pub async fn save(&self) -> Result<()> {
1027        let content = {
1028            let schema_guard = acquire_read(&self.schema, "schema")?;
1029            serde_json::to_string_pretty(&**schema_guard)?
1030        };
1031        self.store
1032            .put(&self.path, content.into())
1033            .await
1034            .map_err(anyhow::Error::from)?;
1035        Ok(())
1036    }
1037
1038    pub fn path(&self) -> &ObjectStorePath {
1039        &self.path
1040    }
1041
1042    pub fn schema(&self) -> Arc<Schema> {
1043        self.schema
1044            .read()
1045            .expect("Schema lock poisoned - a thread panicked while holding it")
1046            .clone()
1047    }
1048
1049    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1050    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1051    fn normalize_function_names(expr: &str) -> String {
1052        let mut result = String::with_capacity(expr.len());
1053        let mut chars = expr.chars().peekable();
1054
1055        while let Some(ch) = chars.next() {
1056            if ch.is_alphabetic() {
1057                // Collect identifier
1058                let mut ident = String::new();
1059                ident.push(ch);
1060
1061                while let Some(&next) = chars.peek() {
1062                    if next.is_alphanumeric() || next == '_' {
1063                        ident.push(chars.next().unwrap());
1064                    } else {
1065                        break;
1066                    }
1067                }
1068
1069                // If followed by '(', it's a function call - uppercase it
1070                if chars.peek() == Some(&'(') {
1071                    result.push_str(&ident.to_uppercase());
1072                } else {
1073                    result.push_str(&ident); // Keep property names as-is
1074                }
1075            } else {
1076                result.push(ch);
1077            }
1078        }
1079
1080        result
1081    }
1082
1083    /// Generate a consistent internal column name for an expression index.
1084    /// Uses a hash suffix to ensure uniqueness for different expressions that
1085    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1086    ///
1087    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1088    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1089    /// if the hash changes after a compiler upgrade.
1090    pub fn generated_column_name(expr: &str) -> String {
1091        // Normalize function names to uppercase for case-insensitive matching
1092        let normalized = Self::normalize_function_names(expr);
1093
1094        let sanitized = normalized
1095            .replace(|c: char| !c.is_alphanumeric(), "_")
1096            .trim_matches('_')
1097            .to_string();
1098
1099        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1100        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1101        const FNV_PRIME: u64 = 1099511628211;
1102
1103        let mut hash = FNV_OFFSET_BASIS;
1104        for byte in normalized.as_bytes() {
1105            hash ^= *byte as u64;
1106            hash = hash.wrapping_mul(FNV_PRIME);
1107        }
1108
1109        format!("_gen_{}_{:x}", sanitized, hash)
1110    }
1111
1112    pub fn replace_schema(&self, new_schema: Schema) {
1113        let mut schema = self
1114            .schema
1115            .write()
1116            .expect("Schema lock poisoned - a thread panicked while holding it");
1117        *schema = Arc::new(new_schema);
1118    }
1119
1120    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1121    ///
1122    /// Used by `UniInner::at_fork` to give a forked session a schema view
1123    /// that includes any labels/edge-types/properties the fork has
1124    /// introduced on top of primary. The returned manager owns its own
1125    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1126    /// schema file. The returned manager is *not* intended for `.save()`;
1127    /// fork-overlay persistence is owned by the registry layer
1128    /// (`catalog/fork_schemas/{fork_id}.json`).
1129    ///
1130    /// In Phase 1 the delta is always empty, so the merge is a clone.
1131    /// Phase 2 starts populating it when on-the-fly label creation lands.
1132    #[must_use]
1133    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1134        let primary = self.schema();
1135        let merged = if overlay.is_empty() {
1136            (*primary).clone()
1137        } else {
1138            let mut merged = (*primary).clone();
1139            for (name, label) in &overlay.added_labels {
1140                merged.labels.insert(name.clone(), label.clone());
1141            }
1142            for (name, edge_type) in &overlay.added_edge_types {
1143                merged.edge_types.insert(name.clone(), edge_type.clone());
1144            }
1145            for addition in &overlay.added_properties {
1146                let props = merged.properties.entry(addition.owner.clone()).or_default();
1147                props.insert(
1148                    addition.property.clone(),
1149                    PropertyMeta {
1150                        r#type: addition.data_type.clone(),
1151                        nullable: addition.nullable,
1152                        added_in: merged.schema_version,
1153                        state: SchemaElementState::Active,
1154                        generation_expression: None,
1155                        description: None,
1156                    },
1157                );
1158            }
1159            merged
1160        };
1161
1162        Arc::new(Self {
1163            store: self.store.clone(),
1164            path: self.path.clone(),
1165            schema: RwLock::new(Arc::new(merged)),
1166        })
1167    }
1168
1169    pub fn next_label_id(&self) -> u16 {
1170        self.schema()
1171            .labels
1172            .values()
1173            .map(|l| l.id)
1174            .max()
1175            .unwrap_or(0)
1176            + 1
1177    }
1178
1179    pub fn next_type_id(&self) -> u32 {
1180        let max_schema_id = self
1181            .schema()
1182            .edge_types
1183            .values()
1184            .map(|t| t.id)
1185            .max()
1186            .unwrap_or(0);
1187
1188        // Ensure we stay in schema'd ID space (bit 31 = 0)
1189        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1190            panic!("Schema edge type ID exhaustion");
1191        }
1192
1193        max_schema_id + 1
1194    }
1195
1196    pub fn add_label(&self, name: &str) -> Result<u16> {
1197        self.add_label_with_desc(name, None)
1198    }
1199
1200    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1201        let mut guard = acquire_write(&self.schema, "schema")?;
1202        let schema = Arc::make_mut(&mut *guard);
1203        if schema.labels.contains_key(name) {
1204            return Err(anyhow!("Label '{}' already exists", name));
1205        }
1206
1207        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1208        if id >= VIRTUAL_LABEL_ID_START {
1209            return Err(anyhow!(
1210                "Native label space exhausted (next id {id:#x} would enter the \
1211                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1212                 reserved for catalog-resolved labels)"
1213            ));
1214        }
1215        schema.labels.insert(
1216            name.to_string(),
1217            LabelMeta {
1218                id,
1219                created_at: Utc::now(),
1220                state: SchemaElementState::Active,
1221                description,
1222            },
1223        );
1224        schema.bump_version();
1225        Ok(id)
1226    }
1227
1228    pub fn add_edge_type(
1229        &self,
1230        name: &str,
1231        src_labels: Vec<String>,
1232        dst_labels: Vec<String>,
1233    ) -> Result<u32> {
1234        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1235    }
1236
1237    pub fn add_edge_type_with_desc(
1238        &self,
1239        name: &str,
1240        src_labels: Vec<String>,
1241        dst_labels: Vec<String>,
1242        description: Option<String>,
1243    ) -> Result<u32> {
1244        let mut guard = acquire_write(&self.schema, "schema")?;
1245        let schema = Arc::make_mut(&mut *guard);
1246        if schema.edge_types.contains_key(name) {
1247            return Err(anyhow!("Edge type '{}' already exists", name));
1248        }
1249
1250        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1251
1252        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1253        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1254        // `add_edge_type`, so the two entry points cannot disagree on the
1255        // legal ceiling.
1256        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1257            return Err(anyhow!(
1258                "Native edge type space exhausted (next id {id:#x} would enter the \
1259                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1260                 reserved for catalog-resolved edge types)"
1261            ));
1262        }
1263
1264        schema.edge_types.insert(
1265            name.to_string(),
1266            EdgeTypeMeta {
1267                id,
1268                src_labels,
1269                dst_labels,
1270                state: SchemaElementState::Active,
1271                description,
1272            },
1273        );
1274        schema.bump_version();
1275        Ok(id)
1276    }
1277
1278    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1279    ///
1280    /// Read-lock fast path: the type name is almost always already known
1281    /// (it is constant per statement but resolved per row by the CREATE
1282    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1283    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1284    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1285    /// re-checks under the write lock, so two racing assigners converge on one id.
1286    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1287        {
1288            let guard = acquire_read(&self.schema, "schema")
1289                .expect("Schema lock poisoned - a thread panicked while holding it");
1290            if let Some(id) = guard.edge_type_id_unified(type_name) {
1291                return id;
1292            }
1293        }
1294        let mut guard = acquire_write(&self.schema, "schema")
1295            .expect("Schema lock poisoned - a thread panicked while holding it");
1296        let schema = Arc::make_mut(&mut *guard);
1297        schema.get_or_assign_edge_type_id(type_name)
1298    }
1299
1300    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1301    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1302        let schema = acquire_read(&self.schema, "schema")
1303            .expect("Schema lock poisoned - a thread panicked while holding it");
1304        schema.edge_type_name_by_id_unified(type_id)
1305    }
1306
1307    pub fn add_property(
1308        &self,
1309        label_or_type: &str,
1310        prop_name: &str,
1311        data_type: DataType,
1312        nullable: bool,
1313    ) -> Result<()> {
1314        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1315    }
1316
1317    pub fn add_property_with_desc(
1318        &self,
1319        label_or_type: &str,
1320        prop_name: &str,
1321        data_type: DataType,
1322        nullable: bool,
1323        description: Option<String>,
1324    ) -> Result<()> {
1325        validate_property_name(prop_name)?;
1326        let mut guard = acquire_write(&self.schema, "schema")?;
1327        let schema = Arc::make_mut(&mut *guard);
1328        let version = schema.schema_version;
1329        let props = schema
1330            .properties
1331            .entry(label_or_type.to_string())
1332            .or_default();
1333
1334        if props.contains_key(prop_name) {
1335            return Err(anyhow!(
1336                "Property '{}' already exists for '{}'",
1337                prop_name,
1338                label_or_type
1339            ));
1340        }
1341
1342        props.insert(
1343            prop_name.to_string(),
1344            PropertyMeta {
1345                r#type: data_type,
1346                nullable,
1347                added_in: version,
1348                state: SchemaElementState::Active,
1349                generation_expression: None,
1350                description,
1351            },
1352        );
1353        // Bump after stamping `added_in` with the pre-bump `version`.
1354        schema.bump_version();
1355        Ok(())
1356    }
1357
1358    pub fn add_generated_property(
1359        &self,
1360        label_or_type: &str,
1361        prop_name: &str,
1362        data_type: DataType,
1363        expr: String,
1364    ) -> Result<()> {
1365        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1366        // but must still avoid storage-layer column-name collisions.
1367        validate_reserved_property_name(prop_name)?;
1368        let mut guard = acquire_write(&self.schema, "schema")?;
1369        let schema = Arc::make_mut(&mut *guard);
1370        let version = schema.schema_version;
1371        let props = schema
1372            .properties
1373            .entry(label_or_type.to_string())
1374            .or_default();
1375
1376        if props.contains_key(prop_name) {
1377            return Err(anyhow!("Property '{}' already exists", prop_name));
1378        }
1379
1380        props.insert(
1381            prop_name.to_string(),
1382            PropertyMeta {
1383                r#type: data_type,
1384                nullable: true,
1385                added_in: version,
1386                state: SchemaElementState::Active,
1387                generation_expression: Some(expr),
1388                description: None,
1389            },
1390        );
1391        // Bump after stamping `added_in` with the pre-bump `version`.
1392        schema.bump_version();
1393        Ok(())
1394    }
1395
1396    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1397        let mut guard = acquire_write(&self.schema, "schema")?;
1398        let schema = Arc::make_mut(&mut *guard);
1399        let meta = schema
1400            .labels
1401            .get_mut(name)
1402            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1403        meta.description = description;
1404        Ok(())
1405    }
1406
1407    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1408        let mut guard = acquire_write(&self.schema, "schema")?;
1409        let schema = Arc::make_mut(&mut *guard);
1410        let meta = schema
1411            .edge_types
1412            .get_mut(name)
1413            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1414        meta.description = description;
1415        Ok(())
1416    }
1417
1418    pub fn set_property_description(
1419        &self,
1420        entity: &str,
1421        prop_name: &str,
1422        description: Option<String>,
1423    ) -> Result<()> {
1424        let mut guard = acquire_write(&self.schema, "schema")?;
1425        let schema = Arc::make_mut(&mut *guard);
1426        let props = schema
1427            .properties
1428            .get_mut(entity)
1429            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1430        let meta = props
1431            .get_mut(prop_name)
1432            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1433        meta.description = description;
1434        Ok(())
1435    }
1436
1437    /// Register an index definition on the schema, **upsert by name**.
1438    ///
1439    /// If an index with the same `IndexDefinition::name()` already exists, it
1440    /// is replaced in place; otherwise the def is appended. Idempotent under
1441    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1442    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1443    /// every `IndexManager::create_*_index` re-record metadata updates without
1444    /// duplicating entries (issue rustic-ai/uni-db#63).
1445    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1446        let mut guard = acquire_write(&self.schema, "schema")?;
1447        let schema = Arc::make_mut(&mut *guard);
1448        if let Some(existing) = schema
1449            .indexes
1450            .iter_mut()
1451            .find(|i| i.name() == index_def.name())
1452        {
1453            *existing = index_def;
1454        } else {
1455            schema.indexes.push(index_def);
1456        }
1457        schema.bump_version();
1458        Ok(())
1459    }
1460
1461    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1462        let schema = self.schema.read().expect("Schema lock poisoned");
1463        schema.indexes.iter().find(|i| i.name() == name).cloned()
1464    }
1465
1466    /// Updates the lifecycle metadata for an index by name.
1467    ///
1468    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1469    /// allowing callers to update status, timestamps, etc.
1470    pub fn update_index_metadata(
1471        &self,
1472        index_name: &str,
1473        f: impl FnOnce(&mut IndexMetadata),
1474    ) -> Result<()> {
1475        let mut guard = acquire_write(&self.schema, "schema")?;
1476        let schema = Arc::make_mut(&mut *guard);
1477        let idx = schema
1478            .indexes
1479            .iter_mut()
1480            .find(|i| i.name() == index_name)
1481            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1482        f(idx.metadata_mut());
1483        Ok(())
1484    }
1485
1486    pub fn remove_index(&self, name: &str) -> Result<()> {
1487        let mut guard = acquire_write(&self.schema, "schema")?;
1488        let schema = Arc::make_mut(&mut *guard);
1489        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1490            schema.indexes.remove(pos);
1491            schema.bump_version();
1492            Ok(())
1493        } else {
1494            Err(anyhow!("Index '{}' not found", name))
1495        }
1496    }
1497
1498    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1499        let mut guard = acquire_write(&self.schema, "schema")?;
1500        let schema = Arc::make_mut(&mut *guard);
1501        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1502            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1503        }
1504        schema.constraints.push(constraint);
1505        schema.bump_version();
1506        Ok(())
1507    }
1508
1509    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1510        let mut guard = acquire_write(&self.schema, "schema")?;
1511        let schema = Arc::make_mut(&mut *guard);
1512        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1513            schema.constraints.remove(pos);
1514            schema.bump_version();
1515            Ok(())
1516        } else if if_exists {
1517            Ok(())
1518        } else {
1519            Err(anyhow!("Constraint '{}' not found", name))
1520        }
1521    }
1522
1523    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1524        let mut guard = acquire_write(&self.schema, "schema")?;
1525        let schema = Arc::make_mut(&mut *guard);
1526        let Some(props) = schema.properties.get_mut(label_or_type) else {
1527            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1528        };
1529        if props.remove(prop_name).is_none() {
1530            return Err(anyhow!(
1531                "Property '{}' not found for '{}'",
1532                prop_name,
1533                label_or_type
1534            ));
1535        }
1536        schema.bump_version();
1537        Ok(())
1538    }
1539
1540    pub fn rename_property(
1541        &self,
1542        label_or_type: &str,
1543        old_name: &str,
1544        new_name: &str,
1545    ) -> Result<()> {
1546        let mut guard = acquire_write(&self.schema, "schema")?;
1547        let schema = Arc::make_mut(&mut *guard);
1548        let Some(props) = schema.properties.get_mut(label_or_type) else {
1549            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1550        };
1551        let Some(meta) = props.remove(old_name) else {
1552            return Err(anyhow!(
1553                "Property '{}' not found for '{}'",
1554                old_name,
1555                label_or_type
1556            ));
1557        };
1558        if props.contains_key(new_name) {
1559            // Rollback removal? Or just error.
1560            props.insert(old_name.to_string(), meta); // Restore
1561            return Err(anyhow!("Property '{}' already exists", new_name));
1562        }
1563        props.insert(new_name.to_string(), meta);
1564        schema.bump_version();
1565        Ok(())
1566    }
1567
1568    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1569        let mut guard = acquire_write(&self.schema, "schema")?;
1570        let schema = Arc::make_mut(&mut *guard);
1571        if let Some(label_meta) = schema.labels.get_mut(name) {
1572            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1573            // Do not remove properties; they are implicitly tombstoned by the label
1574            schema.bump_version();
1575            Ok(())
1576        } else if if_exists {
1577            Ok(())
1578        } else {
1579            Err(anyhow!("Label '{}' not found", name))
1580        }
1581    }
1582
1583    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1584        let mut guard = acquire_write(&self.schema, "schema")?;
1585        let schema = Arc::make_mut(&mut *guard);
1586        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1587            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1588            // Do not remove properties; they are implicitly tombstoned by the edge type
1589            schema.bump_version();
1590            Ok(())
1591        } else if if_exists {
1592            Ok(())
1593        } else {
1594            Err(anyhow!("Edge Type '{}' not found", name))
1595        }
1596    }
1597}
1598
1599/// Validate identifier names to prevent injection and ensure compatibility.
1600pub fn validate_identifier(name: &str) -> Result<()> {
1601    // Length check
1602    if name.is_empty() || name.len() > 64 {
1603        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1604    }
1605
1606    // First character must be letter or underscore
1607    let first = name.chars().next().unwrap();
1608    if !first.is_alphabetic() && first != '_' {
1609        return Err(anyhow!(
1610            "Identifier '{}' must start with letter or underscore",
1611            name
1612        ));
1613    }
1614
1615    // Remaining characters: alphanumeric or underscore
1616    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1617        return Err(anyhow!(
1618            "Identifier '{}' must contain only alphanumeric and underscore",
1619            name
1620        ));
1621    }
1622
1623    // Reserved words
1624    const RESERVED: &[&str] = &[
1625        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1626        "UNION", "ORDER", "LIMIT",
1627    ];
1628    if RESERVED.contains(&name.to_uppercase().as_str()) {
1629        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1630    }
1631
1632    Ok(())
1633}
1634
1635/// Reject user-declared property names that collide with internal Arrow column
1636/// names used by the storage layer.
1637///
1638/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1639/// schema with two `ext_id` fields at flush time, which Lance rejects with
1640/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1641pub fn validate_property_name(name: &str) -> Result<()> {
1642    if name.starts_with('_') {
1643        return Err(anyhow!(
1644            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1645            name
1646        ));
1647    }
1648    validate_reserved_property_name(name)
1649}
1650
1651/// Reject names that collide with storage-layer Arrow column names.
1652///
1653/// Used both by `validate_property_name` (user-facing path) and directly by
1654/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1655/// needs to bypass the underscore-prefix rule but must still reject the
1656/// fixed-name collisions below.
1657fn validate_reserved_property_name(name: &str) -> Result<()> {
1658    // Unprefixed names that get appended alongside user properties in the
1659    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1660    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1661    // Arrow schemas — declaring one of these as a user property produces a
1662    // duplicate Arrow field and a Lance "Duplicate field name" error at
1663    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1664    // `labels` in the main tables) are NOT listed: those tables don't
1665    // append user properties, so no collision can occur.
1666    const RESERVED_PROPS: &[&str] = &[
1667        "ext_id",
1668        "overflow_json",
1669        "eid",
1670        "src_vid",
1671        "dst_vid",
1672        "op",
1673        // Internal planner sentinel: a column-name marker used by
1674        // `mark_set_item_variables` (uni-query::query::planner) to request
1675        // narrow structural projection without full-schema expansion.
1676        // Reserved here defensively so an internal `add_generated_property`
1677        // path can't accidentally create a colliding user-facing column.
1678        // The user-facing `validate_property_name` already rejects this
1679        // via the underscore-prefix rule, so this is belt-and-suspenders.
1680        "__set_struct__",
1681    ];
1682    if RESERVED_PROPS.contains(&name) {
1683        return Err(anyhow!(
1684            "Property name '{}' is reserved by the storage layer; please choose a different name",
1685            name
1686        ));
1687    }
1688    Ok(())
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693    use super::*;
1694    use crate::value::{TemporalValue, Value};
1695    use object_store::local::LocalFileSystem;
1696    use tempfile::tempdir;
1697
1698    #[test]
1699    fn test_datatype_accepts_matrix() {
1700        let dt = || TemporalValue::DateTime {
1701            nanos_since_epoch: 0,
1702            offset_seconds: 0,
1703            timezone_name: None,
1704        };
1705
1706        // Null is accepted by every type (nullability checked separately).
1707        for ty in [
1708            DataType::String,
1709            DataType::Int64,
1710            DataType::Bool,
1711            DataType::DateTime,
1712            DataType::Float64,
1713        ] {
1714            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1715        }
1716
1717        // Exact-type matches.
1718        assert!(DataType::String.accepts(&Value::String("x".into())));
1719        assert!(DataType::Int64.accepts(&Value::Int(1)));
1720        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1721        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1722
1723        // Intentional lossless widenings remain allowed.
1724        assert!(
1725            DataType::Float64.accepts(&Value::Int(3)),
1726            "Int widens to Float"
1727        );
1728        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1729        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1730        assert!(
1731            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1732            "storage parses strings for non-struct Timestamp columns"
1733        );
1734
1735        // The #68 data-loss cases must be rejected (coercion handles strings separately).
1736        assert!(
1737            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1738            "String into a DateTime struct column nulls silently — reject here"
1739        );
1740        assert!(!DataType::Bool.accepts(&Value::Int(1)));
1741        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1742        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1743        assert!(
1744            !DataType::String.accepts(&Value::Int(10)),
1745            "no implicit stringification"
1746        );
1747        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1748
1749        // Opaque columns accept anything.
1750        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1751    }
1752
1753    #[tokio::test]
1754    async fn test_schema_management() -> Result<()> {
1755        let dir = tempdir()?;
1756        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1757        let path = ObjectStorePath::from("schema.json");
1758        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1759
1760        // Labels
1761        let lid = manager.add_label("Person")?;
1762        assert_eq!(lid, 1);
1763        assert!(manager.add_label("Person").is_err());
1764
1765        // Properties
1766        manager.add_property("Person", "name", DataType::String, false)?;
1767        assert!(
1768            manager
1769                .add_property("Person", "name", DataType::String, false)
1770                .is_err()
1771        );
1772
1773        // Edge types
1774        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1775        assert_eq!(tid, 1);
1776
1777        manager.save().await?;
1778        // Check file exists
1779        assert!(store.get(&path).await.is_ok());
1780
1781        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1782        assert!(manager2.schema().labels.contains_key("Person"));
1783        assert!(
1784            manager2
1785                .schema()
1786                .properties
1787                .get("Person")
1788                .unwrap()
1789                .contains_key("name")
1790        );
1791
1792        Ok(())
1793    }
1794
1795    #[tokio::test]
1796    async fn test_reserved_property_names_rejected() -> Result<()> {
1797        let dir = tempdir()?;
1798        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1799        let path = ObjectStorePath::from("schema.json");
1800        let manager = SchemaManager::load_from_store(store, &path).await?;
1801
1802        manager.add_label("Tiny")?;
1803
1804        // Unprefixed reserved names — these collide with internal Arrow
1805        // columns in storage tables and previously caused Lance
1806        // "Duplicate field name" errors at flush time.
1807        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1808            let err = manager
1809                .add_property("Tiny", reserved, DataType::String, true)
1810                .expect_err(&format!("expected '{reserved}' to be rejected"));
1811            assert!(
1812                err.to_string().contains("reserved"),
1813                "error for '{reserved}' should mention 'reserved', got: {err}"
1814            );
1815        }
1816
1817        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
1818        // alongside the underscore-prefix rule). Confirms an internal
1819        // `add_generated_property` path cannot accidentally create a column
1820        // that collides with the SET-target structural-projection marker.
1821        let err = manager
1822            .add_property("Tiny", "__set_struct__", DataType::String, true)
1823            .expect_err("expected '__set_struct__' to be rejected");
1824        assert!(
1825            err.to_string().contains("reserved"),
1826            "__set_struct__ rejection should mention 'reserved', got: {err}"
1827        );
1828
1829        // Leading-underscore pattern rule.
1830        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1831            assert!(
1832                manager
1833                    .add_property("Tiny", reserved, DataType::String, true)
1834                    .is_err(),
1835                "expected '{reserved}' to be rejected"
1836            );
1837        }
1838
1839        // Names that merely contain a reserved substring should still be
1840        // accepted.
1841        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1842        manager.add_property("Tiny", "user_op", DataType::String, true)?;
1843        manager.add_property("Tiny", "type_name", DataType::String, true)?;
1844
1845        // Same check applies to edge-type properties (single dispatch).
1846        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1847        assert!(
1848            manager
1849                .add_property("knows", "src_vid", DataType::Int64, true)
1850                .is_err()
1851        );
1852
1853        // And to generated properties.
1854        assert!(
1855            manager
1856                .add_generated_property(
1857                    "Tiny",
1858                    "ext_id",
1859                    DataType::String,
1860                    "concat('x', name)".into()
1861                )
1862                .is_err()
1863        );
1864
1865        Ok(())
1866    }
1867
1868    #[test]
1869    fn test_normalize_function_names() {
1870        assert_eq!(
1871            SchemaManager::normalize_function_names("lower(email)"),
1872            "LOWER(email)"
1873        );
1874        assert_eq!(
1875            SchemaManager::normalize_function_names("LOWER(email)"),
1876            "LOWER(email)"
1877        );
1878        assert_eq!(
1879            SchemaManager::normalize_function_names("Lower(email)"),
1880            "LOWER(email)"
1881        );
1882        assert_eq!(
1883            SchemaManager::normalize_function_names("trim(lower(email))"),
1884            "TRIM(LOWER(email))"
1885        );
1886    }
1887
1888    #[test]
1889    fn test_generated_column_name_case_insensitive() {
1890        let col1 = SchemaManager::generated_column_name("lower(email)");
1891        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1892        let col3 = SchemaManager::generated_column_name("Lower(email)");
1893        assert_eq!(col1, col2);
1894        assert_eq!(col2, col3);
1895        assert!(col1.starts_with("_gen_LOWER_email_"));
1896    }
1897
1898    #[test]
1899    fn test_index_metadata_serde_backward_compat() {
1900        // Simulate old JSON without metadata field
1901        let json = r#"{
1902            "type": "Scalar",
1903            "name": "idx_person_name",
1904            "label": "Person",
1905            "properties": ["name"],
1906            "index_type": "BTree",
1907            "where_clause": null
1908        }"#;
1909        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1910        let meta = def.metadata();
1911        assert_eq!(meta.status, IndexStatus::Online);
1912        assert!(meta.last_built_at.is_none());
1913        assert!(meta.row_count_at_build.is_none());
1914    }
1915
1916    #[test]
1917    fn test_index_metadata_serde_roundtrip() {
1918        let now = Utc::now();
1919        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1920            name: "idx_test".to_string(),
1921            label: "Test".to_string(),
1922            properties: vec!["prop".to_string()],
1923            index_type: ScalarIndexType::BTree,
1924            where_clause: None,
1925            metadata: IndexMetadata {
1926                status: IndexStatus::Building,
1927                last_built_at: Some(now),
1928                row_count_at_build: Some(42),
1929            },
1930        });
1931
1932        let json = serde_json::to_string(&def).unwrap();
1933        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1934        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1935        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1936        assert!(parsed.metadata().last_built_at.is_some());
1937    }
1938
1939    #[tokio::test]
1940    async fn test_update_index_metadata() -> Result<()> {
1941        let dir = tempdir()?;
1942        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1943        let path = ObjectStorePath::from("schema.json");
1944        let manager = SchemaManager::load_from_store(store, &path).await?;
1945
1946        manager.add_label("Person")?;
1947        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1948            name: "idx_test".to_string(),
1949            label: "Person".to_string(),
1950            properties: vec!["name".to_string()],
1951            index_type: ScalarIndexType::BTree,
1952            where_clause: None,
1953            metadata: Default::default(),
1954        });
1955        manager.add_index(idx)?;
1956
1957        // Verify initial status is Online
1958        let initial = manager.get_index("idx_test").unwrap();
1959        assert_eq!(initial.metadata().status, IndexStatus::Online);
1960
1961        // Update to Building
1962        manager.update_index_metadata("idx_test", |m| {
1963            m.status = IndexStatus::Building;
1964            m.row_count_at_build = Some(100);
1965        })?;
1966
1967        let updated = manager.get_index("idx_test").unwrap();
1968        assert_eq!(updated.metadata().status, IndexStatus::Building);
1969        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1970
1971        // Non-existent index should error
1972        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1973
1974        Ok(())
1975    }
1976
1977    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
1978    /// invocations with the same `IndexDefinition::name()` must replace
1979    /// the entry in place rather than appending. Subsequent `add_index`
1980    /// calls also reflect metadata updates from the new definition.
1981    #[tokio::test]
1982    async fn test_add_index_is_upsert_by_name() -> Result<()> {
1983        let dir = tempdir()?;
1984        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1985        let path = ObjectStorePath::from("schema.json");
1986        let manager = SchemaManager::load_from_store(store, &path).await?;
1987        manager.add_label("Person")?;
1988
1989        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1990            name: "idx_test".to_string(),
1991            label: "Person".to_string(),
1992            properties: vec!["name".to_string()],
1993            index_type: ScalarIndexType::BTree,
1994            where_clause: None,
1995            metadata: IndexMetadata {
1996                status: IndexStatus::Building,
1997                ..Default::default()
1998            },
1999        });
2000        manager.add_index(initial.clone())?;
2001        assert_eq!(manager.schema().indexes.len(), 1);
2002
2003        // Re-add the identical def — must remain a single entry.
2004        manager.add_index(initial.clone())?;
2005        assert_eq!(
2006            manager.schema().indexes.len(),
2007            1,
2008            "duplicate add_index by name must not append"
2009        );
2010
2011        // Re-add with updated metadata — must replace in place, len unchanged.
2012        let mut updated_cfg = match initial {
2013            IndexDefinition::Scalar(c) => c,
2014            _ => unreachable!(),
2015        };
2016        updated_cfg.metadata.status = IndexStatus::Online;
2017        updated_cfg.metadata.row_count_at_build = Some(42);
2018        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2019        assert_eq!(manager.schema().indexes.len(), 1);
2020        let stored = manager.get_index("idx_test").unwrap();
2021        assert_eq!(stored.metadata().status, IndexStatus::Online);
2022        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2023
2024        // A *different* name appends as a new entry.
2025        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2026            name: "idx_other".to_string(),
2027            label: "Person".to_string(),
2028            properties: vec!["age".to_string()],
2029            index_type: ScalarIndexType::BTree,
2030            where_clause: None,
2031            metadata: IndexMetadata::default(),
2032        });
2033        manager.add_index(other)?;
2034        assert_eq!(manager.schema().indexes.len(), 2);
2035
2036        Ok(())
2037    }
2038
2039    /// `load_from_store` self-heals catalogs that were bloated by the
2040    /// pre-fix `add_index` (kept the *last* def per name).
2041    #[tokio::test]
2042    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2043        let dir = tempdir()?;
2044        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2045        let path = ObjectStorePath::from("schema.json");
2046
2047        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2048        // sharing the same name. The last entry has distinct metadata so
2049        // we can assert "last writer wins" semantics.
2050        let mut schema = Schema::default();
2051        schema.labels.insert(
2052            "Person".to_string(),
2053            LabelMeta {
2054                id: 1,
2055                created_at: chrono::Utc::now(),
2056                state: SchemaElementState::Active,
2057                description: None,
2058            },
2059        );
2060        let make = |status: IndexStatus, count: Option<u64>| {
2061            IndexDefinition::Scalar(ScalarIndexConfig {
2062                name: "idx_dup".to_string(),
2063                label: "Person".to_string(),
2064                properties: vec!["name".to_string()],
2065                index_type: ScalarIndexType::BTree,
2066                where_clause: None,
2067                metadata: IndexMetadata {
2068                    status,
2069                    row_count_at_build: count,
2070                    ..Default::default()
2071                },
2072            })
2073        };
2074        for _ in 0..49 {
2075            schema.indexes.push(make(IndexStatus::Building, None));
2076        }
2077        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2078        let json = serde_json::to_string_pretty(&schema)?;
2079        store.put(&path, json.into()).await?;
2080
2081        let manager = SchemaManager::load_from_store(store, &path).await?;
2082        let schema = manager.schema();
2083        assert_eq!(
2084            schema.indexes.len(),
2085            1,
2086            "load() must collapse 50 duplicates by name to 1"
2087        );
2088        // Last-writer-wins: the kept entry is the final push (Online, 123).
2089        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2090        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2091
2092        Ok(())
2093    }
2094
2095    #[test]
2096    fn test_vector_index_for_property_skips_non_online() {
2097        let mut schema = Schema::default();
2098        schema.labels.insert(
2099            "Document".to_string(),
2100            LabelMeta {
2101                id: 1,
2102                created_at: chrono::Utc::now(),
2103                state: SchemaElementState::Active,
2104                description: None,
2105            },
2106        );
2107
2108        // Add a vector index with Stale status
2109        schema
2110            .indexes
2111            .push(IndexDefinition::Vector(VectorIndexConfig {
2112                name: "vec_doc_embedding".to_string(),
2113                label: "Document".to_string(),
2114                property: "embedding".to_string(),
2115                index_type: VectorIndexType::Flat,
2116                metric: DistanceMetric::Cosine,
2117                embedding_config: None,
2118                metadata: IndexMetadata {
2119                    status: IndexStatus::Stale,
2120                    ..Default::default()
2121                },
2122            }));
2123
2124        // Stale index should NOT be returned
2125        assert!(
2126            schema
2127                .vector_index_for_property("Document", "embedding")
2128                .is_none()
2129        );
2130
2131        // Set to Online — should now be returned
2132        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2133            cfg.metadata.status = IndexStatus::Online;
2134        }
2135        let result = schema.vector_index_for_property("Document", "embedding");
2136        assert!(result.is_some());
2137        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2138    }
2139
2140    #[tokio::test]
2141    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2142        use crate::core::fork::SchemaDelta;
2143
2144        let dir = tempdir()?;
2145        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2146        let path = ObjectStorePath::from("schema.json");
2147        let primary = SchemaManager::load_from_store(store, &path).await?;
2148        primary.add_label("Person")?;
2149
2150        let overlay = primary.with_overlay(&SchemaDelta::empty());
2151        assert_eq!(overlay.schema().labels.len(), 1);
2152
2153        // Phase 1 invariant: mutating the overlay manager must not bleed
2154        // into primary's schema.
2155        overlay.add_label("Forked")?;
2156        assert!(overlay.schema().labels.contains_key("Forked"));
2157        assert!(!primary.schema().labels.contains_key("Forked"));
2158
2159        Ok(())
2160    }
2161
2162    #[tokio::test]
2163    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2164        use crate::core::fork::SchemaDelta;
2165        use chrono::Utc;
2166
2167        let dir = tempdir()?;
2168        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2169        let path = ObjectStorePath::from("schema.json");
2170        let primary = SchemaManager::load_from_store(store, &path).await?;
2171        primary.add_label("Existing")?;
2172
2173        let label_meta = LabelMeta {
2174            id: 99,
2175            created_at: Utc::now(),
2176            state: SchemaElementState::Active,
2177            description: None,
2178        };
2179        let edge_meta = EdgeTypeMeta {
2180            id: 99,
2181            src_labels: vec!["NewLabel".into()],
2182            dst_labels: vec!["NewLabel".into()],
2183            state: SchemaElementState::Active,
2184            description: None,
2185        };
2186        let delta = SchemaDelta {
2187            added_labels: vec![("NewLabel".to_string(), label_meta)],
2188            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2189            added_properties: vec![],
2190        };
2191
2192        let overlay = primary.with_overlay(&delta);
2193        let merged = overlay.schema();
2194        assert!(merged.labels.contains_key("Existing"));
2195        assert!(merged.labels.contains_key("NewLabel"));
2196        assert!(merged.edge_types.contains_key("NewEdge"));
2197
2198        // Primary unchanged.
2199        assert!(!primary.schema().labels.contains_key("NewLabel"));
2200        Ok(())
2201    }
2202
2203    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2204    /// must converge on a single id (the read-lock fast path double-checks
2205    /// under the write lock); a schema-defined type must win over the
2206    /// schemaless registry.
2207    #[tokio::test]
2208    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2209        let dir = tempdir()?;
2210        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2211        let path = ObjectStorePath::from("schema.json");
2212        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2213
2214        let mut handles = Vec::new();
2215        for _ in 0..16 {
2216            let m = manager.clone();
2217            handles.push(std::thread::spawn(move || {
2218                m.get_or_assign_edge_type_id("RACED")
2219            }));
2220        }
2221        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2222        assert!(
2223            ids.iter().all(|&id| id == ids[0]),
2224            "all racers must observe one id, got {ids:?}"
2225        );
2226        // Fast path returns the same id afterwards.
2227        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2228
2229        // Schema-defined type wins over the schemaless registry.
2230        manager.add_label("A")?;
2231        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2232        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2233        Ok(())
2234    }
2235}