Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79/// Field metadata marking an Arrow `LargeBinary` field as a raw `DataType::Bytes`
80/// value rather than a tagged CypherValue/Duration blob.
81///
82/// Stamped on the child field of `List(Bytes)` / `Map(_, Bytes)` container types so
83/// the read path decodes each element verbatim instead of through the tagged codec
84/// (which would read `byte[0]` as a type tag). CV-encoded containers carry no such
85/// marker and keep the codec path. See the read-side honoring in `arrow_convert`.
86pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
87    HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91#[non_exhaustive]
92pub enum CrdtType {
93    GCounter,
94    GSet,
95    ORSet,
96    LWWRegister,
97    LWWMap,
98    Rga,
99    VectorClock,
100    VCRegister,
101}
102
103impl CrdtType {
104    /// Returns the canonical variant name for this CRDT type.
105    ///
106    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
107    /// so a written CRDT value can be validated against its schema-declared
108    /// variant (see uni-store's write-time CRDT enforcement).
109    ///
110    /// # Examples
111    /// ```
112    /// use uni_common::core::schema::CrdtType;
113    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
114    /// ```
115    #[must_use]
116    pub fn type_name(&self) -> &'static str {
117        match self {
118            CrdtType::GCounter => "GCounter",
119            CrdtType::GSet => "GSet",
120            CrdtType::ORSet => "ORSet",
121            CrdtType::LWWRegister => "LWWRegister",
122            CrdtType::LWWMap => "LWWMap",
123            CrdtType::Rga => "Rga",
124            CrdtType::VectorClock => "VectorClock",
125            CrdtType::VCRegister => "VCRegister",
126        }
127    }
128}
129
130#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PointType {
132    Geographic,  // WGS84
133    Cartesian2D, // x, y
134    Cartesian3D, // x, y, z
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138#[non_exhaustive]
139pub enum DataType {
140    String,
141    Int32,
142    Int64,
143    Float32,
144    Float64,
145    Bool,
146    Timestamp,
147    Date,
148    Time,
149    DateTime,
150    Duration,
151    CypherValue,
152    Bytes,
153    Point(PointType),
154    Vector { dimensions: usize },
155    Btic,
156    Crdt(CrdtType),
157    List(Box<DataType>),
158    Map(Box<DataType>, Box<DataType>),
159}
160
161impl DataType {
162    // Alias for compatibility/convenience if needed, but preferable to use exact types.
163    #[allow(non_upper_case_globals)]
164    pub const Float: DataType = DataType::Float64;
165    #[allow(non_upper_case_globals)]
166    pub const Int: DataType = DataType::Int64;
167
168    pub fn to_arrow(&self) -> ArrowDataType {
169        match self {
170            DataType::String => ArrowDataType::Utf8,
171            DataType::Int32 => ArrowDataType::Int32,
172            DataType::Int64 => ArrowDataType::Int64,
173            DataType::Float32 => ArrowDataType::Float32,
174            DataType::Float64 => ArrowDataType::Float64,
175            DataType::Bool => ArrowDataType::Boolean,
176            DataType::Timestamp => {
177                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
178            }
179            DataType::Date => ArrowDataType::Date32,
180            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
181            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
182            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
183            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
184            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
185            DataType::Point(pt) => match pt {
186                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
187                    Field::new("latitude", ArrowDataType::Float64, false),
188                    Field::new("longitude", ArrowDataType::Float64, false),
189                    Field::new("crs", ArrowDataType::Utf8, false),
190                ])),
191                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
192                    Field::new("x", ArrowDataType::Float64, false),
193                    Field::new("y", ArrowDataType::Float64, false),
194                    Field::new("crs", ArrowDataType::Utf8, false),
195                ])),
196                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
197                    Field::new("x", ArrowDataType::Float64, false),
198                    Field::new("y", ArrowDataType::Float64, false),
199                    Field::new("z", ArrowDataType::Float64, false),
200                    Field::new("crs", ArrowDataType::Utf8, false),
201                ])),
202            },
203            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
204                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
205                *dimensions as i32,
206            ),
207            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
208            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
209            DataType::List(inner) => {
210                // A raw `Bytes` element maps to Arrow `LargeBinary`, indistinguishable
211                // from a CV-encoded element by type alone; mark the child field so the
212                // read path decodes it verbatim rather than through the tagged codec.
213                let item = Field::new("item", inner.to_arrow(), true);
214                let item = if matches!(**inner, DataType::Bytes) {
215                    item.with_metadata(raw_bytes_field_metadata())
216                } else {
217                    item
218                };
219                ArrowDataType::List(Arc::new(item))
220            }
221            DataType::Map(key, value) => {
222                // The value child's Arrow storage MUST agree with `build_map_column` in
223                // uni-store: typed scalars use their own Arrow type; `Bytes` is a
224                // raw-bytes-marked `LargeBinary`; every other (nested/non-scalar) value type
225                // is CypherValue-encoded into an UNMARKED `LargeBinary` (decoded back through
226                // the tagged codec on read). Gated by `map_value_is_typed` so the two sites
227                // can't drift.
228                let value_field = if value.map_value_is_typed() {
229                    let f = Field::new("value", value.to_arrow(), true);
230                    if matches!(**value, DataType::Bytes) {
231                        f.with_metadata(raw_bytes_field_metadata())
232                    } else {
233                        f
234                    }
235                } else {
236                    Field::new("value", ArrowDataType::LargeBinary, true)
237                };
238                ArrowDataType::List(Arc::new(Field::new(
239                    "item",
240                    ArrowDataType::Struct(Fields::from(vec![
241                        Field::new("key", key.to_arrow(), false),
242                        value_field,
243                    ])),
244                    true,
245                )))
246            }
247        }
248    }
249
250    /// Whether a `Map(_, self)` VALUE is stored as a typed Arrow child (this set) versus a
251    /// CypherValue-encoded `LargeBinary` fallback child (everything else, e.g. `Vector`,
252    /// `List`, `Map`, temporal). This MUST stay in lockstep with the explicit value-type
253    /// arms of `build_map_column` in uni-store (the `_` arm there is the CV fallback).
254    pub fn map_value_is_typed(&self) -> bool {
255        matches!(
256            self,
257            DataType::String
258                | DataType::Int64
259                | DataType::Int32
260                | DataType::Float64
261                | DataType::Float32
262                | DataType::Bool
263                | DataType::Bytes
264        )
265    }
266
267    /// Returns `true` if `value` is directly storable in this column type without loss.
268    ///
269    /// This is the schema-level type guard used by the write path. `Value::Null` is
270    /// always accepted — column nullability is enforced separately by the `nullable`
271    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
272    /// For every other declared type, only the `Value` variants that the storage layer
273    /// persists *without silently nulling* are accepted (see the per-type converters in
274    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
275    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
276    ///
277    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
278    /// intentionally *not* accepted here: the write path first coerces such strings into
279    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
280    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
281    /// predicate.
282    ///
283    /// # Examples
284    /// ```
285    /// use uni_common::core::schema::DataType;
286    /// use uni_common::Value;
287    ///
288    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
289    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
290    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
291    /// ```
292    pub fn accepts(&self, value: &crate::value::Value) -> bool {
293        use crate::value::{TemporalValue, Value};
294
295        // Null is universally accepted; nullability is a separate concern.
296        if matches!(value, Value::Null) {
297            return true;
298        }
299
300        match self {
301            // Opaque / dynamically-typed columns accept any value.
302            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
303
304            DataType::String => matches!(value, Value::String(_)),
305            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
306            // Int widens to Float losslessly for the ranges we care about.
307            DataType::Float32 | DataType::Float64 => {
308                matches!(value, Value::Int(_) | Value::Float(_))
309            }
310            DataType::Bool => matches!(value, Value::Bool(_)),
311
312            // Non-struct timestamp column: storage parses strings and accepts ints,
313            // so both are lossless here (unlike the DateTime struct column below).
314            DataType::Timestamp => matches!(
315                value,
316                Value::String(_)
317                    | Value::Int(_)
318                    | Value::Temporal(
319                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
320                    )
321            ),
322            DataType::DateTime => matches!(
323                value,
324                Value::Temporal(
325                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
326                )
327            ),
328            DataType::Date => {
329                matches!(
330                    value,
331                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
332                )
333            }
334            DataType::Time => matches!(
335                value,
336                Value::Int(_)
337                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
338            ),
339            DataType::Duration => {
340                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
341            }
342            DataType::Bytes => matches!(value, Value::Bytes(_)),
343            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
344            DataType::Btic => matches!(
345                value,
346                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
347            ),
348            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
349            DataType::List(_) => matches!(value, Value::List(_)),
350            DataType::Map(_, _) => matches!(value, Value::Map(_)),
351        }
352    }
353}
354
355fn default_created_at() -> DateTime<Utc> {
356    Utc::now()
357}
358
359fn default_state() -> SchemaElementState {
360    SchemaElementState::Active
361}
362
363fn default_version_1() -> u32 {
364    1
365}
366
367#[derive(Clone, Debug, Serialize, Deserialize)]
368pub struct PropertyMeta {
369    pub r#type: DataType,
370    pub nullable: bool,
371    #[serde(default = "default_version_1")]
372    pub added_in: u32, // SchemaVersion
373    #[serde(default = "default_state")]
374    pub state: SchemaElementState,
375    #[serde(default)]
376    pub generation_expression: Option<String>,
377    #[serde(default, skip_serializing_if = "Option::is_none")]
378    pub description: Option<String>,
379}
380
381#[derive(Clone, Debug, Serialize, Deserialize)]
382pub struct LabelMeta {
383    pub id: u16, // LabelId
384    #[serde(default = "default_created_at")]
385    pub created_at: DateTime<Utc>,
386    #[serde(default = "default_state")]
387    pub state: SchemaElementState,
388    #[serde(default, skip_serializing_if = "Option::is_none")]
389    pub description: Option<String>,
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub struct EdgeTypeMeta {
394    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
395    pub id: u32,
396    pub src_labels: Vec<String>,
397    pub dst_labels: Vec<String>,
398    #[serde(default = "default_state")]
399    pub state: SchemaElementState,
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    pub description: Option<String>,
402}
403
404#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
405#[non_exhaustive]
406pub enum ConstraintType {
407    Unique { properties: Vec<String> },
408    Exists { property: String },
409    Check { expression: String },
410}
411
412#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
413#[non_exhaustive]
414pub enum ConstraintTarget {
415    Label(String),
416    EdgeType(String),
417}
418
419#[derive(Clone, Debug, Serialize, Deserialize)]
420pub struct Constraint {
421    pub name: String,
422    pub constraint_type: ConstraintType,
423    pub target: ConstraintTarget,
424    pub enabled: bool,
425}
426
427/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
428///
429/// Edge types not defined in the schema are assigned IDs at runtime with
430/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
431/// the name-to-ID and ID-to-name mappings for those types.
432#[derive(Clone, Debug, Serialize, Deserialize)]
433pub struct SchemalessEdgeTypeRegistry {
434    name_to_id: HashMap<String, u32>,
435    id_to_name: HashMap<u32, String>,
436    /// Next local ID to assign (0 is reserved for invalid).
437    next_local_id: u32,
438}
439
440impl SchemalessEdgeTypeRegistry {
441    pub fn new() -> Self {
442        Self {
443            name_to_id: HashMap::new(),
444            id_to_name: HashMap::new(),
445            next_local_id: 1,
446        }
447    }
448
449    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
450    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
451        if let Some(&id) = self.name_to_id.get(type_name) {
452            return id;
453        }
454
455        let id = make_schemaless_id(self.next_local_id);
456        self.next_local_id += 1;
457
458        self.name_to_id.insert(type_name.to_string(), id);
459        self.id_to_name.insert(id, type_name.to_string());
460
461        id
462    }
463
464    /// Looks up the edge type name for a schemaless ID.
465    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
466        self.id_to_name.get(&type_id).map(String::as_str)
467    }
468
469    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
470    pub fn contains(&self, type_name: &str) -> bool {
471        self.name_to_id.contains_key(type_name)
472    }
473
474    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
475    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
476        self.name_to_id.get(type_name).copied()
477    }
478
479    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
480    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
481        self.name_to_id
482            .iter()
483            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
484            .map(|(_, &id)| id)
485    }
486
487    /// Returns all registered schemaless type IDs.
488    pub fn all_type_ids(&self) -> Vec<u32> {
489        self.id_to_name.keys().copied().collect()
490    }
491
492    /// Returns true if the registry has any schemaless types.
493    pub fn is_empty(&self) -> bool {
494        self.name_to_id.is_empty()
495    }
496}
497
498impl Default for SchemalessEdgeTypeRegistry {
499    fn default() -> Self {
500        Self::new()
501    }
502}
503
504/// First virtual (catalog-resolved) label ID. Label IDs in
505/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
506/// plugin-registered `CatalogProvider`s and allocated lazily by the
507/// planner via `PluginRegistry::register_virtual_label`. Native label
508/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
509pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
510/// Sentinel "no label" marker, kept distinct from any allocatable ID.
511pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
512
513/// Maximum byte length of a label or edge-type name. (L6)
514///
515/// Generous; the cap is hygiene — the name lands in on-disk dataset paths
516/// and Lance branch names — not a storage limit.
517const MAX_SCHEMA_NAME_LEN: usize = 255;
518
519/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
520#[inline]
521pub fn is_virtual_label_id(id: u16) -> bool {
522    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
526pub struct Schema {
527    pub schema_version: u32,
528    pub labels: HashMap<String, LabelMeta>,
529    pub edge_types: HashMap<String, EdgeTypeMeta>,
530    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
531    #[serde(default)]
532    pub indexes: Vec<IndexDefinition>,
533    #[serde(default)]
534    pub constraints: Vec<Constraint>,
535    /// Registry for schemaless edge types (dynamically assigned IDs)
536    #[serde(default)]
537    pub schemaless_registry: SchemalessEdgeTypeRegistry,
538}
539
540impl Default for Schema {
541    fn default() -> Self {
542        Self {
543            schema_version: 1,
544            labels: HashMap::new(),
545            edge_types: HashMap::new(),
546            properties: HashMap::new(),
547            indexes: Vec::new(),
548            constraints: Vec::new(),
549            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
550        }
551    }
552}
553
554impl Schema {
555    /// Bumps `schema_version` to invalidate cached query plans.
556    ///
557    /// Called at the end of every DDL mutation that changes the schema's
558    /// shape (labels, edge types, properties, indexes, constraints). The
559    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
560    /// built against an older shape is discarded once this advances. Uses
561    /// wrapping arithmetic: the value is a coarse change token, not a count,
562    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
563    fn bump_version(&mut self) {
564        self.schema_version = self.schema_version.wrapping_add(1);
565    }
566
567    /// Returns the label name for a given label ID.
568    ///
569    /// Performs a linear scan over all labels. This is efficient because
570    /// the number of labels in a schema is typically small.
571    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
572        self.labels
573            .iter()
574            .find(|(_, meta)| meta.id == label_id)
575            .map(|(name, _)| name.as_str())
576    }
577
578    /// Returns the label ID for a given label name.
579    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
580        self.labels.get(label_name).map(|meta| meta.id)
581    }
582
583    /// Returns the edge type name for a given type ID.
584    ///
585    /// Performs a linear scan over all edge types. This is efficient because
586    /// the number of edge types in a schema is typically small.
587    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
588        self.edge_types
589            .iter()
590            .find(|(_, meta)| meta.id == type_id)
591            .map(|(name, _)| name.as_str())
592    }
593
594    /// Returns the edge type ID for a given type name.
595    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
596        self.edge_types.get(type_name).map(|meta| meta.id)
597    }
598
599    /// Returns the vector index configuration for a given label and property.
600    ///
601    /// Performs a linear scan over all indexes. This is efficient because
602    /// the number of indexes in a schema is typically small.
603    pub fn vector_index_for_property(
604        &self,
605        label: &str,
606        property: &str,
607    ) -> Option<&VectorIndexConfig> {
608        self.indexes.iter().find_map(|idx| {
609            if let IndexDefinition::Vector(config) = idx
610                && config.label == label
611                && config.property == property
612                && config.metadata.status == IndexStatus::Online
613            {
614                return Some(config);
615            }
616            None
617        })
618    }
619
620    /// Returns the full-text index configuration for a given label and property.
621    ///
622    /// A full-text index covers one or more properties. This returns the config
623    /// if the specified property is among the indexed properties.
624    pub fn fulltext_index_for_property(
625        &self,
626        label: &str,
627        property: &str,
628    ) -> Option<&FullTextIndexConfig> {
629        self.indexes.iter().find_map(|idx| {
630            if let IndexDefinition::FullText(config) = idx
631                && config.label == label
632                && config.properties.iter().any(|p| p == property)
633                && config.metadata.status == IndexStatus::Online
634            {
635                return Some(config);
636            }
637            None
638        })
639    }
640
641    /// Get label metadata with case-insensitive lookup.
642    ///
643    /// This allows queries to match labels regardless of case, providing
644    /// better user experience when label names vary in casing.
645    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
646        self.labels
647            .iter()
648            .find(|(k, _)| k.eq_ignore_ascii_case(name))
649            .map(|(_, v)| v)
650    }
651
652    /// Get the schema-canonical spelling of a label, matched case-insensitively.
653    ///
654    /// Returns the stored label name whose spelling differs only in case from
655    /// `name`, or `None` if no such label is registered. Callers use this to
656    /// normalize a user-supplied label to the canonical form the storage tier
657    /// keys on, so case variants resolve to the same vertex table.
658    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
659        self.labels
660            .iter()
661            .find(|(k, _)| k.eq_ignore_ascii_case(name))
662            .map(|(k, _)| k.clone())
663    }
664
665    /// Get label ID with case-insensitive lookup.
666    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
667        self.get_label_case_insensitive(label_name)
668            .map(|meta| meta.id)
669    }
670
671    /// Get edge type metadata with case-insensitive lookup.
672    ///
673    /// This allows queries to match edge types regardless of case, providing
674    /// better user experience when type names vary in casing.
675    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
676        self.edge_types
677            .iter()
678            .find(|(k, _)| k.eq_ignore_ascii_case(name))
679            .map(|(_, v)| v)
680    }
681
682    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
683    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
684        self.get_edge_type_case_insensitive(type_name)
685            .map(|meta| meta.id)
686    }
687
688    /// Get edge type ID with case-insensitive lookup, checking both
689    /// schema-defined and schemaless registries.
690    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
691        self.edge_type_id_by_name_case_insensitive(type_name)
692            .or_else(|| {
693                self.schemaless_registry
694                    .id_by_name_case_insensitive(type_name)
695            })
696    }
697
698    /// Returns the edge type ID for `type_name`, checking the schema first
699    /// and falling back to the schemaless registry (assigning a new ID if needed).
700    ///
701    /// Requires `&mut self` because it may assign a new schemaless ID.
702    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
703    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
704        if let Some(id) = self.edge_type_id_unified(type_name) {
705            return id;
706        }
707        // Reaching here means the type is brand-new to *both* the schema map and
708        // the schemaless registry (the early return above mirrors exactly what
709        // `edge_type_id_unified` checks). Minting a new schemaless edge type
710        // changes the result of `all_edge_type_ids()`, which untyped traversals
711        // bake into cached plans keyed on `schema_version`. Bump the version so
712        // those stale plans are evicted — otherwise a `MATCH ()-[r]->()` plan
713        // built before this type existed silently drops edges of the new type.
714        let id = self.schemaless_registry.get_or_assign_id(type_name);
715        self.bump_version();
716        id
717    }
718
719    /// Read-only unified exact lookup: schema-defined edge type id, falling
720    /// back to an already-assigned schemaless id.
721    ///
722    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
723    /// performs before assigning, so a `Some` here means the assigning path
724    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
725    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
726        self.edge_type_id_by_name(type_name)
727            .or_else(|| self.schemaless_registry.id_by_name(type_name))
728    }
729
730    /// Returns the edge type name for `type_id`, checking both the schema
731    /// and schemaless registries. Returns an owned `String` because the
732    /// name may come from either registry.
733    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
734        if is_schemaless_edge_type(type_id) {
735            self.schemaless_registry
736                .type_name_by_id(type_id)
737                .map(str::to_owned)
738        } else {
739            self.edge_type_name_by_id(type_id).map(str::to_owned)
740        }
741    }
742
743    /// Returns all edge type IDs, including both schema-defined and schemaless types.
744    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
745    pub fn all_edge_type_ids(&self) -> Vec<u32> {
746        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
747        ids.extend(self.schemaless_registry.all_type_ids());
748        ids.sort_unstable();
749        ids
750    }
751}
752
753/// Lifecycle status of an index.
754#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
755pub enum IndexStatus {
756    /// Index is up-to-date and available for queries.
757    #[default]
758    Online,
759    /// Index is currently being rebuilt.
760    Building,
761    /// Index is outdated and scheduled for rebuild.
762    Stale,
763    /// Index rebuild failed after exhausting retries.
764    Failed,
765}
766
767/// Metadata tracking the lifecycle state of an index.
768#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
769pub struct IndexMetadata {
770    /// Current lifecycle status.
771    #[serde(default)]
772    pub status: IndexStatus,
773    /// When the index was last successfully built.
774    #[serde(default)]
775    pub last_built_at: Option<DateTime<Utc>>,
776    /// Row count of the dataset when the index was last built.
777    #[serde(default)]
778    pub row_count_at_build: Option<u64>,
779}
780
781#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
782#[serde(tag = "type")]
783#[non_exhaustive]
784pub enum IndexDefinition {
785    Vector(VectorIndexConfig),
786    FullText(FullTextIndexConfig),
787    Scalar(ScalarIndexConfig),
788    Inverted(InvertedIndexConfig),
789    JsonFullText(JsonFtsIndexConfig),
790}
791
792impl IndexDefinition {
793    /// Returns the index name for any variant.
794    pub fn name(&self) -> &str {
795        match self {
796            IndexDefinition::Vector(c) => &c.name,
797            IndexDefinition::FullText(c) => &c.name,
798            IndexDefinition::Scalar(c) => &c.name,
799            IndexDefinition::Inverted(c) => &c.name,
800            IndexDefinition::JsonFullText(c) => &c.name,
801        }
802    }
803
804    /// Returns the label this index is defined on.
805    pub fn label(&self) -> &str {
806        match self {
807            IndexDefinition::Vector(c) => &c.label,
808            IndexDefinition::FullText(c) => &c.label,
809            IndexDefinition::Scalar(c) => &c.label,
810            IndexDefinition::Inverted(c) => &c.label,
811            IndexDefinition::JsonFullText(c) => &c.label,
812        }
813    }
814
815    /// Returns a reference to the index lifecycle metadata.
816    pub fn metadata(&self) -> &IndexMetadata {
817        match self {
818            IndexDefinition::Vector(c) => &c.metadata,
819            IndexDefinition::FullText(c) => &c.metadata,
820            IndexDefinition::Scalar(c) => &c.metadata,
821            IndexDefinition::Inverted(c) => &c.metadata,
822            IndexDefinition::JsonFullText(c) => &c.metadata,
823        }
824    }
825
826    /// Returns a mutable reference to the index lifecycle metadata.
827    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
828        match self {
829            IndexDefinition::Vector(c) => &mut c.metadata,
830            IndexDefinition::FullText(c) => &mut c.metadata,
831            IndexDefinition::Scalar(c) => &mut c.metadata,
832            IndexDefinition::Inverted(c) => &mut c.metadata,
833            IndexDefinition::JsonFullText(c) => &mut c.metadata,
834        }
835    }
836}
837
838#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
839pub struct InvertedIndexConfig {
840    pub name: String,
841    pub label: String,
842    pub property: String,
843    #[serde(default = "default_normalize")]
844    pub normalize: bool,
845    #[serde(default = "default_max_terms_per_doc")]
846    pub max_terms_per_doc: usize,
847    #[serde(default)]
848    pub metadata: IndexMetadata,
849}
850
851fn default_normalize() -> bool {
852    true
853}
854
855fn default_max_terms_per_doc() -> usize {
856    10_000
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
860pub struct VectorIndexConfig {
861    pub name: String,
862    pub label: String,
863    pub property: String,
864    pub index_type: VectorIndexType,
865    pub metric: DistanceMetric,
866    pub embedding_config: Option<EmbeddingConfig>,
867    #[serde(default)]
868    pub metadata: IndexMetadata,
869}
870
871#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
872pub struct EmbeddingConfig {
873    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
874    pub alias: String,
875    pub source_properties: Vec<String>,
876    pub batch_size: usize,
877    /// Prefix prepended to text before embedding during auto-embed (document side).
878    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
879    #[serde(default)]
880    pub document_prefix: Option<String>,
881    /// Prefix prepended to text before embedding during query-time embed calls.
882    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
883    #[serde(default)]
884    pub query_prefix: Option<String>,
885}
886
887#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
888#[non_exhaustive]
889pub enum VectorIndexType {
890    Flat,
891    IvfFlat {
892        num_partitions: u32,
893    },
894    IvfPq {
895        num_partitions: u32,
896        num_sub_vectors: u32,
897        bits_per_subvector: u8,
898    },
899    IvfSq {
900        num_partitions: u32,
901    },
902    IvfRq {
903        num_partitions: u32,
904        #[serde(default)]
905        num_bits: Option<u8>,
906    },
907    HnswFlat {
908        m: u32,
909        ef_construction: u32,
910        #[serde(default)]
911        num_partitions: Option<u32>,
912    },
913    HnswSq {
914        m: u32,
915        ef_construction: u32,
916        #[serde(default)]
917        num_partitions: Option<u32>,
918    },
919    HnswPq {
920        m: u32,
921        ef_construction: u32,
922        num_sub_vectors: u32,
923        #[serde(default)]
924        num_partitions: Option<u32>,
925    },
926    /// MUVERA (arXiv:2405.19504) Fixed-Dimensional Encoding for multi-vector
927    /// (ColBERT/MaxSim) columns. The source multi-vector is encoded into a single
928    /// derived `Vector<fde_dim>` column, and `inner` is the single-vector ANN index
929    /// type built over that derived column (always with the `Dot` metric — the FDE
930    /// inner product approximates MaxSim). The exact MaxSim re-rank still uses the
931    /// `VectorIndexConfig.metric`. See `uni_query_functions::muvera`.
932    Muvera {
933        /// SimHash hyperplanes per repetition (`2^k_sim` buckets).
934        k_sim: u32,
935        /// Independent repetitions concatenated into the FDE.
936        reps: u32,
937        /// Inner-projection target dim (`0` = no projection, use the source dim).
938        d_proj: u32,
939        /// Master seed; persisted so query-time encoding matches doc-time encoding.
940        seed: u64,
941        /// The single-vector ANN index built over the derived FDE column.
942        inner: Box<VectorIndexType>,
943    },
944}
945
946#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
947#[non_exhaustive]
948pub enum DistanceMetric {
949    Cosine,
950    L2,
951    Dot,
952}
953
954impl DistanceMetric {
955    /// Computes the distance between two vectors using this metric.
956    ///
957    /// All metrics follow LanceDB conventions so that lower values indicate
958    /// greater similarity:
959    /// - **L2**: squared Euclidean distance.
960    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
961    /// - **Dot**: negative dot product.
962    ///
963    /// # Panics
964    ///
965    /// Panics if `a` and `b` have different lengths.
966    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
967        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
968        match self {
969            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
970            DistanceMetric::Cosine => {
971                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
972                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
973                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
974                let denom = norm_a * norm_b;
975                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
976            }
977            DistanceMetric::Dot => {
978                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
979                -dot
980            }
981        }
982    }
983}
984
985#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
986pub struct FullTextIndexConfig {
987    pub name: String,
988    pub label: String,
989    pub properties: Vec<String>,
990    pub tokenizer: TokenizerConfig,
991    pub with_positions: bool,
992    #[serde(default)]
993    pub metadata: IndexMetadata,
994}
995
996#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
997#[non_exhaustive]
998pub enum TokenizerConfig {
999    Standard,
1000    Whitespace,
1001    Ngram { min: u8, max: u8 },
1002    Custom { name: String },
1003}
1004
1005#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1006pub struct JsonFtsIndexConfig {
1007    pub name: String,
1008    pub label: String,
1009    pub column: String,
1010    #[serde(default)]
1011    pub paths: Vec<String>,
1012    #[serde(default)]
1013    pub with_positions: bool,
1014    #[serde(default)]
1015    pub metadata: IndexMetadata,
1016}
1017
1018#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1019pub struct ScalarIndexConfig {
1020    pub name: String,
1021    pub label: String,
1022    pub properties: Vec<String>,
1023    pub index_type: ScalarIndexType,
1024    pub where_clause: Option<String>,
1025    #[serde(default)]
1026    pub metadata: IndexMetadata,
1027}
1028
1029#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1030#[non_exhaustive]
1031pub enum ScalarIndexType {
1032    BTree,
1033    Hash,
1034    Bitmap,
1035    LabelList,
1036}
1037
1038pub struct SchemaManager {
1039    store: Arc<dyn ObjectStore>,
1040    path: ObjectStorePath,
1041    schema: RwLock<Arc<Schema>>,
1042}
1043
1044impl SchemaManager {
1045    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1046        let path = path.as_ref();
1047        let parent = path
1048            .parent()
1049            .ok_or_else(|| anyhow!("Invalid schema path"))?;
1050        let filename = path
1051            .file_name()
1052            .ok_or_else(|| anyhow!("Invalid schema filename"))?
1053            .to_str()
1054            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1055
1056        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1057        let obj_path = ObjectStorePath::from(filename);
1058
1059        Self::load_from_store(store, &obj_path).await
1060    }
1061
1062    pub async fn load_from_store(
1063        store: Arc<dyn ObjectStore>,
1064        path: &ObjectStorePath,
1065    ) -> Result<Self> {
1066        match store.get(path).await {
1067            Ok(result) => {
1068                let bytes = result.bytes().await?;
1069                let content = String::from_utf8(bytes.to_vec())?;
1070                let mut schema: Schema = serde_json::from_str(&content)?;
1071                // Self-heal catalogs that grew super-linearly under the
1072                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
1073                // duplicate index entries by name, keeping the *last*
1074                // occurrence — matches the upsert semantics in `add_index`
1075                // and preserves whatever metadata the most recent rebuild
1076                // wrote. The dedup persists on the next mutation that
1077                // calls `save()`.
1078                let original_len = schema.indexes.len();
1079                if original_len > 0 {
1080                    let mut seen: std::collections::HashSet<String> =
1081                        std::collections::HashSet::with_capacity(original_len);
1082                    let mut dedup: Vec<IndexDefinition> = schema
1083                        .indexes
1084                        .iter()
1085                        .rev()
1086                        .filter(|idx| seen.insert(idx.name().to_string()))
1087                        .cloned()
1088                        .collect();
1089                    dedup.reverse();
1090                    if dedup.len() != original_len {
1091                        tracing::warn!(
1092                            collapsed = original_len - dedup.len(),
1093                            kept = dedup.len(),
1094                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1095                        );
1096                        schema.indexes = dedup;
1097                    }
1098                }
1099                Ok(Self {
1100                    store,
1101                    path: path.clone(),
1102                    schema: RwLock::new(Arc::new(schema)),
1103                })
1104            }
1105            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1106                store,
1107                path: path.clone(),
1108                schema: RwLock::new(Arc::new(Schema::default())),
1109            }),
1110            Err(e) => Err(anyhow::Error::from(e)),
1111        }
1112    }
1113
1114    pub async fn save(&self) -> Result<()> {
1115        let content = {
1116            let schema_guard = acquire_read(&self.schema, "schema")?;
1117            serde_json::to_string_pretty(&**schema_guard)?
1118        };
1119        self.store
1120            .put(&self.path, content.into())
1121            .await
1122            .map_err(anyhow::Error::from)?;
1123        Ok(())
1124    }
1125
1126    pub fn path(&self) -> &ObjectStorePath {
1127        &self.path
1128    }
1129
1130    pub fn schema(&self) -> Arc<Schema> {
1131        self.schema
1132            .read()
1133            .expect("Schema lock poisoned - a thread panicked while holding it")
1134            .clone()
1135    }
1136
1137    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1138    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1139    fn normalize_function_names(expr: &str) -> String {
1140        let mut result = String::with_capacity(expr.len());
1141        let mut chars = expr.chars().peekable();
1142
1143        while let Some(ch) = chars.next() {
1144            if ch.is_alphabetic() {
1145                // Collect identifier
1146                let mut ident = String::new();
1147                ident.push(ch);
1148
1149                while let Some(&next) = chars.peek() {
1150                    if next.is_alphanumeric() || next == '_' {
1151                        ident.push(chars.next().unwrap());
1152                    } else {
1153                        break;
1154                    }
1155                }
1156
1157                // If followed by '(', it's a function call - uppercase it
1158                if chars.peek() == Some(&'(') {
1159                    result.push_str(&ident.to_uppercase());
1160                } else {
1161                    result.push_str(&ident); // Keep property names as-is
1162                }
1163            } else {
1164                result.push(ch);
1165            }
1166        }
1167
1168        result
1169    }
1170
1171    /// Generate a consistent internal column name for an expression index.
1172    /// Uses a hash suffix to ensure uniqueness for different expressions that
1173    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1174    ///
1175    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1176    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1177    /// if the hash changes after a compiler upgrade.
1178    pub fn generated_column_name(expr: &str) -> String {
1179        // Normalize function names to uppercase for case-insensitive matching
1180        let normalized = Self::normalize_function_names(expr);
1181
1182        let sanitized = normalized
1183            .replace(|c: char| !c.is_alphanumeric(), "_")
1184            .trim_matches('_')
1185            .to_string();
1186
1187        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1188        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1189        const FNV_PRIME: u64 = 1099511628211;
1190
1191        let mut hash = FNV_OFFSET_BASIS;
1192        for byte in normalized.as_bytes() {
1193            hash ^= *byte as u64;
1194            hash = hash.wrapping_mul(FNV_PRIME);
1195        }
1196
1197        format!("_gen_{}_{:x}", sanitized, hash)
1198    }
1199
1200    pub fn replace_schema(&self, new_schema: Schema) {
1201        let mut schema = self
1202            .schema
1203            .write()
1204            .expect("Schema lock poisoned - a thread panicked while holding it");
1205        *schema = Arc::new(new_schema);
1206    }
1207
1208    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1209    ///
1210    /// Used by `UniInner::at_fork` to give a forked session a schema view
1211    /// that includes any labels/edge-types/properties the fork has
1212    /// introduced on top of primary. The returned manager owns its own
1213    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1214    /// schema file. The returned manager is *not* intended for `.save()`;
1215    /// fork-overlay persistence is owned by the registry layer
1216    /// (`catalog/fork_schemas/{fork_id}.json`).
1217    ///
1218    /// In Phase 1 the delta is always empty, so the merge is a clone.
1219    /// Phase 2 starts populating it when on-the-fly label creation lands.
1220    #[must_use]
1221    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1222        let primary = self.schema();
1223        let merged = if overlay.is_empty() {
1224            (*primary).clone()
1225        } else {
1226            let mut merged = (*primary).clone();
1227            for (name, label) in &overlay.added_labels {
1228                merged.labels.insert(name.clone(), label.clone());
1229            }
1230            for (name, edge_type) in &overlay.added_edge_types {
1231                merged.edge_types.insert(name.clone(), edge_type.clone());
1232            }
1233            for addition in &overlay.added_properties {
1234                let props = merged.properties.entry(addition.owner.clone()).or_default();
1235                props.insert(
1236                    addition.property.clone(),
1237                    PropertyMeta {
1238                        r#type: addition.data_type.clone(),
1239                        nullable: addition.nullable,
1240                        added_in: merged.schema_version,
1241                        state: SchemaElementState::Active,
1242                        generation_expression: None,
1243                        description: None,
1244                    },
1245                );
1246            }
1247            merged
1248        };
1249
1250        Arc::new(Self {
1251            store: self.store.clone(),
1252            path: self.path.clone(),
1253            schema: RwLock::new(Arc::new(merged)),
1254        })
1255    }
1256
1257    pub fn next_label_id(&self) -> u16 {
1258        self.schema()
1259            .labels
1260            .values()
1261            .map(|l| l.id)
1262            .max()
1263            .unwrap_or(0)
1264            + 1
1265    }
1266
1267    pub fn next_type_id(&self) -> u32 {
1268        let max_schema_id = self
1269            .schema()
1270            .edge_types
1271            .values()
1272            .map(|t| t.id)
1273            .max()
1274            .unwrap_or(0);
1275
1276        // Ensure we stay in schema'd ID space (bit 31 = 0)
1277        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1278            panic!("Schema edge type ID exhaustion");
1279        }
1280
1281        max_schema_id + 1
1282    }
1283
1284    /// Validate a label or edge-type name at definition time. (L6)
1285    ///
1286    /// Names flow into on-disk dataset paths (`vertices_{name}.lance`) and
1287    /// Lance branch names (`fork_{id}_{…}`); a name with a path separator,
1288    /// whitespace, or a control character corrupts those paths and breaks
1289    /// fork creation. Such names were never actually usable, so they are
1290    /// rejected up front rather than failing later. `.` is allowed
1291    /// (path-safe and common in qualified names).
1292    ///
1293    /// Public so the fork-create path can apply the same rule as a backstop
1294    /// over names that entered the schema through an infallible interning
1295    /// path (e.g. schemaless `get_or_assign_edge_type_id`).
1296    ///
1297    /// # Errors
1298    /// Returns an error if `name` is empty/all-whitespace, exceeds
1299    /// `MAX_SCHEMA_NAME_LEN` bytes, or contains a control, whitespace,
1300    /// `/`, or `\` character.
1301    pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1302        if name.is_empty() || name.chars().all(char::is_whitespace) {
1303            return Err(anyhow!(
1304                "{kind} name must be non-empty and not all whitespace"
1305            ));
1306        }
1307        if name.len() > MAX_SCHEMA_NAME_LEN {
1308            return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1309        }
1310        if let Some(c) = name
1311            .chars()
1312            .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1313        {
1314            return Err(anyhow!(
1315                "{kind} name '{name}' contains an unsafe character ({c:?})"
1316            ));
1317        }
1318        Ok(())
1319    }
1320
1321    pub fn add_label(&self, name: &str) -> Result<u16> {
1322        self.add_label_with_desc(name, None)
1323    }
1324
1325    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1326        Self::validate_schema_element_name("Label", name)?;
1327        let mut guard = acquire_write(&self.schema, "schema")?;
1328        let schema = Arc::make_mut(&mut *guard);
1329        if schema.labels.contains_key(name) {
1330            return Err(anyhow!("Label '{}' already exists", name));
1331        }
1332
1333        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1334        if id >= VIRTUAL_LABEL_ID_START {
1335            return Err(anyhow!(
1336                "Native label space exhausted (next id {id:#x} would enter the \
1337                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1338                 reserved for catalog-resolved labels)"
1339            ));
1340        }
1341        schema.labels.insert(
1342            name.to_string(),
1343            LabelMeta {
1344                id,
1345                created_at: Utc::now(),
1346                state: SchemaElementState::Active,
1347                description,
1348            },
1349        );
1350        schema.bump_version();
1351        Ok(id)
1352    }
1353
1354    pub fn add_edge_type(
1355        &self,
1356        name: &str,
1357        src_labels: Vec<String>,
1358        dst_labels: Vec<String>,
1359    ) -> Result<u32> {
1360        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1361    }
1362
1363    pub fn add_edge_type_with_desc(
1364        &self,
1365        name: &str,
1366        src_labels: Vec<String>,
1367        dst_labels: Vec<String>,
1368        description: Option<String>,
1369    ) -> Result<u32> {
1370        Self::validate_schema_element_name("Edge type", name)?;
1371        let mut guard = acquire_write(&self.schema, "schema")?;
1372        let schema = Arc::make_mut(&mut *guard);
1373        if schema.edge_types.contains_key(name) {
1374            return Err(anyhow!("Edge type '{}' already exists", name));
1375        }
1376
1377        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1378
1379        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1380        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1381        // `add_edge_type`, so the two entry points cannot disagree on the
1382        // legal ceiling.
1383        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1384            return Err(anyhow!(
1385                "Native edge type space exhausted (next id {id:#x} would enter the \
1386                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1387                 reserved for catalog-resolved edge types)"
1388            ));
1389        }
1390
1391        schema.edge_types.insert(
1392            name.to_string(),
1393            EdgeTypeMeta {
1394                id,
1395                src_labels,
1396                dst_labels,
1397                state: SchemaElementState::Active,
1398                description,
1399            },
1400        );
1401        schema.bump_version();
1402        Ok(id)
1403    }
1404
1405    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1406    ///
1407    /// Read-lock fast path: the type name is almost always already known
1408    /// (it is constant per statement but resolved per row by the CREATE
1409    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1410    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1411    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1412    /// re-checks under the write lock, so two racing assigners converge on one id.
1413    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1414        {
1415            let guard = acquire_read(&self.schema, "schema")
1416                .expect("Schema lock poisoned - a thread panicked while holding it");
1417            if let Some(id) = guard.edge_type_id_unified(type_name) {
1418                return id;
1419            }
1420        }
1421        let mut guard = acquire_write(&self.schema, "schema")
1422            .expect("Schema lock poisoned - a thread panicked while holding it");
1423        let schema = Arc::make_mut(&mut *guard);
1424        schema.get_or_assign_edge_type_id(type_name)
1425    }
1426
1427    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1428    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1429        let schema = acquire_read(&self.schema, "schema")
1430            .expect("Schema lock poisoned - a thread panicked while holding it");
1431        schema.edge_type_name_by_id_unified(type_id)
1432    }
1433
1434    pub fn add_property(
1435        &self,
1436        label_or_type: &str,
1437        prop_name: &str,
1438        data_type: DataType,
1439        nullable: bool,
1440    ) -> Result<()> {
1441        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1442    }
1443
1444    pub fn add_property_with_desc(
1445        &self,
1446        label_or_type: &str,
1447        prop_name: &str,
1448        data_type: DataType,
1449        nullable: bool,
1450        description: Option<String>,
1451    ) -> Result<()> {
1452        validate_property_name(prop_name)?;
1453        let mut guard = acquire_write(&self.schema, "schema")?;
1454        let schema = Arc::make_mut(&mut *guard);
1455        let version = schema.schema_version;
1456        let props = schema
1457            .properties
1458            .entry(label_or_type.to_string())
1459            .or_default();
1460
1461        if props.contains_key(prop_name) {
1462            return Err(anyhow!(
1463                "Property '{}' already exists for '{}'",
1464                prop_name,
1465                label_or_type
1466            ));
1467        }
1468
1469        props.insert(
1470            prop_name.to_string(),
1471            PropertyMeta {
1472                r#type: data_type,
1473                nullable,
1474                added_in: version,
1475                state: SchemaElementState::Active,
1476                generation_expression: None,
1477                description,
1478            },
1479        );
1480        // Bump after stamping `added_in` with the pre-bump `version`.
1481        schema.bump_version();
1482        Ok(())
1483    }
1484
1485    /// Register an INTERNAL property (underscore-prefixed name allowed) that is
1486    /// materialised by the storage layer, not written by the user — e.g. the MUVERA
1487    /// `__fde_*` derived column. Bypasses the user-facing underscore-prefix rule but
1488    /// still rejects storage-layer name collisions. Idempotent: a no-op if the property
1489    /// already exists with the same type (so re-creating an index is safe).
1490    ///
1491    /// Returns `true` if this call newly inserted the property, `false` if it already
1492    /// existed (idempotent). The check-and-insert is atomic under the schema write lock,
1493    /// so for concurrent callers exactly one observes `true` — letting callers gate
1494    /// expensive one-time work (e.g. the MUVERA backfill) on the winner.
1495    pub fn add_internal_property(
1496        &self,
1497        label_or_type: &str,
1498        prop_name: &str,
1499        data_type: DataType,
1500        nullable: bool,
1501    ) -> Result<bool> {
1502        validate_reserved_property_name(prop_name)?;
1503        let mut guard = acquire_write(&self.schema, "schema")?;
1504        let schema = Arc::make_mut(&mut *guard);
1505        let version = schema.schema_version;
1506        let props = schema
1507            .properties
1508            .entry(label_or_type.to_string())
1509            .or_default();
1510
1511        if let Some(existing) = props.get(prop_name) {
1512            if existing.r#type == data_type {
1513                return Ok(false); // already present (idempotent re-registration)
1514            }
1515            return Err(anyhow!(
1516                "Internal property '{}' already exists for '{}' with a different type",
1517                prop_name,
1518                label_or_type
1519            ));
1520        }
1521
1522        props.insert(
1523            prop_name.to_string(),
1524            PropertyMeta {
1525                r#type: data_type,
1526                nullable,
1527                added_in: version,
1528                state: SchemaElementState::Active,
1529                generation_expression: None,
1530                description: None,
1531            },
1532        );
1533        schema.bump_version();
1534        Ok(true)
1535    }
1536
1537    pub fn add_generated_property(
1538        &self,
1539        label_or_type: &str,
1540        prop_name: &str,
1541        data_type: DataType,
1542        expr: String,
1543    ) -> Result<()> {
1544        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1545        // but must still avoid storage-layer column-name collisions.
1546        validate_reserved_property_name(prop_name)?;
1547        let mut guard = acquire_write(&self.schema, "schema")?;
1548        let schema = Arc::make_mut(&mut *guard);
1549        let version = schema.schema_version;
1550        let props = schema
1551            .properties
1552            .entry(label_or_type.to_string())
1553            .or_default();
1554
1555        if props.contains_key(prop_name) {
1556            return Err(anyhow!("Property '{}' already exists", prop_name));
1557        }
1558
1559        props.insert(
1560            prop_name.to_string(),
1561            PropertyMeta {
1562                r#type: data_type,
1563                nullable: true,
1564                added_in: version,
1565                state: SchemaElementState::Active,
1566                generation_expression: Some(expr),
1567                description: None,
1568            },
1569        );
1570        // Bump after stamping `added_in` with the pre-bump `version`.
1571        schema.bump_version();
1572        Ok(())
1573    }
1574
1575    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1576        let mut guard = acquire_write(&self.schema, "schema")?;
1577        let schema = Arc::make_mut(&mut *guard);
1578        let meta = schema
1579            .labels
1580            .get_mut(name)
1581            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1582        meta.description = description;
1583        Ok(())
1584    }
1585
1586    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1587        let mut guard = acquire_write(&self.schema, "schema")?;
1588        let schema = Arc::make_mut(&mut *guard);
1589        let meta = schema
1590            .edge_types
1591            .get_mut(name)
1592            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1593        meta.description = description;
1594        Ok(())
1595    }
1596
1597    pub fn set_property_description(
1598        &self,
1599        entity: &str,
1600        prop_name: &str,
1601        description: Option<String>,
1602    ) -> Result<()> {
1603        let mut guard = acquire_write(&self.schema, "schema")?;
1604        let schema = Arc::make_mut(&mut *guard);
1605        let props = schema
1606            .properties
1607            .get_mut(entity)
1608            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1609        let meta = props
1610            .get_mut(prop_name)
1611            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1612        meta.description = description;
1613        Ok(())
1614    }
1615
1616    /// Register an index definition on the schema, **upsert by name**.
1617    ///
1618    /// If an index with the same `IndexDefinition::name()` already exists, it
1619    /// is replaced in place; otherwise the def is appended. Idempotent under
1620    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1621    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1622    /// every `IndexManager::create_*_index` re-record metadata updates without
1623    /// duplicating entries (issue rustic-ai/uni-db#63).
1624    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1625        let mut guard = acquire_write(&self.schema, "schema")?;
1626        let schema = Arc::make_mut(&mut *guard);
1627        if let Some(existing) = schema
1628            .indexes
1629            .iter_mut()
1630            .find(|i| i.name() == index_def.name())
1631        {
1632            *existing = index_def;
1633        } else {
1634            schema.indexes.push(index_def);
1635        }
1636        schema.bump_version();
1637        Ok(())
1638    }
1639
1640    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1641        let schema = self.schema.read().expect("Schema lock poisoned");
1642        schema.indexes.iter().find(|i| i.name() == name).cloned()
1643    }
1644
1645    /// Updates the lifecycle metadata for an index by name.
1646    ///
1647    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1648    /// allowing callers to update status, timestamps, etc.
1649    pub fn update_index_metadata(
1650        &self,
1651        index_name: &str,
1652        f: impl FnOnce(&mut IndexMetadata),
1653    ) -> Result<()> {
1654        let mut guard = acquire_write(&self.schema, "schema")?;
1655        let schema = Arc::make_mut(&mut *guard);
1656        let idx = schema
1657            .indexes
1658            .iter_mut()
1659            .find(|i| i.name() == index_name)
1660            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1661        f(idx.metadata_mut());
1662        Ok(())
1663    }
1664
1665    pub fn remove_index(&self, name: &str) -> Result<()> {
1666        let mut guard = acquire_write(&self.schema, "schema")?;
1667        let schema = Arc::make_mut(&mut *guard);
1668        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1669            schema.indexes.remove(pos);
1670            schema.bump_version();
1671            Ok(())
1672        } else {
1673            Err(anyhow!("Index '{}' not found", name))
1674        }
1675    }
1676
1677    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1678        let mut guard = acquire_write(&self.schema, "schema")?;
1679        let schema = Arc::make_mut(&mut *guard);
1680        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1681            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1682        }
1683        schema.constraints.push(constraint);
1684        schema.bump_version();
1685        Ok(())
1686    }
1687
1688    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1689        let mut guard = acquire_write(&self.schema, "schema")?;
1690        let schema = Arc::make_mut(&mut *guard);
1691        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1692            schema.constraints.remove(pos);
1693            schema.bump_version();
1694            Ok(())
1695        } else if if_exists {
1696            Ok(())
1697        } else {
1698            Err(anyhow!("Constraint '{}' not found", name))
1699        }
1700    }
1701
1702    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1703        let mut guard = acquire_write(&self.schema, "schema")?;
1704        let schema = Arc::make_mut(&mut *guard);
1705        let Some(props) = schema.properties.get_mut(label_or_type) else {
1706            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1707        };
1708        if props.remove(prop_name).is_none() {
1709            return Err(anyhow!(
1710                "Property '{}' not found for '{}'",
1711                prop_name,
1712                label_or_type
1713            ));
1714        }
1715        schema.bump_version();
1716        Ok(())
1717    }
1718
1719    pub fn rename_property(
1720        &self,
1721        label_or_type: &str,
1722        old_name: &str,
1723        new_name: &str,
1724    ) -> Result<()> {
1725        let mut guard = acquire_write(&self.schema, "schema")?;
1726        let schema = Arc::make_mut(&mut *guard);
1727        let Some(props) = schema.properties.get_mut(label_or_type) else {
1728            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1729        };
1730        let Some(meta) = props.remove(old_name) else {
1731            return Err(anyhow!(
1732                "Property '{}' not found for '{}'",
1733                old_name,
1734                label_or_type
1735            ));
1736        };
1737        if props.contains_key(new_name) {
1738            // Rollback removal? Or just error.
1739            props.insert(old_name.to_string(), meta); // Restore
1740            return Err(anyhow!("Property '{}' already exists", new_name));
1741        }
1742        props.insert(new_name.to_string(), meta);
1743        schema.bump_version();
1744        Ok(())
1745    }
1746
1747    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1748        let mut guard = acquire_write(&self.schema, "schema")?;
1749        let schema = Arc::make_mut(&mut *guard);
1750        if let Some(label_meta) = schema.labels.get_mut(name) {
1751            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1752            // Do not remove properties; they are implicitly tombstoned by the label
1753            schema.bump_version();
1754            Ok(())
1755        } else if if_exists {
1756            Ok(())
1757        } else {
1758            Err(anyhow!("Label '{}' not found", name))
1759        }
1760    }
1761
1762    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1763        let mut guard = acquire_write(&self.schema, "schema")?;
1764        let schema = Arc::make_mut(&mut *guard);
1765        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1766            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1767            // Do not remove properties; they are implicitly tombstoned by the edge type
1768            schema.bump_version();
1769            Ok(())
1770        } else if if_exists {
1771            Ok(())
1772        } else {
1773            Err(anyhow!("Edge Type '{}' not found", name))
1774        }
1775    }
1776}
1777
1778/// Validate identifier names to prevent injection and ensure compatibility.
1779pub fn validate_identifier(name: &str) -> Result<()> {
1780    // Length check
1781    if name.is_empty() || name.len() > 64 {
1782        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1783    }
1784
1785    // First character must be letter or underscore
1786    let first = name.chars().next().unwrap();
1787    if !first.is_alphabetic() && first != '_' {
1788        return Err(anyhow!(
1789            "Identifier '{}' must start with letter or underscore",
1790            name
1791        ));
1792    }
1793
1794    // Remaining characters: alphanumeric or underscore
1795    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1796        return Err(anyhow!(
1797            "Identifier '{}' must contain only alphanumeric and underscore",
1798            name
1799        ));
1800    }
1801
1802    // Reserved words
1803    const RESERVED: &[&str] = &[
1804        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1805        "UNION", "ORDER", "LIMIT",
1806    ];
1807    if RESERVED.contains(&name.to_uppercase().as_str()) {
1808        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1809    }
1810
1811    Ok(())
1812}
1813
1814/// Reject user-declared property names that collide with internal Arrow column
1815/// names used by the storage layer.
1816///
1817/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1818/// schema with two `ext_id` fields at flush time, which Lance rejects with
1819/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1820pub fn validate_property_name(name: &str) -> Result<()> {
1821    if name.starts_with('_') {
1822        return Err(anyhow!(
1823            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1824            name
1825        ));
1826    }
1827    validate_reserved_property_name(name)
1828}
1829
1830/// Reject names that collide with storage-layer Arrow column names.
1831///
1832/// Used both by `validate_property_name` (user-facing path) and directly by
1833/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1834/// needs to bypass the underscore-prefix rule but must still reject the
1835/// fixed-name collisions below.
1836fn validate_reserved_property_name(name: &str) -> Result<()> {
1837    // Unprefixed names that get appended alongside user properties in the
1838    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1839    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1840    // Arrow schemas — declaring one of these as a user property produces a
1841    // duplicate Arrow field and a Lance "Duplicate field name" error at
1842    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1843    // `labels` in the main tables) are NOT listed: those tables don't
1844    // append user properties, so no collision can occur.
1845    const RESERVED_PROPS: &[&str] = &[
1846        "ext_id",
1847        "overflow_json",
1848        "eid",
1849        "src_vid",
1850        "dst_vid",
1851        "op",
1852        // Internal planner sentinel: a column-name marker used by
1853        // `mark_set_item_variables` (uni-query::query::planner) to request
1854        // narrow structural projection without full-schema expansion.
1855        // Reserved here defensively so an internal `add_generated_property`
1856        // path can't accidentally create a colliding user-facing column.
1857        // The user-facing `validate_property_name` already rejects this
1858        // via the underscore-prefix rule, so this is belt-and-suspenders.
1859        "__set_struct__",
1860    ];
1861    if RESERVED_PROPS.contains(&name) {
1862        return Err(anyhow!(
1863            "Property name '{}' is reserved by the storage layer; please choose a different name",
1864            name
1865        ));
1866    }
1867    Ok(())
1868}
1869
1870#[cfg(test)]
1871mod tests {
1872    use super::*;
1873    use crate::value::{TemporalValue, Value};
1874    use object_store::local::LocalFileSystem;
1875    use tempfile::tempdir;
1876
1877    #[test]
1878    fn test_datatype_accepts_matrix() {
1879        let dt = || TemporalValue::DateTime {
1880            nanos_since_epoch: 0,
1881            offset_seconds: 0,
1882            timezone_name: None,
1883        };
1884
1885        // Null is accepted by every type (nullability checked separately).
1886        for ty in [
1887            DataType::String,
1888            DataType::Int64,
1889            DataType::Bool,
1890            DataType::DateTime,
1891            DataType::Float64,
1892        ] {
1893            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1894        }
1895
1896        // Exact-type matches.
1897        assert!(DataType::String.accepts(&Value::String("x".into())));
1898        assert!(DataType::Int64.accepts(&Value::Int(1)));
1899        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1900        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1901
1902        // Intentional lossless widenings remain allowed.
1903        assert!(
1904            DataType::Float64.accepts(&Value::Int(3)),
1905            "Int widens to Float"
1906        );
1907        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1908        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1909        assert!(
1910            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1911            "storage parses strings for non-struct Timestamp columns"
1912        );
1913
1914        // The #68 data-loss cases must be rejected (coercion handles strings separately).
1915        assert!(
1916            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1917            "String into a DateTime struct column nulls silently — reject here"
1918        );
1919        assert!(!DataType::Bool.accepts(&Value::Int(1)));
1920        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1921        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1922        assert!(
1923            !DataType::String.accepts(&Value::Int(10)),
1924            "no implicit stringification"
1925        );
1926        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1927
1928        // Opaque columns accept anything.
1929        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1930    }
1931
1932    #[tokio::test]
1933    async fn test_schema_management() -> Result<()> {
1934        let dir = tempdir()?;
1935        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1936        let path = ObjectStorePath::from("schema.json");
1937        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1938
1939        // Labels
1940        let lid = manager.add_label("Person")?;
1941        assert_eq!(lid, 1);
1942        assert!(manager.add_label("Person").is_err());
1943
1944        // Properties
1945        manager.add_property("Person", "name", DataType::String, false)?;
1946        assert!(
1947            manager
1948                .add_property("Person", "name", DataType::String, false)
1949                .is_err()
1950        );
1951
1952        // Edge types
1953        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1954        assert_eq!(tid, 1);
1955
1956        manager.save().await?;
1957        // Check file exists
1958        assert!(store.get(&path).await.is_ok());
1959
1960        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1961        assert!(manager2.schema().labels.contains_key("Person"));
1962        assert!(
1963            manager2
1964                .schema()
1965                .properties
1966                .get("Person")
1967                .unwrap()
1968                .contains_key("name")
1969        );
1970
1971        Ok(())
1972    }
1973
1974    #[tokio::test]
1975    async fn test_reserved_property_names_rejected() -> Result<()> {
1976        let dir = tempdir()?;
1977        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1978        let path = ObjectStorePath::from("schema.json");
1979        let manager = SchemaManager::load_from_store(store, &path).await?;
1980
1981        manager.add_label("Tiny")?;
1982
1983        // Unprefixed reserved names — these collide with internal Arrow
1984        // columns in storage tables and previously caused Lance
1985        // "Duplicate field name" errors at flush time.
1986        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1987            let err = manager
1988                .add_property("Tiny", reserved, DataType::String, true)
1989                .expect_err(&format!("expected '{reserved}' to be rejected"));
1990            assert!(
1991                err.to_string().contains("reserved"),
1992                "error for '{reserved}' should mention 'reserved', got: {err}"
1993            );
1994        }
1995
1996        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
1997        // alongside the underscore-prefix rule). Confirms an internal
1998        // `add_generated_property` path cannot accidentally create a column
1999        // that collides with the SET-target structural-projection marker.
2000        let err = manager
2001            .add_property("Tiny", "__set_struct__", DataType::String, true)
2002            .expect_err("expected '__set_struct__' to be rejected");
2003        assert!(
2004            err.to_string().contains("reserved"),
2005            "__set_struct__ rejection should mention 'reserved', got: {err}"
2006        );
2007
2008        // Leading-underscore pattern rule.
2009        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2010            assert!(
2011                manager
2012                    .add_property("Tiny", reserved, DataType::String, true)
2013                    .is_err(),
2014                "expected '{reserved}' to be rejected"
2015            );
2016        }
2017
2018        // Names that merely contain a reserved substring should still be
2019        // accepted.
2020        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2021        manager.add_property("Tiny", "user_op", DataType::String, true)?;
2022        manager.add_property("Tiny", "type_name", DataType::String, true)?;
2023
2024        // Same check applies to edge-type properties (single dispatch).
2025        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2026        assert!(
2027            manager
2028                .add_property("knows", "src_vid", DataType::Int64, true)
2029                .is_err()
2030        );
2031
2032        // And to generated properties.
2033        assert!(
2034            manager
2035                .add_generated_property(
2036                    "Tiny",
2037                    "ext_id",
2038                    DataType::String,
2039                    "concat('x', name)".into()
2040                )
2041                .is_err()
2042        );
2043
2044        Ok(())
2045    }
2046
2047    #[test]
2048    fn test_normalize_function_names() {
2049        assert_eq!(
2050            SchemaManager::normalize_function_names("lower(email)"),
2051            "LOWER(email)"
2052        );
2053        assert_eq!(
2054            SchemaManager::normalize_function_names("LOWER(email)"),
2055            "LOWER(email)"
2056        );
2057        assert_eq!(
2058            SchemaManager::normalize_function_names("Lower(email)"),
2059            "LOWER(email)"
2060        );
2061        assert_eq!(
2062            SchemaManager::normalize_function_names("trim(lower(email))"),
2063            "TRIM(LOWER(email))"
2064        );
2065    }
2066
2067    #[test]
2068    fn test_generated_column_name_case_insensitive() {
2069        let col1 = SchemaManager::generated_column_name("lower(email)");
2070        let col2 = SchemaManager::generated_column_name("LOWER(email)");
2071        let col3 = SchemaManager::generated_column_name("Lower(email)");
2072        assert_eq!(col1, col2);
2073        assert_eq!(col2, col3);
2074        assert!(col1.starts_with("_gen_LOWER_email_"));
2075    }
2076
2077    #[test]
2078    fn test_index_metadata_serde_backward_compat() {
2079        // Simulate old JSON without metadata field
2080        let json = r#"{
2081            "type": "Scalar",
2082            "name": "idx_person_name",
2083            "label": "Person",
2084            "properties": ["name"],
2085            "index_type": "BTree",
2086            "where_clause": null
2087        }"#;
2088        let def: IndexDefinition = serde_json::from_str(json).unwrap();
2089        let meta = def.metadata();
2090        assert_eq!(meta.status, IndexStatus::Online);
2091        assert!(meta.last_built_at.is_none());
2092        assert!(meta.row_count_at_build.is_none());
2093    }
2094
2095    #[test]
2096    fn test_index_metadata_serde_roundtrip() {
2097        let now = Utc::now();
2098        let def = IndexDefinition::Scalar(ScalarIndexConfig {
2099            name: "idx_test".to_string(),
2100            label: "Test".to_string(),
2101            properties: vec!["prop".to_string()],
2102            index_type: ScalarIndexType::BTree,
2103            where_clause: None,
2104            metadata: IndexMetadata {
2105                status: IndexStatus::Building,
2106                last_built_at: Some(now),
2107                row_count_at_build: Some(42),
2108            },
2109        });
2110
2111        let json = serde_json::to_string(&def).unwrap();
2112        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2113        assert_eq!(parsed.metadata().status, IndexStatus::Building);
2114        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2115        assert!(parsed.metadata().last_built_at.is_some());
2116    }
2117
2118    #[tokio::test]
2119    async fn test_update_index_metadata() -> Result<()> {
2120        let dir = tempdir()?;
2121        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2122        let path = ObjectStorePath::from("schema.json");
2123        let manager = SchemaManager::load_from_store(store, &path).await?;
2124
2125        manager.add_label("Person")?;
2126        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2127            name: "idx_test".to_string(),
2128            label: "Person".to_string(),
2129            properties: vec!["name".to_string()],
2130            index_type: ScalarIndexType::BTree,
2131            where_clause: None,
2132            metadata: Default::default(),
2133        });
2134        manager.add_index(idx)?;
2135
2136        // Verify initial status is Online
2137        let initial = manager.get_index("idx_test").unwrap();
2138        assert_eq!(initial.metadata().status, IndexStatus::Online);
2139
2140        // Update to Building
2141        manager.update_index_metadata("idx_test", |m| {
2142            m.status = IndexStatus::Building;
2143            m.row_count_at_build = Some(100);
2144        })?;
2145
2146        let updated = manager.get_index("idx_test").unwrap();
2147        assert_eq!(updated.metadata().status, IndexStatus::Building);
2148        assert_eq!(updated.metadata().row_count_at_build, Some(100));
2149
2150        // Non-existent index should error
2151        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2152
2153        Ok(())
2154    }
2155
2156    /// `add_internal_property` reports whether THIS call inserted the property: `true` on
2157    /// first insert, `false` on idempotent re-registration, `Err` on a type conflict. The
2158    /// MUVERA backfill gates on this (only the inserter backfills), so two concurrent
2159    /// creates of the same index can't both run the full-table rewrite (issue #107).
2160    #[tokio::test]
2161    async fn add_internal_property_reports_newly_added() -> Result<()> {
2162        let dir = tempdir()?;
2163        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2164        let path = ObjectStorePath::from("schema.json");
2165        let manager = SchemaManager::load_from_store(store, &path).await?;
2166        manager.add_label("Doc")?;
2167
2168        let dt = DataType::Vector { dimensions: 16 };
2169        // First registration: newly added.
2170        assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2171        // Idempotent re-registration with the same type: NOT newly added.
2172        assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2173        // Same name, conflicting type: hard error (no silent divergence).
2174        assert!(
2175            manager
2176                .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2177                .is_err()
2178        );
2179        Ok(())
2180    }
2181
2182    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
2183    /// invocations with the same `IndexDefinition::name()` must replace
2184    /// the entry in place rather than appending. Subsequent `add_index`
2185    /// calls also reflect metadata updates from the new definition.
2186    #[tokio::test]
2187    async fn test_add_index_is_upsert_by_name() -> Result<()> {
2188        let dir = tempdir()?;
2189        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2190        let path = ObjectStorePath::from("schema.json");
2191        let manager = SchemaManager::load_from_store(store, &path).await?;
2192        manager.add_label("Person")?;
2193
2194        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2195            name: "idx_test".to_string(),
2196            label: "Person".to_string(),
2197            properties: vec!["name".to_string()],
2198            index_type: ScalarIndexType::BTree,
2199            where_clause: None,
2200            metadata: IndexMetadata {
2201                status: IndexStatus::Building,
2202                ..Default::default()
2203            },
2204        });
2205        manager.add_index(initial.clone())?;
2206        assert_eq!(manager.schema().indexes.len(), 1);
2207
2208        // Re-add the identical def — must remain a single entry.
2209        manager.add_index(initial.clone())?;
2210        assert_eq!(
2211            manager.schema().indexes.len(),
2212            1,
2213            "duplicate add_index by name must not append"
2214        );
2215
2216        // Re-add with updated metadata — must replace in place, len unchanged.
2217        let mut updated_cfg = match initial {
2218            IndexDefinition::Scalar(c) => c,
2219            _ => unreachable!(),
2220        };
2221        updated_cfg.metadata.status = IndexStatus::Online;
2222        updated_cfg.metadata.row_count_at_build = Some(42);
2223        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2224        assert_eq!(manager.schema().indexes.len(), 1);
2225        let stored = manager.get_index("idx_test").unwrap();
2226        assert_eq!(stored.metadata().status, IndexStatus::Online);
2227        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2228
2229        // A *different* name appends as a new entry.
2230        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2231            name: "idx_other".to_string(),
2232            label: "Person".to_string(),
2233            properties: vec!["age".to_string()],
2234            index_type: ScalarIndexType::BTree,
2235            where_clause: None,
2236            metadata: IndexMetadata::default(),
2237        });
2238        manager.add_index(other)?;
2239        assert_eq!(manager.schema().indexes.len(), 2);
2240
2241        Ok(())
2242    }
2243
2244    /// `load_from_store` self-heals catalogs that were bloated by the
2245    /// pre-fix `add_index` (kept the *last* def per name).
2246    #[tokio::test]
2247    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2248        let dir = tempdir()?;
2249        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2250        let path = ObjectStorePath::from("schema.json");
2251
2252        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2253        // sharing the same name. The last entry has distinct metadata so
2254        // we can assert "last writer wins" semantics.
2255        let mut schema = Schema::default();
2256        schema.labels.insert(
2257            "Person".to_string(),
2258            LabelMeta {
2259                id: 1,
2260                created_at: chrono::Utc::now(),
2261                state: SchemaElementState::Active,
2262                description: None,
2263            },
2264        );
2265        let make = |status: IndexStatus, count: Option<u64>| {
2266            IndexDefinition::Scalar(ScalarIndexConfig {
2267                name: "idx_dup".to_string(),
2268                label: "Person".to_string(),
2269                properties: vec!["name".to_string()],
2270                index_type: ScalarIndexType::BTree,
2271                where_clause: None,
2272                metadata: IndexMetadata {
2273                    status,
2274                    row_count_at_build: count,
2275                    ..Default::default()
2276                },
2277            })
2278        };
2279        for _ in 0..49 {
2280            schema.indexes.push(make(IndexStatus::Building, None));
2281        }
2282        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2283        let json = serde_json::to_string_pretty(&schema)?;
2284        store.put(&path, json.into()).await?;
2285
2286        let manager = SchemaManager::load_from_store(store, &path).await?;
2287        let schema = manager.schema();
2288        assert_eq!(
2289            schema.indexes.len(),
2290            1,
2291            "load() must collapse 50 duplicates by name to 1"
2292        );
2293        // Last-writer-wins: the kept entry is the final push (Online, 123).
2294        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2295        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2296
2297        Ok(())
2298    }
2299
2300    #[test]
2301    fn test_vector_index_for_property_skips_non_online() {
2302        let mut schema = Schema::default();
2303        schema.labels.insert(
2304            "Document".to_string(),
2305            LabelMeta {
2306                id: 1,
2307                created_at: chrono::Utc::now(),
2308                state: SchemaElementState::Active,
2309                description: None,
2310            },
2311        );
2312
2313        // Add a vector index with Stale status
2314        schema
2315            .indexes
2316            .push(IndexDefinition::Vector(VectorIndexConfig {
2317                name: "vec_doc_embedding".to_string(),
2318                label: "Document".to_string(),
2319                property: "embedding".to_string(),
2320                index_type: VectorIndexType::Flat,
2321                metric: DistanceMetric::Cosine,
2322                embedding_config: None,
2323                metadata: IndexMetadata {
2324                    status: IndexStatus::Stale,
2325                    ..Default::default()
2326                },
2327            }));
2328
2329        // Stale index should NOT be returned
2330        assert!(
2331            schema
2332                .vector_index_for_property("Document", "embedding")
2333                .is_none()
2334        );
2335
2336        // Set to Online — should now be returned
2337        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2338            cfg.metadata.status = IndexStatus::Online;
2339        }
2340        let result = schema.vector_index_for_property("Document", "embedding");
2341        assert!(result.is_some());
2342        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2343    }
2344
2345    #[tokio::test]
2346    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2347        use crate::core::fork::SchemaDelta;
2348
2349        let dir = tempdir()?;
2350        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2351        let path = ObjectStorePath::from("schema.json");
2352        let primary = SchemaManager::load_from_store(store, &path).await?;
2353        primary.add_label("Person")?;
2354
2355        let overlay = primary.with_overlay(&SchemaDelta::empty());
2356        assert_eq!(overlay.schema().labels.len(), 1);
2357
2358        // Phase 1 invariant: mutating the overlay manager must not bleed
2359        // into primary's schema.
2360        overlay.add_label("Forked")?;
2361        assert!(overlay.schema().labels.contains_key("Forked"));
2362        assert!(!primary.schema().labels.contains_key("Forked"));
2363
2364        Ok(())
2365    }
2366
2367    #[tokio::test]
2368    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2369        use crate::core::fork::SchemaDelta;
2370        use chrono::Utc;
2371
2372        let dir = tempdir()?;
2373        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2374        let path = ObjectStorePath::from("schema.json");
2375        let primary = SchemaManager::load_from_store(store, &path).await?;
2376        primary.add_label("Existing")?;
2377
2378        let label_meta = LabelMeta {
2379            id: 99,
2380            created_at: Utc::now(),
2381            state: SchemaElementState::Active,
2382            description: None,
2383        };
2384        let edge_meta = EdgeTypeMeta {
2385            id: 99,
2386            src_labels: vec!["NewLabel".into()],
2387            dst_labels: vec!["NewLabel".into()],
2388            state: SchemaElementState::Active,
2389            description: None,
2390        };
2391        let delta = SchemaDelta {
2392            added_labels: vec![("NewLabel".to_string(), label_meta)],
2393            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2394            added_properties: vec![],
2395        };
2396
2397        let overlay = primary.with_overlay(&delta);
2398        let merged = overlay.schema();
2399        assert!(merged.labels.contains_key("Existing"));
2400        assert!(merged.labels.contains_key("NewLabel"));
2401        assert!(merged.edge_types.contains_key("NewEdge"));
2402
2403        // Primary unchanged.
2404        assert!(!primary.schema().labels.contains_key("NewLabel"));
2405        Ok(())
2406    }
2407
2408    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2409    /// must converge on a single id (the read-lock fast path double-checks
2410    /// under the write lock); a schema-defined type must win over the
2411    /// schemaless registry.
2412    #[tokio::test]
2413    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2414        let dir = tempdir()?;
2415        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2416        let path = ObjectStorePath::from("schema.json");
2417        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2418
2419        let mut handles = Vec::new();
2420        for _ in 0..16 {
2421            let m = manager.clone();
2422            handles.push(std::thread::spawn(move || {
2423                m.get_or_assign_edge_type_id("RACED")
2424            }));
2425        }
2426        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2427        assert!(
2428            ids.iter().all(|&id| id == ids[0]),
2429            "all racers must observe one id, got {ids:?}"
2430        );
2431        // Fast path returns the same id afterwards.
2432        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2433
2434        // Schema-defined type wins over the schemaless registry.
2435        manager.add_label("A")?;
2436        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2437        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2438        Ok(())
2439    }
2440
2441    /// Minting a brand-new schemaless edge type must bump `schema_version`
2442    /// (the plan cache keys on it; untyped traversals bake `all_edge_type_ids()`
2443    /// into the plan, so a stale plan would silently drop edges of the new
2444    /// type). Re-resolving an existing type must NOT bump. (review C5)
2445    #[test]
2446    fn test_new_schemaless_edge_type_bumps_schema_version() {
2447        let mut schema = Schema::default();
2448        let v0 = schema.schema_version;
2449
2450        let id1 = schema.get_or_assign_edge_type_id("FRESH");
2451        assert_eq!(
2452            schema.schema_version,
2453            v0.wrapping_add(1),
2454            "minting a new edge type must bump schema_version"
2455        );
2456
2457        // Re-resolving the same type is a no-op — no further bump.
2458        let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2459        assert_eq!(id1, id1_again);
2460        assert_eq!(
2461            schema.schema_version,
2462            v0.wrapping_add(1),
2463            "resolving an existing edge type must not bump schema_version"
2464        );
2465
2466        // A second distinct new type bumps again.
2467        let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2468        assert_eq!(
2469            schema.schema_version,
2470            v0.wrapping_add(2),
2471            "a second new edge type must bump schema_version again"
2472        );
2473    }
2474
2475    /// L6: label/edge-type names with path separators, whitespace, or
2476    /// control chars are rejected at definition; benign names (incl. `.`)
2477    /// are accepted.
2478    #[test]
2479    fn validate_schema_element_name_rejects_unsafe() {
2480        for bad in ["", "   ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2481            assert!(
2482                SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2483                "expected {bad:?} to be rejected"
2484            );
2485        }
2486        for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2487            assert!(
2488                SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2489                "expected {good:?} to be accepted"
2490            );
2491        }
2492        // Over-length is rejected.
2493        let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2494        assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2495    }
2496}