Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79/// Field metadata marking an Arrow `LargeBinary` field as a raw `DataType::Bytes`
80/// value rather than a tagged CypherValue/Duration blob.
81///
82/// Stamped on the child field of `List(Bytes)` / `Map(_, Bytes)` container types so
83/// the read path decodes each element verbatim instead of through the tagged codec
84/// (which would read `byte[0]` as a type tag). CV-encoded containers carry no such
85/// marker and keep the codec path. See the read-side honoring in `arrow_convert`.
86pub fn raw_bytes_field_metadata() -> HashMap<String, String> {
87    HashMap::from([("uni_raw_bytes".to_string(), "true".to_string())])
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91#[non_exhaustive]
92pub enum CrdtType {
93    GCounter,
94    GSet,
95    ORSet,
96    LWWRegister,
97    LWWMap,
98    Rga,
99    VectorClock,
100    VCRegister,
101}
102
103impl CrdtType {
104    /// Returns the canonical variant name for this CRDT type.
105    ///
106    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
107    /// so a written CRDT value can be validated against its schema-declared
108    /// variant (see uni-store's write-time CRDT enforcement).
109    ///
110    /// # Examples
111    /// ```
112    /// use uni_common::core::schema::CrdtType;
113    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
114    /// ```
115    #[must_use]
116    pub fn type_name(&self) -> &'static str {
117        match self {
118            CrdtType::GCounter => "GCounter",
119            CrdtType::GSet => "GSet",
120            CrdtType::ORSet => "ORSet",
121            CrdtType::LWWRegister => "LWWRegister",
122            CrdtType::LWWMap => "LWWMap",
123            CrdtType::Rga => "Rga",
124            CrdtType::VectorClock => "VectorClock",
125            CrdtType::VCRegister => "VCRegister",
126        }
127    }
128}
129
130#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
131pub enum PointType {
132    Geographic,  // WGS84
133    Cartesian2D, // x, y
134    Cartesian3D, // x, y, z
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138#[non_exhaustive]
139pub enum DataType {
140    String,
141    Int32,
142    Int64,
143    Float32,
144    Float64,
145    Bool,
146    Timestamp,
147    Date,
148    Time,
149    DateTime,
150    Duration,
151    CypherValue,
152    Bytes,
153    Point(PointType),
154    Vector { dimensions: usize },
155    Btic,
156    Crdt(CrdtType),
157    List(Box<DataType>),
158    Map(Box<DataType>, Box<DataType>),
159}
160
161impl DataType {
162    // Alias for compatibility/convenience if needed, but preferable to use exact types.
163    #[allow(non_upper_case_globals)]
164    pub const Float: DataType = DataType::Float64;
165    #[allow(non_upper_case_globals)]
166    pub const Int: DataType = DataType::Int64;
167
168    pub fn to_arrow(&self) -> ArrowDataType {
169        match self {
170            DataType::String => ArrowDataType::Utf8,
171            DataType::Int32 => ArrowDataType::Int32,
172            DataType::Int64 => ArrowDataType::Int64,
173            DataType::Float32 => ArrowDataType::Float32,
174            DataType::Float64 => ArrowDataType::Float64,
175            DataType::Bool => ArrowDataType::Boolean,
176            DataType::Timestamp => {
177                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
178            }
179            DataType::Date => ArrowDataType::Date32,
180            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
181            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
182            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
183            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
184            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
185            DataType::Point(pt) => match pt {
186                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
187                    Field::new("latitude", ArrowDataType::Float64, false),
188                    Field::new("longitude", ArrowDataType::Float64, false),
189                    Field::new("crs", ArrowDataType::Utf8, false),
190                ])),
191                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
192                    Field::new("x", ArrowDataType::Float64, false),
193                    Field::new("y", ArrowDataType::Float64, false),
194                    Field::new("crs", ArrowDataType::Utf8, false),
195                ])),
196                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
197                    Field::new("x", ArrowDataType::Float64, false),
198                    Field::new("y", ArrowDataType::Float64, false),
199                    Field::new("z", ArrowDataType::Float64, false),
200                    Field::new("crs", ArrowDataType::Utf8, false),
201                ])),
202            },
203            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
204                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
205                *dimensions as i32,
206            ),
207            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
208            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
209            DataType::List(inner) => {
210                // A raw `Bytes` element maps to Arrow `LargeBinary`, indistinguishable
211                // from a CV-encoded element by type alone; mark the child field so the
212                // read path decodes it verbatim rather than through the tagged codec.
213                let item = Field::new("item", inner.to_arrow(), true);
214                let item = if matches!(**inner, DataType::Bytes) {
215                    item.with_metadata(raw_bytes_field_metadata())
216                } else {
217                    item
218                };
219                ArrowDataType::List(Arc::new(item))
220            }
221            DataType::Map(key, value) => {
222                let value_field = Field::new("value", value.to_arrow(), true);
223                let value_field = if matches!(**value, DataType::Bytes) {
224                    value_field.with_metadata(raw_bytes_field_metadata())
225                } else {
226                    value_field
227                };
228                ArrowDataType::List(Arc::new(Field::new(
229                    "item",
230                    ArrowDataType::Struct(Fields::from(vec![
231                        Field::new("key", key.to_arrow(), false),
232                        value_field,
233                    ])),
234                    true,
235                )))
236            }
237        }
238    }
239
240    /// Returns `true` if `value` is directly storable in this column type without loss.
241    ///
242    /// This is the schema-level type guard used by the write path. `Value::Null` is
243    /// always accepted — column nullability is enforced separately by the `nullable`
244    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
245    /// For every other declared type, only the `Value` variants that the storage layer
246    /// persists *without silently nulling* are accepted (see the per-type converters in
247    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
248    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
249    ///
250    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
251    /// intentionally *not* accepted here: the write path first coerces such strings into
252    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
253    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
254    /// predicate.
255    ///
256    /// # Examples
257    /// ```
258    /// use uni_common::core::schema::DataType;
259    /// use uni_common::Value;
260    ///
261    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
262    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
263    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
264    /// ```
265    pub fn accepts(&self, value: &crate::value::Value) -> bool {
266        use crate::value::{TemporalValue, Value};
267
268        // Null is universally accepted; nullability is a separate concern.
269        if matches!(value, Value::Null) {
270            return true;
271        }
272
273        match self {
274            // Opaque / dynamically-typed columns accept any value.
275            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
276
277            DataType::String => matches!(value, Value::String(_)),
278            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
279            // Int widens to Float losslessly for the ranges we care about.
280            DataType::Float32 | DataType::Float64 => {
281                matches!(value, Value::Int(_) | Value::Float(_))
282            }
283            DataType::Bool => matches!(value, Value::Bool(_)),
284
285            // Non-struct timestamp column: storage parses strings and accepts ints,
286            // so both are lossless here (unlike the DateTime struct column below).
287            DataType::Timestamp => matches!(
288                value,
289                Value::String(_)
290                    | Value::Int(_)
291                    | Value::Temporal(
292                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
293                    )
294            ),
295            DataType::DateTime => matches!(
296                value,
297                Value::Temporal(
298                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
299                )
300            ),
301            DataType::Date => {
302                matches!(
303                    value,
304                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
305                )
306            }
307            DataType::Time => matches!(
308                value,
309                Value::Int(_)
310                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
311            ),
312            DataType::Duration => {
313                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
314            }
315            DataType::Bytes => matches!(value, Value::Bytes(_)),
316            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
317            DataType::Btic => matches!(
318                value,
319                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
320            ),
321            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
322            DataType::List(_) => matches!(value, Value::List(_)),
323            DataType::Map(_, _) => matches!(value, Value::Map(_)),
324        }
325    }
326}
327
328fn default_created_at() -> DateTime<Utc> {
329    Utc::now()
330}
331
332fn default_state() -> SchemaElementState {
333    SchemaElementState::Active
334}
335
336fn default_version_1() -> u32 {
337    1
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct PropertyMeta {
342    pub r#type: DataType,
343    pub nullable: bool,
344    #[serde(default = "default_version_1")]
345    pub added_in: u32, // SchemaVersion
346    #[serde(default = "default_state")]
347    pub state: SchemaElementState,
348    #[serde(default)]
349    pub generation_expression: Option<String>,
350    #[serde(default, skip_serializing_if = "Option::is_none")]
351    pub description: Option<String>,
352}
353
354#[derive(Clone, Debug, Serialize, Deserialize)]
355pub struct LabelMeta {
356    pub id: u16, // LabelId
357    #[serde(default = "default_created_at")]
358    pub created_at: DateTime<Utc>,
359    #[serde(default = "default_state")]
360    pub state: SchemaElementState,
361    #[serde(default, skip_serializing_if = "Option::is_none")]
362    pub description: Option<String>,
363}
364
365#[derive(Clone, Debug, Serialize, Deserialize)]
366pub struct EdgeTypeMeta {
367    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
368    pub id: u32,
369    pub src_labels: Vec<String>,
370    pub dst_labels: Vec<String>,
371    #[serde(default = "default_state")]
372    pub state: SchemaElementState,
373    #[serde(default, skip_serializing_if = "Option::is_none")]
374    pub description: Option<String>,
375}
376
377#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
378#[non_exhaustive]
379pub enum ConstraintType {
380    Unique { properties: Vec<String> },
381    Exists { property: String },
382    Check { expression: String },
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
386#[non_exhaustive]
387pub enum ConstraintTarget {
388    Label(String),
389    EdgeType(String),
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub struct Constraint {
394    pub name: String,
395    pub constraint_type: ConstraintType,
396    pub target: ConstraintTarget,
397    pub enabled: bool,
398}
399
400/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
401///
402/// Edge types not defined in the schema are assigned IDs at runtime with
403/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
404/// the name-to-ID and ID-to-name mappings for those types.
405#[derive(Clone, Debug, Serialize, Deserialize)]
406pub struct SchemalessEdgeTypeRegistry {
407    name_to_id: HashMap<String, u32>,
408    id_to_name: HashMap<u32, String>,
409    /// Next local ID to assign (0 is reserved for invalid).
410    next_local_id: u32,
411}
412
413impl SchemalessEdgeTypeRegistry {
414    pub fn new() -> Self {
415        Self {
416            name_to_id: HashMap::new(),
417            id_to_name: HashMap::new(),
418            next_local_id: 1,
419        }
420    }
421
422    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
423    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
424        if let Some(&id) = self.name_to_id.get(type_name) {
425            return id;
426        }
427
428        let id = make_schemaless_id(self.next_local_id);
429        self.next_local_id += 1;
430
431        self.name_to_id.insert(type_name.to_string(), id);
432        self.id_to_name.insert(id, type_name.to_string());
433
434        id
435    }
436
437    /// Looks up the edge type name for a schemaless ID.
438    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
439        self.id_to_name.get(&type_id).map(String::as_str)
440    }
441
442    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
443    pub fn contains(&self, type_name: &str) -> bool {
444        self.name_to_id.contains_key(type_name)
445    }
446
447    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
448    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
449        self.name_to_id.get(type_name).copied()
450    }
451
452    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
453    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
454        self.name_to_id
455            .iter()
456            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
457            .map(|(_, &id)| id)
458    }
459
460    /// Returns all registered schemaless type IDs.
461    pub fn all_type_ids(&self) -> Vec<u32> {
462        self.id_to_name.keys().copied().collect()
463    }
464
465    /// Returns true if the registry has any schemaless types.
466    pub fn is_empty(&self) -> bool {
467        self.name_to_id.is_empty()
468    }
469}
470
471impl Default for SchemalessEdgeTypeRegistry {
472    fn default() -> Self {
473        Self::new()
474    }
475}
476
477/// First virtual (catalog-resolved) label ID. Label IDs in
478/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
479/// plugin-registered `CatalogProvider`s and allocated lazily by the
480/// planner via `PluginRegistry::register_virtual_label`. Native label
481/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
482pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
483/// Sentinel "no label" marker, kept distinct from any allocatable ID.
484pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
485
486/// Maximum byte length of a label or edge-type name. (L6)
487///
488/// Generous; the cap is hygiene — the name lands in on-disk dataset paths
489/// and Lance branch names — not a storage limit.
490const MAX_SCHEMA_NAME_LEN: usize = 255;
491
492/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
493#[inline]
494pub fn is_virtual_label_id(id: u16) -> bool {
495    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
496}
497
498#[derive(Clone, Debug, Serialize, Deserialize)]
499pub struct Schema {
500    pub schema_version: u32,
501    pub labels: HashMap<String, LabelMeta>,
502    pub edge_types: HashMap<String, EdgeTypeMeta>,
503    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
504    #[serde(default)]
505    pub indexes: Vec<IndexDefinition>,
506    #[serde(default)]
507    pub constraints: Vec<Constraint>,
508    /// Registry for schemaless edge types (dynamically assigned IDs)
509    #[serde(default)]
510    pub schemaless_registry: SchemalessEdgeTypeRegistry,
511}
512
513impl Default for Schema {
514    fn default() -> Self {
515        Self {
516            schema_version: 1,
517            labels: HashMap::new(),
518            edge_types: HashMap::new(),
519            properties: HashMap::new(),
520            indexes: Vec::new(),
521            constraints: Vec::new(),
522            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
523        }
524    }
525}
526
527impl Schema {
528    /// Bumps `schema_version` to invalidate cached query plans.
529    ///
530    /// Called at the end of every DDL mutation that changes the schema's
531    /// shape (labels, edge types, properties, indexes, constraints). The
532    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
533    /// built against an older shape is discarded once this advances. Uses
534    /// wrapping arithmetic: the value is a coarse change token, not a count,
535    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
536    fn bump_version(&mut self) {
537        self.schema_version = self.schema_version.wrapping_add(1);
538    }
539
540    /// Returns the label name for a given label ID.
541    ///
542    /// Performs a linear scan over all labels. This is efficient because
543    /// the number of labels in a schema is typically small.
544    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
545        self.labels
546            .iter()
547            .find(|(_, meta)| meta.id == label_id)
548            .map(|(name, _)| name.as_str())
549    }
550
551    /// Returns the label ID for a given label name.
552    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
553        self.labels.get(label_name).map(|meta| meta.id)
554    }
555
556    /// Returns the edge type name for a given type ID.
557    ///
558    /// Performs a linear scan over all edge types. This is efficient because
559    /// the number of edge types in a schema is typically small.
560    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
561        self.edge_types
562            .iter()
563            .find(|(_, meta)| meta.id == type_id)
564            .map(|(name, _)| name.as_str())
565    }
566
567    /// Returns the edge type ID for a given type name.
568    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
569        self.edge_types.get(type_name).map(|meta| meta.id)
570    }
571
572    /// Returns the vector index configuration for a given label and property.
573    ///
574    /// Performs a linear scan over all indexes. This is efficient because
575    /// the number of indexes in a schema is typically small.
576    pub fn vector_index_for_property(
577        &self,
578        label: &str,
579        property: &str,
580    ) -> Option<&VectorIndexConfig> {
581        self.indexes.iter().find_map(|idx| {
582            if let IndexDefinition::Vector(config) = idx
583                && config.label == label
584                && config.property == property
585                && config.metadata.status == IndexStatus::Online
586            {
587                return Some(config);
588            }
589            None
590        })
591    }
592
593    /// Returns the full-text index configuration for a given label and property.
594    ///
595    /// A full-text index covers one or more properties. This returns the config
596    /// if the specified property is among the indexed properties.
597    pub fn fulltext_index_for_property(
598        &self,
599        label: &str,
600        property: &str,
601    ) -> Option<&FullTextIndexConfig> {
602        self.indexes.iter().find_map(|idx| {
603            if let IndexDefinition::FullText(config) = idx
604                && config.label == label
605                && config.properties.iter().any(|p| p == property)
606                && config.metadata.status == IndexStatus::Online
607            {
608                return Some(config);
609            }
610            None
611        })
612    }
613
614    /// Get label metadata with case-insensitive lookup.
615    ///
616    /// This allows queries to match labels regardless of case, providing
617    /// better user experience when label names vary in casing.
618    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
619        self.labels
620            .iter()
621            .find(|(k, _)| k.eq_ignore_ascii_case(name))
622            .map(|(_, v)| v)
623    }
624
625    /// Get the schema-canonical spelling of a label, matched case-insensitively.
626    ///
627    /// Returns the stored label name whose spelling differs only in case from
628    /// `name`, or `None` if no such label is registered. Callers use this to
629    /// normalize a user-supplied label to the canonical form the storage tier
630    /// keys on, so case variants resolve to the same vertex table.
631    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
632        self.labels
633            .iter()
634            .find(|(k, _)| k.eq_ignore_ascii_case(name))
635            .map(|(k, _)| k.clone())
636    }
637
638    /// Get label ID with case-insensitive lookup.
639    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
640        self.get_label_case_insensitive(label_name)
641            .map(|meta| meta.id)
642    }
643
644    /// Get edge type metadata with case-insensitive lookup.
645    ///
646    /// This allows queries to match edge types regardless of case, providing
647    /// better user experience when type names vary in casing.
648    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
649        self.edge_types
650            .iter()
651            .find(|(k, _)| k.eq_ignore_ascii_case(name))
652            .map(|(_, v)| v)
653    }
654
655    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
656    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
657        self.get_edge_type_case_insensitive(type_name)
658            .map(|meta| meta.id)
659    }
660
661    /// Get edge type ID with case-insensitive lookup, checking both
662    /// schema-defined and schemaless registries.
663    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
664        self.edge_type_id_by_name_case_insensitive(type_name)
665            .or_else(|| {
666                self.schemaless_registry
667                    .id_by_name_case_insensitive(type_name)
668            })
669    }
670
671    /// Returns the edge type ID for `type_name`, checking the schema first
672    /// and falling back to the schemaless registry (assigning a new ID if needed).
673    ///
674    /// Requires `&mut self` because it may assign a new schemaless ID.
675    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
676    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
677        if let Some(id) = self.edge_type_id_unified(type_name) {
678            return id;
679        }
680        // Reaching here means the type is brand-new to *both* the schema map and
681        // the schemaless registry (the early return above mirrors exactly what
682        // `edge_type_id_unified` checks). Minting a new schemaless edge type
683        // changes the result of `all_edge_type_ids()`, which untyped traversals
684        // bake into cached plans keyed on `schema_version`. Bump the version so
685        // those stale plans are evicted — otherwise a `MATCH ()-[r]->()` plan
686        // built before this type existed silently drops edges of the new type.
687        let id = self.schemaless_registry.get_or_assign_id(type_name);
688        self.bump_version();
689        id
690    }
691
692    /// Read-only unified exact lookup: schema-defined edge type id, falling
693    /// back to an already-assigned schemaless id.
694    ///
695    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
696    /// performs before assigning, so a `Some` here means the assigning path
697    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
698    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
699        self.edge_type_id_by_name(type_name)
700            .or_else(|| self.schemaless_registry.id_by_name(type_name))
701    }
702
703    /// Returns the edge type name for `type_id`, checking both the schema
704    /// and schemaless registries. Returns an owned `String` because the
705    /// name may come from either registry.
706    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
707        if is_schemaless_edge_type(type_id) {
708            self.schemaless_registry
709                .type_name_by_id(type_id)
710                .map(str::to_owned)
711        } else {
712            self.edge_type_name_by_id(type_id).map(str::to_owned)
713        }
714    }
715
716    /// Returns all edge type IDs, including both schema-defined and schemaless types.
717    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
718    pub fn all_edge_type_ids(&self) -> Vec<u32> {
719        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
720        ids.extend(self.schemaless_registry.all_type_ids());
721        ids.sort_unstable();
722        ids
723    }
724}
725
726/// Lifecycle status of an index.
727#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
728pub enum IndexStatus {
729    /// Index is up-to-date and available for queries.
730    #[default]
731    Online,
732    /// Index is currently being rebuilt.
733    Building,
734    /// Index is outdated and scheduled for rebuild.
735    Stale,
736    /// Index rebuild failed after exhausting retries.
737    Failed,
738}
739
740/// Metadata tracking the lifecycle state of an index.
741#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
742pub struct IndexMetadata {
743    /// Current lifecycle status.
744    #[serde(default)]
745    pub status: IndexStatus,
746    /// When the index was last successfully built.
747    #[serde(default)]
748    pub last_built_at: Option<DateTime<Utc>>,
749    /// Row count of the dataset when the index was last built.
750    #[serde(default)]
751    pub row_count_at_build: Option<u64>,
752}
753
754#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
755#[serde(tag = "type")]
756#[non_exhaustive]
757pub enum IndexDefinition {
758    Vector(VectorIndexConfig),
759    FullText(FullTextIndexConfig),
760    Scalar(ScalarIndexConfig),
761    Inverted(InvertedIndexConfig),
762    JsonFullText(JsonFtsIndexConfig),
763}
764
765impl IndexDefinition {
766    /// Returns the index name for any variant.
767    pub fn name(&self) -> &str {
768        match self {
769            IndexDefinition::Vector(c) => &c.name,
770            IndexDefinition::FullText(c) => &c.name,
771            IndexDefinition::Scalar(c) => &c.name,
772            IndexDefinition::Inverted(c) => &c.name,
773            IndexDefinition::JsonFullText(c) => &c.name,
774        }
775    }
776
777    /// Returns the label this index is defined on.
778    pub fn label(&self) -> &str {
779        match self {
780            IndexDefinition::Vector(c) => &c.label,
781            IndexDefinition::FullText(c) => &c.label,
782            IndexDefinition::Scalar(c) => &c.label,
783            IndexDefinition::Inverted(c) => &c.label,
784            IndexDefinition::JsonFullText(c) => &c.label,
785        }
786    }
787
788    /// Returns a reference to the index lifecycle metadata.
789    pub fn metadata(&self) -> &IndexMetadata {
790        match self {
791            IndexDefinition::Vector(c) => &c.metadata,
792            IndexDefinition::FullText(c) => &c.metadata,
793            IndexDefinition::Scalar(c) => &c.metadata,
794            IndexDefinition::Inverted(c) => &c.metadata,
795            IndexDefinition::JsonFullText(c) => &c.metadata,
796        }
797    }
798
799    /// Returns a mutable reference to the index lifecycle metadata.
800    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
801        match self {
802            IndexDefinition::Vector(c) => &mut c.metadata,
803            IndexDefinition::FullText(c) => &mut c.metadata,
804            IndexDefinition::Scalar(c) => &mut c.metadata,
805            IndexDefinition::Inverted(c) => &mut c.metadata,
806            IndexDefinition::JsonFullText(c) => &mut c.metadata,
807        }
808    }
809}
810
811#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
812pub struct InvertedIndexConfig {
813    pub name: String,
814    pub label: String,
815    pub property: String,
816    #[serde(default = "default_normalize")]
817    pub normalize: bool,
818    #[serde(default = "default_max_terms_per_doc")]
819    pub max_terms_per_doc: usize,
820    #[serde(default)]
821    pub metadata: IndexMetadata,
822}
823
824fn default_normalize() -> bool {
825    true
826}
827
828fn default_max_terms_per_doc() -> usize {
829    10_000
830}
831
832#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
833pub struct VectorIndexConfig {
834    pub name: String,
835    pub label: String,
836    pub property: String,
837    pub index_type: VectorIndexType,
838    pub metric: DistanceMetric,
839    pub embedding_config: Option<EmbeddingConfig>,
840    #[serde(default)]
841    pub metadata: IndexMetadata,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
845pub struct EmbeddingConfig {
846    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
847    pub alias: String,
848    pub source_properties: Vec<String>,
849    pub batch_size: usize,
850    /// Prefix prepended to text before embedding during auto-embed (document side).
851    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
852    #[serde(default)]
853    pub document_prefix: Option<String>,
854    /// Prefix prepended to text before embedding during query-time embed calls.
855    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
856    #[serde(default)]
857    pub query_prefix: Option<String>,
858}
859
860#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
861#[non_exhaustive]
862pub enum VectorIndexType {
863    Flat,
864    IvfFlat {
865        num_partitions: u32,
866    },
867    IvfPq {
868        num_partitions: u32,
869        num_sub_vectors: u32,
870        bits_per_subvector: u8,
871    },
872    IvfSq {
873        num_partitions: u32,
874    },
875    IvfRq {
876        num_partitions: u32,
877        #[serde(default)]
878        num_bits: Option<u8>,
879    },
880    HnswFlat {
881        m: u32,
882        ef_construction: u32,
883        #[serde(default)]
884        num_partitions: Option<u32>,
885    },
886    HnswSq {
887        m: u32,
888        ef_construction: u32,
889        #[serde(default)]
890        num_partitions: Option<u32>,
891    },
892    HnswPq {
893        m: u32,
894        ef_construction: u32,
895        num_sub_vectors: u32,
896        #[serde(default)]
897        num_partitions: Option<u32>,
898    },
899}
900
901#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
902#[non_exhaustive]
903pub enum DistanceMetric {
904    Cosine,
905    L2,
906    Dot,
907}
908
909impl DistanceMetric {
910    /// Computes the distance between two vectors using this metric.
911    ///
912    /// All metrics follow LanceDB conventions so that lower values indicate
913    /// greater similarity:
914    /// - **L2**: squared Euclidean distance.
915    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
916    /// - **Dot**: negative dot product.
917    ///
918    /// # Panics
919    ///
920    /// Panics if `a` and `b` have different lengths.
921    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
922        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
923        match self {
924            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
925            DistanceMetric::Cosine => {
926                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
927                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
928                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
929                let denom = norm_a * norm_b;
930                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
931            }
932            DistanceMetric::Dot => {
933                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
934                -dot
935            }
936        }
937    }
938}
939
940#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
941pub struct FullTextIndexConfig {
942    pub name: String,
943    pub label: String,
944    pub properties: Vec<String>,
945    pub tokenizer: TokenizerConfig,
946    pub with_positions: bool,
947    #[serde(default)]
948    pub metadata: IndexMetadata,
949}
950
951#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
952#[non_exhaustive]
953pub enum TokenizerConfig {
954    Standard,
955    Whitespace,
956    Ngram { min: u8, max: u8 },
957    Custom { name: String },
958}
959
960#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
961pub struct JsonFtsIndexConfig {
962    pub name: String,
963    pub label: String,
964    pub column: String,
965    #[serde(default)]
966    pub paths: Vec<String>,
967    #[serde(default)]
968    pub with_positions: bool,
969    #[serde(default)]
970    pub metadata: IndexMetadata,
971}
972
973#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
974pub struct ScalarIndexConfig {
975    pub name: String,
976    pub label: String,
977    pub properties: Vec<String>,
978    pub index_type: ScalarIndexType,
979    pub where_clause: Option<String>,
980    #[serde(default)]
981    pub metadata: IndexMetadata,
982}
983
984#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
985#[non_exhaustive]
986pub enum ScalarIndexType {
987    BTree,
988    Hash,
989    Bitmap,
990    LabelList,
991}
992
993pub struct SchemaManager {
994    store: Arc<dyn ObjectStore>,
995    path: ObjectStorePath,
996    schema: RwLock<Arc<Schema>>,
997}
998
999impl SchemaManager {
1000    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
1001        let path = path.as_ref();
1002        let parent = path
1003            .parent()
1004            .ok_or_else(|| anyhow!("Invalid schema path"))?;
1005        let filename = path
1006            .file_name()
1007            .ok_or_else(|| anyhow!("Invalid schema filename"))?
1008            .to_str()
1009            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
1010
1011        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
1012        let obj_path = ObjectStorePath::from(filename);
1013
1014        Self::load_from_store(store, &obj_path).await
1015    }
1016
1017    pub async fn load_from_store(
1018        store: Arc<dyn ObjectStore>,
1019        path: &ObjectStorePath,
1020    ) -> Result<Self> {
1021        match store.get(path).await {
1022            Ok(result) => {
1023                let bytes = result.bytes().await?;
1024                let content = String::from_utf8(bytes.to_vec())?;
1025                let mut schema: Schema = serde_json::from_str(&content)?;
1026                // Self-heal catalogs that grew super-linearly under the
1027                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
1028                // duplicate index entries by name, keeping the *last*
1029                // occurrence — matches the upsert semantics in `add_index`
1030                // and preserves whatever metadata the most recent rebuild
1031                // wrote. The dedup persists on the next mutation that
1032                // calls `save()`.
1033                let original_len = schema.indexes.len();
1034                if original_len > 0 {
1035                    let mut seen: std::collections::HashSet<String> =
1036                        std::collections::HashSet::with_capacity(original_len);
1037                    let mut dedup: Vec<IndexDefinition> = schema
1038                        .indexes
1039                        .iter()
1040                        .rev()
1041                        .filter(|idx| seen.insert(idx.name().to_string()))
1042                        .cloned()
1043                        .collect();
1044                    dedup.reverse();
1045                    if dedup.len() != original_len {
1046                        tracing::warn!(
1047                            collapsed = original_len - dedup.len(),
1048                            kept = dedup.len(),
1049                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1050                        );
1051                        schema.indexes = dedup;
1052                    }
1053                }
1054                Ok(Self {
1055                    store,
1056                    path: path.clone(),
1057                    schema: RwLock::new(Arc::new(schema)),
1058                })
1059            }
1060            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1061                store,
1062                path: path.clone(),
1063                schema: RwLock::new(Arc::new(Schema::default())),
1064            }),
1065            Err(e) => Err(anyhow::Error::from(e)),
1066        }
1067    }
1068
1069    pub async fn save(&self) -> Result<()> {
1070        let content = {
1071            let schema_guard = acquire_read(&self.schema, "schema")?;
1072            serde_json::to_string_pretty(&**schema_guard)?
1073        };
1074        self.store
1075            .put(&self.path, content.into())
1076            .await
1077            .map_err(anyhow::Error::from)?;
1078        Ok(())
1079    }
1080
1081    pub fn path(&self) -> &ObjectStorePath {
1082        &self.path
1083    }
1084
1085    pub fn schema(&self) -> Arc<Schema> {
1086        self.schema
1087            .read()
1088            .expect("Schema lock poisoned - a thread panicked while holding it")
1089            .clone()
1090    }
1091
1092    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1093    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1094    fn normalize_function_names(expr: &str) -> String {
1095        let mut result = String::with_capacity(expr.len());
1096        let mut chars = expr.chars().peekable();
1097
1098        while let Some(ch) = chars.next() {
1099            if ch.is_alphabetic() {
1100                // Collect identifier
1101                let mut ident = String::new();
1102                ident.push(ch);
1103
1104                while let Some(&next) = chars.peek() {
1105                    if next.is_alphanumeric() || next == '_' {
1106                        ident.push(chars.next().unwrap());
1107                    } else {
1108                        break;
1109                    }
1110                }
1111
1112                // If followed by '(', it's a function call - uppercase it
1113                if chars.peek() == Some(&'(') {
1114                    result.push_str(&ident.to_uppercase());
1115                } else {
1116                    result.push_str(&ident); // Keep property names as-is
1117                }
1118            } else {
1119                result.push(ch);
1120            }
1121        }
1122
1123        result
1124    }
1125
1126    /// Generate a consistent internal column name for an expression index.
1127    /// Uses a hash suffix to ensure uniqueness for different expressions that
1128    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1129    ///
1130    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1131    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1132    /// if the hash changes after a compiler upgrade.
1133    pub fn generated_column_name(expr: &str) -> String {
1134        // Normalize function names to uppercase for case-insensitive matching
1135        let normalized = Self::normalize_function_names(expr);
1136
1137        let sanitized = normalized
1138            .replace(|c: char| !c.is_alphanumeric(), "_")
1139            .trim_matches('_')
1140            .to_string();
1141
1142        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1143        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1144        const FNV_PRIME: u64 = 1099511628211;
1145
1146        let mut hash = FNV_OFFSET_BASIS;
1147        for byte in normalized.as_bytes() {
1148            hash ^= *byte as u64;
1149            hash = hash.wrapping_mul(FNV_PRIME);
1150        }
1151
1152        format!("_gen_{}_{:x}", sanitized, hash)
1153    }
1154
1155    pub fn replace_schema(&self, new_schema: Schema) {
1156        let mut schema = self
1157            .schema
1158            .write()
1159            .expect("Schema lock poisoned - a thread panicked while holding it");
1160        *schema = Arc::new(new_schema);
1161    }
1162
1163    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1164    ///
1165    /// Used by `UniInner::at_fork` to give a forked session a schema view
1166    /// that includes any labels/edge-types/properties the fork has
1167    /// introduced on top of primary. The returned manager owns its own
1168    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1169    /// schema file. The returned manager is *not* intended for `.save()`;
1170    /// fork-overlay persistence is owned by the registry layer
1171    /// (`catalog/fork_schemas/{fork_id}.json`).
1172    ///
1173    /// In Phase 1 the delta is always empty, so the merge is a clone.
1174    /// Phase 2 starts populating it when on-the-fly label creation lands.
1175    #[must_use]
1176    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1177        let primary = self.schema();
1178        let merged = if overlay.is_empty() {
1179            (*primary).clone()
1180        } else {
1181            let mut merged = (*primary).clone();
1182            for (name, label) in &overlay.added_labels {
1183                merged.labels.insert(name.clone(), label.clone());
1184            }
1185            for (name, edge_type) in &overlay.added_edge_types {
1186                merged.edge_types.insert(name.clone(), edge_type.clone());
1187            }
1188            for addition in &overlay.added_properties {
1189                let props = merged.properties.entry(addition.owner.clone()).or_default();
1190                props.insert(
1191                    addition.property.clone(),
1192                    PropertyMeta {
1193                        r#type: addition.data_type.clone(),
1194                        nullable: addition.nullable,
1195                        added_in: merged.schema_version,
1196                        state: SchemaElementState::Active,
1197                        generation_expression: None,
1198                        description: None,
1199                    },
1200                );
1201            }
1202            merged
1203        };
1204
1205        Arc::new(Self {
1206            store: self.store.clone(),
1207            path: self.path.clone(),
1208            schema: RwLock::new(Arc::new(merged)),
1209        })
1210    }
1211
1212    pub fn next_label_id(&self) -> u16 {
1213        self.schema()
1214            .labels
1215            .values()
1216            .map(|l| l.id)
1217            .max()
1218            .unwrap_or(0)
1219            + 1
1220    }
1221
1222    pub fn next_type_id(&self) -> u32 {
1223        let max_schema_id = self
1224            .schema()
1225            .edge_types
1226            .values()
1227            .map(|t| t.id)
1228            .max()
1229            .unwrap_or(0);
1230
1231        // Ensure we stay in schema'd ID space (bit 31 = 0)
1232        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1233            panic!("Schema edge type ID exhaustion");
1234        }
1235
1236        max_schema_id + 1
1237    }
1238
1239    /// Validate a label or edge-type name at definition time. (L6)
1240    ///
1241    /// Names flow into on-disk dataset paths (`vertices_{name}.lance`) and
1242    /// Lance branch names (`fork_{id}_{…}`); a name with a path separator,
1243    /// whitespace, or a control character corrupts those paths and breaks
1244    /// fork creation. Such names were never actually usable, so they are
1245    /// rejected up front rather than failing later. `.` is allowed
1246    /// (path-safe and common in qualified names).
1247    ///
1248    /// Public so the fork-create path can apply the same rule as a backstop
1249    /// over names that entered the schema through an infallible interning
1250    /// path (e.g. schemaless `get_or_assign_edge_type_id`).
1251    ///
1252    /// # Errors
1253    /// Returns an error if `name` is empty/all-whitespace, exceeds
1254    /// `MAX_SCHEMA_NAME_LEN` bytes, or contains a control, whitespace,
1255    /// `/`, or `\` character.
1256    pub fn validate_schema_element_name(kind: &str, name: &str) -> Result<()> {
1257        if name.is_empty() || name.chars().all(char::is_whitespace) {
1258            return Err(anyhow!(
1259                "{kind} name must be non-empty and not all whitespace"
1260            ));
1261        }
1262        if name.len() > MAX_SCHEMA_NAME_LEN {
1263            return Err(anyhow!("{kind} name exceeds {MAX_SCHEMA_NAME_LEN} bytes"));
1264        }
1265        if let Some(c) = name
1266            .chars()
1267            .find(|c| c.is_control() || c.is_whitespace() || matches!(c, '/' | '\\'))
1268        {
1269            return Err(anyhow!(
1270                "{kind} name '{name}' contains an unsafe character ({c:?})"
1271            ));
1272        }
1273        Ok(())
1274    }
1275
1276    pub fn add_label(&self, name: &str) -> Result<u16> {
1277        self.add_label_with_desc(name, None)
1278    }
1279
1280    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1281        Self::validate_schema_element_name("Label", name)?;
1282        let mut guard = acquire_write(&self.schema, "schema")?;
1283        let schema = Arc::make_mut(&mut *guard);
1284        if schema.labels.contains_key(name) {
1285            return Err(anyhow!("Label '{}' already exists", name));
1286        }
1287
1288        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1289        if id >= VIRTUAL_LABEL_ID_START {
1290            return Err(anyhow!(
1291                "Native label space exhausted (next id {id:#x} would enter the \
1292                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1293                 reserved for catalog-resolved labels)"
1294            ));
1295        }
1296        schema.labels.insert(
1297            name.to_string(),
1298            LabelMeta {
1299                id,
1300                created_at: Utc::now(),
1301                state: SchemaElementState::Active,
1302                description,
1303            },
1304        );
1305        schema.bump_version();
1306        Ok(id)
1307    }
1308
1309    pub fn add_edge_type(
1310        &self,
1311        name: &str,
1312        src_labels: Vec<String>,
1313        dst_labels: Vec<String>,
1314    ) -> Result<u32> {
1315        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1316    }
1317
1318    pub fn add_edge_type_with_desc(
1319        &self,
1320        name: &str,
1321        src_labels: Vec<String>,
1322        dst_labels: Vec<String>,
1323        description: Option<String>,
1324    ) -> Result<u32> {
1325        Self::validate_schema_element_name("Edge type", name)?;
1326        let mut guard = acquire_write(&self.schema, "schema")?;
1327        let schema = Arc::make_mut(&mut *guard);
1328        if schema.edge_types.contains_key(name) {
1329            return Err(anyhow!("Edge type '{}' already exists", name));
1330        }
1331
1332        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1333
1334        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1335        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1336        // `add_edge_type`, so the two entry points cannot disagree on the
1337        // legal ceiling.
1338        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1339            return Err(anyhow!(
1340                "Native edge type space exhausted (next id {id:#x} would enter the \
1341                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1342                 reserved for catalog-resolved edge types)"
1343            ));
1344        }
1345
1346        schema.edge_types.insert(
1347            name.to_string(),
1348            EdgeTypeMeta {
1349                id,
1350                src_labels,
1351                dst_labels,
1352                state: SchemaElementState::Active,
1353                description,
1354            },
1355        );
1356        schema.bump_version();
1357        Ok(id)
1358    }
1359
1360    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1361    ///
1362    /// Read-lock fast path: the type name is almost always already known
1363    /// (it is constant per statement but resolved per row by the CREATE
1364    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1365    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1366    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1367    /// re-checks under the write lock, so two racing assigners converge on one id.
1368    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1369        {
1370            let guard = acquire_read(&self.schema, "schema")
1371                .expect("Schema lock poisoned - a thread panicked while holding it");
1372            if let Some(id) = guard.edge_type_id_unified(type_name) {
1373                return id;
1374            }
1375        }
1376        let mut guard = acquire_write(&self.schema, "schema")
1377            .expect("Schema lock poisoned - a thread panicked while holding it");
1378        let schema = Arc::make_mut(&mut *guard);
1379        schema.get_or_assign_edge_type_id(type_name)
1380    }
1381
1382    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1383    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1384        let schema = acquire_read(&self.schema, "schema")
1385            .expect("Schema lock poisoned - a thread panicked while holding it");
1386        schema.edge_type_name_by_id_unified(type_id)
1387    }
1388
1389    pub fn add_property(
1390        &self,
1391        label_or_type: &str,
1392        prop_name: &str,
1393        data_type: DataType,
1394        nullable: bool,
1395    ) -> Result<()> {
1396        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1397    }
1398
1399    pub fn add_property_with_desc(
1400        &self,
1401        label_or_type: &str,
1402        prop_name: &str,
1403        data_type: DataType,
1404        nullable: bool,
1405        description: Option<String>,
1406    ) -> Result<()> {
1407        validate_property_name(prop_name)?;
1408        let mut guard = acquire_write(&self.schema, "schema")?;
1409        let schema = Arc::make_mut(&mut *guard);
1410        let version = schema.schema_version;
1411        let props = schema
1412            .properties
1413            .entry(label_or_type.to_string())
1414            .or_default();
1415
1416        if props.contains_key(prop_name) {
1417            return Err(anyhow!(
1418                "Property '{}' already exists for '{}'",
1419                prop_name,
1420                label_or_type
1421            ));
1422        }
1423
1424        props.insert(
1425            prop_name.to_string(),
1426            PropertyMeta {
1427                r#type: data_type,
1428                nullable,
1429                added_in: version,
1430                state: SchemaElementState::Active,
1431                generation_expression: None,
1432                description,
1433            },
1434        );
1435        // Bump after stamping `added_in` with the pre-bump `version`.
1436        schema.bump_version();
1437        Ok(())
1438    }
1439
1440    pub fn add_generated_property(
1441        &self,
1442        label_or_type: &str,
1443        prop_name: &str,
1444        data_type: DataType,
1445        expr: String,
1446    ) -> Result<()> {
1447        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1448        // but must still avoid storage-layer column-name collisions.
1449        validate_reserved_property_name(prop_name)?;
1450        let mut guard = acquire_write(&self.schema, "schema")?;
1451        let schema = Arc::make_mut(&mut *guard);
1452        let version = schema.schema_version;
1453        let props = schema
1454            .properties
1455            .entry(label_or_type.to_string())
1456            .or_default();
1457
1458        if props.contains_key(prop_name) {
1459            return Err(anyhow!("Property '{}' already exists", prop_name));
1460        }
1461
1462        props.insert(
1463            prop_name.to_string(),
1464            PropertyMeta {
1465                r#type: data_type,
1466                nullable: true,
1467                added_in: version,
1468                state: SchemaElementState::Active,
1469                generation_expression: Some(expr),
1470                description: None,
1471            },
1472        );
1473        // Bump after stamping `added_in` with the pre-bump `version`.
1474        schema.bump_version();
1475        Ok(())
1476    }
1477
1478    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1479        let mut guard = acquire_write(&self.schema, "schema")?;
1480        let schema = Arc::make_mut(&mut *guard);
1481        let meta = schema
1482            .labels
1483            .get_mut(name)
1484            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1485        meta.description = description;
1486        Ok(())
1487    }
1488
1489    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1490        let mut guard = acquire_write(&self.schema, "schema")?;
1491        let schema = Arc::make_mut(&mut *guard);
1492        let meta = schema
1493            .edge_types
1494            .get_mut(name)
1495            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1496        meta.description = description;
1497        Ok(())
1498    }
1499
1500    pub fn set_property_description(
1501        &self,
1502        entity: &str,
1503        prop_name: &str,
1504        description: Option<String>,
1505    ) -> Result<()> {
1506        let mut guard = acquire_write(&self.schema, "schema")?;
1507        let schema = Arc::make_mut(&mut *guard);
1508        let props = schema
1509            .properties
1510            .get_mut(entity)
1511            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1512        let meta = props
1513            .get_mut(prop_name)
1514            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1515        meta.description = description;
1516        Ok(())
1517    }
1518
1519    /// Register an index definition on the schema, **upsert by name**.
1520    ///
1521    /// If an index with the same `IndexDefinition::name()` already exists, it
1522    /// is replaced in place; otherwise the def is appended. Idempotent under
1523    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1524    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1525    /// every `IndexManager::create_*_index` re-record metadata updates without
1526    /// duplicating entries (issue rustic-ai/uni-db#63).
1527    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1528        let mut guard = acquire_write(&self.schema, "schema")?;
1529        let schema = Arc::make_mut(&mut *guard);
1530        if let Some(existing) = schema
1531            .indexes
1532            .iter_mut()
1533            .find(|i| i.name() == index_def.name())
1534        {
1535            *existing = index_def;
1536        } else {
1537            schema.indexes.push(index_def);
1538        }
1539        schema.bump_version();
1540        Ok(())
1541    }
1542
1543    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1544        let schema = self.schema.read().expect("Schema lock poisoned");
1545        schema.indexes.iter().find(|i| i.name() == name).cloned()
1546    }
1547
1548    /// Updates the lifecycle metadata for an index by name.
1549    ///
1550    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1551    /// allowing callers to update status, timestamps, etc.
1552    pub fn update_index_metadata(
1553        &self,
1554        index_name: &str,
1555        f: impl FnOnce(&mut IndexMetadata),
1556    ) -> Result<()> {
1557        let mut guard = acquire_write(&self.schema, "schema")?;
1558        let schema = Arc::make_mut(&mut *guard);
1559        let idx = schema
1560            .indexes
1561            .iter_mut()
1562            .find(|i| i.name() == index_name)
1563            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1564        f(idx.metadata_mut());
1565        Ok(())
1566    }
1567
1568    pub fn remove_index(&self, name: &str) -> Result<()> {
1569        let mut guard = acquire_write(&self.schema, "schema")?;
1570        let schema = Arc::make_mut(&mut *guard);
1571        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1572            schema.indexes.remove(pos);
1573            schema.bump_version();
1574            Ok(())
1575        } else {
1576            Err(anyhow!("Index '{}' not found", name))
1577        }
1578    }
1579
1580    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1581        let mut guard = acquire_write(&self.schema, "schema")?;
1582        let schema = Arc::make_mut(&mut *guard);
1583        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1584            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1585        }
1586        schema.constraints.push(constraint);
1587        schema.bump_version();
1588        Ok(())
1589    }
1590
1591    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1592        let mut guard = acquire_write(&self.schema, "schema")?;
1593        let schema = Arc::make_mut(&mut *guard);
1594        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1595            schema.constraints.remove(pos);
1596            schema.bump_version();
1597            Ok(())
1598        } else if if_exists {
1599            Ok(())
1600        } else {
1601            Err(anyhow!("Constraint '{}' not found", name))
1602        }
1603    }
1604
1605    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1606        let mut guard = acquire_write(&self.schema, "schema")?;
1607        let schema = Arc::make_mut(&mut *guard);
1608        let Some(props) = schema.properties.get_mut(label_or_type) else {
1609            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1610        };
1611        if props.remove(prop_name).is_none() {
1612            return Err(anyhow!(
1613                "Property '{}' not found for '{}'",
1614                prop_name,
1615                label_or_type
1616            ));
1617        }
1618        schema.bump_version();
1619        Ok(())
1620    }
1621
1622    pub fn rename_property(
1623        &self,
1624        label_or_type: &str,
1625        old_name: &str,
1626        new_name: &str,
1627    ) -> Result<()> {
1628        let mut guard = acquire_write(&self.schema, "schema")?;
1629        let schema = Arc::make_mut(&mut *guard);
1630        let Some(props) = schema.properties.get_mut(label_or_type) else {
1631            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1632        };
1633        let Some(meta) = props.remove(old_name) else {
1634            return Err(anyhow!(
1635                "Property '{}' not found for '{}'",
1636                old_name,
1637                label_or_type
1638            ));
1639        };
1640        if props.contains_key(new_name) {
1641            // Rollback removal? Or just error.
1642            props.insert(old_name.to_string(), meta); // Restore
1643            return Err(anyhow!("Property '{}' already exists", new_name));
1644        }
1645        props.insert(new_name.to_string(), meta);
1646        schema.bump_version();
1647        Ok(())
1648    }
1649
1650    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1651        let mut guard = acquire_write(&self.schema, "schema")?;
1652        let schema = Arc::make_mut(&mut *guard);
1653        if let Some(label_meta) = schema.labels.get_mut(name) {
1654            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1655            // Do not remove properties; they are implicitly tombstoned by the label
1656            schema.bump_version();
1657            Ok(())
1658        } else if if_exists {
1659            Ok(())
1660        } else {
1661            Err(anyhow!("Label '{}' not found", name))
1662        }
1663    }
1664
1665    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1666        let mut guard = acquire_write(&self.schema, "schema")?;
1667        let schema = Arc::make_mut(&mut *guard);
1668        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1669            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1670            // Do not remove properties; they are implicitly tombstoned by the edge type
1671            schema.bump_version();
1672            Ok(())
1673        } else if if_exists {
1674            Ok(())
1675        } else {
1676            Err(anyhow!("Edge Type '{}' not found", name))
1677        }
1678    }
1679}
1680
1681/// Validate identifier names to prevent injection and ensure compatibility.
1682pub fn validate_identifier(name: &str) -> Result<()> {
1683    // Length check
1684    if name.is_empty() || name.len() > 64 {
1685        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1686    }
1687
1688    // First character must be letter or underscore
1689    let first = name.chars().next().unwrap();
1690    if !first.is_alphabetic() && first != '_' {
1691        return Err(anyhow!(
1692            "Identifier '{}' must start with letter or underscore",
1693            name
1694        ));
1695    }
1696
1697    // Remaining characters: alphanumeric or underscore
1698    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1699        return Err(anyhow!(
1700            "Identifier '{}' must contain only alphanumeric and underscore",
1701            name
1702        ));
1703    }
1704
1705    // Reserved words
1706    const RESERVED: &[&str] = &[
1707        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1708        "UNION", "ORDER", "LIMIT",
1709    ];
1710    if RESERVED.contains(&name.to_uppercase().as_str()) {
1711        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1712    }
1713
1714    Ok(())
1715}
1716
1717/// Reject user-declared property names that collide with internal Arrow column
1718/// names used by the storage layer.
1719///
1720/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1721/// schema with two `ext_id` fields at flush time, which Lance rejects with
1722/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1723pub fn validate_property_name(name: &str) -> Result<()> {
1724    if name.starts_with('_') {
1725        return Err(anyhow!(
1726            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1727            name
1728        ));
1729    }
1730    validate_reserved_property_name(name)
1731}
1732
1733/// Reject names that collide with storage-layer Arrow column names.
1734///
1735/// Used both by `validate_property_name` (user-facing path) and directly by
1736/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1737/// needs to bypass the underscore-prefix rule but must still reject the
1738/// fixed-name collisions below.
1739fn validate_reserved_property_name(name: &str) -> Result<()> {
1740    // Unprefixed names that get appended alongside user properties in the
1741    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1742    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1743    // Arrow schemas — declaring one of these as a user property produces a
1744    // duplicate Arrow field and a Lance "Duplicate field name" error at
1745    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1746    // `labels` in the main tables) are NOT listed: those tables don't
1747    // append user properties, so no collision can occur.
1748    const RESERVED_PROPS: &[&str] = &[
1749        "ext_id",
1750        "overflow_json",
1751        "eid",
1752        "src_vid",
1753        "dst_vid",
1754        "op",
1755        // Internal planner sentinel: a column-name marker used by
1756        // `mark_set_item_variables` (uni-query::query::planner) to request
1757        // narrow structural projection without full-schema expansion.
1758        // Reserved here defensively so an internal `add_generated_property`
1759        // path can't accidentally create a colliding user-facing column.
1760        // The user-facing `validate_property_name` already rejects this
1761        // via the underscore-prefix rule, so this is belt-and-suspenders.
1762        "__set_struct__",
1763    ];
1764    if RESERVED_PROPS.contains(&name) {
1765        return Err(anyhow!(
1766            "Property name '{}' is reserved by the storage layer; please choose a different name",
1767            name
1768        ));
1769    }
1770    Ok(())
1771}
1772
1773#[cfg(test)]
1774mod tests {
1775    use super::*;
1776    use crate::value::{TemporalValue, Value};
1777    use object_store::local::LocalFileSystem;
1778    use tempfile::tempdir;
1779
1780    #[test]
1781    fn test_datatype_accepts_matrix() {
1782        let dt = || TemporalValue::DateTime {
1783            nanos_since_epoch: 0,
1784            offset_seconds: 0,
1785            timezone_name: None,
1786        };
1787
1788        // Null is accepted by every type (nullability checked separately).
1789        for ty in [
1790            DataType::String,
1791            DataType::Int64,
1792            DataType::Bool,
1793            DataType::DateTime,
1794            DataType::Float64,
1795        ] {
1796            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1797        }
1798
1799        // Exact-type matches.
1800        assert!(DataType::String.accepts(&Value::String("x".into())));
1801        assert!(DataType::Int64.accepts(&Value::Int(1)));
1802        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1803        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1804
1805        // Intentional lossless widenings remain allowed.
1806        assert!(
1807            DataType::Float64.accepts(&Value::Int(3)),
1808            "Int widens to Float"
1809        );
1810        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1811        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1812        assert!(
1813            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1814            "storage parses strings for non-struct Timestamp columns"
1815        );
1816
1817        // The #68 data-loss cases must be rejected (coercion handles strings separately).
1818        assert!(
1819            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1820            "String into a DateTime struct column nulls silently — reject here"
1821        );
1822        assert!(!DataType::Bool.accepts(&Value::Int(1)));
1823        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1824        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1825        assert!(
1826            !DataType::String.accepts(&Value::Int(10)),
1827            "no implicit stringification"
1828        );
1829        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1830
1831        // Opaque columns accept anything.
1832        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1833    }
1834
1835    #[tokio::test]
1836    async fn test_schema_management() -> Result<()> {
1837        let dir = tempdir()?;
1838        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1839        let path = ObjectStorePath::from("schema.json");
1840        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1841
1842        // Labels
1843        let lid = manager.add_label("Person")?;
1844        assert_eq!(lid, 1);
1845        assert!(manager.add_label("Person").is_err());
1846
1847        // Properties
1848        manager.add_property("Person", "name", DataType::String, false)?;
1849        assert!(
1850            manager
1851                .add_property("Person", "name", DataType::String, false)
1852                .is_err()
1853        );
1854
1855        // Edge types
1856        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1857        assert_eq!(tid, 1);
1858
1859        manager.save().await?;
1860        // Check file exists
1861        assert!(store.get(&path).await.is_ok());
1862
1863        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1864        assert!(manager2.schema().labels.contains_key("Person"));
1865        assert!(
1866            manager2
1867                .schema()
1868                .properties
1869                .get("Person")
1870                .unwrap()
1871                .contains_key("name")
1872        );
1873
1874        Ok(())
1875    }
1876
1877    #[tokio::test]
1878    async fn test_reserved_property_names_rejected() -> Result<()> {
1879        let dir = tempdir()?;
1880        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1881        let path = ObjectStorePath::from("schema.json");
1882        let manager = SchemaManager::load_from_store(store, &path).await?;
1883
1884        manager.add_label("Tiny")?;
1885
1886        // Unprefixed reserved names — these collide with internal Arrow
1887        // columns in storage tables and previously caused Lance
1888        // "Duplicate field name" errors at flush time.
1889        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1890            let err = manager
1891                .add_property("Tiny", reserved, DataType::String, true)
1892                .expect_err(&format!("expected '{reserved}' to be rejected"));
1893            assert!(
1894                err.to_string().contains("reserved"),
1895                "error for '{reserved}' should mention 'reserved', got: {err}"
1896            );
1897        }
1898
1899        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
1900        // alongside the underscore-prefix rule). Confirms an internal
1901        // `add_generated_property` path cannot accidentally create a column
1902        // that collides with the SET-target structural-projection marker.
1903        let err = manager
1904            .add_property("Tiny", "__set_struct__", DataType::String, true)
1905            .expect_err("expected '__set_struct__' to be rejected");
1906        assert!(
1907            err.to_string().contains("reserved"),
1908            "__set_struct__ rejection should mention 'reserved', got: {err}"
1909        );
1910
1911        // Leading-underscore pattern rule.
1912        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1913            assert!(
1914                manager
1915                    .add_property("Tiny", reserved, DataType::String, true)
1916                    .is_err(),
1917                "expected '{reserved}' to be rejected"
1918            );
1919        }
1920
1921        // Names that merely contain a reserved substring should still be
1922        // accepted.
1923        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1924        manager.add_property("Tiny", "user_op", DataType::String, true)?;
1925        manager.add_property("Tiny", "type_name", DataType::String, true)?;
1926
1927        // Same check applies to edge-type properties (single dispatch).
1928        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1929        assert!(
1930            manager
1931                .add_property("knows", "src_vid", DataType::Int64, true)
1932                .is_err()
1933        );
1934
1935        // And to generated properties.
1936        assert!(
1937            manager
1938                .add_generated_property(
1939                    "Tiny",
1940                    "ext_id",
1941                    DataType::String,
1942                    "concat('x', name)".into()
1943                )
1944                .is_err()
1945        );
1946
1947        Ok(())
1948    }
1949
1950    #[test]
1951    fn test_normalize_function_names() {
1952        assert_eq!(
1953            SchemaManager::normalize_function_names("lower(email)"),
1954            "LOWER(email)"
1955        );
1956        assert_eq!(
1957            SchemaManager::normalize_function_names("LOWER(email)"),
1958            "LOWER(email)"
1959        );
1960        assert_eq!(
1961            SchemaManager::normalize_function_names("Lower(email)"),
1962            "LOWER(email)"
1963        );
1964        assert_eq!(
1965            SchemaManager::normalize_function_names("trim(lower(email))"),
1966            "TRIM(LOWER(email))"
1967        );
1968    }
1969
1970    #[test]
1971    fn test_generated_column_name_case_insensitive() {
1972        let col1 = SchemaManager::generated_column_name("lower(email)");
1973        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1974        let col3 = SchemaManager::generated_column_name("Lower(email)");
1975        assert_eq!(col1, col2);
1976        assert_eq!(col2, col3);
1977        assert!(col1.starts_with("_gen_LOWER_email_"));
1978    }
1979
1980    #[test]
1981    fn test_index_metadata_serde_backward_compat() {
1982        // Simulate old JSON without metadata field
1983        let json = r#"{
1984            "type": "Scalar",
1985            "name": "idx_person_name",
1986            "label": "Person",
1987            "properties": ["name"],
1988            "index_type": "BTree",
1989            "where_clause": null
1990        }"#;
1991        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1992        let meta = def.metadata();
1993        assert_eq!(meta.status, IndexStatus::Online);
1994        assert!(meta.last_built_at.is_none());
1995        assert!(meta.row_count_at_build.is_none());
1996    }
1997
1998    #[test]
1999    fn test_index_metadata_serde_roundtrip() {
2000        let now = Utc::now();
2001        let def = IndexDefinition::Scalar(ScalarIndexConfig {
2002            name: "idx_test".to_string(),
2003            label: "Test".to_string(),
2004            properties: vec!["prop".to_string()],
2005            index_type: ScalarIndexType::BTree,
2006            where_clause: None,
2007            metadata: IndexMetadata {
2008                status: IndexStatus::Building,
2009                last_built_at: Some(now),
2010                row_count_at_build: Some(42),
2011            },
2012        });
2013
2014        let json = serde_json::to_string(&def).unwrap();
2015        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
2016        assert_eq!(parsed.metadata().status, IndexStatus::Building);
2017        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
2018        assert!(parsed.metadata().last_built_at.is_some());
2019    }
2020
2021    #[tokio::test]
2022    async fn test_update_index_metadata() -> Result<()> {
2023        let dir = tempdir()?;
2024        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2025        let path = ObjectStorePath::from("schema.json");
2026        let manager = SchemaManager::load_from_store(store, &path).await?;
2027
2028        manager.add_label("Person")?;
2029        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
2030            name: "idx_test".to_string(),
2031            label: "Person".to_string(),
2032            properties: vec!["name".to_string()],
2033            index_type: ScalarIndexType::BTree,
2034            where_clause: None,
2035            metadata: Default::default(),
2036        });
2037        manager.add_index(idx)?;
2038
2039        // Verify initial status is Online
2040        let initial = manager.get_index("idx_test").unwrap();
2041        assert_eq!(initial.metadata().status, IndexStatus::Online);
2042
2043        // Update to Building
2044        manager.update_index_metadata("idx_test", |m| {
2045            m.status = IndexStatus::Building;
2046            m.row_count_at_build = Some(100);
2047        })?;
2048
2049        let updated = manager.get_index("idx_test").unwrap();
2050        assert_eq!(updated.metadata().status, IndexStatus::Building);
2051        assert_eq!(updated.metadata().row_count_at_build, Some(100));
2052
2053        // Non-existent index should error
2054        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
2055
2056        Ok(())
2057    }
2058
2059    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
2060    /// invocations with the same `IndexDefinition::name()` must replace
2061    /// the entry in place rather than appending. Subsequent `add_index`
2062    /// calls also reflect metadata updates from the new definition.
2063    #[tokio::test]
2064    async fn test_add_index_is_upsert_by_name() -> Result<()> {
2065        let dir = tempdir()?;
2066        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2067        let path = ObjectStorePath::from("schema.json");
2068        let manager = SchemaManager::load_from_store(store, &path).await?;
2069        manager.add_label("Person")?;
2070
2071        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
2072            name: "idx_test".to_string(),
2073            label: "Person".to_string(),
2074            properties: vec!["name".to_string()],
2075            index_type: ScalarIndexType::BTree,
2076            where_clause: None,
2077            metadata: IndexMetadata {
2078                status: IndexStatus::Building,
2079                ..Default::default()
2080            },
2081        });
2082        manager.add_index(initial.clone())?;
2083        assert_eq!(manager.schema().indexes.len(), 1);
2084
2085        // Re-add the identical def — must remain a single entry.
2086        manager.add_index(initial.clone())?;
2087        assert_eq!(
2088            manager.schema().indexes.len(),
2089            1,
2090            "duplicate add_index by name must not append"
2091        );
2092
2093        // Re-add with updated metadata — must replace in place, len unchanged.
2094        let mut updated_cfg = match initial {
2095            IndexDefinition::Scalar(c) => c,
2096            _ => unreachable!(),
2097        };
2098        updated_cfg.metadata.status = IndexStatus::Online;
2099        updated_cfg.metadata.row_count_at_build = Some(42);
2100        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2101        assert_eq!(manager.schema().indexes.len(), 1);
2102        let stored = manager.get_index("idx_test").unwrap();
2103        assert_eq!(stored.metadata().status, IndexStatus::Online);
2104        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2105
2106        // A *different* name appends as a new entry.
2107        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2108            name: "idx_other".to_string(),
2109            label: "Person".to_string(),
2110            properties: vec!["age".to_string()],
2111            index_type: ScalarIndexType::BTree,
2112            where_clause: None,
2113            metadata: IndexMetadata::default(),
2114        });
2115        manager.add_index(other)?;
2116        assert_eq!(manager.schema().indexes.len(), 2);
2117
2118        Ok(())
2119    }
2120
2121    /// `load_from_store` self-heals catalogs that were bloated by the
2122    /// pre-fix `add_index` (kept the *last* def per name).
2123    #[tokio::test]
2124    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2125        let dir = tempdir()?;
2126        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2127        let path = ObjectStorePath::from("schema.json");
2128
2129        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2130        // sharing the same name. The last entry has distinct metadata so
2131        // we can assert "last writer wins" semantics.
2132        let mut schema = Schema::default();
2133        schema.labels.insert(
2134            "Person".to_string(),
2135            LabelMeta {
2136                id: 1,
2137                created_at: chrono::Utc::now(),
2138                state: SchemaElementState::Active,
2139                description: None,
2140            },
2141        );
2142        let make = |status: IndexStatus, count: Option<u64>| {
2143            IndexDefinition::Scalar(ScalarIndexConfig {
2144                name: "idx_dup".to_string(),
2145                label: "Person".to_string(),
2146                properties: vec!["name".to_string()],
2147                index_type: ScalarIndexType::BTree,
2148                where_clause: None,
2149                metadata: IndexMetadata {
2150                    status,
2151                    row_count_at_build: count,
2152                    ..Default::default()
2153                },
2154            })
2155        };
2156        for _ in 0..49 {
2157            schema.indexes.push(make(IndexStatus::Building, None));
2158        }
2159        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2160        let json = serde_json::to_string_pretty(&schema)?;
2161        store.put(&path, json.into()).await?;
2162
2163        let manager = SchemaManager::load_from_store(store, &path).await?;
2164        let schema = manager.schema();
2165        assert_eq!(
2166            schema.indexes.len(),
2167            1,
2168            "load() must collapse 50 duplicates by name to 1"
2169        );
2170        // Last-writer-wins: the kept entry is the final push (Online, 123).
2171        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2172        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2173
2174        Ok(())
2175    }
2176
2177    #[test]
2178    fn test_vector_index_for_property_skips_non_online() {
2179        let mut schema = Schema::default();
2180        schema.labels.insert(
2181            "Document".to_string(),
2182            LabelMeta {
2183                id: 1,
2184                created_at: chrono::Utc::now(),
2185                state: SchemaElementState::Active,
2186                description: None,
2187            },
2188        );
2189
2190        // Add a vector index with Stale status
2191        schema
2192            .indexes
2193            .push(IndexDefinition::Vector(VectorIndexConfig {
2194                name: "vec_doc_embedding".to_string(),
2195                label: "Document".to_string(),
2196                property: "embedding".to_string(),
2197                index_type: VectorIndexType::Flat,
2198                metric: DistanceMetric::Cosine,
2199                embedding_config: None,
2200                metadata: IndexMetadata {
2201                    status: IndexStatus::Stale,
2202                    ..Default::default()
2203                },
2204            }));
2205
2206        // Stale index should NOT be returned
2207        assert!(
2208            schema
2209                .vector_index_for_property("Document", "embedding")
2210                .is_none()
2211        );
2212
2213        // Set to Online — should now be returned
2214        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2215            cfg.metadata.status = IndexStatus::Online;
2216        }
2217        let result = schema.vector_index_for_property("Document", "embedding");
2218        assert!(result.is_some());
2219        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2220    }
2221
2222    #[tokio::test]
2223    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2224        use crate::core::fork::SchemaDelta;
2225
2226        let dir = tempdir()?;
2227        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2228        let path = ObjectStorePath::from("schema.json");
2229        let primary = SchemaManager::load_from_store(store, &path).await?;
2230        primary.add_label("Person")?;
2231
2232        let overlay = primary.with_overlay(&SchemaDelta::empty());
2233        assert_eq!(overlay.schema().labels.len(), 1);
2234
2235        // Phase 1 invariant: mutating the overlay manager must not bleed
2236        // into primary's schema.
2237        overlay.add_label("Forked")?;
2238        assert!(overlay.schema().labels.contains_key("Forked"));
2239        assert!(!primary.schema().labels.contains_key("Forked"));
2240
2241        Ok(())
2242    }
2243
2244    #[tokio::test]
2245    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2246        use crate::core::fork::SchemaDelta;
2247        use chrono::Utc;
2248
2249        let dir = tempdir()?;
2250        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2251        let path = ObjectStorePath::from("schema.json");
2252        let primary = SchemaManager::load_from_store(store, &path).await?;
2253        primary.add_label("Existing")?;
2254
2255        let label_meta = LabelMeta {
2256            id: 99,
2257            created_at: Utc::now(),
2258            state: SchemaElementState::Active,
2259            description: None,
2260        };
2261        let edge_meta = EdgeTypeMeta {
2262            id: 99,
2263            src_labels: vec!["NewLabel".into()],
2264            dst_labels: vec!["NewLabel".into()],
2265            state: SchemaElementState::Active,
2266            description: None,
2267        };
2268        let delta = SchemaDelta {
2269            added_labels: vec![("NewLabel".to_string(), label_meta)],
2270            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2271            added_properties: vec![],
2272        };
2273
2274        let overlay = primary.with_overlay(&delta);
2275        let merged = overlay.schema();
2276        assert!(merged.labels.contains_key("Existing"));
2277        assert!(merged.labels.contains_key("NewLabel"));
2278        assert!(merged.edge_types.contains_key("NewEdge"));
2279
2280        // Primary unchanged.
2281        assert!(!primary.schema().labels.contains_key("NewLabel"));
2282        Ok(())
2283    }
2284
2285    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2286    /// must converge on a single id (the read-lock fast path double-checks
2287    /// under the write lock); a schema-defined type must win over the
2288    /// schemaless registry.
2289    #[tokio::test]
2290    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2291        let dir = tempdir()?;
2292        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2293        let path = ObjectStorePath::from("schema.json");
2294        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2295
2296        let mut handles = Vec::new();
2297        for _ in 0..16 {
2298            let m = manager.clone();
2299            handles.push(std::thread::spawn(move || {
2300                m.get_or_assign_edge_type_id("RACED")
2301            }));
2302        }
2303        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2304        assert!(
2305            ids.iter().all(|&id| id == ids[0]),
2306            "all racers must observe one id, got {ids:?}"
2307        );
2308        // Fast path returns the same id afterwards.
2309        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2310
2311        // Schema-defined type wins over the schemaless registry.
2312        manager.add_label("A")?;
2313        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2314        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2315        Ok(())
2316    }
2317
2318    /// Minting a brand-new schemaless edge type must bump `schema_version`
2319    /// (the plan cache keys on it; untyped traversals bake `all_edge_type_ids()`
2320    /// into the plan, so a stale plan would silently drop edges of the new
2321    /// type). Re-resolving an existing type must NOT bump. (review C5)
2322    #[test]
2323    fn test_new_schemaless_edge_type_bumps_schema_version() {
2324        let mut schema = Schema::default();
2325        let v0 = schema.schema_version;
2326
2327        let id1 = schema.get_or_assign_edge_type_id("FRESH");
2328        assert_eq!(
2329            schema.schema_version,
2330            v0.wrapping_add(1),
2331            "minting a new edge type must bump schema_version"
2332        );
2333
2334        // Re-resolving the same type is a no-op — no further bump.
2335        let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2336        assert_eq!(id1, id1_again);
2337        assert_eq!(
2338            schema.schema_version,
2339            v0.wrapping_add(1),
2340            "resolving an existing edge type must not bump schema_version"
2341        );
2342
2343        // A second distinct new type bumps again.
2344        let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2345        assert_eq!(
2346            schema.schema_version,
2347            v0.wrapping_add(2),
2348            "a second new edge type must bump schema_version again"
2349        );
2350    }
2351
2352    /// L6: label/edge-type names with path separators, whitespace, or
2353    /// control chars are rejected at definition; benign names (incl. `.`)
2354    /// are accepted.
2355    #[test]
2356    fn validate_schema_element_name_rejects_unsafe() {
2357        for bad in ["", "   ", "a/b", "a b", "a\nb", "a\\b", "x\0y"] {
2358            assert!(
2359                SchemaManager::validate_schema_element_name("Label", bad).is_err(),
2360                "expected {bad:?} to be rejected"
2361            );
2362        }
2363        for good in ["Person", "My.Label", "edge_2", "KNOWS"] {
2364            assert!(
2365                SchemaManager::validate_schema_element_name("Label", good).is_ok(),
2366                "expected {good:?} to be accepted"
2367            );
2368        }
2369        // Over-length is rejected.
2370        let long = "x".repeat(MAX_SCHEMA_NAME_LEN + 1);
2371        assert!(SchemaManager::validate_schema_element_name("Label", &long).is_err());
2372    }
2373}