Skip to main content

uni_common/core/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::core::edge_type::{
5    MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
6    is_schemaless_edge_type, make_schemaless_id,
7};
8use crate::sync::{acquire_read, acquire_write};
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11use object_store::ObjectStore;
12use object_store::ObjectStoreExt;
13use object_store::local::LocalFileSystem;
14use object_store::path::Path as ObjectStorePath;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub enum SchemaElementState {
23    Active,
24    Hidden {
25        since: DateTime<Utc>,
26        last_active_snapshot: String, // SnapshotId
27    },
28    Tombstone {
29        since: DateTime<Utc>,
30    },
31}
32
33use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
34
35/// Returns the canonical struct field definitions for DateTime encoding in Arrow.
36///
37/// DateTime is encoded as a 3-field struct to preserve timezone information:
38/// - `nanos_since_epoch`: i64 nanoseconds since Unix epoch (UTC)
39/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
40/// - `timezone_name`: Optional IANA timezone name (e.g., "America/New_York")
41pub fn datetime_struct_fields() -> Fields {
42    Fields::from(vec![
43        Field::new(
44            "nanos_since_epoch",
45            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
46            true,
47        ),
48        Field::new("offset_seconds", ArrowDataType::Int32, true),
49        Field::new("timezone_name", ArrowDataType::Utf8, true),
50    ])
51}
52
53/// Returns the canonical struct field definitions for Time encoding in Arrow.
54///
55/// Time is encoded as a 2-field struct to preserve timezone offset:
56/// - `nanos_since_midnight`: i64 nanoseconds since midnight (0-86,399,999,999,999)
57/// - `offset_seconds`: i32 seconds offset from UTC (e.g., +3600 for +01:00)
58pub fn time_struct_fields() -> Fields {
59    Fields::from(vec![
60        Field::new(
61            "nanos_since_midnight",
62            ArrowDataType::Time64(TimeUnit::Nanosecond),
63            true,
64        ),
65        Field::new("offset_seconds", ArrowDataType::Int32, true),
66    ])
67}
68
69/// Detects if an Arrow DataType is the canonical DateTime struct.
70pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
71    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
72}
73
74/// Detects if an Arrow DataType is the canonical Time struct.
75pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
76    matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
80#[non_exhaustive]
81pub enum CrdtType {
82    GCounter,
83    GSet,
84    ORSet,
85    LWWRegister,
86    LWWMap,
87    Rga,
88    VectorClock,
89    VCRegister,
90}
91
92impl CrdtType {
93    /// Returns the canonical variant name for this CRDT type.
94    ///
95    /// The returned strings must stay in sync with `uni_crdt::Crdt::type_name`,
96    /// so a written CRDT value can be validated against its schema-declared
97    /// variant (see uni-store's write-time CRDT enforcement).
98    ///
99    /// # Examples
100    /// ```
101    /// use uni_common::core::schema::CrdtType;
102    /// assert_eq!(CrdtType::GCounter.type_name(), "GCounter");
103    /// ```
104    #[must_use]
105    pub fn type_name(&self) -> &'static str {
106        match self {
107            CrdtType::GCounter => "GCounter",
108            CrdtType::GSet => "GSet",
109            CrdtType::ORSet => "ORSet",
110            CrdtType::LWWRegister => "LWWRegister",
111            CrdtType::LWWMap => "LWWMap",
112            CrdtType::Rga => "Rga",
113            CrdtType::VectorClock => "VectorClock",
114            CrdtType::VCRegister => "VCRegister",
115        }
116    }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
120pub enum PointType {
121    Geographic,  // WGS84
122    Cartesian2D, // x, y
123    Cartesian3D, // x, y, z
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127#[non_exhaustive]
128pub enum DataType {
129    String,
130    Int32,
131    Int64,
132    Float32,
133    Float64,
134    Bool,
135    Timestamp,
136    Date,
137    Time,
138    DateTime,
139    Duration,
140    CypherValue,
141    Bytes,
142    Point(PointType),
143    Vector { dimensions: usize },
144    Btic,
145    Crdt(CrdtType),
146    List(Box<DataType>),
147    Map(Box<DataType>, Box<DataType>),
148}
149
150impl DataType {
151    // Alias for compatibility/convenience if needed, but preferable to use exact types.
152    #[allow(non_upper_case_globals)]
153    pub const Float: DataType = DataType::Float64;
154    #[allow(non_upper_case_globals)]
155    pub const Int: DataType = DataType::Int64;
156
157    pub fn to_arrow(&self) -> ArrowDataType {
158        match self {
159            DataType::String => ArrowDataType::Utf8,
160            DataType::Int32 => ArrowDataType::Int32,
161            DataType::Int64 => ArrowDataType::Int64,
162            DataType::Float32 => ArrowDataType::Float32,
163            DataType::Float64 => ArrowDataType::Float64,
164            DataType::Bool => ArrowDataType::Boolean,
165            DataType::Timestamp => {
166                ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
167            }
168            DataType::Date => ArrowDataType::Date32,
169            DataType::Time => ArrowDataType::Struct(time_struct_fields()),
170            DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
171            DataType::Duration => ArrowDataType::LargeBinary, // Lance doesn't support Interval(MonthDayNano); use CypherValue codec
172            DataType::CypherValue => ArrowDataType::LargeBinary, // MessagePack-tagged binary encoding
173            DataType::Bytes => ArrowDataType::LargeBinary, // raw byte buffer (no codec wrapping)
174            DataType::Point(pt) => match pt {
175                PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
176                    Field::new("latitude", ArrowDataType::Float64, false),
177                    Field::new("longitude", ArrowDataType::Float64, false),
178                    Field::new("crs", ArrowDataType::Utf8, false),
179                ])),
180                PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
181                    Field::new("x", ArrowDataType::Float64, false),
182                    Field::new("y", ArrowDataType::Float64, false),
183                    Field::new("crs", ArrowDataType::Utf8, false),
184                ])),
185                PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
186                    Field::new("x", ArrowDataType::Float64, false),
187                    Field::new("y", ArrowDataType::Float64, false),
188                    Field::new("z", ArrowDataType::Float64, false),
189                    Field::new("crs", ArrowDataType::Utf8, false),
190                ])),
191            },
192            DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
193                Arc::new(Field::new("item", ArrowDataType::Float32, true)),
194                *dimensions as i32,
195            ),
196            DataType::Btic => ArrowDataType::FixedSizeBinary(24),
197            DataType::Crdt(_) => ArrowDataType::Binary, // Store CRDT as binary MessagePack
198            DataType::List(inner) => {
199                ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
200            }
201            DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
202                "item",
203                ArrowDataType::Struct(Fields::from(vec![
204                    Field::new("key", key.to_arrow(), false),
205                    Field::new("value", value.to_arrow(), true),
206                ])),
207                true,
208            ))),
209        }
210    }
211
212    /// Returns `true` if `value` is directly storable in this column type without loss.
213    ///
214    /// This is the schema-level type guard used by the write path. `Value::Null` is
215    /// always accepted — column nullability is enforced separately by the `nullable`
216    /// flag, not here. `CypherValue`, `Crdt`, and `Point` columns accept any value.
217    /// For every other declared type, only the `Value` variants that the storage layer
218    /// persists *without silently nulling* are accepted (see the per-type converters in
219    /// `uni-store`'s `arrow_convert`), plus the intentional lossless widenings
220    /// `Int`→`Float`, `Int`→`Int32`, and `Temporal`→`Timestamp`.
221    ///
222    /// A `Value::String` destined for a `Date`/`Time`/`DateTime`/`Duration` column is
223    /// intentionally *not* accepted here: the write path first coerces such strings into
224    /// the proper `Temporal` value (matching the Cypher temporal constructors), then the
225    /// coerced value passes this check. This keeps `accepts` a pure, allocation-free
226    /// predicate.
227    ///
228    /// # Examples
229    /// ```
230    /// use uni_common::core::schema::DataType;
231    /// use uni_common::Value;
232    ///
233    /// assert!(DataType::Float64.accepts(&Value::Int(3))); // Int widens to Float
234    /// assert!(DataType::Bool.accepts(&Value::Null)); // Null always accepted
235    /// assert!(!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())));
236    /// ```
237    pub fn accepts(&self, value: &crate::value::Value) -> bool {
238        use crate::value::{TemporalValue, Value};
239
240        // Null is universally accepted; nullability is a separate concern.
241        if matches!(value, Value::Null) {
242            return true;
243        }
244
245        match self {
246            // Opaque / dynamically-typed columns accept any value.
247            DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
248
249            DataType::String => matches!(value, Value::String(_)),
250            DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
251            // Int widens to Float losslessly for the ranges we care about.
252            DataType::Float32 | DataType::Float64 => {
253                matches!(value, Value::Int(_) | Value::Float(_))
254            }
255            DataType::Bool => matches!(value, Value::Bool(_)),
256
257            // Non-struct timestamp column: storage parses strings and accepts ints,
258            // so both are lossless here (unlike the DateTime struct column below).
259            DataType::Timestamp => matches!(
260                value,
261                Value::String(_)
262                    | Value::Int(_)
263                    | Value::Temporal(
264                        TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
265                    )
266            ),
267            DataType::DateTime => matches!(
268                value,
269                Value::Temporal(
270                    TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
271                )
272            ),
273            DataType::Date => {
274                matches!(
275                    value,
276                    Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
277                )
278            }
279            DataType::Time => matches!(
280                value,
281                Value::Int(_)
282                    | Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
283            ),
284            DataType::Duration => {
285                matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
286            }
287            DataType::Bytes => matches!(value, Value::Bytes(_)),
288            // FixedSizeBinary(24) converter accepts the Btic temporal, raw strings, and lists.
289            DataType::Btic => matches!(
290                value,
291                Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
292            ),
293            DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
294            DataType::List(_) => matches!(value, Value::List(_)),
295            DataType::Map(_, _) => matches!(value, Value::Map(_)),
296        }
297    }
298}
299
300fn default_created_at() -> DateTime<Utc> {
301    Utc::now()
302}
303
304fn default_state() -> SchemaElementState {
305    SchemaElementState::Active
306}
307
308fn default_version_1() -> u32 {
309    1
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct PropertyMeta {
314    pub r#type: DataType,
315    pub nullable: bool,
316    #[serde(default = "default_version_1")]
317    pub added_in: u32, // SchemaVersion
318    #[serde(default = "default_state")]
319    pub state: SchemaElementState,
320    #[serde(default)]
321    pub generation_expression: Option<String>,
322    #[serde(default, skip_serializing_if = "Option::is_none")]
323    pub description: Option<String>,
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct LabelMeta {
328    pub id: u16, // LabelId
329    #[serde(default = "default_created_at")]
330    pub created_at: DateTime<Utc>,
331    #[serde(default = "default_state")]
332    pub state: SchemaElementState,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub description: Option<String>,
335}
336
337#[derive(Clone, Debug, Serialize, Deserialize)]
338pub struct EdgeTypeMeta {
339    /// See [`crate::core::edge_type::EdgeTypeId`] for bit-layout details.
340    pub id: u32,
341    pub src_labels: Vec<String>,
342    pub dst_labels: Vec<String>,
343    #[serde(default = "default_state")]
344    pub state: SchemaElementState,
345    #[serde(default, skip_serializing_if = "Option::is_none")]
346    pub description: Option<String>,
347}
348
349#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350#[non_exhaustive]
351pub enum ConstraintType {
352    Unique { properties: Vec<String> },
353    Exists { property: String },
354    Check { expression: String },
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
358#[non_exhaustive]
359pub enum ConstraintTarget {
360    Label(String),
361    EdgeType(String),
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize)]
365pub struct Constraint {
366    pub name: String,
367    pub constraint_type: ConstraintType,
368    pub target: ConstraintTarget,
369    pub enabled: bool,
370}
371
372/// Bidirectional registry for dynamically-assigned schemaless edge type IDs.
373///
374/// Edge types not defined in the schema are assigned IDs at runtime with
375/// bit 31 set (see [`crate::core::edge_type`]). This registry maintains
376/// the name-to-ID and ID-to-name mappings for those types.
377#[derive(Clone, Debug, Serialize, Deserialize)]
378pub struct SchemalessEdgeTypeRegistry {
379    name_to_id: HashMap<String, u32>,
380    id_to_name: HashMap<u32, String>,
381    /// Next local ID to assign (0 is reserved for invalid).
382    next_local_id: u32,
383}
384
385impl SchemalessEdgeTypeRegistry {
386    pub fn new() -> Self {
387        Self {
388            name_to_id: HashMap::new(),
389            id_to_name: HashMap::new(),
390            next_local_id: 1,
391        }
392    }
393
394    /// Returns the schemaless ID for `type_name`, assigning a new one if needed.
395    pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
396        if let Some(&id) = self.name_to_id.get(type_name) {
397            return id;
398        }
399
400        let id = make_schemaless_id(self.next_local_id);
401        self.next_local_id += 1;
402
403        self.name_to_id.insert(type_name.to_string(), id);
404        self.id_to_name.insert(id, type_name.to_string());
405
406        id
407    }
408
409    /// Looks up the edge type name for a schemaless ID.
410    pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
411        self.id_to_name.get(&type_id).map(String::as_str)
412    }
413
414    /// Returns `true` if `type_name` has already been assigned a schemaless ID.
415    pub fn contains(&self, type_name: &str) -> bool {
416        self.name_to_id.contains_key(type_name)
417    }
418
419    /// Looks up the schemaless ID for `type_name` (exact match, read-only).
420    pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
421        self.name_to_id.get(type_name).copied()
422    }
423
424    /// Looks up the edge type ID for `type_name` with case-insensitive matching.
425    pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
426        self.name_to_id
427            .iter()
428            .find(|(k, _)| k.eq_ignore_ascii_case(type_name))
429            .map(|(_, &id)| id)
430    }
431
432    /// Returns all registered schemaless type IDs.
433    pub fn all_type_ids(&self) -> Vec<u32> {
434        self.id_to_name.keys().copied().collect()
435    }
436
437    /// Returns true if the registry has any schemaless types.
438    pub fn is_empty(&self) -> bool {
439        self.name_to_id.is_empty()
440    }
441}
442
443impl Default for SchemalessEdgeTypeRegistry {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449/// First virtual (catalog-resolved) label ID. Label IDs in
450/// `VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL` are owned by
451/// plugin-registered `CatalogProvider`s and allocated lazily by the
452/// planner via `PluginRegistry::register_virtual_label`. Native label
453/// allocation (`SchemaManager::add_label`) refuses IDs in this range.
454pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
455/// Sentinel "no label" marker, kept distinct from any allocatable ID.
456pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
457
458/// Returns `true` if `id` is in the virtual (catalog-resolved) range.
459#[inline]
460pub fn is_virtual_label_id(id: u16) -> bool {
461    (VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct Schema {
466    pub schema_version: u32,
467    pub labels: HashMap<String, LabelMeta>,
468    pub edge_types: HashMap<String, EdgeTypeMeta>,
469    pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
470    #[serde(default)]
471    pub indexes: Vec<IndexDefinition>,
472    #[serde(default)]
473    pub constraints: Vec<Constraint>,
474    /// Registry for schemaless edge types (dynamically assigned IDs)
475    #[serde(default)]
476    pub schemaless_registry: SchemalessEdgeTypeRegistry,
477}
478
479impl Default for Schema {
480    fn default() -> Self {
481        Self {
482            schema_version: 1,
483            labels: HashMap::new(),
484            edge_types: HashMap::new(),
485            properties: HashMap::new(),
486            indexes: Vec::new(),
487            constraints: Vec::new(),
488            schemaless_registry: SchemalessEdgeTypeRegistry::new(),
489        }
490    }
491}
492
493impl Schema {
494    /// Bumps `schema_version` to invalidate cached query plans.
495    ///
496    /// Called at the end of every DDL mutation that changes the schema's
497    /// shape (labels, edge types, properties, indexes, constraints). The
498    /// plan-cache eviction guard keys on `schema_version`, so a stale plan
499    /// built against an older shape is discarded once this advances. Uses
500    /// wrapping arithmetic: the value is a coarse change token, not a count,
501    /// so wraparound only risks a missed eviction after 2^32 DDL operations.
502    fn bump_version(&mut self) {
503        self.schema_version = self.schema_version.wrapping_add(1);
504    }
505
506    /// Returns the label name for a given label ID.
507    ///
508    /// Performs a linear scan over all labels. This is efficient because
509    /// the number of labels in a schema is typically small.
510    pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
511        self.labels
512            .iter()
513            .find(|(_, meta)| meta.id == label_id)
514            .map(|(name, _)| name.as_str())
515    }
516
517    /// Returns the label ID for a given label name.
518    pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
519        self.labels.get(label_name).map(|meta| meta.id)
520    }
521
522    /// Returns the edge type name for a given type ID.
523    ///
524    /// Performs a linear scan over all edge types. This is efficient because
525    /// the number of edge types in a schema is typically small.
526    pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
527        self.edge_types
528            .iter()
529            .find(|(_, meta)| meta.id == type_id)
530            .map(|(name, _)| name.as_str())
531    }
532
533    /// Returns the edge type ID for a given type name.
534    pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
535        self.edge_types.get(type_name).map(|meta| meta.id)
536    }
537
538    /// Returns the vector index configuration for a given label and property.
539    ///
540    /// Performs a linear scan over all indexes. This is efficient because
541    /// the number of indexes in a schema is typically small.
542    pub fn vector_index_for_property(
543        &self,
544        label: &str,
545        property: &str,
546    ) -> Option<&VectorIndexConfig> {
547        self.indexes.iter().find_map(|idx| {
548            if let IndexDefinition::Vector(config) = idx
549                && config.label == label
550                && config.property == property
551                && config.metadata.status == IndexStatus::Online
552            {
553                return Some(config);
554            }
555            None
556        })
557    }
558
559    /// Returns the full-text index configuration for a given label and property.
560    ///
561    /// A full-text index covers one or more properties. This returns the config
562    /// if the specified property is among the indexed properties.
563    pub fn fulltext_index_for_property(
564        &self,
565        label: &str,
566        property: &str,
567    ) -> Option<&FullTextIndexConfig> {
568        self.indexes.iter().find_map(|idx| {
569            if let IndexDefinition::FullText(config) = idx
570                && config.label == label
571                && config.properties.iter().any(|p| p == property)
572                && config.metadata.status == IndexStatus::Online
573            {
574                return Some(config);
575            }
576            None
577        })
578    }
579
580    /// Get label metadata with case-insensitive lookup.
581    ///
582    /// This allows queries to match labels regardless of case, providing
583    /// better user experience when label names vary in casing.
584    pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
585        self.labels
586            .iter()
587            .find(|(k, _)| k.eq_ignore_ascii_case(name))
588            .map(|(_, v)| v)
589    }
590
591    /// Get the schema-canonical spelling of a label, matched case-insensitively.
592    ///
593    /// Returns the stored label name whose spelling differs only in case from
594    /// `name`, or `None` if no such label is registered. Callers use this to
595    /// normalize a user-supplied label to the canonical form the storage tier
596    /// keys on, so case variants resolve to the same vertex table.
597    pub fn canonical_label_name(&self, name: &str) -> Option<String> {
598        self.labels
599            .iter()
600            .find(|(k, _)| k.eq_ignore_ascii_case(name))
601            .map(|(k, _)| k.clone())
602    }
603
604    /// Get label ID with case-insensitive lookup.
605    pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
606        self.get_label_case_insensitive(label_name)
607            .map(|meta| meta.id)
608    }
609
610    /// Get edge type metadata with case-insensitive lookup.
611    ///
612    /// This allows queries to match edge types regardless of case, providing
613    /// better user experience when type names vary in casing.
614    pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
615        self.edge_types
616            .iter()
617            .find(|(k, _)| k.eq_ignore_ascii_case(name))
618            .map(|(_, v)| v)
619    }
620
621    /// Get edge type ID with case-insensitive lookup (schema-defined types only).
622    pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
623        self.get_edge_type_case_insensitive(type_name)
624            .map(|meta| meta.id)
625    }
626
627    /// Get edge type ID with case-insensitive lookup, checking both
628    /// schema-defined and schemaless registries.
629    pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
630        self.edge_type_id_by_name_case_insensitive(type_name)
631            .or_else(|| {
632                self.schemaless_registry
633                    .id_by_name_case_insensitive(type_name)
634            })
635    }
636
637    /// Returns the edge type ID for `type_name`, checking the schema first
638    /// and falling back to the schemaless registry (assigning a new ID if needed).
639    ///
640    /// Requires `&mut self` because it may assign a new schemaless ID.
641    /// Use [`edge_type_id_by_name`](Self::edge_type_id_by_name) for read-only schema lookups.
642    pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
643        if let Some(id) = self.edge_type_id_unified(type_name) {
644            return id;
645        }
646        // Reaching here means the type is brand-new to *both* the schema map and
647        // the schemaless registry (the early return above mirrors exactly what
648        // `edge_type_id_unified` checks). Minting a new schemaless edge type
649        // changes the result of `all_edge_type_ids()`, which untyped traversals
650        // bake into cached plans keyed on `schema_version`. Bump the version so
651        // those stale plans are evicted — otherwise a `MATCH ()-[r]->()` plan
652        // built before this type existed silently drops edges of the new type.
653        let id = self.schemaless_registry.get_or_assign_id(type_name);
654        self.bump_version();
655        id
656    }
657
658    /// Read-only unified exact lookup: schema-defined edge type id, falling
659    /// back to an already-assigned schemaless id.
660    ///
661    /// Mirrors exactly the checks [`Self::get_or_assign_edge_type_id`]
662    /// performs before assigning, so a `Some` here means the assigning path
663    /// would be a no-op — the basis for `SchemaManager`'s read-lock fast path.
664    pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
665        self.edge_type_id_by_name(type_name)
666            .or_else(|| self.schemaless_registry.id_by_name(type_name))
667    }
668
669    /// Returns the edge type name for `type_id`, checking both the schema
670    /// and schemaless registries. Returns an owned `String` because the
671    /// name may come from either registry.
672    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
673        if is_schemaless_edge_type(type_id) {
674            self.schemaless_registry
675                .type_name_by_id(type_id)
676                .map(str::to_owned)
677        } else {
678            self.edge_type_name_by_id(type_id).map(str::to_owned)
679        }
680    }
681
682    /// Returns all edge type IDs, including both schema-defined and schemaless types.
683    /// Used when MATCH queries don't specify an edge type and need to scan all edges.
684    pub fn all_edge_type_ids(&self) -> Vec<u32> {
685        let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
686        ids.extend(self.schemaless_registry.all_type_ids());
687        ids.sort_unstable();
688        ids
689    }
690}
691
692/// Lifecycle status of an index.
693#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
694pub enum IndexStatus {
695    /// Index is up-to-date and available for queries.
696    #[default]
697    Online,
698    /// Index is currently being rebuilt.
699    Building,
700    /// Index is outdated and scheduled for rebuild.
701    Stale,
702    /// Index rebuild failed after exhausting retries.
703    Failed,
704}
705
706/// Metadata tracking the lifecycle state of an index.
707#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
708pub struct IndexMetadata {
709    /// Current lifecycle status.
710    #[serde(default)]
711    pub status: IndexStatus,
712    /// When the index was last successfully built.
713    #[serde(default)]
714    pub last_built_at: Option<DateTime<Utc>>,
715    /// Row count of the dataset when the index was last built.
716    #[serde(default)]
717    pub row_count_at_build: Option<u64>,
718}
719
720#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
721#[serde(tag = "type")]
722#[non_exhaustive]
723pub enum IndexDefinition {
724    Vector(VectorIndexConfig),
725    FullText(FullTextIndexConfig),
726    Scalar(ScalarIndexConfig),
727    Inverted(InvertedIndexConfig),
728    JsonFullText(JsonFtsIndexConfig),
729}
730
731impl IndexDefinition {
732    /// Returns the index name for any variant.
733    pub fn name(&self) -> &str {
734        match self {
735            IndexDefinition::Vector(c) => &c.name,
736            IndexDefinition::FullText(c) => &c.name,
737            IndexDefinition::Scalar(c) => &c.name,
738            IndexDefinition::Inverted(c) => &c.name,
739            IndexDefinition::JsonFullText(c) => &c.name,
740        }
741    }
742
743    /// Returns the label this index is defined on.
744    pub fn label(&self) -> &str {
745        match self {
746            IndexDefinition::Vector(c) => &c.label,
747            IndexDefinition::FullText(c) => &c.label,
748            IndexDefinition::Scalar(c) => &c.label,
749            IndexDefinition::Inverted(c) => &c.label,
750            IndexDefinition::JsonFullText(c) => &c.label,
751        }
752    }
753
754    /// Returns a reference to the index lifecycle metadata.
755    pub fn metadata(&self) -> &IndexMetadata {
756        match self {
757            IndexDefinition::Vector(c) => &c.metadata,
758            IndexDefinition::FullText(c) => &c.metadata,
759            IndexDefinition::Scalar(c) => &c.metadata,
760            IndexDefinition::Inverted(c) => &c.metadata,
761            IndexDefinition::JsonFullText(c) => &c.metadata,
762        }
763    }
764
765    /// Returns a mutable reference to the index lifecycle metadata.
766    pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
767        match self {
768            IndexDefinition::Vector(c) => &mut c.metadata,
769            IndexDefinition::FullText(c) => &mut c.metadata,
770            IndexDefinition::Scalar(c) => &mut c.metadata,
771            IndexDefinition::Inverted(c) => &mut c.metadata,
772            IndexDefinition::JsonFullText(c) => &mut c.metadata,
773        }
774    }
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
778pub struct InvertedIndexConfig {
779    pub name: String,
780    pub label: String,
781    pub property: String,
782    #[serde(default = "default_normalize")]
783    pub normalize: bool,
784    #[serde(default = "default_max_terms_per_doc")]
785    pub max_terms_per_doc: usize,
786    #[serde(default)]
787    pub metadata: IndexMetadata,
788}
789
790fn default_normalize() -> bool {
791    true
792}
793
794fn default_max_terms_per_doc() -> usize {
795    10_000
796}
797
798#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
799pub struct VectorIndexConfig {
800    pub name: String,
801    pub label: String,
802    pub property: String,
803    pub index_type: VectorIndexType,
804    pub metric: DistanceMetric,
805    pub embedding_config: Option<EmbeddingConfig>,
806    #[serde(default)]
807    pub metadata: IndexMetadata,
808}
809
810#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
811pub struct EmbeddingConfig {
812    /// Model alias in the Uni-Xervo catalog (for example: "embed/default").
813    pub alias: String,
814    pub source_properties: Vec<String>,
815    pub batch_size: usize,
816    /// Prefix prepended to text before embedding during auto-embed (document side).
817    /// Example: `"search_document: "` for Nomic models. Include any trailing space.
818    #[serde(default)]
819    pub document_prefix: Option<String>,
820    /// Prefix prepended to text before embedding during query-time embed calls.
821    /// Example: `"search_query: "` for Nomic models. Include any trailing space.
822    #[serde(default)]
823    pub query_prefix: Option<String>,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
827#[non_exhaustive]
828pub enum VectorIndexType {
829    Flat,
830    IvfFlat {
831        num_partitions: u32,
832    },
833    IvfPq {
834        num_partitions: u32,
835        num_sub_vectors: u32,
836        bits_per_subvector: u8,
837    },
838    IvfSq {
839        num_partitions: u32,
840    },
841    IvfRq {
842        num_partitions: u32,
843        #[serde(default)]
844        num_bits: Option<u8>,
845    },
846    HnswFlat {
847        m: u32,
848        ef_construction: u32,
849        #[serde(default)]
850        num_partitions: Option<u32>,
851    },
852    HnswSq {
853        m: u32,
854        ef_construction: u32,
855        #[serde(default)]
856        num_partitions: Option<u32>,
857    },
858    HnswPq {
859        m: u32,
860        ef_construction: u32,
861        num_sub_vectors: u32,
862        #[serde(default)]
863        num_partitions: Option<u32>,
864    },
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
868#[non_exhaustive]
869pub enum DistanceMetric {
870    Cosine,
871    L2,
872    Dot,
873}
874
875impl DistanceMetric {
876    /// Computes the distance between two vectors using this metric.
877    ///
878    /// All metrics follow LanceDB conventions so that lower values indicate
879    /// greater similarity:
880    /// - **L2**: squared Euclidean distance.
881    /// - **Cosine**: `1.0 - cosine_similarity` (range \[0, 2\]).
882    /// - **Dot**: negative dot product.
883    ///
884    /// # Panics
885    ///
886    /// Panics if `a` and `b` have different lengths.
887    pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
888        assert_eq!(a.len(), b.len(), "vector dimension mismatch");
889        match self {
890            DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
891            DistanceMetric::Cosine => {
892                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
893                let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
894                let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
895                let denom = norm_a * norm_b;
896                if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
897            }
898            DistanceMetric::Dot => {
899                let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
900                -dot
901            }
902        }
903    }
904}
905
906#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
907pub struct FullTextIndexConfig {
908    pub name: String,
909    pub label: String,
910    pub properties: Vec<String>,
911    pub tokenizer: TokenizerConfig,
912    pub with_positions: bool,
913    #[serde(default)]
914    pub metadata: IndexMetadata,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
918#[non_exhaustive]
919pub enum TokenizerConfig {
920    Standard,
921    Whitespace,
922    Ngram { min: u8, max: u8 },
923    Custom { name: String },
924}
925
926#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
927pub struct JsonFtsIndexConfig {
928    pub name: String,
929    pub label: String,
930    pub column: String,
931    #[serde(default)]
932    pub paths: Vec<String>,
933    #[serde(default)]
934    pub with_positions: bool,
935    #[serde(default)]
936    pub metadata: IndexMetadata,
937}
938
939#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
940pub struct ScalarIndexConfig {
941    pub name: String,
942    pub label: String,
943    pub properties: Vec<String>,
944    pub index_type: ScalarIndexType,
945    pub where_clause: Option<String>,
946    #[serde(default)]
947    pub metadata: IndexMetadata,
948}
949
950#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
951#[non_exhaustive]
952pub enum ScalarIndexType {
953    BTree,
954    Hash,
955    Bitmap,
956    LabelList,
957}
958
959pub struct SchemaManager {
960    store: Arc<dyn ObjectStore>,
961    path: ObjectStorePath,
962    schema: RwLock<Arc<Schema>>,
963}
964
965impl SchemaManager {
966    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
967        let path = path.as_ref();
968        let parent = path
969            .parent()
970            .ok_or_else(|| anyhow!("Invalid schema path"))?;
971        let filename = path
972            .file_name()
973            .ok_or_else(|| anyhow!("Invalid schema filename"))?
974            .to_str()
975            .ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
976
977        let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
978        let obj_path = ObjectStorePath::from(filename);
979
980        Self::load_from_store(store, &obj_path).await
981    }
982
983    pub async fn load_from_store(
984        store: Arc<dyn ObjectStore>,
985        path: &ObjectStorePath,
986    ) -> Result<Self> {
987        match store.get(path).await {
988            Ok(result) => {
989                let bytes = result.bytes().await?;
990                let content = String::from_utf8(bytes.to_vec())?;
991                let mut schema: Schema = serde_json::from_str(&content)?;
992                // Self-heal catalogs that grew super-linearly under the
993                // pre-fix `add_index` (issue rustic-ai/uni-db#63). Collapse
994                // duplicate index entries by name, keeping the *last*
995                // occurrence — matches the upsert semantics in `add_index`
996                // and preserves whatever metadata the most recent rebuild
997                // wrote. The dedup persists on the next mutation that
998                // calls `save()`.
999                let original_len = schema.indexes.len();
1000                if original_len > 0 {
1001                    let mut seen: std::collections::HashSet<String> =
1002                        std::collections::HashSet::with_capacity(original_len);
1003                    let mut dedup: Vec<IndexDefinition> = schema
1004                        .indexes
1005                        .iter()
1006                        .rev()
1007                        .filter(|idx| seen.insert(idx.name().to_string()))
1008                        .cloned()
1009                        .collect();
1010                    dedup.reverse();
1011                    if dedup.len() != original_len {
1012                        tracing::warn!(
1013                            collapsed = original_len - dedup.len(),
1014                            kept = dedup.len(),
1015                            "schema.indexes: collapsed duplicate entries on load (issue #63)"
1016                        );
1017                        schema.indexes = dedup;
1018                    }
1019                }
1020                Ok(Self {
1021                    store,
1022                    path: path.clone(),
1023                    schema: RwLock::new(Arc::new(schema)),
1024                })
1025            }
1026            Err(object_store::Error::NotFound { .. }) => Ok(Self {
1027                store,
1028                path: path.clone(),
1029                schema: RwLock::new(Arc::new(Schema::default())),
1030            }),
1031            Err(e) => Err(anyhow::Error::from(e)),
1032        }
1033    }
1034
1035    pub async fn save(&self) -> Result<()> {
1036        let content = {
1037            let schema_guard = acquire_read(&self.schema, "schema")?;
1038            serde_json::to_string_pretty(&**schema_guard)?
1039        };
1040        self.store
1041            .put(&self.path, content.into())
1042            .await
1043            .map_err(anyhow::Error::from)?;
1044        Ok(())
1045    }
1046
1047    pub fn path(&self) -> &ObjectStorePath {
1048        &self.path
1049    }
1050
1051    pub fn schema(&self) -> Arc<Schema> {
1052        self.schema
1053            .read()
1054            .expect("Schema lock poisoned - a thread panicked while holding it")
1055            .clone()
1056    }
1057
1058    /// Normalize function names in an expression to uppercase for case-insensitive matching.
1059    /// Examples: "lower(email)" -> "LOWER(email)", "trim(name)" -> "TRIM(name)"
1060    fn normalize_function_names(expr: &str) -> String {
1061        let mut result = String::with_capacity(expr.len());
1062        let mut chars = expr.chars().peekable();
1063
1064        while let Some(ch) = chars.next() {
1065            if ch.is_alphabetic() {
1066                // Collect identifier
1067                let mut ident = String::new();
1068                ident.push(ch);
1069
1070                while let Some(&next) = chars.peek() {
1071                    if next.is_alphanumeric() || next == '_' {
1072                        ident.push(chars.next().unwrap());
1073                    } else {
1074                        break;
1075                    }
1076                }
1077
1078                // If followed by '(', it's a function call - uppercase it
1079                if chars.peek() == Some(&'(') {
1080                    result.push_str(&ident.to_uppercase());
1081                } else {
1082                    result.push_str(&ident); // Keep property names as-is
1083                }
1084            } else {
1085                result.push(ch);
1086            }
1087        }
1088
1089        result
1090    }
1091
1092    /// Generate a consistent internal column name for an expression index.
1093    /// Uses a hash suffix to ensure uniqueness for different expressions that
1094    /// might sanitize to the same string (e.g., "a+b" and "a-b" both become "a_b").
1095    ///
1096    /// IMPORTANT: Uses FNV-1a hash which is stable across Rust versions and platforms.
1097    /// DefaultHasher is not guaranteed to be stable and could break persistent data
1098    /// if the hash changes after a compiler upgrade.
1099    pub fn generated_column_name(expr: &str) -> String {
1100        // Normalize function names to uppercase for case-insensitive matching
1101        let normalized = Self::normalize_function_names(expr);
1102
1103        let sanitized = normalized
1104            .replace(|c: char| !c.is_alphanumeric(), "_")
1105            .trim_matches('_')
1106            .to_string();
1107
1108        // FNV-1a 64-bit hash - stable across Rust versions and platforms
1109        const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
1110        const FNV_PRIME: u64 = 1099511628211;
1111
1112        let mut hash = FNV_OFFSET_BASIS;
1113        for byte in normalized.as_bytes() {
1114            hash ^= *byte as u64;
1115            hash = hash.wrapping_mul(FNV_PRIME);
1116        }
1117
1118        format!("_gen_{}_{:x}", sanitized, hash)
1119    }
1120
1121    pub fn replace_schema(&self, new_schema: Schema) {
1122        let mut schema = self
1123            .schema
1124            .write()
1125            .expect("Schema lock poisoned - a thread panicked while holding it");
1126        *schema = Arc::new(new_schema);
1127    }
1128
1129    /// Build a fork-scoped manager whose schema is `primary ⊕ overlay`.
1130    ///
1131    /// Used by `UniInner::at_fork` to give a forked session a schema view
1132    /// that includes any labels/edge-types/properties the fork has
1133    /// introduced on top of primary. The returned manager owns its own
1134    /// in-memory `Arc<Schema>` — mutations to it never reach primary's
1135    /// schema file. The returned manager is *not* intended for `.save()`;
1136    /// fork-overlay persistence is owned by the registry layer
1137    /// (`catalog/fork_schemas/{fork_id}.json`).
1138    ///
1139    /// In Phase 1 the delta is always empty, so the merge is a clone.
1140    /// Phase 2 starts populating it when on-the-fly label creation lands.
1141    #[must_use]
1142    pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
1143        let primary = self.schema();
1144        let merged = if overlay.is_empty() {
1145            (*primary).clone()
1146        } else {
1147            let mut merged = (*primary).clone();
1148            for (name, label) in &overlay.added_labels {
1149                merged.labels.insert(name.clone(), label.clone());
1150            }
1151            for (name, edge_type) in &overlay.added_edge_types {
1152                merged.edge_types.insert(name.clone(), edge_type.clone());
1153            }
1154            for addition in &overlay.added_properties {
1155                let props = merged.properties.entry(addition.owner.clone()).or_default();
1156                props.insert(
1157                    addition.property.clone(),
1158                    PropertyMeta {
1159                        r#type: addition.data_type.clone(),
1160                        nullable: addition.nullable,
1161                        added_in: merged.schema_version,
1162                        state: SchemaElementState::Active,
1163                        generation_expression: None,
1164                        description: None,
1165                    },
1166                );
1167            }
1168            merged
1169        };
1170
1171        Arc::new(Self {
1172            store: self.store.clone(),
1173            path: self.path.clone(),
1174            schema: RwLock::new(Arc::new(merged)),
1175        })
1176    }
1177
1178    pub fn next_label_id(&self) -> u16 {
1179        self.schema()
1180            .labels
1181            .values()
1182            .map(|l| l.id)
1183            .max()
1184            .unwrap_or(0)
1185            + 1
1186    }
1187
1188    pub fn next_type_id(&self) -> u32 {
1189        let max_schema_id = self
1190            .schema()
1191            .edge_types
1192            .values()
1193            .map(|t| t.id)
1194            .max()
1195            .unwrap_or(0);
1196
1197        // Ensure we stay in schema'd ID space (bit 31 = 0)
1198        if max_schema_id >= MAX_SCHEMA_TYPE_ID {
1199            panic!("Schema edge type ID exhaustion");
1200        }
1201
1202        max_schema_id + 1
1203    }
1204
1205    pub fn add_label(&self, name: &str) -> Result<u16> {
1206        self.add_label_with_desc(name, None)
1207    }
1208
1209    pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
1210        let mut guard = acquire_write(&self.schema, "schema")?;
1211        let schema = Arc::make_mut(&mut *guard);
1212        if schema.labels.contains_key(name) {
1213            return Err(anyhow!("Label '{}' already exists", name));
1214        }
1215
1216        let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
1217        if id >= VIRTUAL_LABEL_ID_START {
1218            return Err(anyhow!(
1219                "Native label space exhausted (next id {id:#x} would enter the \
1220                 virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
1221                 reserved for catalog-resolved labels)"
1222            ));
1223        }
1224        schema.labels.insert(
1225            name.to_string(),
1226            LabelMeta {
1227                id,
1228                created_at: Utc::now(),
1229                state: SchemaElementState::Active,
1230                description,
1231            },
1232        );
1233        schema.bump_version();
1234        Ok(id)
1235    }
1236
1237    pub fn add_edge_type(
1238        &self,
1239        name: &str,
1240        src_labels: Vec<String>,
1241        dst_labels: Vec<String>,
1242    ) -> Result<u32> {
1243        self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
1244    }
1245
1246    pub fn add_edge_type_with_desc(
1247        &self,
1248        name: &str,
1249        src_labels: Vec<String>,
1250        dst_labels: Vec<String>,
1251        description: Option<String>,
1252    ) -> Result<u32> {
1253        let mut guard = acquire_write(&self.schema, "schema")?;
1254        let schema = Arc::make_mut(&mut *guard);
1255        if schema.edge_types.contains_key(name) {
1256            return Err(anyhow!("Edge type '{}' already exists", name));
1257        }
1258
1259        let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
1260
1261        // Stay in the schema-defined sub-range (bit 31 = 0, and below the
1262        // virtual reservation `VIRTUAL_EDGE_TYPE_ID_START`) — same bound as
1263        // `add_edge_type`, so the two entry points cannot disagree on the
1264        // legal ceiling.
1265        if id >= VIRTUAL_EDGE_TYPE_ID_START {
1266            return Err(anyhow!(
1267                "Native edge type space exhausted (next id {id:#x} would enter the \
1268                 virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
1269                 reserved for catalog-resolved edge types)"
1270            ));
1271        }
1272
1273        schema.edge_types.insert(
1274            name.to_string(),
1275            EdgeTypeMeta {
1276                id,
1277                src_labels,
1278                dst_labels,
1279                state: SchemaElementState::Active,
1280                description,
1281            },
1282        );
1283        schema.bump_version();
1284        Ok(id)
1285    }
1286
1287    /// Delegates to [`Schema::get_or_assign_edge_type_id`].
1288    ///
1289    /// Read-lock fast path: the type name is almost always already known
1290    /// (it is constant per statement but resolved per row by the CREATE
1291    /// executor), and the slow path's write lock + `Arc::make_mut` deep-clones
1292    /// the whole `Schema` whenever the Arc is shared — which under SSI it
1293    /// always is. Double-checked: on a miss, `Schema::get_or_assign_edge_type_id`
1294    /// re-checks under the write lock, so two racing assigners converge on one id.
1295    pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
1296        {
1297            let guard = acquire_read(&self.schema, "schema")
1298                .expect("Schema lock poisoned - a thread panicked while holding it");
1299            if let Some(id) = guard.edge_type_id_unified(type_name) {
1300                return id;
1301            }
1302        }
1303        let mut guard = acquire_write(&self.schema, "schema")
1304            .expect("Schema lock poisoned - a thread panicked while holding it");
1305        let schema = Arc::make_mut(&mut *guard);
1306        schema.get_or_assign_edge_type_id(type_name)
1307    }
1308
1309    /// Delegates to [`Schema::edge_type_name_by_id_unified`].
1310    pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
1311        let schema = acquire_read(&self.schema, "schema")
1312            .expect("Schema lock poisoned - a thread panicked while holding it");
1313        schema.edge_type_name_by_id_unified(type_id)
1314    }
1315
1316    pub fn add_property(
1317        &self,
1318        label_or_type: &str,
1319        prop_name: &str,
1320        data_type: DataType,
1321        nullable: bool,
1322    ) -> Result<()> {
1323        self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
1324    }
1325
1326    pub fn add_property_with_desc(
1327        &self,
1328        label_or_type: &str,
1329        prop_name: &str,
1330        data_type: DataType,
1331        nullable: bool,
1332        description: Option<String>,
1333    ) -> Result<()> {
1334        validate_property_name(prop_name)?;
1335        let mut guard = acquire_write(&self.schema, "schema")?;
1336        let schema = Arc::make_mut(&mut *guard);
1337        let version = schema.schema_version;
1338        let props = schema
1339            .properties
1340            .entry(label_or_type.to_string())
1341            .or_default();
1342
1343        if props.contains_key(prop_name) {
1344            return Err(anyhow!(
1345                "Property '{}' already exists for '{}'",
1346                prop_name,
1347                label_or_type
1348            ));
1349        }
1350
1351        props.insert(
1352            prop_name.to_string(),
1353            PropertyMeta {
1354                r#type: data_type,
1355                nullable,
1356                added_in: version,
1357                state: SchemaElementState::Active,
1358                generation_expression: None,
1359                description,
1360            },
1361        );
1362        // Bump after stamping `added_in` with the pre-bump `version`.
1363        schema.bump_version();
1364        Ok(())
1365    }
1366
1367    pub fn add_generated_property(
1368        &self,
1369        label_or_type: &str,
1370        prop_name: &str,
1371        data_type: DataType,
1372        expr: String,
1373    ) -> Result<()> {
1374        // System-generated `_gen_*` columns bypass the underscore-prefix rule
1375        // but must still avoid storage-layer column-name collisions.
1376        validate_reserved_property_name(prop_name)?;
1377        let mut guard = acquire_write(&self.schema, "schema")?;
1378        let schema = Arc::make_mut(&mut *guard);
1379        let version = schema.schema_version;
1380        let props = schema
1381            .properties
1382            .entry(label_or_type.to_string())
1383            .or_default();
1384
1385        if props.contains_key(prop_name) {
1386            return Err(anyhow!("Property '{}' already exists", prop_name));
1387        }
1388
1389        props.insert(
1390            prop_name.to_string(),
1391            PropertyMeta {
1392                r#type: data_type,
1393                nullable: true,
1394                added_in: version,
1395                state: SchemaElementState::Active,
1396                generation_expression: Some(expr),
1397                description: None,
1398            },
1399        );
1400        // Bump after stamping `added_in` with the pre-bump `version`.
1401        schema.bump_version();
1402        Ok(())
1403    }
1404
1405    pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
1406        let mut guard = acquire_write(&self.schema, "schema")?;
1407        let schema = Arc::make_mut(&mut *guard);
1408        let meta = schema
1409            .labels
1410            .get_mut(name)
1411            .ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
1412        meta.description = description;
1413        Ok(())
1414    }
1415
1416    pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
1417        let mut guard = acquire_write(&self.schema, "schema")?;
1418        let schema = Arc::make_mut(&mut *guard);
1419        let meta = schema
1420            .edge_types
1421            .get_mut(name)
1422            .ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
1423        meta.description = description;
1424        Ok(())
1425    }
1426
1427    pub fn set_property_description(
1428        &self,
1429        entity: &str,
1430        prop_name: &str,
1431        description: Option<String>,
1432    ) -> Result<()> {
1433        let mut guard = acquire_write(&self.schema, "schema")?;
1434        let schema = Arc::make_mut(&mut *guard);
1435        let props = schema
1436            .properties
1437            .get_mut(entity)
1438            .ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
1439        let meta = props
1440            .get_mut(prop_name)
1441            .ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
1442        meta.description = description;
1443        Ok(())
1444    }
1445
1446    /// Register an index definition on the schema, **upsert by name**.
1447    ///
1448    /// If an index with the same `IndexDefinition::name()` already exists, it
1449    /// is replaced in place; otherwise the def is appended. Idempotent under
1450    /// repeat invocation, which makes `SchemaBuilder::apply()` re-applicable
1451    /// without bloating `schema.indexes` and lets the rebuild epilogue inside
1452    /// every `IndexManager::create_*_index` re-record metadata updates without
1453    /// duplicating entries (issue rustic-ai/uni-db#63).
1454    pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
1455        let mut guard = acquire_write(&self.schema, "schema")?;
1456        let schema = Arc::make_mut(&mut *guard);
1457        if let Some(existing) = schema
1458            .indexes
1459            .iter_mut()
1460            .find(|i| i.name() == index_def.name())
1461        {
1462            *existing = index_def;
1463        } else {
1464            schema.indexes.push(index_def);
1465        }
1466        schema.bump_version();
1467        Ok(())
1468    }
1469
1470    pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
1471        let schema = self.schema.read().expect("Schema lock poisoned");
1472        schema.indexes.iter().find(|i| i.name() == name).cloned()
1473    }
1474
1475    /// Updates the lifecycle metadata for an index by name.
1476    ///
1477    /// The closure receives a mutable reference to the index's `IndexMetadata`,
1478    /// allowing callers to update status, timestamps, etc.
1479    pub fn update_index_metadata(
1480        &self,
1481        index_name: &str,
1482        f: impl FnOnce(&mut IndexMetadata),
1483    ) -> Result<()> {
1484        let mut guard = acquire_write(&self.schema, "schema")?;
1485        let schema = Arc::make_mut(&mut *guard);
1486        let idx = schema
1487            .indexes
1488            .iter_mut()
1489            .find(|i| i.name() == index_name)
1490            .ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
1491        f(idx.metadata_mut());
1492        Ok(())
1493    }
1494
1495    pub fn remove_index(&self, name: &str) -> Result<()> {
1496        let mut guard = acquire_write(&self.schema, "schema")?;
1497        let schema = Arc::make_mut(&mut *guard);
1498        if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
1499            schema.indexes.remove(pos);
1500            schema.bump_version();
1501            Ok(())
1502        } else {
1503            Err(anyhow!("Index '{}' not found", name))
1504        }
1505    }
1506
1507    pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
1508        let mut guard = acquire_write(&self.schema, "schema")?;
1509        let schema = Arc::make_mut(&mut *guard);
1510        if schema.constraints.iter().any(|c| c.name == constraint.name) {
1511            return Err(anyhow!("Constraint '{}' already exists", constraint.name));
1512        }
1513        schema.constraints.push(constraint);
1514        schema.bump_version();
1515        Ok(())
1516    }
1517
1518    pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
1519        let mut guard = acquire_write(&self.schema, "schema")?;
1520        let schema = Arc::make_mut(&mut *guard);
1521        if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
1522            schema.constraints.remove(pos);
1523            schema.bump_version();
1524            Ok(())
1525        } else if if_exists {
1526            Ok(())
1527        } else {
1528            Err(anyhow!("Constraint '{}' not found", name))
1529        }
1530    }
1531
1532    pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
1533        let mut guard = acquire_write(&self.schema, "schema")?;
1534        let schema = Arc::make_mut(&mut *guard);
1535        let Some(props) = schema.properties.get_mut(label_or_type) else {
1536            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1537        };
1538        if props.remove(prop_name).is_none() {
1539            return Err(anyhow!(
1540                "Property '{}' not found for '{}'",
1541                prop_name,
1542                label_or_type
1543            ));
1544        }
1545        schema.bump_version();
1546        Ok(())
1547    }
1548
1549    pub fn rename_property(
1550        &self,
1551        label_or_type: &str,
1552        old_name: &str,
1553        new_name: &str,
1554    ) -> Result<()> {
1555        let mut guard = acquire_write(&self.schema, "schema")?;
1556        let schema = Arc::make_mut(&mut *guard);
1557        let Some(props) = schema.properties.get_mut(label_or_type) else {
1558            return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
1559        };
1560        let Some(meta) = props.remove(old_name) else {
1561            return Err(anyhow!(
1562                "Property '{}' not found for '{}'",
1563                old_name,
1564                label_or_type
1565            ));
1566        };
1567        if props.contains_key(new_name) {
1568            // Rollback removal? Or just error.
1569            props.insert(old_name.to_string(), meta); // Restore
1570            return Err(anyhow!("Property '{}' already exists", new_name));
1571        }
1572        props.insert(new_name.to_string(), meta);
1573        schema.bump_version();
1574        Ok(())
1575    }
1576
1577    pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
1578        let mut guard = acquire_write(&self.schema, "schema")?;
1579        let schema = Arc::make_mut(&mut *guard);
1580        if let Some(label_meta) = schema.labels.get_mut(name) {
1581            label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1582            // Do not remove properties; they are implicitly tombstoned by the label
1583            schema.bump_version();
1584            Ok(())
1585        } else if if_exists {
1586            Ok(())
1587        } else {
1588            Err(anyhow!("Label '{}' not found", name))
1589        }
1590    }
1591
1592    pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
1593        let mut guard = acquire_write(&self.schema, "schema")?;
1594        let schema = Arc::make_mut(&mut *guard);
1595        if let Some(edge_meta) = schema.edge_types.get_mut(name) {
1596            edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
1597            // Do not remove properties; they are implicitly tombstoned by the edge type
1598            schema.bump_version();
1599            Ok(())
1600        } else if if_exists {
1601            Ok(())
1602        } else {
1603            Err(anyhow!("Edge Type '{}' not found", name))
1604        }
1605    }
1606}
1607
1608/// Validate identifier names to prevent injection and ensure compatibility.
1609pub fn validate_identifier(name: &str) -> Result<()> {
1610    // Length check
1611    if name.is_empty() || name.len() > 64 {
1612        return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
1613    }
1614
1615    // First character must be letter or underscore
1616    let first = name.chars().next().unwrap();
1617    if !first.is_alphabetic() && first != '_' {
1618        return Err(anyhow!(
1619            "Identifier '{}' must start with letter or underscore",
1620            name
1621        ));
1622    }
1623
1624    // Remaining characters: alphanumeric or underscore
1625    if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
1626        return Err(anyhow!(
1627            "Identifier '{}' must contain only alphanumeric and underscore",
1628            name
1629        ));
1630    }
1631
1632    // Reserved words
1633    const RESERVED: &[&str] = &[
1634        "MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
1635        "UNION", "ORDER", "LIMIT",
1636    ];
1637    if RESERVED.contains(&name.to_uppercase().as_str()) {
1638        return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
1639    }
1640
1641    Ok(())
1642}
1643
1644/// Reject user-declared property names that collide with internal Arrow column
1645/// names used by the storage layer.
1646///
1647/// Without this, declaring a property named e.g. `ext_id` produces an Arrow
1648/// schema with two `ext_id` fields at flush time, which Lance rejects with
1649/// "Duplicate field name" — silently losing all in-session writes on shutdown.
1650pub fn validate_property_name(name: &str) -> Result<()> {
1651    if name.starts_with('_') {
1652        return Err(anyhow!(
1653            "Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
1654            name
1655        ));
1656    }
1657    validate_reserved_property_name(name)
1658}
1659
1660/// Reject names that collide with storage-layer Arrow column names.
1661///
1662/// Used both by `validate_property_name` (user-facing path) and directly by
1663/// `add_generated_property` (system-generated `_gen_*` path) — the latter
1664/// needs to bypass the underscore-prefix rule but must still reject the
1665/// fixed-name collisions below.
1666fn validate_reserved_property_name(name: &str) -> Result<()> {
1667    // Unprefixed names that get appended alongside user properties in the
1668    // per-label vertex (`storage/vertex.rs`), per-edge-type edge
1669    // (`storage/edge.rs`), or per-edge-type delta (`storage/delta.rs`)
1670    // Arrow schemas — declaring one of these as a user property produces a
1671    // duplicate Arrow field and a Lance "Duplicate field name" error at
1672    // flush time. Fixed-schema-only columns (`type`, `props_json`,
1673    // `labels` in the main tables) are NOT listed: those tables don't
1674    // append user properties, so no collision can occur.
1675    const RESERVED_PROPS: &[&str] = &[
1676        "ext_id",
1677        "overflow_json",
1678        "eid",
1679        "src_vid",
1680        "dst_vid",
1681        "op",
1682        // Internal planner sentinel: a column-name marker used by
1683        // `mark_set_item_variables` (uni-query::query::planner) to request
1684        // narrow structural projection without full-schema expansion.
1685        // Reserved here defensively so an internal `add_generated_property`
1686        // path can't accidentally create a colliding user-facing column.
1687        // The user-facing `validate_property_name` already rejects this
1688        // via the underscore-prefix rule, so this is belt-and-suspenders.
1689        "__set_struct__",
1690    ];
1691    if RESERVED_PROPS.contains(&name) {
1692        return Err(anyhow!(
1693            "Property name '{}' is reserved by the storage layer; please choose a different name",
1694            name
1695        ));
1696    }
1697    Ok(())
1698}
1699
1700#[cfg(test)]
1701mod tests {
1702    use super::*;
1703    use crate::value::{TemporalValue, Value};
1704    use object_store::local::LocalFileSystem;
1705    use tempfile::tempdir;
1706
1707    #[test]
1708    fn test_datatype_accepts_matrix() {
1709        let dt = || TemporalValue::DateTime {
1710            nanos_since_epoch: 0,
1711            offset_seconds: 0,
1712            timezone_name: None,
1713        };
1714
1715        // Null is accepted by every type (nullability checked separately).
1716        for ty in [
1717            DataType::String,
1718            DataType::Int64,
1719            DataType::Bool,
1720            DataType::DateTime,
1721            DataType::Float64,
1722        ] {
1723            assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
1724        }
1725
1726        // Exact-type matches.
1727        assert!(DataType::String.accepts(&Value::String("x".into())));
1728        assert!(DataType::Int64.accepts(&Value::Int(1)));
1729        assert!(DataType::Bool.accepts(&Value::Bool(true)));
1730        assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
1731
1732        // Intentional lossless widenings remain allowed.
1733        assert!(
1734            DataType::Float64.accepts(&Value::Int(3)),
1735            "Int widens to Float"
1736        );
1737        assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
1738        assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
1739        assert!(
1740            DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1741            "storage parses strings for non-struct Timestamp columns"
1742        );
1743
1744        // The #68 data-loss cases must be rejected (coercion handles strings separately).
1745        assert!(
1746            !DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
1747            "String into a DateTime struct column nulls silently — reject here"
1748        );
1749        assert!(!DataType::Bool.accepts(&Value::Int(1)));
1750        assert!(!DataType::Int64.accepts(&Value::Bool(true)));
1751        assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
1752        assert!(
1753            !DataType::String.accepts(&Value::Int(10)),
1754            "no implicit stringification"
1755        );
1756        assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
1757
1758        // Opaque columns accept anything.
1759        assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
1760    }
1761
1762    #[tokio::test]
1763    async fn test_schema_management() -> Result<()> {
1764        let dir = tempdir()?;
1765        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1766        let path = ObjectStorePath::from("schema.json");
1767        let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
1768
1769        // Labels
1770        let lid = manager.add_label("Person")?;
1771        assert_eq!(lid, 1);
1772        assert!(manager.add_label("Person").is_err());
1773
1774        // Properties
1775        manager.add_property("Person", "name", DataType::String, false)?;
1776        assert!(
1777            manager
1778                .add_property("Person", "name", DataType::String, false)
1779                .is_err()
1780        );
1781
1782        // Edge types
1783        let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
1784        assert_eq!(tid, 1);
1785
1786        manager.save().await?;
1787        // Check file exists
1788        assert!(store.get(&path).await.is_ok());
1789
1790        let manager2 = SchemaManager::load_from_store(store, &path).await?;
1791        assert!(manager2.schema().labels.contains_key("Person"));
1792        assert!(
1793            manager2
1794                .schema()
1795                .properties
1796                .get("Person")
1797                .unwrap()
1798                .contains_key("name")
1799        );
1800
1801        Ok(())
1802    }
1803
1804    #[tokio::test]
1805    async fn test_reserved_property_names_rejected() -> Result<()> {
1806        let dir = tempdir()?;
1807        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1808        let path = ObjectStorePath::from("schema.json");
1809        let manager = SchemaManager::load_from_store(store, &path).await?;
1810
1811        manager.add_label("Tiny")?;
1812
1813        // Unprefixed reserved names — these collide with internal Arrow
1814        // columns in storage tables and previously caused Lance
1815        // "Duplicate field name" errors at flush time.
1816        for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
1817            let err = manager
1818                .add_property("Tiny", reserved, DataType::String, true)
1819                .expect_err(&format!("expected '{reserved}' to be rejected"));
1820            assert!(
1821                err.to_string().contains("reserved"),
1822                "error for '{reserved}' should mention 'reserved', got: {err}"
1823            );
1824        }
1825
1826        // Planner sentinel — reserved in RESERVED_PROPS (belt-and-suspenders
1827        // alongside the underscore-prefix rule). Confirms an internal
1828        // `add_generated_property` path cannot accidentally create a column
1829        // that collides with the SET-target structural-projection marker.
1830        let err = manager
1831            .add_property("Tiny", "__set_struct__", DataType::String, true)
1832            .expect_err("expected '__set_struct__' to be rejected");
1833        assert!(
1834            err.to_string().contains("reserved"),
1835            "__set_struct__ rejection should mention 'reserved', got: {err}"
1836        );
1837
1838        // Leading-underscore pattern rule.
1839        for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
1840            assert!(
1841                manager
1842                    .add_property("Tiny", reserved, DataType::String, true)
1843                    .is_err(),
1844                "expected '{reserved}' to be rejected"
1845            );
1846        }
1847
1848        // Names that merely contain a reserved substring should still be
1849        // accepted.
1850        manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
1851        manager.add_property("Tiny", "user_op", DataType::String, true)?;
1852        manager.add_property("Tiny", "type_name", DataType::String, true)?;
1853
1854        // Same check applies to edge-type properties (single dispatch).
1855        manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
1856        assert!(
1857            manager
1858                .add_property("knows", "src_vid", DataType::Int64, true)
1859                .is_err()
1860        );
1861
1862        // And to generated properties.
1863        assert!(
1864            manager
1865                .add_generated_property(
1866                    "Tiny",
1867                    "ext_id",
1868                    DataType::String,
1869                    "concat('x', name)".into()
1870                )
1871                .is_err()
1872        );
1873
1874        Ok(())
1875    }
1876
1877    #[test]
1878    fn test_normalize_function_names() {
1879        assert_eq!(
1880            SchemaManager::normalize_function_names("lower(email)"),
1881            "LOWER(email)"
1882        );
1883        assert_eq!(
1884            SchemaManager::normalize_function_names("LOWER(email)"),
1885            "LOWER(email)"
1886        );
1887        assert_eq!(
1888            SchemaManager::normalize_function_names("Lower(email)"),
1889            "LOWER(email)"
1890        );
1891        assert_eq!(
1892            SchemaManager::normalize_function_names("trim(lower(email))"),
1893            "TRIM(LOWER(email))"
1894        );
1895    }
1896
1897    #[test]
1898    fn test_generated_column_name_case_insensitive() {
1899        let col1 = SchemaManager::generated_column_name("lower(email)");
1900        let col2 = SchemaManager::generated_column_name("LOWER(email)");
1901        let col3 = SchemaManager::generated_column_name("Lower(email)");
1902        assert_eq!(col1, col2);
1903        assert_eq!(col2, col3);
1904        assert!(col1.starts_with("_gen_LOWER_email_"));
1905    }
1906
1907    #[test]
1908    fn test_index_metadata_serde_backward_compat() {
1909        // Simulate old JSON without metadata field
1910        let json = r#"{
1911            "type": "Scalar",
1912            "name": "idx_person_name",
1913            "label": "Person",
1914            "properties": ["name"],
1915            "index_type": "BTree",
1916            "where_clause": null
1917        }"#;
1918        let def: IndexDefinition = serde_json::from_str(json).unwrap();
1919        let meta = def.metadata();
1920        assert_eq!(meta.status, IndexStatus::Online);
1921        assert!(meta.last_built_at.is_none());
1922        assert!(meta.row_count_at_build.is_none());
1923    }
1924
1925    #[test]
1926    fn test_index_metadata_serde_roundtrip() {
1927        let now = Utc::now();
1928        let def = IndexDefinition::Scalar(ScalarIndexConfig {
1929            name: "idx_test".to_string(),
1930            label: "Test".to_string(),
1931            properties: vec!["prop".to_string()],
1932            index_type: ScalarIndexType::BTree,
1933            where_clause: None,
1934            metadata: IndexMetadata {
1935                status: IndexStatus::Building,
1936                last_built_at: Some(now),
1937                row_count_at_build: Some(42),
1938            },
1939        });
1940
1941        let json = serde_json::to_string(&def).unwrap();
1942        let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
1943        assert_eq!(parsed.metadata().status, IndexStatus::Building);
1944        assert_eq!(parsed.metadata().row_count_at_build, Some(42));
1945        assert!(parsed.metadata().last_built_at.is_some());
1946    }
1947
1948    #[tokio::test]
1949    async fn test_update_index_metadata() -> Result<()> {
1950        let dir = tempdir()?;
1951        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1952        let path = ObjectStorePath::from("schema.json");
1953        let manager = SchemaManager::load_from_store(store, &path).await?;
1954
1955        manager.add_label("Person")?;
1956        let idx = IndexDefinition::Scalar(ScalarIndexConfig {
1957            name: "idx_test".to_string(),
1958            label: "Person".to_string(),
1959            properties: vec!["name".to_string()],
1960            index_type: ScalarIndexType::BTree,
1961            where_clause: None,
1962            metadata: Default::default(),
1963        });
1964        manager.add_index(idx)?;
1965
1966        // Verify initial status is Online
1967        let initial = manager.get_index("idx_test").unwrap();
1968        assert_eq!(initial.metadata().status, IndexStatus::Online);
1969
1970        // Update to Building
1971        manager.update_index_metadata("idx_test", |m| {
1972            m.status = IndexStatus::Building;
1973            m.row_count_at_build = Some(100);
1974        })?;
1975
1976        let updated = manager.get_index("idx_test").unwrap();
1977        assert_eq!(updated.metadata().status, IndexStatus::Building);
1978        assert_eq!(updated.metadata().row_count_at_build, Some(100));
1979
1980        // Non-existent index should error
1981        assert!(manager.update_index_metadata("nope", |_| {}).is_err());
1982
1983        Ok(())
1984    }
1985
1986    /// `add_index` is upsert-by-name (issue rustic-ai/uni-db#63). Repeat
1987    /// invocations with the same `IndexDefinition::name()` must replace
1988    /// the entry in place rather than appending. Subsequent `add_index`
1989    /// calls also reflect metadata updates from the new definition.
1990    #[tokio::test]
1991    async fn test_add_index_is_upsert_by_name() -> Result<()> {
1992        let dir = tempdir()?;
1993        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1994        let path = ObjectStorePath::from("schema.json");
1995        let manager = SchemaManager::load_from_store(store, &path).await?;
1996        manager.add_label("Person")?;
1997
1998        let initial = IndexDefinition::Scalar(ScalarIndexConfig {
1999            name: "idx_test".to_string(),
2000            label: "Person".to_string(),
2001            properties: vec!["name".to_string()],
2002            index_type: ScalarIndexType::BTree,
2003            where_clause: None,
2004            metadata: IndexMetadata {
2005                status: IndexStatus::Building,
2006                ..Default::default()
2007            },
2008        });
2009        manager.add_index(initial.clone())?;
2010        assert_eq!(manager.schema().indexes.len(), 1);
2011
2012        // Re-add the identical def — must remain a single entry.
2013        manager.add_index(initial.clone())?;
2014        assert_eq!(
2015            manager.schema().indexes.len(),
2016            1,
2017            "duplicate add_index by name must not append"
2018        );
2019
2020        // Re-add with updated metadata — must replace in place, len unchanged.
2021        let mut updated_cfg = match initial {
2022            IndexDefinition::Scalar(c) => c,
2023            _ => unreachable!(),
2024        };
2025        updated_cfg.metadata.status = IndexStatus::Online;
2026        updated_cfg.metadata.row_count_at_build = Some(42);
2027        manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
2028        assert_eq!(manager.schema().indexes.len(), 1);
2029        let stored = manager.get_index("idx_test").unwrap();
2030        assert_eq!(stored.metadata().status, IndexStatus::Online);
2031        assert_eq!(stored.metadata().row_count_at_build, Some(42));
2032
2033        // A *different* name appends as a new entry.
2034        let other = IndexDefinition::Scalar(ScalarIndexConfig {
2035            name: "idx_other".to_string(),
2036            label: "Person".to_string(),
2037            properties: vec!["age".to_string()],
2038            index_type: ScalarIndexType::BTree,
2039            where_clause: None,
2040            metadata: IndexMetadata::default(),
2041        });
2042        manager.add_index(other)?;
2043        assert_eq!(manager.schema().indexes.len(), 2);
2044
2045        Ok(())
2046    }
2047
2048    /// `load_from_store` self-heals catalogs that were bloated by the
2049    /// pre-fix `add_index` (kept the *last* def per name).
2050    #[tokio::test]
2051    async fn test_load_dedups_bloated_indexes() -> Result<()> {
2052        let dir = tempdir()?;
2053        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2054        let path = ObjectStorePath::from("schema.json");
2055
2056        // Seed disk with a hand-crafted bloated schema: 50 entries, all
2057        // sharing the same name. The last entry has distinct metadata so
2058        // we can assert "last writer wins" semantics.
2059        let mut schema = Schema::default();
2060        schema.labels.insert(
2061            "Person".to_string(),
2062            LabelMeta {
2063                id: 1,
2064                created_at: chrono::Utc::now(),
2065                state: SchemaElementState::Active,
2066                description: None,
2067            },
2068        );
2069        let make = |status: IndexStatus, count: Option<u64>| {
2070            IndexDefinition::Scalar(ScalarIndexConfig {
2071                name: "idx_dup".to_string(),
2072                label: "Person".to_string(),
2073                properties: vec!["name".to_string()],
2074                index_type: ScalarIndexType::BTree,
2075                where_clause: None,
2076                metadata: IndexMetadata {
2077                    status,
2078                    row_count_at_build: count,
2079                    ..Default::default()
2080                },
2081            })
2082        };
2083        for _ in 0..49 {
2084            schema.indexes.push(make(IndexStatus::Building, None));
2085        }
2086        schema.indexes.push(make(IndexStatus::Online, Some(123)));
2087        let json = serde_json::to_string_pretty(&schema)?;
2088        store.put(&path, json.into()).await?;
2089
2090        let manager = SchemaManager::load_from_store(store, &path).await?;
2091        let schema = manager.schema();
2092        assert_eq!(
2093            schema.indexes.len(),
2094            1,
2095            "load() must collapse 50 duplicates by name to 1"
2096        );
2097        // Last-writer-wins: the kept entry is the final push (Online, 123).
2098        assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
2099        assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
2100
2101        Ok(())
2102    }
2103
2104    #[test]
2105    fn test_vector_index_for_property_skips_non_online() {
2106        let mut schema = Schema::default();
2107        schema.labels.insert(
2108            "Document".to_string(),
2109            LabelMeta {
2110                id: 1,
2111                created_at: chrono::Utc::now(),
2112                state: SchemaElementState::Active,
2113                description: None,
2114            },
2115        );
2116
2117        // Add a vector index with Stale status
2118        schema
2119            .indexes
2120            .push(IndexDefinition::Vector(VectorIndexConfig {
2121                name: "vec_doc_embedding".to_string(),
2122                label: "Document".to_string(),
2123                property: "embedding".to_string(),
2124                index_type: VectorIndexType::Flat,
2125                metric: DistanceMetric::Cosine,
2126                embedding_config: None,
2127                metadata: IndexMetadata {
2128                    status: IndexStatus::Stale,
2129                    ..Default::default()
2130                },
2131            }));
2132
2133        // Stale index should NOT be returned
2134        assert!(
2135            schema
2136                .vector_index_for_property("Document", "embedding")
2137                .is_none()
2138        );
2139
2140        // Set to Online — should now be returned
2141        if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
2142            cfg.metadata.status = IndexStatus::Online;
2143        }
2144        let result = schema.vector_index_for_property("Document", "embedding");
2145        assert!(result.is_some());
2146        assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
2147    }
2148
2149    #[tokio::test]
2150    async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
2151        use crate::core::fork::SchemaDelta;
2152
2153        let dir = tempdir()?;
2154        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2155        let path = ObjectStorePath::from("schema.json");
2156        let primary = SchemaManager::load_from_store(store, &path).await?;
2157        primary.add_label("Person")?;
2158
2159        let overlay = primary.with_overlay(&SchemaDelta::empty());
2160        assert_eq!(overlay.schema().labels.len(), 1);
2161
2162        // Phase 1 invariant: mutating the overlay manager must not bleed
2163        // into primary's schema.
2164        overlay.add_label("Forked")?;
2165        assert!(overlay.schema().labels.contains_key("Forked"));
2166        assert!(!primary.schema().labels.contains_key("Forked"));
2167
2168        Ok(())
2169    }
2170
2171    #[tokio::test]
2172    async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
2173        use crate::core::fork::SchemaDelta;
2174        use chrono::Utc;
2175
2176        let dir = tempdir()?;
2177        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2178        let path = ObjectStorePath::from("schema.json");
2179        let primary = SchemaManager::load_from_store(store, &path).await?;
2180        primary.add_label("Existing")?;
2181
2182        let label_meta = LabelMeta {
2183            id: 99,
2184            created_at: Utc::now(),
2185            state: SchemaElementState::Active,
2186            description: None,
2187        };
2188        let edge_meta = EdgeTypeMeta {
2189            id: 99,
2190            src_labels: vec!["NewLabel".into()],
2191            dst_labels: vec!["NewLabel".into()],
2192            state: SchemaElementState::Active,
2193            description: None,
2194        };
2195        let delta = SchemaDelta {
2196            added_labels: vec![("NewLabel".to_string(), label_meta)],
2197            added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
2198            added_properties: vec![],
2199        };
2200
2201        let overlay = primary.with_overlay(&delta);
2202        let merged = overlay.schema();
2203        assert!(merged.labels.contains_key("Existing"));
2204        assert!(merged.labels.contains_key("NewLabel"));
2205        assert!(merged.edge_types.contains_key("NewEdge"));
2206
2207        // Primary unchanged.
2208        assert!(!primary.schema().labels.contains_key("NewLabel"));
2209        Ok(())
2210    }
2211
2212    /// N threads racing `get_or_assign_edge_type_id` for the same new name
2213    /// must converge on a single id (the read-lock fast path double-checks
2214    /// under the write lock); a schema-defined type must win over the
2215    /// schemaless registry.
2216    #[tokio::test]
2217    async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
2218        let dir = tempdir()?;
2219        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2220        let path = ObjectStorePath::from("schema.json");
2221        let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
2222
2223        let mut handles = Vec::new();
2224        for _ in 0..16 {
2225            let m = manager.clone();
2226            handles.push(std::thread::spawn(move || {
2227                m.get_or_assign_edge_type_id("RACED")
2228            }));
2229        }
2230        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2231        assert!(
2232            ids.iter().all(|&id| id == ids[0]),
2233            "all racers must observe one id, got {ids:?}"
2234        );
2235        // Fast path returns the same id afterwards.
2236        assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
2237
2238        // Schema-defined type wins over the schemaless registry.
2239        manager.add_label("A")?;
2240        let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
2241        assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
2242        Ok(())
2243    }
2244
2245    /// Minting a brand-new schemaless edge type must bump `schema_version`
2246    /// (the plan cache keys on it; untyped traversals bake `all_edge_type_ids()`
2247    /// into the plan, so a stale plan would silently drop edges of the new
2248    /// type). Re-resolving an existing type must NOT bump. (review C5)
2249    #[test]
2250    fn test_new_schemaless_edge_type_bumps_schema_version() {
2251        let mut schema = Schema::default();
2252        let v0 = schema.schema_version;
2253
2254        let id1 = schema.get_or_assign_edge_type_id("FRESH");
2255        assert_eq!(
2256            schema.schema_version,
2257            v0.wrapping_add(1),
2258            "minting a new edge type must bump schema_version"
2259        );
2260
2261        // Re-resolving the same type is a no-op — no further bump.
2262        let id1_again = schema.get_or_assign_edge_type_id("FRESH");
2263        assert_eq!(id1, id1_again);
2264        assert_eq!(
2265            schema.schema_version,
2266            v0.wrapping_add(1),
2267            "resolving an existing edge type must not bump schema_version"
2268        );
2269
2270        // A second distinct new type bumps again.
2271        let _id2 = schema.get_or_assign_edge_type_id("OTHER");
2272        assert_eq!(
2273            schema.schema_version,
2274            v0.wrapping_add(2),
2275            "a second new edge type must bump schema_version again"
2276        );
2277    }
2278}