Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79/// The canonical Arrow struct fields for a `DataType::SparseVector` column:
80/// `Struct { indices: List<UInt32>, values: List<Float32> }`. Two parallel
81/// variable-length lists in one struct. Both lists are non-null — an empty
82/// sparse vector stores as two empty lists, never null. The write and read
83/// sides both route through this one definition so they cannot drift (the same
84/// lockstep discipline as the temporal structs above).
85pub fn sparse_vector_struct_fields() -> Fields {
86    Fields::from(vec![
87        Field::new(
88            "indices",
89            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt32, true))),
90            false,
91        ),
92        Field::new(
93            "values",
94            ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Float32, true))),
95            false,
96        ),
97    ])
98}
99
100/// Detects if an Arrow DataType is the canonical SparseVector struct.
101pub fn is_sparse_vector_struct(arrow_dt: &ArrowDataType) -> bool {
102    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == sparse_vector_struct_fields())
103}
104
105/// Field metadata marking an Arrow `LargeBinary` field as a raw `DataType::Bytes`
106/// value rather than a tagged CypherValue/Duration blob.
107///
108/// Stamped on the child field of `List(Bytes)` / `Map(_, Bytes)` container types so
109/// the read path decodes each element verbatim instead of through the tagged codec
110/// (which would read `byte[0]` as a type tag). CV-encoded containers carry no such
111/// marker and keep the codec path. See the read-side honoring in `arrow_convert`.
112pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
113    HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
117#[non_exhaustive]
118pub enum CrdtType {
119    GCounter,
120    GSet,
121    ORSet,
122    LWWRegister,
123    LWWMap,
124    Rga,
125    VectorClock,
126    VCRegister,
127}
128
129impl CrdtType {
130    /// Returns the canonical variant name for this CRDT type.
131    ///
132    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
133    /// so a written CRDT value can be validated against its schema-declared
134    /// variant (see uni-store's write-time CRDT enforcement).
135    ///
136    /// # Examples
137    /// ```
138    /// use uni_common::core::schema::CrdtType;
139    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
140    /// ```
141    #[must_use]
142    pub fn type_name(&self) -> &'static str {
143        match self {
144            CrdtType::GCounter => "GCounter",
145            CrdtType::GSet => "GSet",
146            CrdtType::ORSet => "ORSet",
147            CrdtType::LWWRegister => "LWWRegister",
148            CrdtType::LWWMap => "LWWMap",
149            CrdtType::Rga => "Rga",
150            CrdtType::VectorClock => "VectorClock",
151            CrdtType::VCRegister => "VCRegister",
152        }
153    }
154}
155
156#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
157pub enum PointType {
158    Geographic,  // WGS84
159    Cartesian2D, // x, y
160    Cartesian3D, // x, y, z
161}
162
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
164#[non_exhaustive]
165pub enum DataType {
166    String,
167    Int32,
168    Int64,
169    Float32,
170    Float64,
171    Bool,
172    Timestamp,
173    Date,
174    Time,
175    DateTime,
176    Duration,
177    CypherValue,
178    Bytes,
179    Point(PointType),
180    Vector {
181        dimensions: usize,
182    },
183    /// Learned-sparse vector (SPLADE / BGE-M3). `dimensions` is the term-space
184    /// cardinality (max term id + 1) used for validation and index config.
185    SparseVector {
186        dimensions: usize,
187    },
188    Btic,
189    Crdt(CrdtType),
190    List(Box<DataType>),
191    Map(Box<DataType>, Box<DataType>),
192}
193
194impl DataType {
195    // Alias for compatibility/convenience if needed, but preferable to use exact types.
196    #[allow(non_upper_case_globals)]
197    pub const Float: DataType = DataType::Float64;
198    #[allow(non_upper_case_globals)]
199    pub const Int: DataType = DataType::Int64;
200
201    pub fn to_arrow(&self) -> ArrowDataType {
202        match self {
203            DataType::String => ArrowDataType::Utf8,
204            DataType::Int32 => ArrowDataType::Int32,
205            DataType::Int64 => ArrowDataType::Int64,
206            DataType::Float32 => ArrowDataType::Float32,
207            DataType::Float64 => ArrowDataType::Float64,
208            DataType::Bool => ArrowDataType::Boolean,
209            DataType::Timestamp => {
210                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
211            }
212            DataType::Date => ArrowDataType::Date32,
213            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
214            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
215            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
216            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
217            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
218            DataType::Point(pt) => match pt {
219                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
220                    Field::new("latitude", ArrowDataType::Float64, false),
221                    Field::new("longitude", ArrowDataType::Float64, false),
222                    Field::new("crs", ArrowDataType::Utf8, false),
223                ])),
224                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
225                    Field::new("x", ArrowDataType::Float64, false),
226                    Field::new("y", ArrowDataType::Float64, false),
227                    Field::new("crs", ArrowDataType::Utf8, false),
228                ])),
229                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
230                    Field::new("x", ArrowDataType::Float64, false),
231                    Field::new("y", ArrowDataType::Float64, false),
232                    Field::new("z", ArrowDataType::Float64, false),
233                    Field::new("crs", ArrowDataType::Utf8, false),
234                ])),
235            },
236            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
237                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
238                *dimensions as i32,
239            ),
240            DataType::SparseVector { .. } => ArrowDataType::Struct(sparse_vector_struct_fields()),
241            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
242            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
243            DataType::List(inner) => {
244                // A raw `Bytes` element maps to Arrow `LargeBinary`, indistinguishable
245                // from a CV-encoded element by type alone; mark the child field so the
246                // read path decodes it verbatim rather than through the tagged codec.
247                let item = Field::new("item", inner.to_arrow(), true);
248                let item = if matches!(**inner, DataType::Bytes) {
249                    item.with_metadata(raw_bytes_field_metadata())
250                } else {
251                    item
252                };
253                ArrowDataType::List(Arc::new(item))
254            }
255            DataType::Map(key, value) => {
256                // The value child's Arrow storage MUST agree with `build_map_column` in
257                // uni-store: typed scalars use their own Arrow type; `Bytes` is a
258                // raw-bytes-marked `LargeBinary`; every other (nested/non-scalar) value type
259                // is CypherValue-encoded into an UNMARKED `LargeBinary` (decoded back through
260                // the tagged codec on read). Gated by `map_value_is_typed` so the two sites
261                // can't drift.
262                let value_field = if value.map_value_is_typed() {
263                    let f = Field::new("value", value.to_arrow(), true);
264                    if matches!(**value, DataType::Bytes) {
265                        f.with_metadata(raw_bytes_field_metadata())
266                    } else {
267                        f
268                    }
269                } else {
270                    Field::new("value", ArrowDataType::LargeBinary, true)
271                };
272                ArrowDataType::List(Arc::new(Field::new(
273                    "item",
274                    ArrowDataType::Struct(Fields::from(vec![
275                        Field::new("key", key.to_arrow(), false),
276                        value_field,
277                    ])),
278                    true,
279                )))
280            }
281        }
282    }
283
284    /// Whether a `Map(_, self)` VALUE is stored as a typed Arrow child (this set) versus a
285    /// CypherValue-encoded `LargeBinary` fallback child (everything else, e.g. `Vector`,
286    /// `List`, `Map`, temporal). This MUST stay in lockstep with the explicit value-type
287    /// arms of `build_map_column` in uni-store (the `_` arm there is the CV fallback).
288    pub fn map_value_is_typed(&self) -> bool {
289        matches!(
290            self,
291            DataType::String
292                | DataType::Int64
293                | DataType::Int32
294                | DataType::Float64
295                | DataType::Float32
296                | DataType::Bool
297                | DataType::Bytes
298        )
299    }
300
301    /// Returns `true` if `value` is directly storable in this column type without loss.
302    ///
303    /// This is the schema-level type guard used by the write path. `Value::Null` is
304    /// always accepted — column nullability is enforced separately by the `nullable`
305    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
306    /// For every other declared type, only the `Value` variants that the storage layer
307    /// persists *without silently nulling* are accepted (see the per-type converters in
308    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
309    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
310    ///
311    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
312    /// intentionally *not* accepted here: the write path first coerces such strings into
313    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
314    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
315    /// predicate.
316    ///
317    /// # Examples
318    /// ```
319    /// use uni_common::core::schema::DataType;
320    /// use uni_common::Value;
321    ///
322    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
323    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
324    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
325    /// ```
326    pub fn accepts(&self, value: &crate::value::Value) -> bool {
327        use crate::value::{TemporalValue, Value};
328
329        // Null is universally accepted; nullability is a separate concern.
330        if matches!(value, Value::Null) {
331            return true;
332        }
333
334        match self {
335            // Opaque / dynamically-typed columns accept any value.
336            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
337
338            DataType::String => matches!(value, Value::String(_)),
339            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
340            // Int widens to Float losslessly for the ranges we care about.
341            DataType::Float32 | DataType::Float64 => {
342                matches!(value, Value::Int(_) | Value::Float(_))
343            }
344            DataType::Bool => matches!(value, Value::Bool(_)),
345
346            // Non-struct timestamp column: storage parses strings and accepts ints,
347            // so both are lossless here (unlike the DateTime struct column below).
348            DataType::Timestamp => matches!(
349                value,
350                Value::String(_)
351                    | Value::Int(_)
352                    | Value::Temporal(
353                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
354                    )
355            ),
356            DataType::DateTime => matches!(
357                value,
358                Value::Temporal(
359                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
360                )
361            ),
362            DataType::Date => {
363                matches!(
364                    value,
365                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
366                )
367            }
368            DataType::Time => matches!(
369                value,
370                Value::Int(_)
371                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
372            ),
373            DataType::Duration => {
374                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
375            }
376            DataType::Bytes => matches!(value, Value::Bytes(_)),
377            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
378            DataType::Btic => matches!(
379                value,
380                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
381            ),
382            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
383            // `Value::Map` is the degraded form a `SparseVector` collapses into
384            // when round-tripped through `#[serde(untagged)]` persistence (e.g.
385            // the WAL's serde_json mutation log); accept it like `Vector` accepts
386            // `List`. The column builder re-extracts `{indices, values}`.
387            DataType::SparseVector { .. } => {
388                matches!(value, Value::SparseVector { .. } | Value::Map(_))
389            }
390            DataType::List(_) => matches!(value, Value::List(_)),
391            DataType::Map(_, _) => matches!(value, Value::Map(_)),
392        }
393    }
394}
395
396fn default_created_at() -> DateTime<Utc> {
397    Utc::now()
398}
399
400fn default_state() -> SchemaElementState {
401    SchemaElementState::Active
402}
403
404fn default_version_1() -> u32 {
405    1
406}
407
408#[derive(Clone, Debug, Serialize, Deserialize)]
409pub struct PropertyMeta {
410    pub r#type: DataType,
411    pub nullable: bool,
412    #[serde(default = "default_version_1")]
413    pub added_in: u32, // SchemaVersion
414    #[serde(default = "default_state")]
415    pub state: SchemaElementState,
416    #[serde(default)]
417    pub generation_expression: Option<String>,
418    #[serde(default, skip_serializing_if = "Option::is_none")]
419    pub description: Option<String>,
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize)]
423pub struct LabelMeta {
424    pub id: u16, // LabelId
425    #[serde(default = "default_created_at")]
426    pub created_at: DateTime<Utc>,
427    #[serde(default = "default_state")]
428    pub state: SchemaElementState,
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub description: Option<String>,
431}
432
433#[derive(Clone, Debug, Serialize, Deserialize)]
434pub struct EdgeTypeMeta {
435    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
436    pub id: u32,
437    pub src_labels: Vec<String>,
438    pub dst_labels: Vec<String>,
439    #[serde(default = "default_state")]
440    pub state: SchemaElementState,
441    #[serde(default, skip_serializing_if = "Option::is_none")]
442    pub description: Option<String>,
443}
444
445#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
446#[non_exhaustive]
447pub enum ConstraintType {
448    Unique { properties: Vec<String> },
449    Exists { property: String },
450    Check { expression: String },
451}
452
453#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
454#[non_exhaustive]
455pub enum ConstraintTarget {
456    Label(String),
457    EdgeType(String),
458}
459
460#[derive(Clone, Debug, Serialize, Deserialize)]
461pub struct Constraint {
462    pub name: String,
463    pub constraint_type: ConstraintType,
464    pub target: ConstraintTarget,
465    pub enabled: bool,
466}
467
468/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
469///
470/// Edge types not defined in the schema are assigned IDs at runtime with
471/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
472/// the name-to-ID and ID-to-name mappings for those types.
473#[derive(Clone, Debug, Serialize, Deserialize)]
474pub struct SchemalessEdgeTypeRegistry {
475    name_to_id: HashMap<String, u32>,
476    id_to_name: HashMap<u32, String>,
477    /// Next local ID to assign (0 is reserved for invalid).
478    next_local_id: u32,
479}
480
481impl SchemalessEdgeTypeRegistry {
482    pub fn new() -> Self {
483        Self {
484            name_to_id: HashMap::new(),
485            id_to_name: HashMap::new(),
486            next_local_id: 1,
487        }
488    }
489
490    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
491    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
492        if let Some(&id) = self.name_to_id.get(type_name) {
493            return id;
494        }
495
496        let id = make_schemaless_id(self.next_local_id);
497        self.next_local_id += 1;
498
499        self.name_to_id.insert(type_name.to_string(), id);
500        self.id_to_name.insert(id, type_name.to_string());
501
502        id
503    }
504
505    /// Looks up the edge type name for a schemaless ID.
506    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
507        self.id_to_name.get(&type_id).map(String::as_str)
508    }
509
510    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
511    pub fn contains(&self, type_name: &str) -> bool {
512        self.name_to_id.contains_key(type_name)
513    }
514
515    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
516    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
517        self.name_to_id.get(type_name).copied()
518    }
519
520    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
521    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
522        self.name_to_id
523            .iter()
524            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
525            .map(|(_, &id)| id)
526    }
527
528    /// Returns all registered schemaless type IDs.
529    pub fn all_type_ids(&self) -> Vec<u32> {
530        self.id_to_name.keys().copied().collect()
531    }
532
533    /// Returns true if the registry has any schemaless types.
534    pub fn is_empty(&self) -> bool {
535        self.name_to_id.is_empty()
536    }
537}
538
539impl Default for SchemalessEdgeTypeRegistry {
540    fn default() -> Self {
541        Self::new()
542    }
543}
544
545/// First virtual (catalog-resolved) label ID. Label IDs in
546/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
547/// plugin-registered `CatalogProvider`s and allocated lazily by the
548/// planner via `PluginRegistry::register_virtual_label`. Native label
549/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
550pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
551/// Sentinel "no label" marker, kept distinct from any allocatable ID.
552pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
553
554/// Maximum byte length of a label or edge-type name. (L6)
555///
556/// Generous; the cap is hygiene — the name lands in on-disk dataset paths
557/// and Lance branch names — not a storage limit.
558const MAX_SCHEMA_NAME_LEN: usize = 255;
559
560/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
561#[inline]
562pub fn is_virtual_label_id(id: u16) -> bool {
563    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
564}
565
566#[derive(Clone, Debug, Serialize, Deserialize)]
567pub struct Schema {
568    pub schema_version: u32,
569    pub labels: HashMap<String, LabelMeta>,
570    pub edge_types: HashMap<String, EdgeTypeMeta>,
571    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
572    #[serde(default)]
573    pub indexes: Vec<IndexDefinition>,
574    #[serde(default)]
575    pub constraints: Vec<Constraint>,
576    /// Registry for schemaless edge types (dynamically assigned IDs)
577    #[serde(default)]
578    pub schemaless_registry: SchemalessEdgeTypeRegistry,
579}
580
581impl Default for Schema {
582    fn default() -> Self {
583        Self {
584            schema_version: 1,
585            labels: HashMap::new(),
586            edge_types: HashMap::new(),
587            properties: HashMap::new(),
588            indexes: Vec::new(),
589            constraints: Vec::new(),
590            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
591        }
592    }
593}
594
595impl Schema {
596    /// Bumps `schema_version` to invalidate cached query plans.
597    ///
598    /// Called at the end of every DDL mutation that changes the schema's
599    /// shape (labels, edge types, properties, indexes, constraints). The
600    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
601    /// built against an older shape is discarded once this advances. Uses
602    /// wrapping arithmetic: the value is a coarse change token, not a count,
603    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
604    fn bump_version(&mut self) {
605        self.schema_version = self.schema_version.wrapping_add(1);
606    }
607
608    /// Returns the label name for a given label ID.
609    ///
610    /// Performs a linear scan over all labels. This is efficient because
611    /// the number of labels in a schema is typically small.
612    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
613        self.labels
614            .iter()
615            .find(|(_, meta)| meta.id == label_id)
616            .map(|(name, _)| name.as_str())
617    }
618
619    /// Returns the label ID for a given label name.
620    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
621        self.labels.get(label_name).map(|meta| meta.id)
622    }
623
624    /// Returns the edge type name for a given type ID.
625    ///
626    /// Performs a linear scan over all edge types. This is efficient because
627    /// the number of edge types in a schema is typically small.
628    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
629        self.edge_types
630            .iter()
631            .find(|(_, meta)| meta.id == type_id)
632            .map(|(name, _)| name.as_str())
633    }
634
635    /// Returns the edge type ID for a given type name.
636    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
637        self.edge_types.get(type_name).map(|meta| meta.id)
638    }
639
640    /// Returns the vector index configuration for a given label and property.
641    ///
642    /// Performs a linear scan over all indexes. This is efficient because
643    /// the number of indexes in a schema is typically small.
644    pub fn vector_index_for_property(
645        &self,
646        label: &str,
647        property: &str,
648    ) -> Option<&VectorIndexConfig> {
649        self.indexes.iter().find_map(|idx| {
650            if let IndexDefinition::Vector(config) = idx
651                && config.label == label
652                && config.property == property
653                && config.metadata.status == IndexStatus::Online
654            {
655                return Some(config);
656            }
657            None
658        })
659    }
660
661    /// Returns the scored sparse-vector index configuration for a label/property.
662    pub fn sparse_index_for_property(
663        &self,
664        label: &str,
665        property: &str,
666    ) -> Option<&SparseVectorIndexConfig> {
667        self.indexes.iter().find_map(|idx| {
668            if let IndexDefinition::Sparse(config) = idx
669                && config.label == label
670                && config.property == property
671                && config.metadata.status == IndexStatus::Online
672            {
673                return Some(config);
674            }
675            None
676        })
677    }
678
679    /// Returns the full-text index configuration for a given label and property.
680    ///
681    /// A full-text index covers one or more properties. This returns the config
682    /// if the specified property is among the indexed properties.
683    pub fn fulltext_index_for_property(
684        &self,
685        label: &str,
686        property: &str,
687    ) -> Option<&FullTextIndexConfig> {
688        self.indexes.iter().find_map(|idx| {
689            if let IndexDefinition::FullText(config) = idx
690                && config.label == label
691                && config.properties.iter().any(|p| p == property)
692                && config.metadata.status == IndexStatus::Online
693            {
694                return Some(config);
695            }
696            None
697        })
698    }
699
700    /// Get label metadata with case-insensitive lookup.
701    ///
702    /// This allows queries to match labels regardless of case, providing
703    /// better user experience when label names vary in casing.
704    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
705        self.labels
706            .iter()
707            .find(|(k, _)| k.eq_ignore_ascii_case(name))
708            .map(|(_, v)| v)
709    }
710
711    /// Get the schema-canonical spelling of a label, matched case-insensitively.
712    ///
713    /// Returns the stored label name whose spelling differs only in case from
714    /// `name`, or `None` if no such label is registered. Callers use this to
715    /// normalize a user-supplied label to the canonical form the storage tier
716    /// keys on, so case variants resolve to the same vertex table.
717    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
718        self.labels
719            .iter()
720            .find(|(k, _)| k.eq_ignore_ascii_case(name))
721            .map(|(k, _)| k.clone())
722    }
723
724    /// Get label ID with case-insensitive lookup.
725    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
726        self.get_label_case_insensitive(label_name)
727            .map(|meta| meta.id)
728    }
729
730    /// Get edge type metadata with case-insensitive lookup.
731    ///
732    /// This allows queries to match edge types regardless of case, providing
733    /// better user experience when type names vary in casing.
734    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
735        self.edge_types
736            .iter()
737            .find(|(k, _)| k.eq_ignore_ascii_case(name))
738            .map(|(_, v)| v)
739    }
740
741    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
742    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
743        self.get_edge_type_case_insensitive(type_name)
744            .map(|meta| meta.id)
745    }
746
747    /// Get edge type ID with case-insensitive lookup, checking both
748    /// schema-defined and schemaless registries.
749    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
750        self.edge_type_id_by_name_case_insensitive(type_name)
751            .or_else(|| {
752                self.schemaless_registry
753                    .id_by_name_case_insensitive(type_name)
754            })
755    }
756
757    /// Returns the edge type ID for `type_name`, checking the schema first
758    /// and falling back to the schemaless registry (assigning a new ID if needed).
759    ///
760    /// Requires `&mut self` because it may assign a new schemaless ID.
761    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
762    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
763        if let Some(id) = self.edge_type_id_unified(type_name) {
764            return id;
765        }
766        // Reaching here means the type is brand-new to *both* the schema map and
767        // the schemaless registry (the early return above mirrors exactly what
768        // `edge_type_id_unified` checks). Minting a new schemaless edge type
769        // changes the result of `all_edge_type_ids()`, which untyped traversals
770        // bake into cached plans keyed on `schema_version`. Bump the version so
771        // those stale plans are evicted — otherwise a `MATCH ()-[r]->()` plan
772        // built before this type existed silently drops edges of the new type.
773        let id = self.schemaless_registry.get_or_assign_id(type_name);
774        self.bump_version();
775        id
776    }
777
778    /// Read-only unified exact lookup: schema-defined edge type id, falling
779    /// back to an already-assigned schemaless id.
780    ///
781    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
782    /// performs before assigning, so a `Some` here means the assigning path
783    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
784    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
785        self.edge_type_id_by_name(type_name)
786            .or_else(|| self.schemaless_registry.id_by_name(type_name))
787    }
788
789    /// Returns the edge type name for `type_id`, checking both the schema
790    /// and schemaless registries. Returns an owned `String` because the
791    /// name may come from either registry.
792    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
793        if is_schemaless_edge_type(type_id) {
794            self.schemaless_registry
795                .type_name_by_id(type_id)
796                .map(str::to_owned)
797        } else {
798            self.edge_type_name_by_id(type_id).map(str::to_owned)
799        }
800    }
801
802    /// Returns all edge type IDs, including both schema-defined and schemaless types.
803    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
804    pub fn all_edge_type_ids(&self) -> Vec<u32> {
805        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
806        ids.extend(self.schemaless_registry.all_type_ids());
807        ids.sort_unstable();
808        ids
809    }
810}
811
812/// Lifecycle status of an index.
813#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
814pub enum IndexStatus {
815    /// Index is up-to-date and available for queries.
816    #[default]
817    Online,
818    /// Index is currently being rebuilt.
819    Building,
820    /// Index is outdated and scheduled for rebuild.
821    Stale,
822    /// Index rebuild failed after exhausting retries.
823    Failed,
824}
825
826/// Metadata tracking the lifecycle state of an index.
827#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
828pub struct IndexMetadata {
829    /// Current lifecycle status.
830    #[serde(default)]
831    pub status: IndexStatus,
832    /// When the index was last successfully built.
833    #[serde(default)]
834    pub last_built_at: Option<DateTime<Utc>>,
835    /// Row count of the dataset when the index was last built.
836    #[serde(default)]
837    pub row_count_at_build: Option<u64>,
838}
839
840#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
841#[serde(tag = "type")]
842#[non_exhaustive]
843pub enum IndexDefinition {
844    Vector(VectorIndexConfig),
845    FullText(FullTextIndexConfig),
846    Scalar(ScalarIndexConfig),
847    Inverted(InvertedIndexConfig),
848    JsonFullText(JsonFtsIndexConfig),
849    /// Scored sparse-vector (SPLADE / learned-sparse) inverted index.
850    Sparse(SparseVectorIndexConfig),
851}
852
853impl IndexDefinition {
854    /// Returns the index name for any variant.
855    pub fn name(&self) -> &str {
856        match self {
857            IndexDefinition::Vector(c) => &c.name,
858            IndexDefinition::FullText(c) => &c.name,
859            IndexDefinition::Scalar(c) => &c.name,
860            IndexDefinition::Inverted(c) => &c.name,
861            IndexDefinition::JsonFullText(c) => &c.name,
862            IndexDefinition::Sparse(c) => &c.name,
863        }
864    }
865
866    /// Returns the label this index is defined on.
867    pub fn label(&self) -> &str {
868        match self {
869            IndexDefinition::Vector(c) => &c.label,
870            IndexDefinition::FullText(c) => &c.label,
871            IndexDefinition::Scalar(c) => &c.label,
872            IndexDefinition::Inverted(c) => &c.label,
873            IndexDefinition::JsonFullText(c) => &c.label,
874            IndexDefinition::Sparse(c) => &c.label,
875        }
876    }
877
878    /// Returns a reference to the index lifecycle metadata.
879    pub fn metadata(&self) -> &IndexMetadata {
880        match self {
881            IndexDefinition::Vector(c) => &c.metadata,
882            IndexDefinition::FullText(c) => &c.metadata,
883            IndexDefinition::Scalar(c) => &c.metadata,
884            IndexDefinition::Inverted(c) => &c.metadata,
885            IndexDefinition::JsonFullText(c) => &c.metadata,
886            IndexDefinition::Sparse(c) => &c.metadata,
887        }
888    }
889
890    /// Returns a mutable reference to the index lifecycle metadata.
891    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
892        match self {
893            IndexDefinition::Vector(c) => &mut c.metadata,
894            IndexDefinition::FullText(c) => &mut c.metadata,
895            IndexDefinition::Scalar(c) => &mut c.metadata,
896            IndexDefinition::Inverted(c) => &mut c.metadata,
897            IndexDefinition::JsonFullText(c) => &mut c.metadata,
898            IndexDefinition::Sparse(c) => &mut c.metadata,
899        }
900    }
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
904pub struct InvertedIndexConfig {
905    pub name: String,
906    pub label: String,
907    pub property: String,
908    #[serde(default = "default_normalize")]
909    pub normalize: bool,
910    #[serde(default = "default_max_terms_per_doc")]
911    pub max_terms_per_doc: usize,
912    #[serde(default)]
913    pub metadata: IndexMetadata,
914}
915
916fn default_normalize() -> bool {
917    true
918}
919
920fn default_max_terms_per_doc() -> usize {
921    10_000
922}
923
924/// Configuration for a scored sparse-vector (SPLADE / learned-sparse) index.
925///
926/// The index stores per-term postings `(term_id, vids, weights, max_impact)`
927/// and scores by dot product. `quantize` controls 8-bit weight quantization at
928/// the postings boundary (≈ lossless, ~4× smaller; default on). P2 block-max
929/// pruning knobs are added in a later milestone.
930#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
931pub struct SparseVectorIndexConfig {
932    pub name: String,
933    pub label: String,
934    pub property: String,
935    /// Term-space cardinality (max term id + 1), for validation/config.
936    pub dimensions: usize,
937    /// Quantize stored weights to 8-bit (per-term scale). Default on.
938    #[serde(default = "default_sparse_quantize")]
939    pub quantize: bool,
940    /// Auto-embedding source. When set, a declared text column is embedded into
941    /// this sparse column via the xervo sparse model at write time (and a text
942    /// query is embedded at query time) — mirrors `VectorIndexConfig`.
943    #[serde(default)]
944    pub embedding_config: Option<EmbeddingConfig>,
945    #[serde(default)]
946    pub metadata: IndexMetadata,
947}
948
949fn default_sparse_quantize() -> bool {
950    true
951}
952
953#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
954pub struct VectorIndexConfig {
955    pub name: String,
956    pub label: String,
957    pub property: String,
958    pub index_type: VectorIndexType,
959    pub metric: DistanceMetric,
960    pub embedding_config: Option<EmbeddingConfig>,
961    #[serde(default)]
962    pub metadata: IndexMetadata,
963}
964
965#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
966pub struct EmbeddingConfig {
967    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
968    pub alias: String,
969    pub source_properties: Vec<String>,
970    pub batch_size: usize,
971    /// Prefix prepended to text before embedding during auto-embed (document side).
972    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
973    #[serde(default)]
974    pub document_prefix: Option<String>,
975    /// Prefix prepended to text before embedding during query-time embed calls.
976    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
977    #[serde(default)]
978    pub query_prefix: Option<String>,
979}
980
981#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
982#[non_exhaustive]
983pub enum VectorIndexType {
984    Flat,
985    IvfFlat {
986        num_partitions: u32,
987    },
988    IvfPq {
989        num_partitions: u32,
990        num_sub_vectors: u32,
991        bits_per_subvector: u8,
992    },
993    IvfSq {
994        num_partitions: u32,
995    },
996    IvfRq {
997        num_partitions: u32,
998        #[serde(default)]
999        num_bits: Option<u8>,
1000    },
1001    HnswFlat {
1002        m: u32,
1003        ef_construction: u32,
1004        #[serde(default)]
1005        num_partitions: Option<u32>,
1006    },
1007    HnswSq {
1008        m: u32,
1009        ef_construction: u32,
1010        #[serde(default)]
1011        num_partitions: Option<u32>,
1012    },
1013    HnswPq {
1014        m: u32,
1015        ef_construction: u32,
1016        num_sub_vectors: u32,
1017        #[serde(default)]
1018        num_partitions: Option<u32>,
1019    },
1020    /// MUVERA (arXiv:2405.19504) Fixed-Dimensional Encoding for multi-vector
1021    /// (ColBERT/MaxSim) columns. The source multi-vector is encoded into a single
1022    /// derived `Vector<fde_dim>` column, and `inner` is the single-vector ANN index
1023    /// type built over that derived column (always with the `Dot` metric — the FDE
1024    /// inner product approximates MaxSim). The exact MaxSim re-rank still uses the
1025    /// `VectorIndexConfig.metric`. See `uni_query_functions::muvera`.
1026    Muvera {
1027        /// SimHash hyperplanes per repetition (`2^k_sim` buckets).
1028        k_sim: u32,
1029        /// Independent repetitions concatenated into the FDE.
1030        reps: u32,
1031        /// Inner-projection target dim (`0` = no projection, use the source dim).
1032        d_proj: u32,
1033        /// Master seed; persisted so query-time encoding matches doc-time encoding.
1034        seed: u64,
1035        /// The single-vector ANN index built over the derived FDE column.
1036        inner: Box<VectorIndexType>,
1037    },
1038}
1039
1040#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1041#[non_exhaustive]
1042pub enum DistanceMetric {
1043    Cosine,
1044    L2,
1045    Dot,
1046}
1047
1048impl DistanceMetric {
1049    /// Computes the distance between two vectors using this metric.
1050    ///
1051    /// All metrics follow LanceDB conventions so that lower values indicate
1052    /// greater similarity:
1053    /// - **L2**: squared Euclidean distance.
1054    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
1055    /// - **Dot**: negative dot product.
1056    ///
1057    /// # Panics
1058    ///
1059    /// Panics if `a` and `b` have different lengths.
1060    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
1061        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
1062        match self {
1063            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
1064            DistanceMetric::Cosine => {
1065                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1066                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1067                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
1068                let denom = norm_a * norm_b;
1069                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
1070            }
1071            DistanceMetric::Dot => {
1072                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
1073                -dot
1074            }
1075        }
1076    }
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1080pub struct FullTextIndexConfig {
1081    pub name: String,
1082    pub label: String,
1083    pub properties: Vec<String>,
1084    pub tokenizer: TokenizerConfig,
1085    pub with_positions: bool,
1086    #[serde(default)]
1087    pub metadata: IndexMetadata,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1091#[non_exhaustive]
1092pub enum TokenizerConfig {
1093    Standard,
1094    Whitespace,
1095    Ngram { min: u8, max: u8 },
1096    Custom { name: String },
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1100pub struct JsonFtsIndexConfig {
1101    pub name: String,
1102    pub label: String,
1103    pub column: String,
1104    #[serde(default)]
1105    pub paths: Vec<String>,
1106    #[serde(default)]
1107    pub with_positions: bool,
1108    #[serde(default)]
1109    pub metadata: IndexMetadata,
1110}
1111
1112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1113pub struct ScalarIndexConfig {
1114    pub name: String,
1115    pub label: String,
1116    pub properties: Vec<String>,
1117    pub index_type: ScalarIndexType,
1118    pub where_clause: Option<String>,
1119    #[serde(default)]
1120    pub metadata: IndexMetadata,
1121}
1122
1123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1124#[non_exhaustive]
1125pub enum ScalarIndexType {
1126    BTree,
1127    Hash,
1128    Bitmap,
1129    LabelList,
1130}
1131
1132pub struct SchemaManager {
1133    store: Arc<dyn ObjectStore>,
1134    path: ObjectStorePath,
1135    schema: RwLock<Arc<Schema>>,
1136}
1137
1138impl SchemaManager {
1139    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1140        let path = path.as_ref();
1141        let parent = path
1142            .parent()
1143            .ok_or_else(|| anyhow!("Invalid schema path"))?;
1144        let filename = path
1145            .file_name()
1146            .ok_or_else(|| anyhow!("Invalid schema filename"))?
1147            .to_str()
1148            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1149
1150        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1151        let obj_path = ObjectStorePath::from(filename);
1152
1153        Self::load_from_store(store, &obj_path).await
1154    }
1155
1156    pub async fn load_from_store(
1157        store: Arc<dyn ObjectStore>,
1158        path: &ObjectStorePath,
1159    ) -> Result<Self> {
1160        match store.get(path).await {
1161            Ok(result) => {
1162                let bytes = result.bytes().await?;
1163                let content = String::from_utf8(bytes.to_vec())?;
1164                let mut schema: Schema = serde_json::from_str(&content)?;
1165                // Self-heal catalogs that grew super-linearly under the
1166                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
1167                // duplicate index entries by name, keeping the *last*
1168                // occurrence — matches the upsert semantics in `add_index`
1169                // and preserves whatever metadata the most recent rebuild
1170                // wrote. The dedup persists on the next mutation that
1171                // calls `save()`.
1172                let original_len = schema.indexes.len();
1173                if original_len > 0 {
1174                    let mut seen: std::collections::HashSet<String> =
1175                        std::collections::HashSet::with_capacity(original_len);
1176                    let mut dedup: Vec<IndexDefinition> = schema
1177                        .indexes
1178                        .iter()
1179                        .rev()
1180                        .filter(|idx| seen.insert(idx.name().to_string()))
1181                        .cloned()
1182                        .collect();
1183                    dedup.reverse();
1184                    if dedup.len() != original_len {
1185                        tracing::warn!(
1186                            collapsed = original_len - dedup.len(),
1187                            kept = dedup.len(),
1188                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1189                        );
1190                        schema.indexes = dedup;
1191                    }
1192                }
1193                Ok(Self {
1194                    store,
1195                    path: path.clone(),
1196                    schema: RwLock::new(Arc::new(schema)),
1197                })
1198            }
1199            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1200                store,
1201                path: path.clone(),
1202                schema: RwLock::new(Arc::new(Schema::default())),
1203            }),
1204            Err(e) => Err(anyhow::Error::from(e)),
1205        }
1206    }
1207
1208    pub async fn save(&self) -> Result<()> {
1209        let content = {
1210            let schema_guard = acquire_read(&self.schema, "schema")?;
1211            serde_json::to_string_pretty(&**schema_guard)?
1212        };
1213        self.store
1214            .put(&self.path, content.into())
1215            .await
1216            .map_err(anyhow::Error::from)?;
1217        Ok(())
1218    }
1219
1220    pub fn path(&self) -> &ObjectStorePath {
1221        &self.path
1222    }
1223
1224    pub fn schema(&self) -> Arc<Schema> {
1225        self.schema
1226            .read()
1227            .expect("Schema lock poisoned - a thread panicked while holding it")
1228            .clone()
1229    }
1230
1231    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1232    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1233    fn normalize_function_names(expr: &str) -> String {
1234        let mut result = String::with_capacity(expr.len());
1235        let mut chars = expr.chars().peekable();
1236
1237        while let Some(ch) = chars.next() {
1238            if ch.is_alphabetic() {
1239                // Collect identifier
1240                let mut ident = String::new();
1241                ident.push(ch);
1242
1243                while let Some(&next) = chars.peek() {
1244                    if next.is_alphanumeric() || next == '_' {
1245                        ident.push(chars.next().unwrap());
1246                    } else {
1247                        break;
1248                    }
1249                }
1250
1251                // If followed by '(', it's a function call - uppercase it
1252                if chars.peek() == Some(&'(') {
1253                    result.push_str(&ident.to_uppercase());
1254                } else {
1255                    result.push_str(&ident); // Keep property names as-is
1256                }
1257            } else {
1258                result.push(ch);
1259            }
1260        }
1261
1262        result
1263    }
1264
1265    /// Generate a consistent internal column name for an expression index.
1266    /// Uses a hash suffix to ensure uniqueness for different expressions that
1267    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1268    ///
1269    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1270    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1271    /// if the hash changes after a compiler upgrade.
1272    pub fn generated_column_name(expr: &str) -> String {
1273        // Normalize function names to uppercase for case-insensitive matching
1274        let normalized = Self::normalize_function_names(expr);
1275
1276        let sanitized = normalized
1277            .replace(|c: char| !c.is_alphanumeric(), "_")
1278            .trim_matches('_')
1279            .to_string();
1280
1281        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1282        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1283        const FNV_PRIME: u64 = 1099511628211;
1284
1285        let mut hash = FNV_OFFSET_BASIS;
1286        for byte in normalized.as_bytes() {
1287            hash ^= *byte as u64;
1288            hash = hash.wrapping_mul(FNV_PRIME);
1289        }
1290
1291        format!("_gen_{}_{:x}", sanitized, hash)
1292    }
1293
1294    pub fn replace_schema(&self, new_schema: Schema) {
1295        let mut schema = self
1296            .schema
1297            .write()
1298            .expect("Schema lock poisoned - a thread panicked while holding it");
1299        *schema = Arc::new(new_schema);
1300    }
1301
1302    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1303    ///
1304    /// Used by `UniInner::at_fork` to give a forked session a schema view
1305    /// that includes any labels/edge-types/properties the fork has
1306    /// introduced on top of primary. The returned manager owns its own
1307    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1308    /// schema file. The returned manager is *not* intended for `.save()`;
1309    /// fork-overlay persistence is owned by the registry layer
1310    /// (`catalog/fork_schemas/{fork_id}.json`).
1311    ///
1312    /// In Phase 1 the delta is always empty, so the merge is a clone.
1313    /// Phase 2 starts populating it when on-the-fly label creation lands.
1314    #[must_use]
1315    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1316        let primary = self.schema();
1317        let merged = if overlay.is_empty() {
1318            (*primary).clone()
1319        } else {
1320            let mut merged = (*primary).clone();
1321            for (name, label) in &overlay.added_labels {
1322                merged.labels.insert(name.clone(), label.clone());
1323            }
1324            for (name, edge_type) in &overlay.added_edge_types {
1325                merged.edge_types.insert(name.clone(), edge_type.clone());
1326            }
1327            for addition in &overlay.added_properties {
1328                let props = merged.properties.entry(addition.owner.clone()).or_default();
1329                props.insert(
1330                    addition.property.clone(),
1331                    PropertyMeta {
1332                        r#type: addition.data_type.clone(),
1333                        nullable: addition.nullable,
1334                        added_in: merged.schema_version,
1335                        state: SchemaElementState::Active,
1336                        generation_expression: None,
1337                        description: None,
1338                    },
1339                );
1340            }
1341            merged
1342        };
1343
1344        Arc::new(Self {
1345            store: self.store.clone(),
1346            path: self.path.clone(),
1347            schema: RwLock::new(Arc::new(merged)),
1348        })
1349    }
1350
1351    pub fn next_label_id(&self) -> u16 {
1352        self.schema()
1353            .labels
1354            .values()
1355            .map(|l| l.id)
1356            .max()
1357            .unwrap_or(0)
1358            + 1
1359    }
1360
1361    pub fn next_type_id(&self) -> u32 {
1362        let max_schema_id = self
1363            .schema()
1364            .edge_types
1365            .values()
1366            .map(|t| t.id)
1367            .max()
1368            .unwrap_or(0);
1369
1370        // Ensure we stay in schema'd ID space (bit 31 = 0)
1371        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1372            panic!("Schema edge type ID exhaustion");
1373        }
1374
1375        max_schema_id + 1
1376    }
1377
1378    /// Validate a label or edge-type name at definition time. (L6)
1379    ///
1380    /// Names flow into on-disk dataset paths (`vertices_{name}.lance`) and
1381    /// Lance branch names (`fork_{id}_{…}`); a name with a path separator,
1382    /// whitespace, or a control character corrupts those paths and breaks
1383    /// fork creation. Such names were never actually usable, so they are
1384    /// rejected up front rather than failing later. `.` is allowed
1385    /// (path-safe and common in qualified names).
1386    ///
1387    /// Public so the fork-create path can apply the same rule as a backstop
1388    /// over names that entered the schema through an infallible interning
1389    /// path (e.g. schemaless `get_or_assign_edge_type_id`).
1390    ///
1391    /// # Errors
1392    /// Returns an error if `name` is empty/all-whitespace, exceeds
1393    /// `MAX_SCHEMA_NAME_LEN` bytes, or contains a control, whitespace,
1394    /// `/`, or `\` character.
1395    pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1396        if name.is_empty() || name.chars().all(char::is_whitespace) {
1397            return Err(anyhow!(
1398                "{kind} name must be non-empty and not all whitespace"
1399            ));
1400        }
1401        if name.len() > MAX_SCHEMA_NAME_LEN {
1402            return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1403        }
1404        if let Some(c) = name
1405            .chars()
1406            .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1407        {
1408            return Err(anyhow!(
1409                "{kind} name '{name}' contains an unsafe character ({c:?})"
1410            ));
1411        }
1412        Ok(())
1413    }
1414
1415    pub fn add_label(&self, name: &str) -> Result<u16> {
1416        self.add_label_with_desc(name, None)
1417    }
1418
1419    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1420        Self::validate_schema_element_name("Label", name)?;
1421        let mut guard = acquire_write(&self.schema, "schema")?;
1422        let schema = Arc::make_mut(&mut *guard);
1423        if schema.labels.contains_key(name) {
1424            return Err(anyhow!("Label '{}' already exists", name));
1425        }
1426
1427        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1428        if id >= VIRTUAL_LABEL_ID_START {
1429            return Err(anyhow!(
1430                "Native label space exhausted (next id {id:#x} would enter the \
1431                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1432                 reserved for catalog-resolved labels)"
1433            ));
1434        }
1435        schema.labels.insert(
1436            name.to_string(),
1437            LabelMeta {
1438                id,
1439                created_at: Utc::now(),
1440                state: SchemaElementState::Active,
1441                description,
1442            },
1443        );
1444        schema.bump_version();
1445        Ok(id)
1446    }
1447
1448    pub fn add_edge_type(
1449        &self,
1450        name: &str,
1451        src_labels: Vec<String>,
1452        dst_labels: Vec<String>,
1453    ) -> Result<u32> {
1454        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1455    }
1456
1457    pub fn add_edge_type_with_desc(
1458        &self,
1459        name: &str,
1460        src_labels: Vec<String>,
1461        dst_labels: Vec<String>,
1462        description: Option<String>,
1463    ) -> Result<u32> {
1464        Self::validate_schema_element_name("Edge type", name)?;
1465        let mut guard = acquire_write(&self.schema, "schema")?;
1466        let schema = Arc::make_mut(&mut *guard);
1467        if schema.edge_types.contains_key(name) {
1468            return Err(anyhow!("Edge type '{}' already exists", name));
1469        }
1470
1471        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1472
1473        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1474        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1475        // `add_edge_type`, so the two entry points cannot disagree on the
1476        // legal ceiling.
1477        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1478            return Err(anyhow!(
1479                "Native edge type space exhausted (next id {id:#x} would enter the \
1480                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1481                 reserved for catalog-resolved edge types)"
1482            ));
1483        }
1484
1485        schema.edge_types.insert(
1486            name.to_string(),
1487            EdgeTypeMeta {
1488                id,
1489                src_labels,
1490                dst_labels,
1491                state: SchemaElementState::Active,
1492                description,
1493            },
1494        );
1495        schema.bump_version();
1496        Ok(id)
1497    }
1498
1499    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1500    ///
1501    /// Read-lock fast path: the type name is almost always already known
1502    /// (it is constant per statement but resolved per row by the CREATE
1503    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1504    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1505    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1506    /// re-checks under the write lock, so two racing assigners converge on one id.
1507    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1508        {
1509            let guard = acquire_read(&self.schema, "schema")
1510                .expect("Schema lock poisoned - a thread panicked while holding it");
1511            if let Some(id) = guard.edge_type_id_unified(type_name) {
1512                return id;
1513            }
1514        }
1515        let mut guard = acquire_write(&self.schema, "schema")
1516            .expect("Schema lock poisoned - a thread panicked while holding it");
1517        let schema = Arc::make_mut(&mut *guard);
1518        schema.get_or_assign_edge_type_id(type_name)
1519    }
1520
1521    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1522    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1523        let schema = acquire_read(&self.schema, "schema")
1524            .expect("Schema lock poisoned - a thread panicked while holding it");
1525        schema.edge_type_name_by_id_unified(type_id)
1526    }
1527
1528    pub fn add_property(
1529        &self,
1530        label_or_type: &str,
1531        prop_name: &str,
1532        data_type: DataType,
1533        nullable: bool,
1534    ) -> Result<()> {
1535        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1536    }
1537
1538    pub fn add_property_with_desc(
1539        &self,
1540        label_or_type: &str,
1541        prop_name: &str,
1542        data_type: DataType,
1543        nullable: bool,
1544        description: Option<String>,
1545    ) -> Result<()> {
1546        validate_property_name(prop_name)?;
1547        let mut guard = acquire_write(&self.schema, "schema")?;
1548        let schema = Arc::make_mut(&mut *guard);
1549        let version = schema.schema_version;
1550        let props = schema
1551            .properties
1552            .entry(label_or_type.to_string())
1553            .or_default();
1554
1555        if props.contains_key(prop_name) {
1556            return Err(anyhow!(
1557                "Property '{}' already exists for '{}'",
1558                prop_name,
1559                label_or_type
1560            ));
1561        }
1562
1563        props.insert(
1564            prop_name.to_string(),
1565            PropertyMeta {
1566                r#type: data_type,
1567                nullable,
1568                added_in: version,
1569                state: SchemaElementState::Active,
1570                generation_expression: None,
1571                description,
1572            },
1573        );
1574        // Bump after stamping `added_in` with the pre-bump `version`.
1575        schema.bump_version();
1576        Ok(())
1577    }
1578
1579    /// Register an INTERNAL property (underscore-prefixed name allowed) that is
1580    /// materialised by the storage layer, not written by the user — e.g. the MUVERA
1581    /// `__fde_*` derived column. Bypasses the user-facing underscore-prefix rule but
1582    /// still rejects storage-layer name collisions. Idempotent: a no-op if the property
1583    /// already exists with the same type (so re-creating an index is safe).
1584    ///
1585    /// Returns `true` if this call newly inserted the property, `false` if it already
1586    /// existed (idempotent). The check-and-insert is atomic under the schema write lock,
1587    /// so for concurrent callers exactly one observes `true` — letting callers gate
1588    /// expensive one-time work (e.g. the MUVERA backfill) on the winner.
1589    pub fn add_internal_property(
1590        &self,
1591        label_or_type: &str,
1592        prop_name: &str,
1593        data_type: DataType,
1594        nullable: bool,
1595    ) -> Result<bool> {
1596        validate_reserved_property_name(prop_name)?;
1597        let mut guard = acquire_write(&self.schema, "schema")?;
1598        let schema = Arc::make_mut(&mut *guard);
1599        let version = schema.schema_version;
1600        let props = schema
1601            .properties
1602            .entry(label_or_type.to_string())
1603            .or_default();
1604
1605        if let Some(existing) = props.get(prop_name) {
1606            if existing.r#type == data_type {
1607                return Ok(false); // already present (idempotent re-registration)
1608            }
1609            return Err(anyhow!(
1610                "Internal property '{}' already exists for '{}' with a different type",
1611                prop_name,
1612                label_or_type
1613            ));
1614        }
1615
1616        props.insert(
1617            prop_name.to_string(),
1618            PropertyMeta {
1619                r#type: data_type,
1620                nullable,
1621                added_in: version,
1622                state: SchemaElementState::Active,
1623                generation_expression: None,
1624                description: None,
1625            },
1626        );
1627        schema.bump_version();
1628        Ok(true)
1629    }
1630
1631    pub fn add_generated_property(
1632        &self,
1633        label_or_type: &str,
1634        prop_name: &str,
1635        data_type: DataType,
1636        expr: String,
1637    ) -> Result<()> {
1638        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1639        // but must still avoid storage-layer column-name collisions.
1640        validate_reserved_property_name(prop_name)?;
1641        let mut guard = acquire_write(&self.schema, "schema")?;
1642        let schema = Arc::make_mut(&mut *guard);
1643        let version = schema.schema_version;
1644        let props = schema
1645            .properties
1646            .entry(label_or_type.to_string())
1647            .or_default();
1648
1649        if props.contains_key(prop_name) {
1650            return Err(anyhow!("Property '{}' already exists", prop_name));
1651        }
1652
1653        props.insert(
1654            prop_name.to_string(),
1655            PropertyMeta {
1656                r#type: data_type,
1657                nullable: true,
1658                added_in: version,
1659                state: SchemaElementState::Active,
1660                generation_expression: Some(expr),
1661                description: None,
1662            },
1663        );
1664        // Bump after stamping `added_in` with the pre-bump `version`.
1665        schema.bump_version();
1666        Ok(())
1667    }
1668
1669    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1670        let mut guard = acquire_write(&self.schema, "schema")?;
1671        let schema = Arc::make_mut(&mut *guard);
1672        let meta = schema
1673            .labels
1674            .get_mut(name)
1675            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1676        meta.description = description;
1677        Ok(())
1678    }
1679
1680    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1681        let mut guard = acquire_write(&self.schema, "schema")?;
1682        let schema = Arc::make_mut(&mut *guard);
1683        let meta = schema
1684            .edge_types
1685            .get_mut(name)
1686            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1687        meta.description = description;
1688        Ok(())
1689    }
1690
1691    pub fn set_property_description(
1692        &self,
1693        entity: &str,
1694        prop_name: &str,
1695        description: Option<String>,
1696    ) -> Result<()> {
1697        let mut guard = acquire_write(&self.schema, "schema")?;
1698        let schema = Arc::make_mut(&mut *guard);
1699        let props = schema
1700            .properties
1701            .get_mut(entity)
1702            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1703        let meta = props
1704            .get_mut(prop_name)
1705            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1706        meta.description = description;
1707        Ok(())
1708    }
1709
1710    /// Register an index definition on the schema, **upsert by name**.
1711    ///
1712    /// If an index with the same `IndexDefinition::name()` already exists, it
1713    /// is replaced in place; otherwise the def is appended. Idempotent under
1714    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1715    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1716    /// every `IndexManager::create_*_index` re-record metadata updates without
1717    /// duplicating entries (issue rustic-ai/uni-db#63).
1718    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1719        let mut guard = acquire_write(&self.schema, "schema")?;
1720        let schema = Arc::make_mut(&mut *guard);
1721        if let Some(existing) = schema
1722            .indexes
1723            .iter_mut()
1724            .find(|i| i.name() == index_def.name())
1725        {
1726            *existing = index_def;
1727        } else {
1728            schema.indexes.push(index_def);
1729        }
1730        schema.bump_version();
1731        Ok(())
1732    }
1733
1734    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1735        let schema = self.schema.read().expect("Schema lock poisoned");
1736        schema.indexes.iter().find(|i| i.name() == name).cloned()
1737    }
1738
1739    /// Updates the lifecycle metadata for an index by name.
1740    ///
1741    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1742    /// allowing callers to update status, timestamps, etc.
1743    pub fn update_index_metadata(
1744        &self,
1745        index_name: &str,
1746        f: impl FnOnce(&mut IndexMetadata),
1747    ) -> Result<()> {
1748        let mut guard = acquire_write(&self.schema, "schema")?;
1749        let schema = Arc::make_mut(&mut *guard);
1750        let idx = schema
1751            .indexes
1752            .iter_mut()
1753            .find(|i| i.name() == index_name)
1754            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1755        f(idx.metadata_mut());
1756        Ok(())
1757    }
1758
1759    pub fn remove_index(&self, name: &str) -> Result<()> {
1760        let mut guard = acquire_write(&self.schema, "schema")?;
1761        let schema = Arc::make_mut(&mut *guard);
1762        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1763            schema.indexes.remove(pos);
1764            schema.bump_version();
1765            Ok(())
1766        } else {
1767            Err(anyhow!("Index '{}' not found", name))
1768        }
1769    }
1770
1771    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1772        let mut guard = acquire_write(&self.schema, "schema")?;
1773        let schema = Arc::make_mut(&mut *guard);
1774        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1775            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1776        }
1777        schema.constraints.push(constraint);
1778        schema.bump_version();
1779        Ok(())
1780    }
1781
1782    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1783        let mut guard = acquire_write(&self.schema, "schema")?;
1784        let schema = Arc::make_mut(&mut *guard);
1785        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1786            schema.constraints.remove(pos);
1787            schema.bump_version();
1788            Ok(())
1789        } else if if_exists {
1790            Ok(())
1791        } else {
1792            Err(anyhow!("Constraint '{}' not found", name))
1793        }
1794    }
1795
1796    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1797        let mut guard = acquire_write(&self.schema, "schema")?;
1798        let schema = Arc::make_mut(&mut *guard);
1799        let Some(props) = schema.properties.get_mut(label_or_type) else {
1800            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1801        };
1802        if props.remove(prop_name).is_none() {
1803            return Err(anyhow!(
1804                "Property '{}' not found for '{}'",
1805                prop_name,
1806                label_or_type
1807            ));
1808        }
1809        schema.bump_version();
1810        Ok(())
1811    }
1812
1813    pub fn rename_property(
1814        &self,
1815        label_or_type: &str,
1816        old_name: &str,
1817        new_name: &str,
1818    ) -> Result<()> {
1819        let mut guard = acquire_write(&self.schema, "schema")?;
1820        let schema = Arc::make_mut(&mut *guard);
1821        let Some(props) = schema.properties.get_mut(label_or_type) else {
1822            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1823        };
1824        let Some(meta) = props.remove(old_name) else {
1825            return Err(anyhow!(
1826                "Property '{}' not found for '{}'",
1827                old_name,
1828                label_or_type
1829            ));
1830        };
1831        if props.contains_key(new_name) {
1832            // Rollback removal? Or just error.
1833            props.insert(old_name.to_string(), meta); // Restore
1834            return Err(anyhow!("Property '{}' already exists", new_name));
1835        }
1836        props.insert(new_name.to_string(), meta);
1837        schema.bump_version();
1838        Ok(())
1839    }
1840
1841    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1842        let mut guard = acquire_write(&self.schema, "schema")?;
1843        let schema = Arc::make_mut(&mut *guard);
1844        if let Some(label_meta) = schema.labels.get_mut(name) {
1845            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1846            // Do not remove properties; they are implicitly tombstoned by the label
1847            schema.bump_version();
1848            Ok(())
1849        } else if if_exists {
1850            Ok(())
1851        } else {
1852            Err(anyhow!("Label '{}' not found", name))
1853        }
1854    }
1855
1856    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1857        let mut guard = acquire_write(&self.schema, "schema")?;
1858        let schema = Arc::make_mut(&mut *guard);
1859        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1860            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1861            // Do not remove properties; they are implicitly tombstoned by the edge type
1862            schema.bump_version();
1863            Ok(())
1864        } else if if_exists {
1865            Ok(())
1866        } else {
1867            Err(anyhow!("Edge Type '{}' not found", name))
1868        }
1869    }
1870}
1871
1872/// Validate identifier names to prevent injection and ensure compatibility.
1873pub fn validate_identifier(name: &str) -> Result<()> {
1874    // Length check
1875    if name.is_empty() || name.len() > 64 {
1876        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1877    }
1878
1879    // First character must be letter or underscore
1880    let first = name.chars().next().unwrap();
1881    if !first.is_alphabetic() && first != '_' {
1882        return Err(anyhow!(
1883            "Identifier '{}' must start with letter or underscore",
1884            name
1885        ));
1886    }
1887
1888    // Remaining characters: alphanumeric or underscore
1889    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1890        return Err(anyhow!(
1891            "Identifier '{}' must contain only alphanumeric and underscore",
1892            name
1893        ));
1894    }
1895
1896    // Reserved words
1897    const RESERVED: &[&str] = &[
1898        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1899        "UNION", "ORDER", "LIMIT",
1900    ];
1901    if RESERVED.contains(&name.to_uppercase().as_str()) {
1902        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1903    }
1904
1905    Ok(())
1906}
1907
1908/// Reject user-declared property names that collide with internal Arrow column
1909/// names used by the storage layer.
1910///
1911/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1912/// schema with two `ext_id` fields at flush time, which Lance rejects with
1913/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1914pub fn validate_property_name(name: &str) -> Result<()> {
1915    if name.starts_with('_') {
1916        return Err(anyhow!(
1917            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1918            name
1919        ));
1920    }
1921    validate_reserved_property_name(name)
1922}
1923
1924/// Reject names that collide with storage-layer Arrow column names.
1925///
1926/// Used both by `validate_property_name` (user-facing path) and directly by
1927/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1928/// needs to bypass the underscore-prefix rule but must still reject the
1929/// fixed-name collisions below.
1930fn validate_reserved_property_name(name: &str) -> Result<()> {
1931    // Unprefixed names that get appended alongside user properties in the
1932    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1933    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1934    // Arrow schemas — declaring one of these as a user property produces a
1935    // duplicate Arrow field and a Lance "Duplicate field name" error at
1936    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1937    // `labels` in the main tables) are NOT listed: those tables don't
1938    // append user properties, so no collision can occur.
1939    const RESERVED_PROPS: &[&str] = &[
1940        "ext_id",
1941        "overflow_json",
1942        "eid",
1943        "src_vid",
1944        "dst_vid",
1945        "op",
1946        // Internal planner sentinel: a column-name marker used by
1947        // `mark_set_item_variables` (uni-query::query::planner) to request
1948        // narrow structural projection without full-schema expansion.
1949        // Reserved here defensively so an internal `add_generated_property`
1950        // path can't accidentally create a colliding user-facing column.
1951        // The user-facing `validate_property_name` already rejects this
1952        // via the underscore-prefix rule, so this is belt-and-suspenders.
1953        "__set_struct__",
1954    ];
1955    if RESERVED_PROPS.contains(&name) {
1956        return Err(anyhow!(
1957            "Property name '{}' is reserved by the storage layer; please choose a different name",
1958            name
1959        ));
1960    }
1961    Ok(())
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use super::*;
1967    use crate::value::{TemporalValue, Value};
1968    use object_store::local::LocalFileSystem;
1969    use tempfile::tempdir;
1970
1971    #[test]
1972    fn test_datatype_accepts_matrix() {
1973        let dt = || TemporalValue::DateTime {
1974            nanos_since_epoch: 0,
1975            offset_seconds: 0,
1976            timezone_name: None,
1977        };
1978
1979        // Null is accepted by every type (nullability checked separately).
1980        for ty in [
1981            DataType::String,
1982            DataType::Int64,
1983            DataType::Bool,
1984            DataType::DateTime,
1985            DataType::Float64,
1986        ] {
1987            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1988        }
1989
1990        // Exact-type matches.
1991        assert!(DataType::String.accepts(&Value::String("x".into())));
1992        assert!(DataType::Int64.accepts(&Value::Int(1)));
1993        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1994        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1995
1996        // Intentional lossless widenings remain allowed.
1997        assert!(
1998            DataType::Float64.accepts(&Value::Int(3)),
1999            "Int widens to Float"
2000        );
2001        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
2002        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
2003        assert!(
2004            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2005            "storage parses strings for non-struct Timestamp columns"
2006        );
2007
2008        // The #68 data-loss cases must be rejected (coercion handles strings separately).
2009        assert!(
2010            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
2011            "String into a DateTime struct column nulls silently — reject here"
2012        );
2013        assert!(!DataType::Bool.accepts(&Value::Int(1)));
2014        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
2015        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
2016        assert!(
2017            !DataType::String.accepts(&Value::Int(10)),
2018            "no implicit stringification"
2019        );
2020        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
2021
2022        // Opaque columns accept anything.
2023        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
2024    }
2025
2026    #[tokio::test]
2027    async fn test_schema_management() -> Result<()> {
2028        let dir = tempdir()?;
2029        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2030        let path = ObjectStorePath::from("schema.json");
2031        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
2032
2033        // Labels
2034        let lid = manager.add_label("Person")?;
2035        assert_eq!(lid, 1);
2036        assert!(manager.add_label("Person").is_err());
2037
2038        // Properties
2039        manager.add_property("Person", "name", DataType::String, false)?;
2040        assert!(
2041            manager
2042                .add_property("Person", "name", DataType::String, false)
2043                .is_err()
2044        );
2045
2046        // Edge types
2047        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
2048        assert_eq!(tid, 1);
2049
2050        manager.save().await?;
2051        // Check file exists
2052        assert!(store.get(&path).await.is_ok());
2053
2054        let manager2 = SchemaManager::load_from_store(store, &path).await?;
2055        assert!(manager2.schema().labels.contains_key("Person"));
2056        assert!(
2057            manager2
2058                .schema()
2059                .properties
2060                .get("Person")
2061                .unwrap()
2062                .contains_key("name")
2063        );
2064
2065        Ok(())
2066    }
2067
2068    #[tokio::test]
2069    async fn test_reserved_property_names_rejected() -> Result<()> {
2070        let dir = tempdir()?;
2071        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2072        let path = ObjectStorePath::from("schema.json");
2073        let manager = SchemaManager::load_from_store(store, &path).await?;
2074
2075        manager.add_label("Tiny")?;
2076
2077        // Unprefixed reserved names — these collide with internal Arrow
2078        // columns in storage tables and previously caused Lance
2079        // "Duplicate field name" errors at flush time.
2080        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
2081            let err = manager
2082                .add_property("Tiny", reserved, DataType::String, true)
2083                .expect_err(&format!("expected '{reserved}' to be rejected"));
2084            assert!(
2085                err.to_string().contains("reserved"),
2086                "error for '{reserved}' should mention 'reserved', got: {err}"
2087            );
2088        }
2089
2090        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
2091        // alongside the underscore-prefix rule). Confirms an internal
2092        // `add_generated_property` path cannot accidentally create a column
2093        // that collides with the SET-target structural-projection marker.
2094        let err = manager
2095            .add_property("Tiny", "__set_struct__", DataType::String, true)
2096            .expect_err("expected '__set_struct__' to be rejected");
2097        assert!(
2098            err.to_string().contains("reserved"),
2099            "__set_struct__ rejection should mention 'reserved', got: {err}"
2100        );
2101
2102        // Leading-underscore pattern rule.
2103        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
2104            assert!(
2105                manager
2106                    .add_property("Tiny", reserved, DataType::String, true)
2107                    .is_err(),
2108                "expected '{reserved}' to be rejected"
2109            );
2110        }
2111
2112        // Names that merely contain a reserved substring should still be
2113        // accepted.
2114        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
2115        manager.add_property("Tiny", "user_op", DataType::String, true)?;
2116        manager.add_property("Tiny", "type_name", DataType::String, true)?;
2117
2118        // Same check applies to edge-type properties (single dispatch).
2119        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
2120        assert!(
2121            manager
2122                .add_property("knows", "src_vid", DataType::Int64, true)
2123                .is_err()
2124        );
2125
2126        // And to generated properties.
2127        assert!(
2128            manager
2129                .add_generated_property(
2130                    "Tiny",
2131                    "ext_id",
2132                    DataType::String,
2133                    "concat('x', name)".into()
2134                )
2135                .is_err()
2136        );
2137
2138        Ok(())
2139    }
2140
2141    #[test]
2142    fn test_normalize_function_names() {
2143        assert_eq!(
2144            SchemaManager::normalize_function_names("lower(email)"),
2145            "LOWER(email)"
2146        );
2147        assert_eq!(
2148            SchemaManager::normalize_function_names("LOWER(email)"),
2149            "LOWER(email)"
2150        );
2151        assert_eq!(
2152            SchemaManager::normalize_function_names("Lower(email)"),
2153            "LOWER(email)"
2154        );
2155        assert_eq!(
2156            SchemaManager::normalize_function_names("trim(lower(email))"),
2157            "TRIM(LOWER(email))"
2158        );
2159    }
2160
2161    #[test]
2162    fn test_generated_column_name_case_insensitive() {
2163        let col1 = SchemaManager::generated_column_name("lower(email)");
2164        let col2 = SchemaManager::generated_column_name("LOWER(email)");
2165        let col3 = SchemaManager::generated_column_name("Lower(email)");
2166        assert_eq!(col1, col2);
2167        assert_eq!(col2, col3);
2168        assert!(col1.starts_with("_gen_LOWER_email_"));
2169    }
2170
2171    #[test]
2172    fn test_index_metadata_serde_backward_compat() {
2173        // Simulate old JSON without metadata field
2174        let json = r#"{
2175            "type": "Scalar",
2176            "name": "idx_person_name",
2177            "label": "Person",
2178            "properties": ["name"],
2179            "index_type": "BTree",
2180            "where_clause": null
2181        }"#;
2182        let def: IndexDefinition = serde_json::from_str(json).unwrap();
2183        let meta = def.metadata();
2184        assert_eq!(meta.status, IndexStatus::Online);
2185        assert!(meta.last_built_at.is_none());
2186        assert!(meta.row_count_at_build.is_none());
2187    }
2188
2189    #[test]
2190    fn test_index_metadata_serde_roundtrip() {
2191        let now = Utc::now();
2192        let def = IndexDefinition::Scalar(ScalarIndexConfig {
2193            name: "idx_test".to_string(),
2194            label: "Test".to_string(),
2195            properties: vec!["prop".to_string()],
2196            index_type: ScalarIndexType::BTree,
2197            where_clause: None,
2198            metadata: IndexMetadata {
2199                status: IndexStatus::Building,
2200                last_built_at: Some(now),
2201                row_count_at_build: Some(42),
2202            },
2203        });
2204
2205        let json = serde_json::to_string(&def).unwrap();
2206        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2207        assert_eq!(parsed.metadata().status, IndexStatus::Building);
2208        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2209        assert!(parsed.metadata().last_built_at.is_some());
2210    }
2211
2212    #[tokio::test]
2213    async fn test_update_index_metadata() -> Result<()> {
2214        let dir = tempdir()?;
2215        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2216        let path = ObjectStorePath::from("schema.json");
2217        let manager = SchemaManager::load_from_store(store, &path).await?;
2218
2219        manager.add_label("Person")?;
2220        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2221            name: "idx_test".to_string(),
2222            label: "Person".to_string(),
2223            properties: vec!["name".to_string()],
2224            index_type: ScalarIndexType::BTree,
2225            where_clause: None,
2226            metadata: Default::default(),
2227        });
2228        manager.add_index(idx)?;
2229
2230        // Verify initial status is Online
2231        let initial = manager.get_index("idx_test").unwrap();
2232        assert_eq!(initial.metadata().status, IndexStatus::Online);
2233
2234        // Update to Building
2235        manager.update_index_metadata("idx_test", |m| {
2236            m.status = IndexStatus::Building;
2237            m.row_count_at_build = Some(100);
2238        })?;
2239
2240        let updated = manager.get_index("idx_test").unwrap();
2241        assert_eq!(updated.metadata().status, IndexStatus::Building);
2242        assert_eq!(updated.metadata().row_count_at_build, Some(100));
2243
2244        // Non-existent index should error
2245        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2246
2247        Ok(())
2248    }
2249
2250    /// `add_internal_property` reports whether THIS call inserted the property: `true` on
2251    /// first insert, `false` on idempotent re-registration, `Err` on a type conflict. The
2252    /// MUVERA backfill gates on this (only the inserter backfills), so two concurrent
2253    /// creates of the same index can't both run the full-table rewrite (issue #107).
2254    #[tokio::test]
2255    async fn add_internal_property_reports_newly_added() -> Result<()> {
2256        let dir = tempdir()?;
2257        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2258        let path = ObjectStorePath::from("schema.json");
2259        let manager = SchemaManager::load_from_store(store, &path).await?;
2260        manager.add_label("Doc")?;
2261
2262        let dt = DataType::Vector { dimensions: 16 };
2263        // First registration: newly added.
2264        assert!(manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2265        // Idempotent re-registration with the same type: NOT newly added.
2266        assert!(!manager.add_internal_property("Doc", "__fde_x", dt.clone(), true)?);
2267        // Same name, conflicting type: hard error (no silent divergence).
2268        assert!(
2269            manager
2270                .add_internal_property("Doc", "__fde_x", DataType::Vector { dimensions: 8 }, true)
2271                .is_err()
2272        );
2273        Ok(())
2274    }
2275
2276    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
2277    /// invocations with the same `IndexDefinition::name()` must replace
2278    /// the entry in place rather than appending. Subsequent `add_index`
2279    /// calls also reflect metadata updates from the new definition.
2280    #[tokio::test]
2281    async fn test_add_index_is_upsert_by_name() -> Result<()> {
2282        let dir = tempdir()?;
2283        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2284        let path = ObjectStorePath::from("schema.json");
2285        let manager = SchemaManager::load_from_store(store, &path).await?;
2286        manager.add_label("Person")?;
2287
2288        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2289            name: "idx_test".to_string(),
2290            label: "Person".to_string(),
2291            properties: vec!["name".to_string()],
2292            index_type: ScalarIndexType::BTree,
2293            where_clause: None,
2294            metadata: IndexMetadata {
2295                status: IndexStatus::Building,
2296                ..Default::default()
2297            },
2298        });
2299        manager.add_index(initial.clone())?;
2300        assert_eq!(manager.schema().indexes.len(), 1);
2301
2302        // Re-add the identical def — must remain a single entry.
2303        manager.add_index(initial.clone())?;
2304        assert_eq!(
2305            manager.schema().indexes.len(),
2306            1,
2307            "duplicate add_index by name must not append"
2308        );
2309
2310        // Re-add with updated metadata — must replace in place, len unchanged.
2311        let mut updated_cfg = match initial {
2312            IndexDefinition::Scalar(c) => c,
2313            _ => unreachable!(),
2314        };
2315        updated_cfg.metadata.status = IndexStatus::Online;
2316        updated_cfg.metadata.row_count_at_build = Some(42);
2317        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2318        assert_eq!(manager.schema().indexes.len(), 1);
2319        let stored = manager.get_index("idx_test").unwrap();
2320        assert_eq!(stored.metadata().status, IndexStatus::Online);
2321        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2322
2323        // A *different* name appends as a new entry.
2324        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2325            name: "idx_other".to_string(),
2326            label: "Person".to_string(),
2327            properties: vec!["age".to_string()],
2328            index_type: ScalarIndexType::BTree,
2329            where_clause: None,
2330            metadata: IndexMetadata::default(),
2331        });
2332        manager.add_index(other)?;
2333        assert_eq!(manager.schema().indexes.len(), 2);
2334
2335        Ok(())
2336    }
2337
2338    /// `load_from_store` self-heals catalogs that were bloated by the
2339    /// pre-fix `add_index` (kept the *last* def per name).
2340    #[tokio::test]
2341    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2342        let dir = tempdir()?;
2343        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2344        let path = ObjectStorePath::from("schema.json");
2345
2346        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2347        // sharing the same name. The last entry has distinct metadata so
2348        // we can assert "last writer wins" semantics.
2349        let mut schema = Schema::default();
2350        schema.labels.insert(
2351            "Person".to_string(),
2352            LabelMeta {
2353                id: 1,
2354                created_at: chrono::Utc::now(),
2355                state: SchemaElementState::Active,
2356                description: None,
2357            },
2358        );
2359        let make = |status: IndexStatus, count: Option<u64>| {
2360            IndexDefinition::Scalar(ScalarIndexConfig {
2361                name: "idx_dup".to_string(),
2362                label: "Person".to_string(),
2363                properties: vec!["name".to_string()],
2364                index_type: ScalarIndexType::BTree,
2365                where_clause: None,
2366                metadata: IndexMetadata {
2367                    status,
2368                    row_count_at_build: count,
2369                    ..Default::default()
2370                },
2371            })
2372        };
2373        for _ in 0..49 {
2374            schema.indexes.push(make(IndexStatus::Building, None));
2375        }
2376        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2377        let json = serde_json::to_string_pretty(&schema)?;
2378        store.put(&path, json.into()).await?;
2379
2380        let manager = SchemaManager::load_from_store(store, &path).await?;
2381        let schema = manager.schema();
2382        assert_eq!(
2383            schema.indexes.len(),
2384            1,
2385            "load() must collapse 50 duplicates by name to 1"
2386        );
2387        // Last-writer-wins: the kept entry is the final push (Online, 123).
2388        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2389        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2390
2391        Ok(())
2392    }
2393
2394    #[test]
2395    fn test_vector_index_for_property_skips_non_online() {
2396        let mut schema = Schema::default();
2397        schema.labels.insert(
2398            "Document".to_string(),
2399            LabelMeta {
2400                id: 1,
2401                created_at: chrono::Utc::now(),
2402                state: SchemaElementState::Active,
2403                description: None,
2404            },
2405        );
2406
2407        // Add a vector index with Stale status
2408        schema
2409            .indexes
2410            .push(IndexDefinition::Vector(VectorIndexConfig {
2411                name: "vec_doc_embedding".to_string(),
2412                label: "Document".to_string(),
2413                property: "embedding".to_string(),
2414                index_type: VectorIndexType::Flat,
2415                metric: DistanceMetric::Cosine,
2416                embedding_config: None,
2417                metadata: IndexMetadata {
2418                    status: IndexStatus::Stale,
2419                    ..Default::default()
2420                },
2421            }));
2422
2423        // Stale index should NOT be returned
2424        assert!(
2425            schema
2426                .vector_index_for_property("Document", "embedding")
2427                .is_none()
2428        );
2429
2430        // Set to Online — should now be returned
2431        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2432            cfg.metadata.status = IndexStatus::Online;
2433        }
2434        let result = schema.vector_index_for_property("Document", "embedding");
2435        assert!(result.is_some());
2436        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2437    }
2438
2439    #[tokio::test]
2440    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2441        use crate::core::fork::SchemaDelta;
2442
2443        let dir = tempdir()?;
2444        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2445        let path = ObjectStorePath::from("schema.json");
2446        let primary = SchemaManager::load_from_store(store, &path).await?;
2447        primary.add_label("Person")?;
2448
2449        let overlay = primary.with_overlay(&SchemaDelta::empty());
2450        assert_eq!(overlay.schema().labels.len(), 1);
2451
2452        // Phase 1 invariant: mutating the overlay manager must not bleed
2453        // into primary's schema.
2454        overlay.add_label("Forked")?;
2455        assert!(overlay.schema().labels.contains_key("Forked"));
2456        assert!(!primary.schema().labels.contains_key("Forked"));
2457
2458        Ok(())
2459    }
2460
2461    #[tokio::test]
2462    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2463        use crate::core::fork::SchemaDelta;
2464        use chrono::Utc;
2465
2466        let dir = tempdir()?;
2467        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2468        let path = ObjectStorePath::from("schema.json");
2469        let primary = SchemaManager::load_from_store(store, &path).await?;
2470        primary.add_label("Existing")?;
2471
2472        let label_meta = LabelMeta {
2473            id: 99,
2474            created_at: Utc::now(),
2475            state: SchemaElementState::Active,
2476            description: None,
2477        };
2478        let edge_meta = EdgeTypeMeta {
2479            id: 99,
2480            src_labels: vec!["NewLabel".into()],
2481            dst_labels: vec!["NewLabel".into()],
2482            state: SchemaElementState::Active,
2483            description: None,
2484        };
2485        let delta = SchemaDelta {
2486            added_labels: vec![("NewLabel".to_string(), label_meta)],
2487            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2488            added_properties: vec![],
2489        };
2490
2491        let overlay = primary.with_overlay(&delta);
2492        let merged = overlay.schema();
2493        assert!(merged.labels.contains_key("Existing"));
2494        assert!(merged.labels.contains_key("NewLabel"));
2495        assert!(merged.edge_types.contains_key("NewEdge"));
2496
2497        // Primary unchanged.
2498        assert!(!primary.schema().labels.contains_key("NewLabel"));
2499        Ok(())
2500    }
2501
2502    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2503    /// must converge on a single id (the read-lock fast path double-checks
2504    /// under the write lock); a schema-defined type must win over the
2505    /// schemaless registry.
2506    #[tokio::test]
2507    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2508        let dir = tempdir()?;
2509        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2510        let path = ObjectStorePath::from("schema.json");
2511        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2512
2513        let mut handles = Vec::new();
2514        for _ in 0..16 {
2515            let m = manager.clone();
2516            handles.push(std::thread::spawn(move || {
2517                m.get_or_assign_edge_type_id("RACED")
2518            }));
2519        }
2520        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2521        assert!(
2522            ids.iter().all(|&id| id == ids[0]),
2523            "all racers must observe one id, got {ids:?}"
2524        );
2525        // Fast path returns the same id afterwards.
2526        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2527
2528        // Schema-defined type wins over the schemaless registry.
2529        manager.add_label("A")?;
2530        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2531        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2532        Ok(())
2533    }
2534
2535    /// Minting a brand-new schemaless edge type must bump `schema_version`
2536    /// (the plan cache keys on it; untyped traversals bake `all_edge_type_ids()`
2537    /// into the plan, so a stale plan would silently drop edges of the new
2538    /// type). Re-resolving an existing type must NOT bump. (review C5)
2539    #[test]
2540    fn test_new_schemaless_edge_type_bumps_schema_version() {
2541        let mut schema = Schema::default();
2542        let v0 = schema.schema_version;
2543
2544        let id1 = schema.get_or_assign_edge_type_id("FRESH");
2545        assert_eq!(
2546            schema.schema_version,
2547            v0.wrapping_add(1),
2548            "minting a new edge type must bump schema_version"
2549        );
2550
2551        // Re-resolving the same type is a no-op — no further bump.
2552        let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2553        assert_eq!(id1, id1_again);
2554        assert_eq!(
2555            schema.schema_version,
2556            v0.wrapping_add(1),
2557            "resolving an existing edge type must not bump schema_version"
2558        );
2559
2560        // A second distinct new type bumps again.
2561        let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2562        assert_eq!(
2563            schema.schema_version,
2564            v0.wrapping_add(2),
2565            "a second new edge type must bump schema_version again"
2566        );
2567    }
2568
2569    /// L6: label/edge-type names with path separators, whitespace, or
2570    /// control chars are rejected at definition; benign names (incl. `.`)
2571    /// are accepted.
2572    #[test]
2573    fn validate_schema_element_name_rejects_unsafe() {
2574        for bad in ["", "   ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2575            assert!(
2576                SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2577                "expected {bad:?} to be rejected"
2578            );
2579        }
2580        for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2581            assert!(
2582                SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2583                "expected {good:?} to be accepted"
2584            );
2585        }
2586        // Over-length is rejected.
2587        let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2588        assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2589    }
2590}