Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79/// The canonical Arrow struct fields for a `DataType::SparseVector` column:
80/// `Struct { indices: List<UInt32>, values: List<Float32> }`. Two parallel
81/// variable-length lists in one struct. Both lists are non-null — an empty
82/// sparse vector stores as two empty lists, never null. The write and read
83/// sides both route through this one definition so they cannot drift (the same
84/// lockstep discipline as the temporal structs above).
85pub fn sparse_vector_struct_fields() -> Fields {
86    Fields::from(vec![
87        Field::new(
88            "indices",
89            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt32, true))),
90            false,
91        ),
92        Field::new(
93            "values",
94            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Float32, true))),
95            false,
96        ),
97    ])
98}
99
100/// Detects if an Arrow DataType is the canonical SparseVector struct.
101pub fn is_sparse_vector_struct(arrow_dt: &ArrowDataType) -> bool {
102    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == sparse_vector_struct_fields())
103}
104
105/// Field metadata marking an Arrow `LargeBinary` field as a raw `DataType::Bytes`
106/// value rather than a tagged CypherValue/Duration blob.
107///
108/// Stamped on the child field of `List(Bytes)` / `Map(_, Bytes)` container types so
109/// the read path decodes each element verbatim instead of through the tagged codec
110/// (which would read `byte[0]` as a type tag). CV-encoded containers carry no such
111/// marker and keep the codec path. See the read-side honoring in `arrow_convert`.
112pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
113    HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
117#[non_exhaustive]
118pub enum CrdtType {
119    GCounter,
120    GSet,
121    ORSet,
122    LWWRegister,
123    LWWMap,
124    Rga,
125    VectorClock,
126    VCRegister,
127}
128
129impl CrdtType {
130    /// Returns the canonical variant name for this CRDT type.
131    ///
132    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
133    /// so a written CRDT value can be validated against its schema-declared
134    /// variant (see uni-store's write-time CRDT enforcement).
135    ///
136    /// # Examples
137    /// ```
138    /// use uni_common::core::schema::CrdtType;
139    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
140    /// ```
141    #[must_use]
142    pub fn type_name(&self) -> &'static str {
143        match self {
144            CrdtType::GCounter => "GCounter",
145            CrdtType::GSet => "GSet",
146            CrdtType::ORSet => "ORSet",
147            CrdtType::LWWRegister => "LWWRegister",
148            CrdtType::LWWMap => "LWWMap",
149            CrdtType::Rga => "Rga",
150            CrdtType::VectorClock => "VectorClock",
151            CrdtType::VCRegister => "VCRegister",
152        }
153    }
154}
155
156#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
157pub enum PointType {
158    Geographic,  // WGS84
159    Cartesian2D, // x, y
160    Cartesian3D, // x, y, z
161}
162
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
164#[non_exhaustive]
165pub enum DataType {
166    String,
167    Int32,
168    Int64,
169    Float32,
170    Float64,
171    Bool,
172    Timestamp,
173    Date,
174    Time,
175    DateTime,
176    Duration,
177    CypherValue,
178    Bytes,
179    Point(PointType),
180    Vector {
181        dimensions: usize,
182    },
183    /// Learned-sparse vector (SPLADE / BGE-M3). `dimensions` is the term-space
184    /// cardinality (max term id + 1) used for validation and index config.
185    SparseVector {
186        dimensions: usize,
187    },
188    Btic,
189    Crdt(CrdtType),
190    List(Box<DataType>),
191    Map(Box<DataType>, Box<DataType>),
192}
193
194impl DataType {
195    // Alias for compatibility/convenience if needed, but preferable to use exact types.
196    #[allow(non_upper_case_globals)]
197    pub const Float: DataType = DataType::Float64;
198    #[allow(non_upper_case_globals)]
199    pub const Int: DataType = DataType::Int64;
200
201    pub fn to_arrow(&self) -> ArrowDataType {
202        match self {
203            DataType::String => ArrowDataType::Utf8,
204            DataType::Int32 => ArrowDataType::Int32,
205            DataType::Int64 => ArrowDataType::Int64,
206            DataType::Float32 => ArrowDataType::Float32,
207            DataType::Float64 => ArrowDataType::Float64,
208            DataType::Bool => ArrowDataType::Boolean,
209            DataType::Timestamp => {
210                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
211            }
212            DataType::Date => ArrowDataType::Date32,
213            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
214            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
215            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
216            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
217            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
218            DataType::Point(pt) => match pt {
219                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
220                    Field::new("latitude", ArrowDataType::Float64, false),
221                    Field::new("longitude", ArrowDataType::Float64, false),
222                    Field::new("crs", ArrowDataType::Utf8, false),
223                ])),
224                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
225                    Field::new("x", ArrowDataType::Float64, false),
226                    Field::new("y", ArrowDataType::Float64, false),
227                    Field::new("crs", ArrowDataType::Utf8, false),
228                ])),
229                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
230                    Field::new("x", ArrowDataType::Float64, false),
231                    Field::new("y", ArrowDataType::Float64, false),
232                    Field::new("z", ArrowDataType::Float64, false),
233                    Field::new("crs", ArrowDataType::Utf8, false),
234                ])),
235            },
236            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
237                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
238                *dimensions as i32,
239            ),
240            DataType::SparseVector { .. } => ArrowDataType::Struct(sparse_vector_struct_fields()),
241            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
242            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
243            DataType::List(inner) => {
244                // A raw `Bytes` element maps to Arrow `LargeBinary`, indistinguishable
245                // from a CV-encoded element by type alone; mark the child field so the
246                // read path decodes it verbatim rather than through the tagged codec.
247                let item = Field::new("item", inner.to_arrow(), true);
248                let item = if matches!(**inner, DataType::Bytes) {
249                    item.with_metadata(raw_bytes_field_metadata())
250                } else {
251                    item
252                };
253                ArrowDataType::List(Arc::new(item))
254            }
255            DataType::Map(key, value) => {
256                // The value child's Arrow storage MUST agree with `build_map_column` in
257                // uni-store: typed scalars use their own Arrow type; `Bytes` is a
258                // raw-bytes-marked `LargeBinary`; every other (nested/non-scalar) value type
259                // is CypherValue-encoded into an UNMARKED `LargeBinary` (decoded back through
260                // the tagged codec on read). Gated by `map_value_is_typed` so the two sites
261                // can't drift.
262                let value_field = if value.map_value_is_typed() {
263                    let f = Field::new("value", value.to_arrow(), true);
264                    if matches!(**value, DataType::Bytes) {
265                        f.with_metadata(raw_bytes_field_metadata())
266                    } else {
267                        f
268                    }
269                } else {
270                    Field::new("value", ArrowDataType::LargeBinary, true)
271                };
272                ArrowDataType::List(Arc::new(Field::new(
273                    "item",
274                    ArrowDataType::Struct(Fields::from(vec![
275                        Field::new("key", key.to_arrow(), false),
276                        value_field,
277                    ])),
278                    true,
279                )))
280            }
281        }
282    }
283
284    /// Whether a `Map(_, self)` VALUE is stored as a typed Arrow child (this set) versus a
285    /// CypherValue-encoded `LargeBinary` fallback child (everything else, e.g. `Vector`,
286    /// `List`, `Map`, temporal). This MUST stay in lockstep with the explicit value-type
287    /// arms of `build_map_column` in uni-store (the `_` arm there is the CV fallback).
288    pub fn map_value_is_typed(&self) -> bool {
289        matches!(
290            self,
291            DataType::String
292                | DataType::Int64
293                | DataType::Int32
294                | DataType::Float64
295                | DataType::Float32
296                | DataType::Bool
297                | DataType::Bytes
298        )
299    }
300
301    /// Returns `true` if `value` is directly storable in this column type without loss.
302    ///
303    /// This is the schema-level type guard used by the write path. `Value::Null` is
304    /// always accepted — column nullability is enforced separately by the `nullable`
305    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
306    /// For every other declared type, only the `Value` variants that the storage layer
307    /// persists *without silently nulling* are accepted (see the per-type converters in
308    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
309    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
310    ///
311    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
312    /// intentionally *not* accepted here: the write path first coerces such strings into
313    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
314    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
315    /// predicate.
316    ///
317    /// # Examples
318    /// ```
319    /// use uni_common::core::schema::DataType;
320    /// use uni_common::Value;
321    ///
322    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
323    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
324    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
325    /// ```
326    pub fn accepts(&self, value: &crate::value::Value) -> bool {
327        use crate::value::{TemporalValue, Value};
328
329        // Null is universally accepted; nullability is a separate concern.
330        if matches!(value, Value::Null) {
331            return true;
332        }
333
334        match self {
335            // Opaque / dynamically-typed columns accept any value.
336            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
337
338            DataType::String => matches!(value, Value::String(_)),
339            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
340            // Int widens to Float losslessly for the ranges we care about.
341            DataType::Float32 | DataType::Float64 => {
342                matches!(value, Value::Int(_) | Value::Float(_))
343            }
344            DataType::Bool => matches!(value, Value::Bool(_)),
345
346            // Non-struct timestamp column: storage parses strings and accepts ints,
347            // so both are lossless here (unlike the DateTime struct column below).
348            DataType::Timestamp => matches!(
349                value,
350                Value::String(_)
351                    | Value::Int(_)
352                    | Value::Temporal(
353                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
354                    )
355            ),
356            DataType::DateTime => matches!(
357                value,
358                Value::Temporal(
359                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
360                )
361            ),
362            DataType::Date => {
363                matches!(
364                    value,
365                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
366                )
367            }
368            DataType::Time => matches!(
369                value,
370                Value::Int(_)
371                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
372            ),
373            DataType::Duration => {
374                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
375            }
376            DataType::Bytes => matches!(value, Value::Bytes(_)),
377            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
378            DataType::Btic => matches!(
379                value,
380                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
381            ),
382            // Shape-only: declared dimensions are enforced by `check_vector_dims`,
383            // which the write paths call alongside this predicate (issue #137).
384            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
385            // `Value::Map` is the degraded form a `SparseVector` collapses into
386            // when round-tripped through `#[serde(untagged)]` persistence (e.g.
387            // the WAL's serde_json mutation log); accept it like `Vector` accepts
388            // `List`. The column builder re-extracts `{indices, values}`.
389            DataType::SparseVector { .. } => {
390                matches!(value, Value::SparseVector { .. } | Value::Map(_))
391            }
392            // Shape-only: for `List(Vector)` multi-vector columns, per-token
393            // dimensions are enforced by `check_vector_dims` (issue #137).
394            DataType::List(_) => matches!(value, Value::List(_)),
395            DataType::Map(_, _) => matches!(value, Value::Map(_)),
396        }
397    }
398
399    /// Checks a value's dimensions against a declared vector column type.
400    ///
401    /// The dimension-aware companion to [`DataType::accepts`] (which is shape-only):
402    /// for `Vector { dimensions }` columns the value must be a `Value::Vector` or an
403    /// all-numeric `Value::List` of exactly `dimensions` elements; for
404    /// `List(Vector { dimensions })` multi-vector columns every token must satisfy the
405    /// same rule (an empty token list is a legal empty multi-vector). `Value::Null` is
406    /// always accepted — nullability is enforced separately — and every non-vector
407    /// `DataType` returns `Ok(())`, so callers may invoke this unconditionally.
408    ///
409    /// Guards against the silent data loss of issue #137, where wrong-length vectors
410    /// were accepted at write time and nulled at flush by the Arrow converters.
411    ///
412    /// # Errors
413    /// Returns a [`VectorDimError`] describing the first offending value: a length
414    /// mismatch, a non-numeric list element, a non-vector value in a vector column,
415    /// or their per-token counterparts for multi-vector columns.
416    pub fn check_vector_dims(&self, value: &crate::value::Value) -> Result<(), VectorDimError> {
417        use crate::value::Value;
418
419        if matches!(value, Value::Null) {
420            return Ok(());
421        }
422
423        match self {
424            DataType::Vector { dimensions } => check_dense_vector_value(value, *dimensions),
425            DataType::List(inner) => {
426                let DataType::Vector { dimensions } = inner.as_ref() else {
427                    return Ok(());
428                };
429                let Value::List(tokens) = value else {
430                    return Err(VectorDimError::NotATokenList {
431                        actual: value_variant_name(value),
432                    });
433                };
434                for (token, token_value) in tokens.iter().enumerate() {
435                    check_dense_vector_value(token_value, *dimensions)
436                        .map_err(|e| e.for_token(token))?;
437                }
438                Ok(())
439            }
440            _ => Ok(()),
441        }
442    }
443}
444
445/// Why a value cannot be stored in a declared `VECTOR(dim)` or multi-vector column.
446///
447/// Produced by [`DataType::check_vector_dims`] and [`check_dense_vector_value`].
448/// Messages carry the declared and actual lengths so write-path errors are
449/// actionable; callers prefix the property name and declared type.
450#[derive(Debug, Clone, PartialEq, Eq)]
451pub enum VectorDimError {
452    /// The vector has the wrong number of elements.
453    WrongLength {
454        /// Declared column dimensions.
455        expected: usize,
456        /// Actual element count of the offending value.
457        actual: usize,
458    },
459    /// A list element is not `Int` or `Float`.
460    NonNumericElement {
461        /// Zero-based index of the offending element.
462        index: usize,
463    },
464    /// The value is not a vector or list at all.
465    NotAVector {
466        /// Variant name of the offending value.
467        actual: &'static str,
468    },
469    /// A multi-vector token has the wrong number of elements.
470    TokenWrongLength {
471        /// Zero-based token index within the multi-vector.
472        token: usize,
473        /// Declared per-token dimensions.
474        expected: usize,
475        /// Actual element count of the offending token.
476        actual: usize,
477    },
478    /// A multi-vector token contains a non-numeric element.
479    TokenNonNumericElement {
480        /// Zero-based token index within the multi-vector.
481        token: usize,
482        /// Zero-based index of the offending element within the token.
483        index: usize,
484    },
485    /// A multi-vector token is not a vector or list.
486    TokenNotAVector {
487        /// Zero-based token index within the multi-vector.
488        token: usize,
489        /// Variant name of the offending token.
490        actual: &'static str,
491    },
492    /// The value for a multi-vector column is not a list of tokens.
493    NotATokenList {
494        /// Variant name of the offending value.
495        actual: &'static str,
496    },
497}
498
499impl VectorDimError {
500    /// Maps a dense-kernel error to its per-token counterpart for multi-vector columns.
501    fn for_token(self, token: usize) -> Self {
502        match self {
503            Self::WrongLength { expected, actual } => Self::TokenWrongLength {
504                token,
505                expected,
506                actual,
507            },
508            Self::NonNumericElement { index } => Self::TokenNonNumericElement { token, index },
509            Self::NotAVector { actual } => Self::TokenNotAVector { token, actual },
510            other => other,
511        }
512    }
513}
514
515impl std::fmt::Display for VectorDimError {
516    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
517        match self {
518            Self::WrongLength { expected, actual } => write!(
519                f,
520                "got a vector of length {actual}, expected {expected} dimensions"
521            ),
522            Self::NonNumericElement { index } => {
523                write!(f, "element {index} is not numeric")
524            }
525            Self::NotAVector { actual } => {
526                write!(f, "got a non-vector value of type {actual}")
527            }
528            Self::TokenWrongLength {
529                token,
530                expected,
531                actual,
532            } => write!(
533                f,
534                "token {token} has {actual} dimensions, expected {expected}"
535            ),
536            Self::TokenNonNumericElement { token, index } => {
537                write!(f, "token {token} element {index} is not numeric")
538            }
539            Self::TokenNotAVector { token, actual } => {
540                write!(f, "token {token} is not a vector (got {actual})")
541            }
542            Self::NotATokenList { actual } => write!(
543                f,
544                "got a non-list value of type {actual} for a multi-vector column"
545            ),
546        }
547    }
548}
549
550impl std::error::Error for VectorDimError {}
551
552/// Checks one dense vector value against a declared dimension count.
553///
554/// The per-token kernel behind [`DataType::check_vector_dims`], exposed so the
555/// storage-layer converters can validate individual multi-vector tokens with the
556/// same semantics: `Value::Null` is accepted (a legal null row), a `Value::Vector`
557/// must have exactly `dimensions` elements, and a `Value::List` must additionally
558/// be all-numeric (`Int` or `Float`).
559///
560/// # Errors
561/// Returns [`VectorDimError::WrongLength`] on a length mismatch (an empty list
562/// reports `actual: 0`), [`VectorDimError::NonNumericElement`] for a `String`,
563/// `Null`, or other non-numeric list element, and [`VectorDimError::NotAVector`]
564/// for any other value variant.
565pub fn check_dense_vector_value(
566    value: &crate::value::Value,
567    dimensions: usize,
568) -> Result<(), VectorDimError> {
569    use crate::value::Value;
570
571    match value {
572        Value::Null => Ok(()),
573        Value::Vector(v) => {
574            if v.len() == dimensions {
575                Ok(())
576            } else {
577                Err(VectorDimError::WrongLength {
578                    expected: dimensions,
579                    actual: v.len(),
580                })
581            }
582        }
583        Value::List(items) => {
584            if items.len() != dimensions {
585                return Err(VectorDimError::WrongLength {
586                    expected: dimensions,
587                    actual: items.len(),
588                });
589            }
590            if let Some(index) = items.iter().position(|e| !e.is_number()) {
591                return Err(VectorDimError::NonNumericElement { index });
592            }
593            Ok(())
594        }
595        other => Err(VectorDimError::NotAVector {
596            actual: value_variant_name(other),
597        }),
598    }
599}
600
601/// Returns a short variant name for a `Value`, used in dimension-mismatch messages.
602fn value_variant_name(value: &crate::value::Value) -> &'static str {
603    use crate::value::Value;
604
605    match value {
606        Value::Null => "Null",
607        Value::Bool(_) => "Bool",
608        Value::Int(_) => "Int",
609        Value::Float(_) => "Float",
610        Value::String(_) => "String",
611        Value::Bytes(_) => "Bytes",
612        Value::List(_) => "List",
613        Value::Map(_) => "Map",
614        Value::Node(_) => "Node",
615        Value::Edge(_) => "Edge",
616        Value::Path(_) => "Path",
617        Value::Vector(_) => "Vector",
618        Value::SparseVector { .. } => "SparseVector",
619        Value::Temporal(_) => "Temporal",
620    }
621}
622
623fn default_created_at() -> DateTime<Utc> {
624    Utc::now()
625}
626
627fn default_state() -> SchemaElementState {
628    SchemaElementState::Active
629}
630
631fn default_version_1() -> u32 {
632    1
633}
634
635#[derive(Clone, Debug, Serialize, Deserialize)]
636pub struct PropertyMeta {
637    pub r#type: DataType,
638    pub nullable: bool,
639    #[serde(default = "default_version_1")]
640    pub added_in: u32, // SchemaVersion
641    #[serde(default = "default_state")]
642    pub state: SchemaElementState,
643    #[serde(default)]
644    pub generation_expression: Option<String>,
645    #[serde(default, skip_serializing_if = "Option::is_none")]
646    pub description: Option<String>,
647}
648
649#[derive(Clone, Debug, Serialize, Deserialize)]
650pub struct LabelMeta {
651    pub id: u16, // LabelId
652    #[serde(default = "default_created_at")]
653    pub created_at: DateTime<Utc>,
654    #[serde(default = "default_state")]
655    pub state: SchemaElementState,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub description: Option<String>,
658}
659
660#[derive(Clone, Debug, Serialize, Deserialize)]
661pub struct EdgeTypeMeta {
662    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
663    pub id: u32,
664    pub src_labels: Vec<String>,
665    pub dst_labels: Vec<String>,
666    #[serde(default = "default_state")]
667    pub state: SchemaElementState,
668    #[serde(default, skip_serializing_if = "Option::is_none")]
669    pub description: Option<String>,
670}
671
672#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
673#[non_exhaustive]
674pub enum ConstraintType {
675    Unique { properties: Vec<String> },
676    Exists { property: String },
677    Check { expression: String },
678}
679
680#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
681#[non_exhaustive]
682pub enum ConstraintTarget {
683    Label(String),
684    EdgeType(String),
685}
686
687#[derive(Clone, Debug, Serialize, Deserialize)]
688pub struct Constraint {
689    pub name: String,
690    pub constraint_type: ConstraintType,
691    pub target: ConstraintTarget,
692    pub enabled: bool,
693}
694
695/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
696///
697/// Edge types not defined in the schema are assigned IDs at runtime with
698/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
699/// the name-to-ID and ID-to-name mappings for those types.
700#[derive(Clone, Debug, Serialize, Deserialize)]
701pub struct SchemalessEdgeTypeRegistry {
702    name_to_id: HashMap<String, u32>,
703    id_to_name: HashMap<u32, String>,
704    /// Next local ID to assign (0 is reserved for invalid).
705    next_local_id: u32,
706}
707
708impl SchemalessEdgeTypeRegistry {
709    pub fn new() -> Self {
710        Self {
711            name_to_id: HashMap::new(),
712            id_to_name: HashMap::new(),
713            next_local_id: 1,
714        }
715    }
716
717    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
718    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
719        if let Some(&id) = self.name_to_id.get(type_name) {
720            return id;
721        }
722
723        let id = make_schemaless_id(self.next_local_id);
724        self.next_local_id += 1;
725
726        self.name_to_id.insert(type_name.to_string(), id);
727        self.id_to_name.insert(id, type_name.to_string());
728
729        id
730    }
731
732    /// Looks up the edge type name for a schemaless ID.
733    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
734        self.id_to_name.get(&type_id).map(String::as_str)
735    }
736
737    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
738    pub fn contains(&self, type_name: &str) -> bool {
739        self.name_to_id.contains_key(type_name)
740    }
741
742    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
743    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
744        self.name_to_id.get(type_name).copied()
745    }
746
747    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
748    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
749        self.name_to_id
750            .iter()
751            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
752            .map(|(_, &id)| id)
753    }
754
755    /// Returns all registered schemaless type IDs.
756    pub fn all_type_ids(&self) -> Vec<u32> {
757        self.id_to_name.keys().copied().collect()
758    }
759
760    /// Returns true if the registry has any schemaless types.
761    pub fn is_empty(&self) -> bool {
762        self.name_to_id.is_empty()
763    }
764}
765
766impl Default for SchemalessEdgeTypeRegistry {
767    fn default() -> Self {
768        Self::new()
769    }
770}
771
772/// First virtual (catalog-resolved) label ID. Label IDs in
773/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
774/// plugin-registered `CatalogProvider`s and allocated lazily by the
775/// planner via `PluginRegistry::register_virtual_label`. Native label
776/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
777pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
778/// Sentinel "no label" marker, kept distinct from any allocatable ID.
779pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
780
781/// Maximum byte length of a label or edge-type name. (L6)
782///
783/// Generous; the cap is hygiene — the name lands in on-disk dataset paths
784/// and Lance branch names — not a storage limit.
785const MAX_SCHEMA_NAME_LEN: usize = 255;
786
787/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
788#[inline]
789pub fn is_virtual_label_id(id: u16) -> bool {
790    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
791}
792
793#[derive(Clone, Debug, Serialize, Deserialize)]
794pub struct Schema {
795    pub schema_version: u32,
796    pub labels: HashMap<String, LabelMeta>,
797    pub edge_types: HashMap<String, EdgeTypeMeta>,
798    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
799    #[serde(default)]
800    pub indexes: Vec<IndexDefinition>,
801    #[serde(default)]
802    pub constraints: Vec<Constraint>,
803    /// Registry for schemaless edge types (dynamically assigned IDs)
804    #[serde(default)]
805    pub schemaless_registry: SchemalessEdgeTypeRegistry,
806}
807
808impl Default for Schema {
809    fn default() -> Self {
810        Self {
811            schema_version: 1,
812            labels: HashMap::new(),
813            edge_types: HashMap::new(),
814            properties: HashMap::new(),
815            indexes: Vec::new(),
816            constraints: Vec::new(),
817            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
818        }
819    }
820}
821
822impl Schema {
823    /// Bumps `schema_version` to invalidate cached query plans.
824    ///
825    /// Called at the end of every DDL mutation that changes the schema's
826    /// shape (labels, edge types, properties, indexes, constraints). The
827    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
828    /// built against an older shape is discarded once this advances. Uses
829    /// wrapping arithmetic: the value is a coarse change token, not a count,
830    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
831    fn bump_version(&mut self) {
832        self.schema_version = self.schema_version.wrapping_add(1);
833    }
834
835    /// Returns the label name for a given label ID.
836    ///
837    /// Performs a linear scan over all labels. This is efficient because
838    /// the number of labels in a schema is typically small.
839    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
840        self.labels
841            .iter()
842            .find(|(_, meta)| meta.id == label_id)
843            .map(|(name, _)| name.as_str())
844    }
845
846    /// Returns the label ID for a given label name.
847    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
848        self.labels.get(label_name).map(|meta| meta.id)
849    }
850
851    /// Returns the edge type name for a given type ID.
852    ///
853    /// Performs a linear scan over all edge types. This is efficient because
854    /// the number of edge types in a schema is typically small.
855    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
856        self.edge_types
857            .iter()
858            .find(|(_, meta)| meta.id == type_id)
859            .map(|(name, _)| name.as_str())
860    }
861
862    /// Returns the edge type ID for a given type name.
863    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
864        self.edge_types.get(type_name).map(|meta| meta.id)
865    }
866
867    /// Returns the vector index configuration for a given label and property.
868    ///
869    /// Performs a linear scan over all indexes. This is efficient because
870    /// the number of indexes in a schema is typically small.
871    pub fn vector_index_for_property(
872        &self,
873        label: &str,
874        property: &str,
875    ) -> Option<&VectorIndexConfig> {
876        self.indexes.iter().find_map(|idx| {
877            if let IndexDefinition::Vector(config) = idx
878                && config.label == label
879                && config.property == property
880                && config.metadata.status == IndexStatus::Online
881            {
882                return Some(config);
883            }
884            None
885        })
886    }
887
888    /// Returns the scored sparse-vector index configuration for a label/property.
889    pub fn sparse_index_for_property(
890        &self,
891        label: &str,
892        property: &str,
893    ) -> Option<&SparseVectorIndexConfig> {
894        self.indexes.iter().find_map(|idx| {
895            if let IndexDefinition::Sparse(config) = idx
896                && config.label == label
897                && config.property == property
898                && config.metadata.status == IndexStatus::Online
899            {
900                return Some(config);
901            }
902            None
903        })
904    }
905
906    /// Returns the full-text index configuration for a given label and property.
907    ///
908    /// A full-text index covers one or more properties. This returns the config
909    /// if the specified property is among the indexed properties.
910    pub fn fulltext_index_for_property(
911        &self,
912        label: &str,
913        property: &str,
914    ) -> Option<&FullTextIndexConfig> {
915        self.indexes.iter().find_map(|idx| {
916            if let IndexDefinition::FullText(config) = idx
917                && config.label == label
918                && config.properties.iter().any(|p| p == property)
919                && config.metadata.status == IndexStatus::Online
920            {
921                return Some(config);
922            }
923            None
924        })
925    }
926
927    /// Get label metadata with case-insensitive lookup.
928    ///
929    /// This allows queries to match labels regardless of case, providing
930    /// better user experience when label names vary in casing.
931    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
932        self.labels
933            .iter()
934            .find(|(k, _)| k.eq_ignore_ascii_case(name))
935            .map(|(_, v)| v)
936    }
937
938    /// Get the schema-canonical spelling of a label, matched case-insensitively.
939    ///
940    /// Returns the stored label name whose spelling differs only in case from
941    /// `name`, or `None` if no such label is registered. Callers use this to
942    /// normalize a user-supplied label to the canonical form the storage tier
943    /// keys on, so case variants resolve to the same vertex table.
944    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
945        self.labels
946            .iter()
947            .find(|(k, _)| k.eq_ignore_ascii_case(name))
948            .map(|(k, _)| k.clone())
949    }
950
951    /// Get label ID with case-insensitive lookup.
952    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
953        self.get_label_case_insensitive(label_name)
954            .map(|meta| meta.id)
955    }
956
957    /// Get edge type metadata with case-insensitive lookup.
958    ///
959    /// This allows queries to match edge types regardless of case, providing
960    /// better user experience when type names vary in casing.
961    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
962        self.edge_types
963            .iter()
964            .find(|(k, _)| k.eq_ignore_ascii_case(name))
965            .map(|(_, v)| v)
966    }
967
968    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
969    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
970        self.get_edge_type_case_insensitive(type_name)
971            .map(|meta| meta.id)
972    }
973
974    /// Get edge type ID with case-insensitive lookup, checking both
975    /// schema-defined and schemaless registries.
976    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
977        self.edge_type_id_by_name_case_insensitive(type_name)
978            .or_else(|| {
979                self.schemaless_registry
980                    .id_by_name_case_insensitive(type_name)
981            })
982    }
983
984    /// Returns the edge type ID for `type_name`, checking the schema first
985    /// and falling back to the schemaless registry (assigning a new ID if needed).
986    ///
987    /// Requires `&mut self` because it may assign a new schemaless ID.
988    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
989    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
990        if let Some(id) = self.edge_type_id_unified(type_name) {
991            return id;
992        }
993        // Reaching here means the type is brand-new to *both* the schema map and
994        // the schemaless registry (the early return above mirrors exactly what
995        // `edge_type_id_unified` checks). Minting a new schemaless edge type
996        // changes the result of `all_edge_type_ids()`, which untyped traversals
997        // bake into cached plans keyed on `schema_version`. Bump the version so
998        // those stale plans are evicted — otherwise a `MATCH ()-[r]->()` plan
999        // built before this type existed silently drops edges of the new type.
1000        let id = self.schemaless_registry.get_or_assign_id(type_name);
1001        self.bump_version();
1002        id
1003    }
1004
1005    /// Read-only unified exact lookup: schema-defined edge type id, falling
1006    /// back to an already-assigned schemaless id.
1007    ///
1008    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
1009    /// performs before assigning, so a `Some` here means the assigning path
1010    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
1011    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
1012        self.edge_type_id_by_name(type_name)
1013            .or_else(|| self.schemaless_registry.id_by_name(type_name))
1014    }
1015
1016    /// Returns the edge type name for `type_id`, checking both the schema
1017    /// and schemaless registries. Returns an owned `String` because the
1018    /// name may come from either registry.
1019    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1020        if is_schemaless_edge_type(type_id) {
1021            self.schemaless_registry
1022                .type_name_by_id(type_id)
1023                .map(str::to_owned)
1024        } else {
1025            self.edge_type_name_by_id(type_id).map(str::to_owned)
1026        }
1027    }
1028
1029    /// Returns all edge type IDs, including both schema-defined and schemaless types.
1030    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
1031    pub fn all_edge_type_ids(&self) -> Vec<u32> {
1032        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
1033        ids.extend(self.schemaless_registry.all_type_ids());
1034        ids.sort_unstable();
1035        ids
1036    }
1037}
1038
1039/// Lifecycle status of an index.
1040#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1041pub enum IndexStatus {
1042    /// Index is up-to-date and available for queries.
1043    #[default]
1044    Online,
1045    /// Index is currently being rebuilt.
1046    Building,
1047    /// Index is outdated and scheduled for rebuild.
1048    Stale,
1049    /// Index rebuild failed after exhausting retries.
1050    Failed,
1051}
1052
1053/// Metadata tracking the lifecycle state of an index.
1054#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1055pub struct IndexMetadata {
1056    /// Current lifecycle status.
1057    #[serde(default)]
1058    pub status: IndexStatus,
1059    /// When the index was last successfully built.
1060    #[serde(default)]
1061    pub last_built_at: Option<DateTime<Utc>>,
1062    /// Row count of the dataset when the index was last built.
1063    #[serde(default)]
1064    pub row_count_at_build: Option<u64>,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1068#[serde(tag = "type")]
1069#[non_exhaustive]
1070pub enum IndexDefinition {
1071    Vector(VectorIndexConfig),
1072    FullText(FullTextIndexConfig),
1073    Scalar(ScalarIndexConfig),
1074    Inverted(InvertedIndexConfig),
1075    JsonFullText(JsonFtsIndexConfig),
1076    /// Scored sparse-vector (SPLADE / learned-sparse) inverted index.
1077    Sparse(SparseVectorIndexConfig),
1078}
1079
1080impl IndexDefinition {
1081    /// Returns the index name for any variant.
1082    pub fn name(&self) -> &str {
1083        match self {
1084            IndexDefinition::Vector(c) => &c.name,
1085            IndexDefinition::FullText(c) => &c.name,
1086            IndexDefinition::Scalar(c) => &c.name,
1087            IndexDefinition::Inverted(c) => &c.name,
1088            IndexDefinition::JsonFullText(c) => &c.name,
1089            IndexDefinition::Sparse(c) => &c.name,
1090        }
1091    }
1092
1093    /// Returns the label this index is defined on.
1094    pub fn label(&self) -> &str {
1095        match self {
1096            IndexDefinition::Vector(c) => &c.label,
1097            IndexDefinition::FullText(c) => &c.label,
1098            IndexDefinition::Scalar(c) => &c.label,
1099            IndexDefinition::Inverted(c) => &c.label,
1100            IndexDefinition::JsonFullText(c) => &c.label,
1101            IndexDefinition::Sparse(c) => &c.label,
1102        }
1103    }
1104
1105    /// Returns a reference to the index lifecycle metadata.
1106    pub fn metadata(&self) -> &IndexMetadata {
1107        match self {
1108            IndexDefinition::Vector(c) => &c.metadata,
1109            IndexDefinition::FullText(c) => &c.metadata,
1110            IndexDefinition::Scalar(c) => &c.metadata,
1111            IndexDefinition::Inverted(c) => &c.metadata,
1112            IndexDefinition::JsonFullText(c) => &c.metadata,
1113            IndexDefinition::Sparse(c) => &c.metadata,
1114        }
1115    }
1116
1117    /// Returns a mutable reference to the index lifecycle metadata.
1118    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
1119        match self {
1120            IndexDefinition::Vector(c) => &mut c.metadata,
1121            IndexDefinition::FullText(c) => &mut c.metadata,
1122            IndexDefinition::Scalar(c) => &mut c.metadata,
1123            IndexDefinition::Inverted(c) => &mut c.metadata,
1124            IndexDefinition::JsonFullText(c) => &mut c.metadata,
1125            IndexDefinition::Sparse(c) => &mut c.metadata,
1126        }
1127    }
1128}
1129
1130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1131pub struct InvertedIndexConfig {
1132    pub name: String,
1133    pub label: String,
1134    pub property: String,
1135    #[serde(default = "default_normalize")]
1136    pub normalize: bool,
1137    #[serde(default = "default_max_terms_per_doc")]
1138    pub max_terms_per_doc: usize,
1139    #[serde(default)]
1140    pub metadata: IndexMetadata,
1141}
1142
1143fn default_normalize() -> bool {
1144    true
1145}
1146
1147fn default_max_terms_per_doc() -> usize {
1148    10_000
1149}
1150
1151/// Configuration for a scored sparse-vector (SPLADE / learned-sparse) index.
1152///
1153/// The index stores per-term postings `(term_id, vids, weights, max_impact)`
1154/// and scores by dot product. `quantize` controls 8-bit weight quantization at
1155/// the postings boundary (≈ lossless, ~4× smaller; default on). P2 block-max
1156/// pruning knobs are added in a later milestone.
1157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1158pub struct SparseVectorIndexConfig {
1159    pub name: String,
1160    pub label: String,
1161    pub property: String,
1162    /// Term-space cardinality (max term id + 1), for validation/config.
1163    pub dimensions: usize,
1164    /// Quantize stored weights to 8-bit (per-term scale). Default on.
1165    #[serde(default = "default_sparse_quantize")]
1166    pub quantize: bool,
1167    /// Auto-embedding source. When set, a declared text column is embedded into
1168    /// this sparse column via the xervo sparse model at write time (and a text
1169    /// query is embedded at query time) — mirrors `VectorIndexConfig`.
1170    #[serde(default)]
1171    pub embedding_config: Option<EmbeddingConfig>,
1172    #[serde(default)]
1173    pub metadata: IndexMetadata,
1174}
1175
1176fn default_sparse_quantize() -> bool {
1177    true
1178}
1179
1180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1181pub struct VectorIndexConfig {
1182    pub name: String,
1183    pub label: String,
1184    pub property: String,
1185    pub index_type: VectorIndexType,
1186    pub metric: DistanceMetric,
1187    pub embedding_config: Option<EmbeddingConfig>,
1188    #[serde(default)]
1189    pub metadata: IndexMetadata,
1190}
1191
1192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1193pub struct EmbeddingConfig {
1194    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
1195    pub alias: String,
1196    pub source_properties: Vec<String>,
1197    pub batch_size: usize,
1198    /// Prefix prepended to text before embedding during auto-embed (document side).
1199    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
1200    #[serde(default)]
1201    pub document_prefix: Option<String>,
1202    /// Prefix prepended to text before embedding during query-time embed calls.
1203    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
1204    #[serde(default)]
1205    pub query_prefix: Option<String>,
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1209#[non_exhaustive]
1210pub enum VectorIndexType {
1211    Flat,
1212    IvfFlat {
1213        num_partitions: u32,
1214    },
1215    IvfPq {
1216        num_partitions: u32,
1217        num_sub_vectors: u32,
1218        bits_per_subvector: u8,
1219    },
1220    IvfSq {
1221        num_partitions: u32,
1222    },
1223    IvfRq {
1224        num_partitions: u32,
1225        #[serde(default)]
1226        num_bits: Option<u8>,
1227    },
1228    HnswFlat {
1229        m: u32,
1230        ef_construction: u32,
1231        #[serde(default)]
1232        num_partitions: Option<u32>,
1233    },
1234    HnswSq {
1235        m: u32,
1236        ef_construction: u32,
1237        #[serde(default)]
1238        num_partitions: Option<u32>,
1239    },
1240    HnswPq {
1241        m: u32,
1242        ef_construction: u32,
1243        num_sub_vectors: u32,
1244        #[serde(default)]
1245        num_partitions: Option<u32>,
1246    },
1247    /// MUVERA (arXiv:2405.19504) Fixed-Dimensional Encoding for multi-vector
1248    /// (ColBERT/MaxSim) columns. The source multi-vector is encoded into a single
1249    /// derived `Vector<fde_dim>` column, and `inner` is the single-vector ANN index
1250    /// type built over that derived column (always with the `Dot` metric — the FDE
1251    /// inner product approximates MaxSim). The exact MaxSim re-rank still uses the
1252    /// `VectorIndexConfig.metric`. See `uni_query_functions::muvera`.
1253    Muvera {
1254        /// SimHash hyperplanes per repetition (`2^k_sim` buckets).
1255        k_sim: u32,
1256        /// Independent repetitions concatenated into the FDE.
1257        reps: u32,
1258        /// Inner-projection target dim (`0` = no projection, use the source dim).
1259        d_proj: u32,
1260        /// Master seed; persisted so query-time encoding matches doc-time encoding.
1261        seed: u64,
1262        /// The single-vector ANN index built over the derived FDE column.
1263        inner: Box<VectorIndexType>,
1264    },
1265}
1266
1267#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1268#[non_exhaustive]
1269pub enum DistanceMetric {
1270    Cosine,
1271    L2,
1272    Dot,
1273}
1274
1275impl DistanceMetric {
1276    /// Computes the distance between two vectors using this metric.
1277    ///
1278    /// All metrics follow LanceDB conventions so that lower values indicate
1279    /// greater similarity:
1280    /// - **L2**: squared Euclidean distance.
1281    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
1282    /// - **Dot**: negative dot product.
1283    ///
1284    /// # Panics
1285    ///
1286    /// Panics if `a` and `b` have different lengths.
1287    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
1288        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
1289        match self {
1290            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
1291            DistanceMetric::Cosine => {
1292                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1293                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1294                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1295                let denom = norm_a * norm_b;
1296                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
1297            }
1298            DistanceMetric::Dot => {
1299                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1300                -dot
1301            }
1302        }
1303    }
1304}
1305
1306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1307pub struct FullTextIndexConfig {
1308    pub name: String,
1309    pub label: String,
1310    pub properties: Vec<String>,
1311    pub tokenizer: TokenizerConfig,
1312    pub with_positions: bool,
1313    #[serde(default)]
1314    pub metadata: IndexMetadata,
1315}
1316
1317#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1318#[non_exhaustive]
1319pub enum TokenizerConfig {
1320    Standard,
1321    Whitespace,
1322    Ngram { min: u8, max: u8 },
1323    Custom { name: String },
1324}
1325
1326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1327pub struct JsonFtsIndexConfig {
1328    pub name: String,
1329    pub label: String,
1330    pub column: String,
1331    #[serde(default)]
1332    pub paths: Vec<String>,
1333    #[serde(default)]
1334    pub with_positions: bool,
1335    #[serde(default)]
1336    pub metadata: IndexMetadata,
1337}
1338
1339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1340pub struct ScalarIndexConfig {
1341    pub name: String,
1342    pub label: String,
1343    pub properties: Vec<String>,
1344    pub index_type: ScalarIndexType,
1345    pub where_clause: Option<String>,
1346    #[serde(default)]
1347    pub metadata: IndexMetadata,
1348}
1349
1350#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1351#[non_exhaustive]
1352pub enum ScalarIndexType {
1353    BTree,
1354    Hash,
1355    Bitmap,
1356    LabelList,
1357}
1358
1359pub struct SchemaManager {
1360    store: Arc<dyn ObjectStore>,
1361    path: ObjectStorePath,
1362    schema: RwLock<Arc<Schema>>,
1363}
1364
1365impl SchemaManager {
1366    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1367        let path = path.as_ref();
1368        let parent = path
1369            .parent()
1370            .ok_or_else(|| anyhow!("Invalid schema path"))?;
1371        let filename = path
1372            .file_name()
1373            .ok_or_else(|| anyhow!("Invalid schema filename"))?
1374            .to_str()
1375            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1376
1377        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1378        let obj_path = ObjectStorePath::from(filename);
1379
1380        Self::load_from_store(store, &obj_path).await
1381    }
1382
1383    pub async fn load_from_store(
1384        store: Arc<dyn ObjectStore>,
1385        path: &ObjectStorePath,
1386    ) -> Result<Self> {
1387        match store.get(path).await {
1388            Ok(result) => {
1389                let bytes = result.bytes().await?;
1390                let content = String::from_utf8(bytes.to_vec())?;
1391                let mut schema: Schema = serde_json::from_str(&content)?;
1392                // Self-heal catalogs that grew super-linearly under the
1393                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
1394                // duplicate index entries by name, keeping the *last*
1395                // occurrence — matches the upsert semantics in `add_index`
1396                // and preserves whatever metadata the most recent rebuild
1397                // wrote. The dedup persists on the next mutation that
1398                // calls `save()`.
1399                let original_len = schema.indexes.len();
1400                if original_len > 0 {
1401                    let mut seen: std::collections::HashSet<String> =
1402                        std::collections::HashSet::with_capacity(original_len);
1403                    let mut dedup: Vec<IndexDefinition> = schema
1404                        .indexes
1405                        .iter()
1406                        .rev()
1407                        .filter(|idx| seen.insert(idx.name().to_string()))
1408                        .cloned()
1409                        .collect();
1410                    dedup.reverse();
1411                    if dedup.len() != original_len {
1412                        tracing::warn!(
1413                            collapsed = original_len - dedup.len(),
1414                            kept = dedup.len(),
1415                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1416                        );
1417                        schema.indexes = dedup;
1418                    }
1419                }
1420                Ok(Self {
1421                    store,
1422                    path: path.clone(),
1423                    schema: RwLock::new(Arc::new(schema)),
1424                })
1425            }
1426            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1427                store,
1428                path: path.clone(),
1429                schema: RwLock::new(Arc::new(Schema::default())),
1430            }),
1431            Err(e) => Err(anyhow::Error::from(e)),
1432        }
1433    }
1434
1435    pub async fn save(&self) -> Result<()> {
1436        let content = {
1437            let schema_guard = acquire_read(&self.schema, "schema")?;
1438            serde_json::to_string_pretty(&**schema_guard)?
1439        };
1440        self.store
1441            .put(&self.path, content.into())
1442            .await
1443            .map_err(anyhow::Error::from)?;
1444        Ok(())
1445    }
1446
1447    pub fn path(&self) -> &ObjectStorePath {
1448        &self.path
1449    }
1450
1451    pub fn schema(&self) -> Arc<Schema> {
1452        self.schema
1453            .read()
1454            .expect("Schema lock poisoned - a thread panicked while holding it")
1455            .clone()
1456    }
1457
1458    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1459    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1460    fn normalize_function_names(expr: &str) -> String {
1461        let mut result = String::with_capacity(expr.len());
1462        let mut chars = expr.chars().peekable();
1463
1464        while let Some(ch) = chars.next() {
1465            if ch.is_alphabetic() {
1466                // Collect identifier
1467                let mut ident = String::new();
1468                ident.push(ch);
1469
1470                while let Some(&next) = chars.peek() {
1471                    if next.is_alphanumeric() || next == '_' {
1472                        ident.push(chars.next().unwrap());
1473                    } else {
1474                        break;
1475                    }
1476                }
1477
1478                // If followed by '(', it's a function call - uppercase it
1479                if chars.peek() == Some(&'(') {
1480                    result.push_str(&ident.to_uppercase());
1481                } else {
1482                    result.push_str(&ident); // Keep property names as-is
1483                }
1484            } else {
1485                result.push(ch);
1486            }
1487        }
1488
1489        result
1490    }
1491
1492    /// Generate a consistent internal column name for an expression index.
1493    /// Uses a hash suffix to ensure uniqueness for different expressions that
1494    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1495    ///
1496    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1497    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1498    /// if the hash changes after a compiler upgrade.
1499    pub fn generated_column_name(expr: &str) -> String {
1500        // Normalize function names to uppercase for case-insensitive matching
1501        let normalized = Self::normalize_function_names(expr);
1502
1503        let sanitized = normalized
1504            .replace(|c: char| !c.is_alphanumeric(), "_")
1505            .trim_matches('_')
1506            .to_string();
1507
1508        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1509        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1510        const FNV_PRIME: u64 = 1099511628211;
1511
1512        let mut hash = FNV_OFFSET_BASIS;
1513        for byte in normalized.as_bytes() {
1514            hash ^= *byte as u64;
1515            hash = hash.wrapping_mul(FNV_PRIME);
1516        }
1517
1518        format!("_gen_{}_{:x}", sanitized, hash)
1519    }
1520
1521    pub fn replace_schema(&self, new_schema: Schema) {
1522        let mut schema = self
1523            .schema
1524            .write()
1525            .expect("Schema lock poisoned - a thread panicked while holding it");
1526        *schema = Arc::new(new_schema);
1527    }
1528
1529    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1530    ///
1531    /// Used by `UniInner::at_fork` to give a forked session a schema view
1532    /// that includes any labels/edge-types/properties the fork has
1533    /// introduced on top of primary. The returned manager owns its own
1534    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1535    /// schema file. The returned manager is *not* intended for `.save()`;
1536    /// fork-overlay persistence is owned by the registry layer
1537    /// (`catalog/fork_schemas/{fork_id}.json`).
1538    ///
1539    /// In Phase 1 the delta is always empty, so the merge is a clone.
1540    /// Phase 2 starts populating it when on-the-fly label creation lands.
1541    #[must_use]
1542    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1543        let primary = self.schema();
1544        let merged = if overlay.is_empty() {
1545            (*primary).clone()
1546        } else {
1547            let mut merged = (*primary).clone();
1548            for (name, label) in &overlay.added_labels {
1549                merged.labels.insert(name.clone(), label.clone());
1550            }
1551            for (name, edge_type) in &overlay.added_edge_types {
1552                merged.edge_types.insert(name.clone(), edge_type.clone());
1553            }
1554            for addition in &overlay.added_properties {
1555                let props = merged.properties.entry(addition.owner.clone()).or_default();
1556                props.insert(
1557                    addition.property.clone(),
1558                    PropertyMeta {
1559                        r#type: addition.data_type.clone(),
1560                        nullable: addition.nullable,
1561                        added_in: merged.schema_version,
1562                        state: SchemaElementState::Active,
1563                        generation_expression: None,
1564                        description: None,
1565                    },
1566                );
1567            }
1568            merged
1569        };
1570
1571        Arc::new(Self {
1572            store: self.store.clone(),
1573            path: self.path.clone(),
1574            schema: RwLock::new(Arc::new(merged)),
1575        })
1576    }
1577
1578    pub fn next_label_id(&self) -> u16 {
1579        self.schema()
1580            .labels
1581            .values()
1582            .map(|l| l.id)
1583            .max()
1584            .unwrap_or(0)
1585            + 1
1586    }
1587
1588    pub fn next_type_id(&self) -> u32 {
1589        let max_schema_id = self
1590            .schema()
1591            .edge_types
1592            .values()
1593            .map(|t| t.id)
1594            .max()
1595            .unwrap_or(0);
1596
1597        // Ensure we stay in schema'd ID space (bit 31 = 0)
1598        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1599            panic!("Schema edge type ID exhaustion");
1600        }
1601
1602        max_schema_id + 1
1603    }
1604
1605    /// Validate a label or edge-type name at definition time. (L6)
1606    ///
1607    /// Names flow into on-disk dataset paths (`vertices_{name}.lance`) and
1608    /// Lance branch names (`fork_{id}_{…}`); a name with a path separator,
1609    /// whitespace, or a control character corrupts those paths and breaks
1610    /// fork creation. Such names were never actually usable, so they are
1611    /// rejected up front rather than failing later. `.` is allowed
1612    /// (path-safe and common in qualified names).
1613    ///
1614    /// Public so the fork-create path can apply the same rule as a backstop
1615    /// over names that entered the schema through an infallible interning
1616    /// path (e.g. schemaless `get_or_assign_edge_type_id`).
1617    ///
1618    /// # Errors
1619    /// Returns an error if `name` is empty/all-whitespace, exceeds
1620    /// `MAX_SCHEMA_NAME_LEN` bytes, or contains a control, whitespace,
1621    /// `/`, or `\` character.
1622    pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1623        if name.is_empty() || name.chars().all(char::is_whitespace) {
1624            return Err(anyhow!(
1625                "{kind} name must be non-empty and not all whitespace"
1626            ));
1627        }
1628        if name.len() > MAX_SCHEMA_NAME_LEN {
1629            return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1630        }
1631        if let Some(c) = name
1632            .chars()
1633            .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1634        {
1635            return Err(anyhow!(
1636                "{kind} name '{name}' contains an unsafe character ({c:?})"
1637            ));
1638        }
1639        Ok(())
1640    }
1641
1642    pub fn add_label(&self, name: &str) -> Result<u16> {
1643        self.add_label_with_desc(name, None)
1644    }
1645
1646    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1647        Self::validate_schema_element_name("Label", name)?;
1648        let mut guard = acquire_write(&self.schema, "schema")?;
1649        let schema = Arc::make_mut(&mut *guard);
1650        if schema.labels.contains_key(name) {
1651            return Err(anyhow!("Label '{}' already exists", name));
1652        }
1653
1654        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1655        if id >= VIRTUAL_LABEL_ID_START {
1656            return Err(anyhow!(
1657                "Native label space exhausted (next id {id:#x} would enter the \
1658                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1659                 reserved for catalog-resolved labels)"
1660            ));
1661        }
1662        schema.labels.insert(
1663            name.to_string(),
1664            LabelMeta {
1665                id,
1666                created_at: Utc::now(),
1667                state: SchemaElementState::Active,
1668                description,
1669            },
1670        );
1671        schema.bump_version();
1672        Ok(id)
1673    }
1674
1675    pub fn add_edge_type(
1676        &self,
1677        name: &str,
1678        src_labels: Vec<String>,
1679        dst_labels: Vec<String>,
1680    ) -> Result<u32> {
1681        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1682    }
1683
1684    pub fn add_edge_type_with_desc(
1685        &self,
1686        name: &str,
1687        src_labels: Vec<String>,
1688        dst_labels: Vec<String>,
1689        description: Option<String>,
1690    ) -> Result<u32> {
1691        Self::validate_schema_element_name("Edge type", name)?;
1692        let mut guard = acquire_write(&self.schema, "schema")?;
1693        let schema = Arc::make_mut(&mut *guard);
1694        if schema.edge_types.contains_key(name) {
1695            return Err(anyhow!("Edge type '{}' already exists", name));
1696        }
1697
1698        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1699
1700        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1701        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1702        // `add_edge_type`, so the two entry points cannot disagree on the
1703        // legal ceiling.
1704        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1705            return Err(anyhow!(
1706                "Native edge type space exhausted (next id {id:#x} would enter the \
1707                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1708                 reserved for catalog-resolved edge types)"
1709            ));
1710        }
1711
1712        schema.edge_types.insert(
1713            name.to_string(),
1714            EdgeTypeMeta {
1715                id,
1716                src_labels,
1717                dst_labels,
1718                state: SchemaElementState::Active,
1719                description,
1720            },
1721        );
1722        schema.bump_version();
1723        Ok(id)
1724    }
1725
1726    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1727    ///
1728    /// Read-lock fast path: the type name is almost always already known
1729    /// (it is constant per statement but resolved per row by the CREATE
1730    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1731    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1732    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1733    /// re-checks under the write lock, so two racing assigners converge on one id.
1734    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1735        {
1736            let guard = acquire_read(&self.schema, "schema")
1737                .expect("Schema lock poisoned - a thread panicked while holding it");
1738            if let Some(id) = guard.edge_type_id_unified(type_name) {
1739                return id;
1740            }
1741        }
1742        let mut guard = acquire_write(&self.schema, "schema")
1743            .expect("Schema lock poisoned - a thread panicked while holding it");
1744        let schema = Arc::make_mut(&mut *guard);
1745        schema.get_or_assign_edge_type_id(type_name)
1746    }
1747
1748    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1749    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1750        let schema = acquire_read(&self.schema, "schema")
1751            .expect("Schema lock poisoned - a thread panicked while holding it");
1752        schema.edge_type_name_by_id_unified(type_id)
1753    }
1754
1755    pub fn add_property(
1756        &self,
1757        label_or_type: &str,
1758        prop_name: &str,
1759        data_type: DataType,
1760        nullable: bool,
1761    ) -> Result<()> {
1762        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1763    }
1764
1765    pub fn add_property_with_desc(
1766        &self,
1767        label_or_type: &str,
1768        prop_name: &str,
1769        data_type: DataType,
1770        nullable: bool,
1771        description: Option<String>,
1772    ) -> Result<()> {
1773        validate_property_name(prop_name)?;
1774        let mut guard = acquire_write(&self.schema, "schema")?;
1775        let schema = Arc::make_mut(&mut *guard);
1776        let version = schema.schema_version;
1777        let props = schema
1778            .properties
1779            .entry(label_or_type.to_string())
1780            .or_default();
1781
1782        if props.contains_key(prop_name) {
1783            return Err(anyhow!(
1784                "Property '{}' already exists for '{}'",
1785                prop_name,
1786                label_or_type
1787            ));
1788        }
1789
1790        props.insert(
1791            prop_name.to_string(),
1792            PropertyMeta {
1793                r#type: data_type,
1794                nullable,
1795                added_in: version,
1796                state: SchemaElementState::Active,
1797                generation_expression: None,
1798                description,
1799            },
1800        );
1801        // Bump after stamping `added_in` with the pre-bump `version`.
1802        schema.bump_version();
1803        Ok(())
1804    }
1805
1806    /// Declares a property, idempotent when an identical declaration already exists.
1807    ///
1808    /// The schema-builder counterpart to [`Self::add_property_with_desc`] (which
1809    /// hard-errors on *any* re-add, as DDL `ALTER` semantics require). Re-applying a
1810    /// schema is common — every `apply()` re-declares — so an existing property with
1811    /// the same `data_type` and `nullable` is a no-op (`Ok(false)`; a differing
1812    /// `description` is ignored as docs-only). A differing type or nullability is a
1813    /// hard conflict: silently swallowing it let `VECTOR(4)` be "re-declared" as
1814    /// `VECTOR(8)` while the column stayed 4-dimensional (issue #137).
1815    ///
1816    /// Returns `true` if this call newly inserted the property.
1817    ///
1818    /// # Errors
1819    /// Returns an error when the property exists with a different type or nullability
1820    /// (the message deliberately does not contain "already exists", which callers
1821    /// historically string-matched to ignore benign re-adds), or when the property
1822    /// name is invalid.
1823    pub fn declare_property(
1824        &self,
1825        label_or_type: &str,
1826        prop_name: &str,
1827        data_type: DataType,
1828        nullable: bool,
1829        description: Option<String>,
1830    ) -> Result<bool> {
1831        validate_property_name(prop_name)?;
1832        let mut guard = acquire_write(&self.schema, "schema")?;
1833        let schema = Arc::make_mut(&mut *guard);
1834        let version = schema.schema_version;
1835        let props = schema
1836            .properties
1837            .entry(label_or_type.to_string())
1838            .or_default();
1839
1840        if let Some(existing) = props.get(prop_name) {
1841            if existing.r#type == data_type && existing.nullable == nullable {
1842                return Ok(false); // identical re-declaration (idempotent)
1843            }
1844            return Err(anyhow!(
1845                "Property '{}' on '{}' is declared as {:?} (nullable: {}); cannot re-declare \
1846                 as {:?} (nullable: {}). Property types are immutable — use a new property \
1847                 name or migrate the data",
1848                prop_name,
1849                label_or_type,
1850                existing.r#type,
1851                existing.nullable,
1852                data_type,
1853                nullable
1854            ));
1855        }
1856
1857        props.insert(
1858            prop_name.to_string(),
1859            PropertyMeta {
1860                r#type: data_type,
1861                nullable,
1862                added_in: version,
1863                state: SchemaElementState::Active,
1864                generation_expression: None,
1865                description,
1866            },
1867        );
1868        // Bump after stamping `added_in` with the pre-bump `version`.
1869        schema.bump_version();
1870        Ok(true)
1871    }
1872
1873    /// Register an INTERNAL property (underscore-prefixed name allowed) that is
1874    /// materialised by the storage layer, not written by the user — e.g. the MUVERA
1875    /// `__fde_*` derived column. Bypasses the user-facing underscore-prefix rule but
1876    /// still rejects storage-layer name collisions. Idempotent: a no-op if the property
1877    /// already exists with the same type (so re-creating an index is safe).
1878    ///
1879    /// Returns `true` if this call newly inserted the property, `false` if it already
1880    /// existed (idempotent). The check-and-insert is atomic under the schema write lock,
1881    /// so for concurrent callers exactly one observes `true` — letting callers gate
1882    /// expensive one-time work (e.g. the MUVERA backfill) on the winner.
1883    pub fn add_internal_property(
1884        &self,
1885        label_or_type: &str,
1886        prop_name: &str,
1887        data_type: DataType,
1888        nullable: bool,
1889    ) -> Result<bool> {
1890        validate_reserved_property_name(prop_name)?;
1891        let mut guard = acquire_write(&self.schema, "schema")?;
1892        let schema = Arc::make_mut(&mut *guard);
1893        let version = schema.schema_version;
1894        let props = schema
1895            .properties
1896            .entry(label_or_type.to_string())
1897            .or_default();
1898
1899        if let Some(existing) = props.get(prop_name) {
1900            if existing.r#type == data_type {
1901                return Ok(false); // already present (idempotent re-registration)
1902            }
1903            return Err(anyhow!(
1904                "Internal property '{}' already exists for '{}' with a different type",
1905                prop_name,
1906                label_or_type
1907            ));
1908        }
1909
1910        props.insert(
1911            prop_name.to_string(),
1912            PropertyMeta {
1913                r#type: data_type,
1914                nullable,
1915                added_in: version,
1916                state: SchemaElementState::Active,
1917                generation_expression: None,
1918                description: None,
1919            },
1920        );
1921        schema.bump_version();
1922        Ok(true)
1923    }
1924
1925    pub fn add_generated_property(
1926        &self,
1927        label_or_type: &str,
1928        prop_name: &str,
1929        data_type: DataType,
1930        expr: String,
1931    ) -> Result<()> {
1932        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1933        // but must still avoid storage-layer column-name collisions.
1934        validate_reserved_property_name(prop_name)?;
1935        let mut guard = acquire_write(&self.schema, "schema")?;
1936        let schema = Arc::make_mut(&mut *guard);
1937        let version = schema.schema_version;
1938        let props = schema
1939            .properties
1940            .entry(label_or_type.to_string())
1941            .or_default();
1942
1943        if props.contains_key(prop_name) {
1944            return Err(anyhow!("Property '{}' already exists", prop_name));
1945        }
1946
1947        props.insert(
1948            prop_name.to_string(),
1949            PropertyMeta {
1950                r#type: data_type,
1951                nullable: true,
1952                added_in: version,
1953                state: SchemaElementState::Active,
1954                generation_expression: Some(expr),
1955                description: None,
1956            },
1957        );
1958        // Bump after stamping `added_in` with the pre-bump `version`.
1959        schema.bump_version();
1960        Ok(())
1961    }
1962
1963    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1964        let mut guard = acquire_write(&self.schema, "schema")?;
1965        let schema = Arc::make_mut(&mut *guard);
1966        let meta = schema
1967            .labels
1968            .get_mut(name)
1969            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1970        meta.description = description;
1971        Ok(())
1972    }
1973
1974    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1975        let mut guard = acquire_write(&self.schema, "schema")?;
1976        let schema = Arc::make_mut(&mut *guard);
1977        let meta = schema
1978            .edge_types
1979            .get_mut(name)
1980            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1981        meta.description = description;
1982        Ok(())
1983    }
1984
1985    pub fn set_property_description(
1986        &self,
1987        entity: &str,
1988        prop_name: &str,
1989        description: Option<String>,
1990    ) -> Result<()> {
1991        let mut guard = acquire_write(&self.schema, "schema")?;
1992        let schema = Arc::make_mut(&mut *guard);
1993        let props = schema
1994            .properties
1995            .get_mut(entity)
1996            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1997        let meta = props
1998            .get_mut(prop_name)
1999            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
2000        meta.description = description;
2001        Ok(())
2002    }
2003
2004    /// Register an index definition on the schema, **upsert by name**.
2005    ///
2006    /// If an index with the same `IndexDefinition::name()` already exists, it
2007    /// is replaced in place; otherwise the def is appended. Idempotent under
2008    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
2009    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
2010    /// every `IndexManager::create_*_index` re-record metadata updates without
2011    /// duplicating entries (issue rustic-ai/uni-db#63).
2012    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
2013        let mut guard = acquire_write(&self.schema, "schema")?;
2014        let schema = Arc::make_mut(&mut *guard);
2015        if let Some(existing) = schema
2016            .indexes
2017            .iter_mut()
2018            .find(|i| i.name() == index_def.name())
2019        {
2020            *existing = index_def;
2021        } else {
2022            schema.indexes.push(index_def);
2023        }
2024        schema.bump_version();
2025        Ok(())
2026    }
2027
2028    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
2029        let schema = self.schema.read().expect("Schema lock poisoned");
2030        schema.indexes.iter().find(|i| i.name() == name).cloned()
2031    }
2032
2033    /// Updates the lifecycle metadata for an index by name.
2034    ///
2035    /// The closure receives a mutable reference to the index's `IndexMetadata`,
2036    /// allowing callers to update status, timestamps, etc.
2037    pub fn update_index_metadata(
2038        &self,
2039        index_name: &str,
2040        f: impl FnOnce(&mut IndexMetadata),
2041    ) -> Result<()> {
2042        let mut guard = acquire_write(&self.schema, "schema")?;
2043        let schema = Arc::make_mut(&mut *guard);
2044        let idx = schema
2045            .indexes
2046            .iter_mut()
2047            .find(|i| i.name() == index_name)
2048            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
2049        f(idx.metadata_mut());
2050        Ok(())
2051    }
2052
2053    pub fn remove_index(&self, name: &str) -> Result<()> {
2054        let mut guard = acquire_write(&self.schema, "schema")?;
2055        let schema = Arc::make_mut(&mut *guard);
2056        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
2057            schema.indexes.remove(pos);
2058            schema.bump_version();
2059            Ok(())
2060        } else {
2061            Err(anyhow!("Index '{}' not found", name))
2062        }
2063    }
2064
2065    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
2066        let mut guard = acquire_write(&self.schema, "schema")?;
2067        let schema = Arc::make_mut(&mut *guard);
2068        if schema.constraints.iter().any(|c| c.name == constraint.name) {
2069            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
2070        }
2071        schema.constraints.push(constraint);
2072        schema.bump_version();
2073        Ok(())
2074    }
2075
2076    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
2077        let mut guard = acquire_write(&self.schema, "schema")?;
2078        let schema = Arc::make_mut(&mut *guard);
2079        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
2080            schema.constraints.remove(pos);
2081            schema.bump_version();
2082            Ok(())
2083        } else if if_exists {
2084            Ok(())
2085        } else {
2086            Err(anyhow!("Constraint '{}' not found", name))
2087        }
2088    }
2089
2090    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
2091        let mut guard = acquire_write(&self.schema, "schema")?;
2092        let schema = Arc::make_mut(&mut *guard);
2093        let Some(props) = schema.properties.get_mut(label_or_type) else {
2094            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
2095        };
2096        if props.remove(prop_name).is_none() {
2097            return Err(anyhow!(
2098                "Property '{}' not found for '{}'",
2099                prop_name,
2100                label_or_type
2101            ));
2102        }
2103        schema.bump_version();
2104        Ok(())
2105    }
2106
2107    pub fn rename_property(
2108        &self,
2109        label_or_type: &str,
2110        old_name: &str,
2111        new_name: &str,
2112    ) -> Result<()> {
2113        let mut guard = acquire_write(&self.schema, "schema")?;
2114        let schema = Arc::make_mut(&mut *guard);
2115        let Some(props) = schema.properties.get_mut(label_or_type) else {
2116            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
2117        };
2118        let Some(meta) = props.remove(old_name) else {
2119            return Err(anyhow!(
2120                "Property '{}' not found for '{}'",
2121                old_name,
2122                label_or_type
2123            ));
2124        };
2125        if props.contains_key(new_name) {
2126            // Rollback removal? Or just error.
2127            props.insert(old_name.to_string(), meta); // Restore
2128            return Err(anyhow!("Property '{}' already exists", new_name));
2129        }
2130        props.insert(new_name.to_string(), meta);
2131        schema.bump_version();
2132        Ok(())
2133    }
2134
2135    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
2136        let mut guard = acquire_write(&self.schema, "schema")?;
2137        let schema = Arc::make_mut(&mut *guard);
2138        if let Some(label_meta) = schema.labels.get_mut(name) {
2139            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
2140            // Do not remove properties; they are implicitly tombstoned by the label
2141            schema.bump_version();
2142            Ok(())
2143        } else if if_exists {
2144            Ok(())
2145        } else {
2146            Err(anyhow!("Label '{}' not found", name))
2147        }
2148    }
2149
2150    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
2151        let mut guard = acquire_write(&self.schema, "schema")?;
2152        let schema = Arc::make_mut(&mut *guard);
2153        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
2154            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
2155            // Do not remove properties; they are implicitly tombstoned by the edge type
2156            schema.bump_version();
2157            Ok(())
2158        } else if if_exists {
2159            Ok(())
2160        } else {
2161            Err(anyhow!("Edge Type '{}' not found", name))
2162        }
2163    }
2164}
2165
2166/// Validate identifier names to prevent injection and ensure compatibility.
2167pub fn validate_identifier(name: &str) -> Result<()> {
2168    // Length check
2169    if name.is_empty() || name.len() > 64 {
2170        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
2171    }
2172
2173    // First character must be letter or underscore
2174    let first = name.chars().next().unwrap();
2175    if !first.is_alphabetic() && first != '_' {
2176        return Err(anyhow!(
2177            "Identifier '{}' must start with letter or underscore",
2178            name
2179        ));
2180    }
2181
2182    // Remaining characters: alphanumeric or underscore
2183    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
2184        return Err(anyhow!(
2185            "Identifier '{}' must contain only alphanumeric and underscore",
2186            name
2187        ));
2188    }
2189
2190    // Reserved words
2191    const RESERVED: &[&str] = &[
2192        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
2193        "UNION", "ORDER", "LIMIT",
2194    ];
2195    if RESERVED.contains(&name.to_uppercase().as_str()) {
2196        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
2197    }
2198
2199    Ok(())
2200}
2201
2202/// Reject user-declared property names that collide with internal Arrow column
2203/// names used by the storage layer.
2204///
2205/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
2206/// schema with two `ext_id` fields at flush time, which Lance rejects with
2207/// "Duplicate field name" — silently losing all in-session writes on shutdown.
2208pub fn validate_property_name(name: &str) -> Result<()> {
2209    if name.starts_with('_') {
2210        return Err(anyhow!(
2211            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
2212            name
2213        ));
2214    }
2215    validate_reserved_property_name(name)
2216}
2217
2218/// Reject names that collide with storage-layer Arrow column names.
2219///
2220/// Used both by `validate_property_name` (user-facing path) and directly by
2221/// `add_generated_property` (system-generated `_gen_*` path) — the latter
2222/// needs to bypass the underscore-prefix rule but must still reject the
2223/// fixed-name collisions below.
2224fn validate_reserved_property_name(name: &str) -> Result<()> {
2225    // Unprefixed names that get appended alongside user properties in the
2226    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
2227    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
2228    // Arrow schemas — declaring one of these as a user property produces a
2229    // duplicate Arrow field and a Lance "Duplicate field name" error at
2230    // flush time. Fixed-schema-only columns (`type`, `props_json`,
2231    // `labels` in the main tables) are NOT listed: those tables don't
2232    // append user properties, so no collision can occur.
2233    const RESERVED_PROPS: &[&str] = &[
2234        "ext_id",
2235        "overflow_json",
2236        "eid",
2237        "src_vid",
2238        "dst_vid",
2239        "op",
2240        // Internal planner sentinel: a column-name marker used by
2241        // `mark_set_item_variables` (uni-query::query::planner) to request
2242        // narrow structural projection without full-schema expansion.
2243        // Reserved here defensively so an internal `add_generated_property`
2244        // path can't accidentally create a colliding user-facing column.
2245        // The user-facing `validate_property_name` already rejects this
2246        // via the underscore-prefix rule, so this is belt-and-suspenders.
2247        "__set_struct__",
2248    ];
2249    if RESERVED_PROPS.contains(&name) {
2250        return Err(anyhow!(
2251            "Property name '{}' is reserved by the storage layer; please choose a different name",
2252            name
2253        ));
2254    }
2255    Ok(())
2256}
2257
2258#[cfg(test)]
2259mod tests {
2260    use super::*;
2261    use crate::value::{TemporalValue, Value};
2262    use object_store::local::LocalFileSystem;
2263    use tempfile::tempdir;
2264
2265    #[test]
2266    fn test_datatype_accepts_matrix() {
2267        let dt = || TemporalValue::DateTime {
2268            nanos_since_epoch: 0,
2269            offset_seconds: 0,
2270            timezone_name: None,
2271        };
2272
2273        // Null is accepted by every type (nullability checked separately).
2274        for ty in [
2275            DataType::String,
2276            DataType::Int64,
2277            DataType::Bool,
2278            DataType::DateTime,
2279            DataType::Float64,
2280        ] {
2281            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
2282        }
2283
2284        // Exact-type matches.
2285        assert!(DataType::String.accepts(&Value::String("x".into())));
2286        assert!(DataType::Int64.accepts(&Value::Int(1)));
2287        assert!(DataType::Bool.accepts(&Value::Bool(true)));
2288        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
2289
2290        // Intentional lossless widenings remain allowed.
2291        assert!(
2292            DataType::Float64.accepts(&Value::Int(3)),
2293            "Int widens to Float"
2294        );
2295        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
2296        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
2297        assert!(
2298            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2299            "storage parses strings for non-struct Timestamp columns"
2300        );
2301
2302        // The #68 data-loss cases must be rejected (coercion handles strings separately).
2303        assert!(
2304            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2305            "String into a DateTime struct column nulls silently — reject here"
2306        );
2307        assert!(!DataType::Bool.accepts(&Value::Int(1)));
2308        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
2309        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
2310        assert!(
2311            !DataType::String.accepts(&Value::Int(10)),
2312            "no implicit stringification"
2313        );
2314        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
2315
2316        // Opaque columns accept anything.
2317        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
2318    }
2319
2320    #[test]
2321    fn test_check_vector_dims_matrix() {
2322        let vec3 = DataType::Vector { dimensions: 3 };
2323        let multi2 = DataType::List(Box::new(DataType::Vector { dimensions: 2 }));
2324        let flist = |vals: &[f64]| Value::List(vals.iter().map(|f| Value::Float(*f)).collect());
2325
2326        // Null is accepted everywhere (nullability enforced separately).
2327        assert!(vec3.check_vector_dims(&Value::Null).is_ok());
2328        assert!(multi2.check_vector_dims(&Value::Null).is_ok());
2329
2330        // Correct-dimension values pass; Int elements are numeric.
2331        assert!(
2332            vec3.check_vector_dims(&Value::Vector(vec![1.0, 2.0, 3.0]))
2333                .is_ok()
2334        );
2335        assert!(vec3.check_vector_dims(&flist(&[1.0, 2.0, 3.0])).is_ok());
2336        assert!(
2337            vec3.check_vector_dims(&Value::List(vec![
2338                Value::Int(1),
2339                Value::Float(2.0),
2340                Value::Int(3)
2341            ]))
2342            .is_ok()
2343        );
2344
2345        // The #137 cases: wrong length, empty list, non-numeric element, wrong shape.
2346        assert_eq!(
2347            vec3.check_vector_dims(&Value::Vector(vec![1.0, 2.0])),
2348            Err(VectorDimError::WrongLength {
2349                expected: 3,
2350                actual: 2
2351            })
2352        );
2353        assert_eq!(
2354            vec3.check_vector_dims(&flist(&[1.0, 2.0, 3.0, 4.0, 5.0])),
2355            Err(VectorDimError::WrongLength {
2356                expected: 3,
2357                actual: 5
2358            })
2359        );
2360        assert_eq!(
2361            vec3.check_vector_dims(&Value::List(vec![])),
2362            Err(VectorDimError::WrongLength {
2363                expected: 3,
2364                actual: 0
2365            })
2366        );
2367        assert_eq!(
2368            vec3.check_vector_dims(&Value::List(vec![
2369                Value::Float(1.0),
2370                Value::String("x".into()),
2371                Value::Float(3.0),
2372            ])),
2373            Err(VectorDimError::NonNumericElement { index: 1 })
2374        );
2375        assert_eq!(
2376            vec3.check_vector_dims(&Value::List(vec![
2377                Value::Float(1.0),
2378                Value::Null,
2379                Value::Float(3.0)
2380            ])),
2381            Err(VectorDimError::NonNumericElement { index: 1 })
2382        );
2383        assert_eq!(
2384            vec3.check_vector_dims(&Value::String("not a vector".into())),
2385            Err(VectorDimError::NotAVector { actual: "String" })
2386        );
2387
2388        // Multi-vector: empty token list is a legal empty multi-vector; each
2389        // token must match the declared per-token dimensions.
2390        assert!(multi2.check_vector_dims(&Value::List(vec![])).is_ok());
2391        assert!(
2392            multi2
2393                .check_vector_dims(&Value::List(vec![flist(&[1.0, 2.0]), flist(&[3.0, 4.0])]))
2394                .is_ok()
2395        );
2396        assert_eq!(
2397            multi2.check_vector_dims(&Value::List(vec![
2398                flist(&[1.0, 2.0]),
2399                flist(&[9.0, 9.0, 9.0])
2400            ])),
2401            Err(VectorDimError::TokenWrongLength {
2402                token: 1,
2403                expected: 2,
2404                actual: 3
2405            })
2406        );
2407        assert_eq!(
2408            multi2.check_vector_dims(&Value::List(vec![Value::String("tok".into())])),
2409            Err(VectorDimError::TokenNotAVector {
2410                token: 0,
2411                actual: "String"
2412            })
2413        );
2414        assert_eq!(
2415            multi2.check_vector_dims(&Value::Vector(vec![1.0, 2.0])),
2416            Err(VectorDimError::NotATokenList { actual: "Vector" })
2417        );
2418
2419        // Non-vector declared types never object, so callers may check unconditionally.
2420        assert!(
2421            DataType::Int64
2422                .check_vector_dims(&Value::String("x".into()))
2423                .is_ok()
2424        );
2425        assert!(
2426            DataType::List(Box::new(DataType::Float64))
2427                .check_vector_dims(&Value::List(vec![Value::String("x".into())]))
2428                .is_ok()
2429        );
2430        assert!(
2431            DataType::SparseVector { dimensions: 8 }
2432                .check_vector_dims(&Value::Map(Default::default()))
2433                .is_ok()
2434        );
2435
2436        // Error rendering carries both lengths so write errors are actionable.
2437        let msg = VectorDimError::WrongLength {
2438            expected: 4,
2439            actual: 5,
2440        }
2441        .to_string();
2442        assert!(msg.contains('4') && msg.contains('5'), "message: {msg}");
2443    }
2444
2445    #[tokio::test]
2446    async fn test_declare_property_idempotent_and_conflicting() -> Result<()> {
2447        let dir = tempdir()?;
2448        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2449        let path = ObjectStorePath::from("schema.json");
2450        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2451
2452        manager.add_label("Doc")?;
2453        let vec4 = DataType::Vector { dimensions: 4 };
2454
2455        // First declaration inserts.
2456        assert!(manager.declare_property("Doc", "embedding", vec4.clone(), true, None)?);
2457
2458        // Identical re-declaration is an idempotent no-op — the register-on-every-open
2459        // pattern; a differing description is docs-only and also ignored.
2460        assert!(!manager.declare_property("Doc", "embedding", vec4.clone(), true, None)?);
2461        assert!(!manager.declare_property(
2462            "Doc",
2463            "embedding",
2464            vec4.clone(),
2465            true,
2466            Some("new docs".into())
2467        )?);
2468
2469        // A dimension change is a conflict (#137 case c), and the message must not
2470        // contain "already exists" (historically string-matched and swallowed).
2471        let err = manager
2472            .declare_property(
2473                "Doc",
2474                "embedding",
2475                DataType::Vector { dimensions: 8 },
2476                true,
2477                None,
2478            )
2479            .unwrap_err()
2480            .to_string();
2481        assert!(err.contains('4') && err.contains('8'), "message: {err}");
2482        assert!(!err.contains("already exists"), "message: {err}");
2483
2484        // Nullability flips are conflicts too — they change NOT NULL enforcement.
2485        assert!(
2486            manager
2487                .declare_property("Doc", "embedding", vec4.clone(), false, None)
2488                .is_err()
2489        );
2490
2491        // The schema still holds the original declaration.
2492        let schema = manager.schema();
2493        let meta = &schema.properties["Doc"]["embedding"];
2494        assert_eq!(meta.r#type, vec4);
2495        assert!(meta.nullable);
2496        Ok(())
2497    }
2498
2499    #[tokio::test]
2500    async fn test_schema_management() -> Result<()> {
2501        let dir = tempdir()?;
2502        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2503        let path = ObjectStorePath::from("schema.json");
2504        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2505
2506        // Labels
2507        let lid = manager.add_label("Person")?;
2508        assert_eq!(lid, 1);
2509        assert!(manager.add_label("Person").is_err());
2510
2511        // Properties
2512        manager.add_property("Person", "name", DataType::String, false)?;
2513        assert!(
2514            manager
2515                .add_property("Person", "name", DataType::String, false)
2516                .is_err()
2517        );
2518
2519        // Edge types
2520        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
2521        assert_eq!(tid, 1);
2522
2523        manager.save().await?;
2524        // Check file exists
2525        assert!(store.get(&path).await.is_ok());
2526
2527        let manager2 = SchemaManager::load_from_store(store, &path).await?;
2528        assert!(manager2.schema().labels.contains_key("Person"));
2529        assert!(
2530            manager2
2531                .schema()
2532                .properties
2533                .get("Person")
2534                .unwrap()
2535                .contains_key("name")
2536        );
2537
2538        Ok(())
2539    }
2540
2541    #[tokio::test]
2542    async fn test_reserved_property_names_rejected() -> Result<()> {
2543        let dir = tempdir()?;
2544        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2545        let path = ObjectStorePath::from("schema.json");
2546        let manager = SchemaManager::load_from_store(store, &path).await?;
2547
2548        manager.add_label("Tiny")?;
2549
2550        // Unprefixed reserved names — these collide with internal Arrow
2551        // columns in storage tables and previously caused Lance
2552        // "Duplicate field name" errors at flush time.
2553        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
2554            let err = manager
2555                .add_property("Tiny", reserved, DataType::String, true)
2556                .expect_err(&format!("expected '{reserved}' to be rejected"));
2557            assert!(
2558                err.to_string().contains("reserved"),
2559                "error for '{reserved}' should mention 'reserved', got: {err}"
2560            );
2561        }
2562
2563        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
2564        // alongside the underscore-prefix rule). Confirms an internal
2565        // `add_generated_property` path cannot accidentally create a column
2566        // that collides with the SET-target structural-projection marker.
2567        let err = manager
2568            .add_property("Tiny", "__set_struct__", DataType::String, true)
2569            .expect_err("expected '__set_struct__' to be rejected");
2570        assert!(
2571            err.to_string().contains("reserved"),
2572            "__set_struct__ rejection should mention 'reserved', got: {err}"
2573        );
2574
2575        // Leading-underscore pattern rule.
2576        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2577            assert!(
2578                manager
2579                    .add_property("Tiny", reserved, DataType::String, true)
2580                    .is_err(),
2581                "expected '{reserved}' to be rejected"
2582            );
2583        }
2584
2585        // Names that merely contain a reserved substring should still be
2586        // accepted.
2587        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2588        manager.add_property("Tiny", "user_op", DataType::String, true)?;
2589        manager.add_property("Tiny", "type_name", DataType::String, true)?;
2590
2591        // Same check applies to edge-type properties (single dispatch).
2592        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2593        assert!(
2594            manager
2595                .add_property("knows", "src_vid", DataType::Int64, true)
2596                .is_err()
2597        );
2598
2599        // And to generated properties.
2600        assert!(
2601            manager
2602                .add_generated_property(
2603                    "Tiny",
2604                    "ext_id",
2605                    DataType::String,
2606                    "concat('x', name)".into()
2607                )
2608                .is_err()
2609        );
2610
2611        Ok(())
2612    }
2613
2614    #[test]
2615    fn test_normalize_function_names() {
2616        assert_eq!(
2617            SchemaManager::normalize_function_names("lower(email)"),
2618            "LOWER(email)"
2619        );
2620        assert_eq!(
2621            SchemaManager::normalize_function_names("LOWER(email)"),
2622            "LOWER(email)"
2623        );
2624        assert_eq!(
2625            SchemaManager::normalize_function_names("Lower(email)"),
2626            "LOWER(email)"
2627        );
2628        assert_eq!(
2629            SchemaManager::normalize_function_names("trim(lower(email))"),
2630            "TRIM(LOWER(email))"
2631        );
2632    }
2633
2634    #[test]
2635    fn test_generated_column_name_case_insensitive() {
2636        let col1 = SchemaManager::generated_column_name("lower(email)");
2637        let col2 = SchemaManager::generated_column_name("LOWER(email)");
2638        let col3 = SchemaManager::generated_column_name("Lower(email)");
2639        assert_eq!(col1, col2);
2640        assert_eq!(col2, col3);
2641        assert!(col1.starts_with("_gen_LOWER_email_"));
2642    }
2643
2644    #[test]
2645    fn test_index_metadata_serde_backward_compat() {
2646        // Simulate old JSON without metadata field
2647        let json = r#"{
2648            "type": "Scalar",
2649            "name": "idx_person_name",
2650            "label": "Person",
2651            "properties": ["name"],
2652            "index_type": "BTree",
2653            "where_clause": null
2654        }"#;
2655        let def: IndexDefinition = serde_json::from_str(json).unwrap();
2656        let meta = def.metadata();
2657        assert_eq!(meta.status, IndexStatus::Online);
2658        assert!(meta.last_built_at.is_none());
2659        assert!(meta.row_count_at_build.is_none());
2660    }
2661
2662    #[test]
2663    fn test_index_metadata_serde_roundtrip() {
2664        let now = Utc::now();
2665        let def = IndexDefinition::Scalar(ScalarIndexConfig {
2666            name: "idx_test".to_string(),
2667            label: "Test".to_string(),
2668            properties: vec!["prop".to_string()],
2669            index_type: ScalarIndexType::BTree,
2670            where_clause: None,
2671            metadata: IndexMetadata {
2672                status: IndexStatus::Building,
2673                last_built_at: Some(now),
2674                row_count_at_build: Some(42),
2675            },
2676        });
2677
2678        let json = serde_json::to_string(&def).unwrap();
2679        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2680        assert_eq!(parsed.metadata().status, IndexStatus::Building);
2681        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2682        assert!(parsed.metadata().last_built_at.is_some());
2683    }
2684
2685    #[tokio::test]
2686    async fn test_update_index_metadata() -> Result<()> {
2687        let dir = tempdir()?;
2688        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2689        let path = ObjectStorePath::from("schema.json");
2690        let manager = SchemaManager::load_from_store(store, &path).await?;
2691
2692        manager.add_label("Person")?;
2693        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2694            name: "idx_test".to_string(),
2695            label: "Person".to_string(),
2696            properties: vec!["name".to_string()],
2697            index_type: ScalarIndexType::BTree,
2698            where_clause: None,
2699            metadata: Default::default(),
2700        });
2701        manager.add_index(idx)?;
2702
2703        // Verify initial status is Online
2704        let initial = manager.get_index("idx_test").unwrap();
2705        assert_eq!(initial.metadata().status, IndexStatus::Online);
2706
2707        // Update to Building
2708        manager.update_index_metadata("idx_test", |m| {
2709            m.status = IndexStatus::Building;
2710            m.row_count_at_build = Some(100);
2711        })?;
2712
2713        let updated = manager.get_index("idx_test").unwrap();
2714        assert_eq!(updated.metadata().status, IndexStatus::Building);
2715        assert_eq!(updated.metadata().row_count_at_build, Some(100));
2716
2717        // Non-existent index should error
2718        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2719
2720        Ok(())
2721    }
2722
2723    /// `add_internal_property` reports whether THIS call inserted the property: `true` on
2724    /// first insert, `false` on idempotent re-registration, `Err` on a type conflict. The
2725    /// MUVERA backfill gates on this (only the inserter backfills), so two concurrent
2726    /// creates of the same index can't both run the full-table rewrite (issue #107).
2727    #[tokio::test]
2728    async fn add_internal_property_reports_newly_added() -> Result<()> {
2729        let dir = tempdir()?;
2730        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2731        let path = ObjectStorePath::from("schema.json");
2732        let manager = SchemaManager::load_from_store(store, &path).await?;
2733        manager.add_label("Doc")?;
2734
2735        let dt = DataType::Vector { dimensions: 16 };
2736        // First registration: newly added.
2737        assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2738        // Idempotent re-registration with the same type: NOT newly added.
2739        assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2740        // Same name, conflicting type: hard error (no silent divergence).
2741        assert!(
2742            manager
2743                .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2744                .is_err()
2745        );
2746        Ok(())
2747    }
2748
2749    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
2750    /// invocations with the same `IndexDefinition::name()` must replace
2751    /// the entry in place rather than appending. Subsequent `add_index`
2752    /// calls also reflect metadata updates from the new definition.
2753    #[tokio::test]
2754    async fn test_add_index_is_upsert_by_name() -> Result<()> {
2755        let dir = tempdir()?;
2756        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2757        let path = ObjectStorePath::from("schema.json");
2758        let manager = SchemaManager::load_from_store(store, &path).await?;
2759        manager.add_label("Person")?;
2760
2761        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2762            name: "idx_test".to_string(),
2763            label: "Person".to_string(),
2764            properties: vec!["name".to_string()],
2765            index_type: ScalarIndexType::BTree,
2766            where_clause: None,
2767            metadata: IndexMetadata {
2768                status: IndexStatus::Building,
2769                ..Default::default()
2770            },
2771        });
2772        manager.add_index(initial.clone())?;
2773        assert_eq!(manager.schema().indexes.len(), 1);
2774
2775        // Re-add the identical def — must remain a single entry.
2776        manager.add_index(initial.clone())?;
2777        assert_eq!(
2778            manager.schema().indexes.len(),
2779            1,
2780            "duplicate add_index by name must not append"
2781        );
2782
2783        // Re-add with updated metadata — must replace in place, len unchanged.
2784        let mut updated_cfg = match initial {
2785            IndexDefinition::Scalar(c) => c,
2786            _ => unreachable!(),
2787        };
2788        updated_cfg.metadata.status = IndexStatus::Online;
2789        updated_cfg.metadata.row_count_at_build = Some(42);
2790        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2791        assert_eq!(manager.schema().indexes.len(), 1);
2792        let stored = manager.get_index("idx_test").unwrap();
2793        assert_eq!(stored.metadata().status, IndexStatus::Online);
2794        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2795
2796        // A *different* name appends as a new entry.
2797        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2798            name: "idx_other".to_string(),
2799            label: "Person".to_string(),
2800            properties: vec!["age".to_string()],
2801            index_type: ScalarIndexType::BTree,
2802            where_clause: None,
2803            metadata: IndexMetadata::default(),
2804        });
2805        manager.add_index(other)?;
2806        assert_eq!(manager.schema().indexes.len(), 2);
2807
2808        Ok(())
2809    }
2810
2811    /// `load_from_store` self-heals catalogs that were bloated by the
2812    /// pre-fix `add_index` (kept the *last* def per name).
2813    #[tokio::test]
2814    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2815        let dir = tempdir()?;
2816        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2817        let path = ObjectStorePath::from("schema.json");
2818
2819        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2820        // sharing the same name. The last entry has distinct metadata so
2821        // we can assert "last writer wins" semantics.
2822        let mut schema = Schema::default();
2823        schema.labels.insert(
2824            "Person".to_string(),
2825            LabelMeta {
2826                id: 1,
2827                created_at: chrono::Utc::now(),
2828                state: SchemaElementState::Active,
2829                description: None,
2830            },
2831        );
2832        let make = |status: IndexStatus, count: Option<u64>| {
2833            IndexDefinition::Scalar(ScalarIndexConfig {
2834                name: "idx_dup".to_string(),
2835                label: "Person".to_string(),
2836                properties: vec!["name".to_string()],
2837                index_type: ScalarIndexType::BTree,
2838                where_clause: None,
2839                metadata: IndexMetadata {
2840                    status,
2841                    row_count_at_build: count,
2842                    ..Default::default()
2843                },
2844            })
2845        };
2846        for _ in 0..49 {
2847            schema.indexes.push(make(IndexStatus::Building, None));
2848        }
2849        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2850        let json = serde_json::to_string_pretty(&schema)?;
2851        store.put(&path, json.into()).await?;
2852
2853        let manager = SchemaManager::load_from_store(store, &path).await?;
2854        let schema = manager.schema();
2855        assert_eq!(
2856            schema.indexes.len(),
2857            1,
2858            "load() must collapse 50 duplicates by name to 1"
2859        );
2860        // Last-writer-wins: the kept entry is the final push (Online, 123).
2861        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2862        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2863
2864        Ok(())
2865    }
2866
2867    #[test]
2868    fn test_vector_index_for_property_skips_non_online() {
2869        let mut schema = Schema::default();
2870        schema.labels.insert(
2871            "Document".to_string(),
2872            LabelMeta {
2873                id: 1,
2874                created_at: chrono::Utc::now(),
2875                state: SchemaElementState::Active,
2876                description: None,
2877            },
2878        );
2879
2880        // Add a vector index with Stale status
2881        schema
2882            .indexes
2883            .push(IndexDefinition::Vector(VectorIndexConfig {
2884                name: "vec_doc_embedding".to_string(),
2885                label: "Document".to_string(),
2886                property: "embedding".to_string(),
2887                index_type: VectorIndexType::Flat,
2888                metric: DistanceMetric::Cosine,
2889                embedding_config: None,
2890                metadata: IndexMetadata {
2891                    status: IndexStatus::Stale,
2892                    ..Default::default()
2893                },
2894            }));
2895
2896        // Stale index should NOT be returned
2897        assert!(
2898            schema
2899                .vector_index_for_property("Document", "embedding")
2900                .is_none()
2901        );
2902
2903        // Set to Online — should now be returned
2904        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2905            cfg.metadata.status = IndexStatus::Online;
2906        }
2907        let result = schema.vector_index_for_property("Document", "embedding");
2908        assert!(result.is_some());
2909        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2910    }
2911
2912    #[tokio::test]
2913    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2914        use crate::core::fork::SchemaDelta;
2915
2916        let dir = tempdir()?;
2917        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2918        let path = ObjectStorePath::from("schema.json");
2919        let primary = SchemaManager::load_from_store(store, &path).await?;
2920        primary.add_label("Person")?;
2921
2922        let overlay = primary.with_overlay(&SchemaDelta::empty());
2923        assert_eq!(overlay.schema().labels.len(), 1);
2924
2925        // Phase 1 invariant: mutating the overlay manager must not bleed
2926        // into primary's schema.
2927        overlay.add_label("Forked")?;
2928        assert!(overlay.schema().labels.contains_key("Forked"));
2929        assert!(!primary.schema().labels.contains_key("Forked"));
2930
2931        Ok(())
2932    }
2933
2934    #[tokio::test]
2935    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2936        use crate::core::fork::SchemaDelta;
2937        use chrono::Utc;
2938
2939        let dir = tempdir()?;
2940        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2941        let path = ObjectStorePath::from("schema.json");
2942        let primary = SchemaManager::load_from_store(store, &path).await?;
2943        primary.add_label("Existing")?;
2944
2945        let label_meta = LabelMeta {
2946            id: 99,
2947            created_at: Utc::now(),
2948            state: SchemaElementState::Active,
2949            description: None,
2950        };
2951        let edge_meta = EdgeTypeMeta {
2952            id: 99,
2953            src_labels: vec!["NewLabel".into()],
2954            dst_labels: vec!["NewLabel".into()],
2955            state: SchemaElementState::Active,
2956            description: None,
2957        };
2958        let delta = SchemaDelta {
2959            added_labels: vec![("NewLabel".to_string(), label_meta)],
2960            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2961            added_properties: vec![],
2962        };
2963
2964        let overlay = primary.with_overlay(&delta);
2965        let merged = overlay.schema();
2966        assert!(merged.labels.contains_key("Existing"));
2967        assert!(merged.labels.contains_key("NewLabel"));
2968        assert!(merged.edge_types.contains_key("NewEdge"));
2969
2970        // Primary unchanged.
2971        assert!(!primary.schema().labels.contains_key("NewLabel"));
2972        Ok(())
2973    }
2974
2975    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2976    /// must converge on a single id (the read-lock fast path double-checks
2977    /// under the write lock); a schema-defined type must win over the
2978    /// schemaless registry.
2979    #[tokio::test]
2980    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2981        let dir = tempdir()?;
2982        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2983        let path = ObjectStorePath::from("schema.json");
2984        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2985
2986        let mut handles = Vec::new();
2987        for _ in 0..16 {
2988            let m = manager.clone();
2989            handles.push(std::thread::spawn(move || {
2990                m.get_or_assign_edge_type_id("RACED")
2991            }));
2992        }
2993        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2994        assert!(
2995            ids.iter().all(|&id| id == ids[0]),
2996            "all racers must observe one id, got {ids:?}"
2997        );
2998        // Fast path returns the same id afterwards.
2999        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
3000
3001        // Schema-defined type wins over the schemaless registry.
3002        manager.add_label("A")?;
3003        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
3004        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
3005        Ok(())
3006    }
3007
3008    /// Minting a brand-new schemaless edge type must bump `schema_version`
3009    /// (the plan cache keys on it; untyped traversals bake `all_edge_type_ids()`
3010    /// into the plan, so a stale plan would silently drop edges of the new
3011    /// type). Re-resolving an existing type must NOT bump. (review C5)
3012    #[test]
3013    fn test_new_schemaless_edge_type_bumps_schema_version() {
3014        let mut schema = Schema::default();
3015        let v0 = schema.schema_version;
3016
3017        let id1 = schema.get_or_assign_edge_type_id("FRESH");
3018        assert_eq!(
3019            schema.schema_version,
3020            v0.wrapping_add(1),
3021            "minting a new edge type must bump schema_version"
3022        );
3023
3024        // Re-resolving the same type is a no-op — no further bump.
3025        let id1_again = schema.get_or_assign_edge_type_id("FRESH");
3026        assert_eq!(id1, id1_again);
3027        assert_eq!(
3028            schema.schema_version,
3029            v0.wrapping_add(1),
3030            "resolving an existing edge type must not bump schema_version"
3031        );
3032
3033        // A second distinct new type bumps again.
3034        let _id2 = schema.get_or_assign_edge_type_id("OTHER");
3035        assert_eq!(
3036            schema.schema_version,
3037            v0.wrapping_add(2),
3038            "a second new edge type must bump schema_version again"
3039        );
3040    }
3041
3042    /// L6: label/edge-type names with path separators, whitespace, or
3043    /// control chars are rejected at definition; benign names (incl. `.`)
3044    /// are accepted.
3045    #[test]
3046    fn validate_schema_element_name_rejects_unsafe() {
3047        for bad in ["", "   ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
3048            assert!(
3049                SchemaManager::validate_schema_element_name("Label", bad).is_err(),
3050                "expected {bad:?} to be rejected"
3051            );
3052        }
3053        for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
3054            assert!(
3055                SchemaManager::validate_schema_element_name("Label", good).is_ok(),
3056                "expected {good:?} to be accepted"
3057            );
3058        }
3059        // Over-length is rejected.
3060        let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
3061        assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
3062    }
3063}