Skip to main content

grafeo_engine/catalog/
mod.rs

1//! Schema metadata - what labels, properties, and indexes exist.
2//!
3//! The catalog is the "dictionary" of your database. When you write `(:Person)`,
4//! the catalog maps "Person" to an internal LabelId. This indirection keeps
5//! storage compact while names stay readable.
6//!
7//! | What it tracks | Why it matters |
8//! | -------------- | -------------- |
9//! | Labels | Maps "Person" → LabelId for efficient storage |
10//! | Property keys | Maps "name" → PropertyKeyId |
11//! | Edge types | Maps "KNOWS" → EdgeTypeId |
12//! | Indexes | Which properties are indexed for fast lookups |
13
14mod check_eval;
15
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19
20use parking_lot::{Mutex, RwLock};
21
22use grafeo_common::collections::{GrafeoConcurrentMap, grafeo_concurrent_map};
23use grafeo_common::types::{EdgeTypeId, IndexId, LabelId, PropertyKeyId, Value};
24
25/// The database's schema dictionary - maps names to compact internal IDs.
26///
27/// You rarely interact with this directly. The query processor uses it to
28/// resolve names like "Person" and "name" to internal IDs.
29pub struct Catalog {
30    /// Label name-to-ID mappings.
31    labels: LabelCatalog,
32    /// Property key name-to-ID mappings.
33    property_keys: PropertyCatalog,
34    /// Edge type name-to-ID mappings.
35    edge_types: EdgeTypeCatalog,
36    /// Index definitions.
37    indexes: IndexCatalog,
38    /// Optional schema constraints.
39    schema: Option<SchemaCatalog>,
40}
41
42impl Catalog {
43    /// Creates a new empty catalog with schema support enabled.
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            labels: LabelCatalog::new(),
48            property_keys: PropertyCatalog::new(),
49            edge_types: EdgeTypeCatalog::new(),
50            indexes: IndexCatalog::new(),
51            schema: Some(SchemaCatalog::new()),
52        }
53    }
54
55    /// Creates a new catalog with schema constraints enabled.
56    ///
57    /// This is now equivalent to `new()` since schema is always enabled.
58    #[must_use]
59    pub fn with_schema() -> Self {
60        Self::new()
61    }
62
63    // === Label Operations ===
64
65    /// Gets or creates a label ID for the given label name.
66    pub fn get_or_create_label(&self, name: &str) -> LabelId {
67        self.labels.get_or_create(name)
68    }
69
70    /// Gets the label ID for a label name, if it exists.
71    #[must_use]
72    pub fn get_label_id(&self, name: &str) -> Option<LabelId> {
73        self.labels.get_id(name)
74    }
75
76    /// Gets the label name for a label ID, if it exists.
77    #[must_use]
78    pub fn get_label_name(&self, id: LabelId) -> Option<Arc<str>> {
79        self.labels.get_name(id)
80    }
81
82    /// Returns the number of distinct labels.
83    #[must_use]
84    pub fn label_count(&self) -> usize {
85        self.labels.count()
86    }
87
88    /// Returns all label names.
89    #[must_use]
90    pub fn all_labels(&self) -> Vec<Arc<str>> {
91        self.labels.all_names()
92    }
93
94    // === Property Key Operations ===
95
96    /// Gets or creates a property key ID for the given property key name.
97    pub fn get_or_create_property_key(&self, name: &str) -> PropertyKeyId {
98        self.property_keys.get_or_create(name)
99    }
100
101    /// Gets the property key ID for a property key name, if it exists.
102    #[must_use]
103    pub fn get_property_key_id(&self, name: &str) -> Option<PropertyKeyId> {
104        self.property_keys.get_id(name)
105    }
106
107    /// Gets the property key name for a property key ID, if it exists.
108    #[must_use]
109    pub fn get_property_key_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
110        self.property_keys.get_name(id)
111    }
112
113    /// Returns the number of distinct property keys.
114    #[must_use]
115    pub fn property_key_count(&self) -> usize {
116        self.property_keys.count()
117    }
118
119    /// Returns all property key names.
120    #[must_use]
121    pub fn all_property_keys(&self) -> Vec<Arc<str>> {
122        self.property_keys.all_names()
123    }
124
125    // === Edge Type Operations ===
126
127    /// Gets or creates an edge type ID for the given edge type name.
128    pub fn get_or_create_edge_type(&self, name: &str) -> EdgeTypeId {
129        self.edge_types.get_or_create(name)
130    }
131
132    /// Gets the edge type ID for an edge type name, if it exists.
133    #[must_use]
134    pub fn get_edge_type_id(&self, name: &str) -> Option<EdgeTypeId> {
135        self.edge_types.get_id(name)
136    }
137
138    /// Gets the edge type name for an edge type ID, if it exists.
139    #[must_use]
140    pub fn get_edge_type_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
141        self.edge_types.get_name(id)
142    }
143
144    /// Returns the number of distinct edge types.
145    #[must_use]
146    pub fn edge_type_count(&self) -> usize {
147        self.edge_types.count()
148    }
149
150    /// Returns all edge type names.
151    #[must_use]
152    pub fn all_edge_types(&self) -> Vec<Arc<str>> {
153        self.edge_types.all_names()
154    }
155
156    // === Index Operations ===
157
158    /// Creates a new index on a label and property key.
159    pub fn create_index(
160        &self,
161        label: LabelId,
162        property_key: PropertyKeyId,
163        index_type: IndexType,
164    ) -> IndexId {
165        self.indexes.create(label, property_key, index_type)
166    }
167
168    /// Drops an index by ID.
169    pub fn drop_index(&self, id: IndexId) -> bool {
170        self.indexes.drop(id)
171    }
172
173    /// Gets the index definition for an index ID.
174    #[must_use]
175    pub fn get_index(&self, id: IndexId) -> Option<IndexDefinition> {
176        self.indexes.get(id)
177    }
178
179    /// Finds indexes for a given label.
180    #[must_use]
181    pub fn indexes_for_label(&self, label: LabelId) -> Vec<IndexId> {
182        self.indexes.for_label(label)
183    }
184
185    /// Finds indexes for a given label and property key.
186    #[must_use]
187    pub fn indexes_for_label_property(
188        &self,
189        label: LabelId,
190        property_key: PropertyKeyId,
191    ) -> Vec<IndexId> {
192        self.indexes.for_label_property(label, property_key)
193    }
194
195    /// Returns all index definitions.
196    #[must_use]
197    pub fn all_indexes(&self) -> Vec<IndexDefinition> {
198        self.indexes.all()
199    }
200
201    /// Returns the number of indexes.
202    #[must_use]
203    pub fn index_count(&self) -> usize {
204        self.indexes.count()
205    }
206
207    // === Schema Operations ===
208
209    /// Returns whether schema constraints are enabled.
210    #[must_use]
211    pub fn has_schema(&self) -> bool {
212        self.schema.is_some()
213    }
214
215    /// Adds a uniqueness constraint.
216    ///
217    /// Returns an error if schema is not enabled or constraint already exists.
218    pub fn add_unique_constraint(
219        &self,
220        label: LabelId,
221        property_key: PropertyKeyId,
222    ) -> Result<(), CatalogError> {
223        match &self.schema {
224            Some(schema) => schema.add_unique_constraint(label, property_key),
225            None => Err(CatalogError::SchemaNotEnabled),
226        }
227    }
228
229    /// Adds a required property constraint (NOT NULL).
230    ///
231    /// Returns an error if schema is not enabled or constraint already exists.
232    pub fn add_required_property(
233        &self,
234        label: LabelId,
235        property_key: PropertyKeyId,
236    ) -> Result<(), CatalogError> {
237        match &self.schema {
238            Some(schema) => schema.add_required_property(label, property_key),
239            None => Err(CatalogError::SchemaNotEnabled),
240        }
241    }
242
243    /// Checks if a property is required for a label.
244    #[must_use]
245    pub fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
246        self.schema
247            .as_ref()
248            .is_some_and(|s| s.is_property_required(label, property_key))
249    }
250
251    /// Checks if a property must be unique for a label.
252    #[must_use]
253    pub fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
254        self.schema
255            .as_ref()
256            .is_some_and(|s| s.is_property_unique(label, property_key))
257    }
258
259    // === Type Definition Operations ===
260
261    /// Returns a reference to the schema catalog.
262    #[must_use]
263    pub fn schema(&self) -> Option<&SchemaCatalog> {
264        self.schema.as_ref()
265    }
266
267    /// Registers a node type definition.
268    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
269        match &self.schema {
270            Some(schema) => schema.register_node_type(def),
271            None => Err(CatalogError::SchemaNotEnabled),
272        }
273    }
274
275    /// Registers or replaces a node type definition.
276    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
277        if let Some(schema) = &self.schema {
278            schema.register_or_replace_node_type(def);
279        }
280    }
281
282    /// Drops a node type definition.
283    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
284        match &self.schema {
285            Some(schema) => schema.drop_node_type(name),
286            None => Err(CatalogError::SchemaNotEnabled),
287        }
288    }
289
290    /// Gets a node type definition by name.
291    #[must_use]
292    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
293        self.schema.as_ref().and_then(|s| s.get_node_type(name))
294    }
295
296    /// Gets a resolved node type with inherited properties from parents.
297    #[must_use]
298    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
299        self.schema
300            .as_ref()
301            .and_then(|s| s.resolved_node_type(name))
302    }
303
304    /// Returns all registered node type names.
305    #[must_use]
306    pub fn all_node_type_names(&self) -> Vec<String> {
307        self.schema
308            .as_ref()
309            .map(SchemaCatalog::all_node_types)
310            .unwrap_or_default()
311    }
312
313    /// Returns all registered edge type definition names.
314    #[must_use]
315    pub fn all_edge_type_names(&self) -> Vec<String> {
316        self.schema
317            .as_ref()
318            .map(SchemaCatalog::all_edge_types)
319            .unwrap_or_default()
320    }
321
322    /// Registers an edge type definition.
323    pub fn register_edge_type_def(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
324        match &self.schema {
325            Some(schema) => schema.register_edge_type(def),
326            None => Err(CatalogError::SchemaNotEnabled),
327        }
328    }
329
330    /// Registers or replaces an edge type definition.
331    pub fn register_or_replace_edge_type_def(&self, def: EdgeTypeDefinition) {
332        if let Some(schema) = &self.schema {
333            schema.register_or_replace_edge_type(def);
334        }
335    }
336
337    /// Drops an edge type definition.
338    pub fn drop_edge_type_def(&self, name: &str) -> Result<(), CatalogError> {
339        match &self.schema {
340            Some(schema) => schema.drop_edge_type(name),
341            None => Err(CatalogError::SchemaNotEnabled),
342        }
343    }
344
345    /// Gets an edge type definition by name.
346    #[must_use]
347    pub fn get_edge_type_def(&self, name: &str) -> Option<EdgeTypeDefinition> {
348        self.schema.as_ref().and_then(|s| s.get_edge_type(name))
349    }
350
351    /// Registers a graph type definition.
352    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
353        match &self.schema {
354            Some(schema) => schema.register_graph_type(def),
355            None => Err(CatalogError::SchemaNotEnabled),
356        }
357    }
358
359    /// Drops a graph type definition.
360    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
361        match &self.schema {
362            Some(schema) => schema.drop_graph_type(name),
363            None => Err(CatalogError::SchemaNotEnabled),
364        }
365    }
366
367    /// Returns all registered graph type names.
368    #[must_use]
369    pub fn all_graph_type_names(&self) -> Vec<String> {
370        self.schema
371            .as_ref()
372            .map(SchemaCatalog::all_graph_types)
373            .unwrap_or_default()
374    }
375
376    /// Gets a graph type definition by name.
377    #[must_use]
378    pub fn get_graph_type_def(&self, name: &str) -> Option<GraphTypeDefinition> {
379        self.schema.as_ref().and_then(|s| s.get_graph_type(name))
380    }
381
382    /// Registers a schema namespace.
383    pub fn register_schema_namespace(&self, name: String) -> Result<(), CatalogError> {
384        match &self.schema {
385            Some(schema) => schema.register_schema(name),
386            None => Err(CatalogError::SchemaNotEnabled),
387        }
388    }
389
390    /// Drops a schema namespace.
391    pub fn drop_schema_namespace(&self, name: &str) -> Result<(), CatalogError> {
392        match &self.schema {
393            Some(schema) => schema.drop_schema(name),
394            None => Err(CatalogError::SchemaNotEnabled),
395        }
396    }
397
398    /// Adds a constraint to an existing node type, creating a minimal type if needed.
399    pub fn add_constraint_to_type(
400        &self,
401        label: &str,
402        constraint: TypeConstraint,
403    ) -> Result<(), CatalogError> {
404        match &self.schema {
405            Some(schema) => schema.add_constraint_to_type(label, constraint),
406            None => Err(CatalogError::SchemaNotEnabled),
407        }
408    }
409
410    /// Adds a property to a node type.
411    pub fn alter_node_type_add_property(
412        &self,
413        type_name: &str,
414        property: TypedProperty,
415    ) -> Result<(), CatalogError> {
416        match &self.schema {
417            Some(schema) => schema.alter_node_type_add_property(type_name, property),
418            None => Err(CatalogError::SchemaNotEnabled),
419        }
420    }
421
422    /// Drops a property from a node type.
423    pub fn alter_node_type_drop_property(
424        &self,
425        type_name: &str,
426        property_name: &str,
427    ) -> Result<(), CatalogError> {
428        match &self.schema {
429            Some(schema) => schema.alter_node_type_drop_property(type_name, property_name),
430            None => Err(CatalogError::SchemaNotEnabled),
431        }
432    }
433
434    /// Adds a property to an edge type.
435    pub fn alter_edge_type_add_property(
436        &self,
437        type_name: &str,
438        property: TypedProperty,
439    ) -> Result<(), CatalogError> {
440        match &self.schema {
441            Some(schema) => schema.alter_edge_type_add_property(type_name, property),
442            None => Err(CatalogError::SchemaNotEnabled),
443        }
444    }
445
446    /// Drops a property from an edge type.
447    pub fn alter_edge_type_drop_property(
448        &self,
449        type_name: &str,
450        property_name: &str,
451    ) -> Result<(), CatalogError> {
452        match &self.schema {
453            Some(schema) => schema.alter_edge_type_drop_property(type_name, property_name),
454            None => Err(CatalogError::SchemaNotEnabled),
455        }
456    }
457
458    /// Adds a node type to a graph type.
459    pub fn alter_graph_type_add_node_type(
460        &self,
461        graph_type_name: &str,
462        node_type: String,
463    ) -> Result<(), CatalogError> {
464        match &self.schema {
465            Some(schema) => schema.alter_graph_type_add_node_type(graph_type_name, node_type),
466            None => Err(CatalogError::SchemaNotEnabled),
467        }
468    }
469
470    /// Drops a node type from a graph type.
471    pub fn alter_graph_type_drop_node_type(
472        &self,
473        graph_type_name: &str,
474        node_type: &str,
475    ) -> Result<(), CatalogError> {
476        match &self.schema {
477            Some(schema) => schema.alter_graph_type_drop_node_type(graph_type_name, node_type),
478            None => Err(CatalogError::SchemaNotEnabled),
479        }
480    }
481
482    /// Adds an edge type to a graph type.
483    pub fn alter_graph_type_add_edge_type(
484        &self,
485        graph_type_name: &str,
486        edge_type: String,
487    ) -> Result<(), CatalogError> {
488        match &self.schema {
489            Some(schema) => schema.alter_graph_type_add_edge_type(graph_type_name, edge_type),
490            None => Err(CatalogError::SchemaNotEnabled),
491        }
492    }
493
494    /// Drops an edge type from a graph type.
495    pub fn alter_graph_type_drop_edge_type(
496        &self,
497        graph_type_name: &str,
498        edge_type: &str,
499    ) -> Result<(), CatalogError> {
500        match &self.schema {
501            Some(schema) => schema.alter_graph_type_drop_edge_type(graph_type_name, edge_type),
502            None => Err(CatalogError::SchemaNotEnabled),
503        }
504    }
505
506    /// Binds a graph instance to a graph type.
507    pub fn bind_graph_type(
508        &self,
509        graph_name: &str,
510        graph_type: String,
511    ) -> Result<(), CatalogError> {
512        match &self.schema {
513            Some(schema) => {
514                // Verify the graph type exists
515                if schema.get_graph_type(&graph_type).is_none() {
516                    return Err(CatalogError::TypeNotFound(graph_type));
517                }
518                schema
519                    .graph_type_bindings
520                    .write()
521                    .insert(graph_name.to_string(), graph_type);
522                Ok(())
523            }
524            None => Err(CatalogError::SchemaNotEnabled),
525        }
526    }
527
528    /// Gets the graph type binding for a graph instance.
529    pub fn get_graph_type_binding(&self, graph_name: &str) -> Option<String> {
530        self.schema
531            .as_ref()?
532            .graph_type_bindings
533            .read()
534            .get(graph_name)
535            .cloned()
536    }
537
538    /// Registers a stored procedure.
539    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
540        match &self.schema {
541            Some(schema) => schema.register_procedure(def),
542            None => Err(CatalogError::SchemaNotEnabled),
543        }
544    }
545
546    /// Replaces or creates a stored procedure.
547    pub fn replace_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
548        match &self.schema {
549            Some(schema) => {
550                schema.replace_procedure(def);
551                Ok(())
552            }
553            None => Err(CatalogError::SchemaNotEnabled),
554        }
555    }
556
557    /// Drops a stored procedure.
558    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
559        match &self.schema {
560            Some(schema) => schema.drop_procedure(name),
561            None => Err(CatalogError::SchemaNotEnabled),
562        }
563    }
564
565    /// Gets a stored procedure by name.
566    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
567        self.schema.as_ref()?.get_procedure(name)
568    }
569}
570
571impl Default for Catalog {
572    fn default() -> Self {
573        Self::new()
574    }
575}
576
577// === Label Catalog ===
578
579/// Bidirectional mapping between label names and IDs.
580///
581/// Uses `DashMap` (shard-level locking) for `name_to_id` so concurrent
582/// readers never block each other. A separate `Mutex` serializes the rare
583/// create path to keep `id_to_name` consistent.
584struct LabelCatalog {
585    name_to_id: GrafeoConcurrentMap<Arc<str>, LabelId>,
586    id_to_name: RwLock<Vec<Arc<str>>>,
587    next_id: AtomicU32,
588    create_lock: Mutex<()>,
589}
590
591impl LabelCatalog {
592    fn new() -> Self {
593        Self {
594            name_to_id: grafeo_concurrent_map(),
595            id_to_name: RwLock::new(Vec::new()),
596            next_id: AtomicU32::new(0),
597            create_lock: Mutex::new(()),
598        }
599    }
600
601    fn get_or_create(&self, name: &str) -> LabelId {
602        // Fast path: shard-level read (no global lock)
603        if let Some(id) = self.name_to_id.get(name) {
604            return *id;
605        }
606
607        // Slow path: serialize creates to keep id_to_name consistent
608        let _guard = self.create_lock.lock();
609        if let Some(id) = self.name_to_id.get(name) {
610            return *id;
611        }
612
613        let id = LabelId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
614        let name: Arc<str> = name.into();
615        self.id_to_name.write().push(Arc::clone(&name));
616        self.name_to_id.insert(name, id);
617        id
618    }
619
620    fn get_id(&self, name: &str) -> Option<LabelId> {
621        self.name_to_id.get(name).map(|r| *r)
622    }
623
624    fn get_name(&self, id: LabelId) -> Option<Arc<str>> {
625        self.id_to_name.read().get(id.as_u32() as usize).cloned()
626    }
627
628    fn count(&self) -> usize {
629        self.id_to_name.read().len()
630    }
631
632    fn all_names(&self) -> Vec<Arc<str>> {
633        self.id_to_name.read().clone()
634    }
635}
636
637// === Property Catalog ===
638
639/// Bidirectional mapping between property key names and IDs.
640struct PropertyCatalog {
641    name_to_id: GrafeoConcurrentMap<Arc<str>, PropertyKeyId>,
642    id_to_name: RwLock<Vec<Arc<str>>>,
643    next_id: AtomicU32,
644    create_lock: Mutex<()>,
645}
646
647impl PropertyCatalog {
648    fn new() -> Self {
649        Self {
650            name_to_id: grafeo_concurrent_map(),
651            id_to_name: RwLock::new(Vec::new()),
652            next_id: AtomicU32::new(0),
653            create_lock: Mutex::new(()),
654        }
655    }
656
657    fn get_or_create(&self, name: &str) -> PropertyKeyId {
658        // Fast path: shard-level read (no global lock)
659        if let Some(id) = self.name_to_id.get(name) {
660            return *id;
661        }
662
663        // Slow path: serialize creates to keep id_to_name consistent
664        let _guard = self.create_lock.lock();
665        if let Some(id) = self.name_to_id.get(name) {
666            return *id;
667        }
668
669        let id = PropertyKeyId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
670        let name: Arc<str> = name.into();
671        self.id_to_name.write().push(Arc::clone(&name));
672        self.name_to_id.insert(name, id);
673        id
674    }
675
676    fn get_id(&self, name: &str) -> Option<PropertyKeyId> {
677        self.name_to_id.get(name).map(|r| *r)
678    }
679
680    fn get_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
681        self.id_to_name.read().get(id.as_u32() as usize).cloned()
682    }
683
684    fn count(&self) -> usize {
685        self.id_to_name.read().len()
686    }
687
688    fn all_names(&self) -> Vec<Arc<str>> {
689        self.id_to_name.read().clone()
690    }
691}
692
693// === Edge Type Catalog ===
694
695/// Bidirectional mapping between edge type names and IDs.
696struct EdgeTypeCatalog {
697    name_to_id: GrafeoConcurrentMap<Arc<str>, EdgeTypeId>,
698    id_to_name: RwLock<Vec<Arc<str>>>,
699    next_id: AtomicU32,
700    create_lock: Mutex<()>,
701}
702
703impl EdgeTypeCatalog {
704    fn new() -> Self {
705        Self {
706            name_to_id: grafeo_concurrent_map(),
707            id_to_name: RwLock::new(Vec::new()),
708            next_id: AtomicU32::new(0),
709            create_lock: Mutex::new(()),
710        }
711    }
712
713    fn get_or_create(&self, name: &str) -> EdgeTypeId {
714        // Fast path: shard-level read (no global lock)
715        if let Some(id) = self.name_to_id.get(name) {
716            return *id;
717        }
718
719        // Slow path: serialize creates to keep id_to_name consistent
720        let _guard = self.create_lock.lock();
721        if let Some(id) = self.name_to_id.get(name) {
722            return *id;
723        }
724
725        let id = EdgeTypeId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
726        let name: Arc<str> = name.into();
727        self.id_to_name.write().push(Arc::clone(&name));
728        self.name_to_id.insert(name, id);
729        id
730    }
731
732    fn get_id(&self, name: &str) -> Option<EdgeTypeId> {
733        self.name_to_id.get(name).map(|r| *r)
734    }
735
736    fn get_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
737        self.id_to_name.read().get(id.as_u32() as usize).cloned()
738    }
739
740    fn count(&self) -> usize {
741        self.id_to_name.read().len()
742    }
743
744    fn all_names(&self) -> Vec<Arc<str>> {
745        self.id_to_name.read().clone()
746    }
747}
748
749// === Index Catalog ===
750
751/// Type of index.
752#[derive(Debug, Clone, Copy, PartialEq, Eq)]
753pub enum IndexType {
754    /// Hash index for equality lookups.
755    Hash,
756    /// BTree index for range queries.
757    BTree,
758    /// Full-text index for text search.
759    FullText,
760}
761
762/// Index definition.
763#[derive(Debug, Clone)]
764pub struct IndexDefinition {
765    /// The index ID.
766    pub id: IndexId,
767    /// The label this index applies to.
768    pub label: LabelId,
769    /// The property key being indexed.
770    pub property_key: PropertyKeyId,
771    /// The type of index.
772    pub index_type: IndexType,
773}
774
775/// Manages index definitions.
776struct IndexCatalog {
777    indexes: RwLock<HashMap<IndexId, IndexDefinition>>,
778    label_indexes: RwLock<HashMap<LabelId, Vec<IndexId>>>,
779    label_property_indexes: RwLock<HashMap<(LabelId, PropertyKeyId), Vec<IndexId>>>,
780    next_id: AtomicU32,
781}
782
783impl IndexCatalog {
784    fn new() -> Self {
785        Self {
786            indexes: RwLock::new(HashMap::new()),
787            label_indexes: RwLock::new(HashMap::new()),
788            label_property_indexes: RwLock::new(HashMap::new()),
789            next_id: AtomicU32::new(0),
790        }
791    }
792
793    fn create(
794        &self,
795        label: LabelId,
796        property_key: PropertyKeyId,
797        index_type: IndexType,
798    ) -> IndexId {
799        let id = IndexId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
800        let definition = IndexDefinition {
801            id,
802            label,
803            property_key,
804            index_type,
805        };
806
807        let mut indexes = self.indexes.write();
808        let mut label_indexes = self.label_indexes.write();
809        let mut label_property_indexes = self.label_property_indexes.write();
810
811        indexes.insert(id, definition);
812        label_indexes.entry(label).or_default().push(id);
813        label_property_indexes
814            .entry((label, property_key))
815            .or_default()
816            .push(id);
817
818        id
819    }
820
821    fn drop(&self, id: IndexId) -> bool {
822        let mut indexes = self.indexes.write();
823        let mut label_indexes = self.label_indexes.write();
824        let mut label_property_indexes = self.label_property_indexes.write();
825
826        if let Some(definition) = indexes.remove(&id) {
827            // Remove from label index
828            if let Some(ids) = label_indexes.get_mut(&definition.label) {
829                ids.retain(|&i| i != id);
830            }
831            // Remove from label-property index
832            if let Some(ids) =
833                label_property_indexes.get_mut(&(definition.label, definition.property_key))
834            {
835                ids.retain(|&i| i != id);
836            }
837            true
838        } else {
839            false
840        }
841    }
842
843    fn get(&self, id: IndexId) -> Option<IndexDefinition> {
844        self.indexes.read().get(&id).cloned()
845    }
846
847    fn for_label(&self, label: LabelId) -> Vec<IndexId> {
848        self.label_indexes
849            .read()
850            .get(&label)
851            .cloned()
852            .unwrap_or_default()
853    }
854
855    fn for_label_property(&self, label: LabelId, property_key: PropertyKeyId) -> Vec<IndexId> {
856        self.label_property_indexes
857            .read()
858            .get(&(label, property_key))
859            .cloned()
860            .unwrap_or_default()
861    }
862
863    fn count(&self) -> usize {
864        self.indexes.read().len()
865    }
866
867    fn all(&self) -> Vec<IndexDefinition> {
868        self.indexes.read().values().cloned().collect()
869    }
870}
871
872// === Type Definitions ===
873
874/// Data type for a typed property in a node or edge type definition.
875#[derive(Debug, Clone, PartialEq, Eq)]
876pub enum PropertyDataType {
877    /// UTF-8 string.
878    String,
879    /// 64-bit signed integer.
880    Int64,
881    /// 64-bit floating point.
882    Float64,
883    /// Boolean.
884    Bool,
885    /// Calendar date.
886    Date,
887    /// Time of day.
888    Time,
889    /// Timestamp (date + time).
890    Timestamp,
891    /// Duration / interval.
892    Duration,
893    /// Ordered list of values (untyped).
894    List,
895    /// Typed list: `LIST<element_type>` (ISO sec 4.16.9).
896    ListTyped(Box<PropertyDataType>),
897    /// Key-value map.
898    Map,
899    /// Raw bytes.
900    Bytes,
901    /// Node reference type (ISO sec 4.15.1).
902    Node,
903    /// Edge reference type (ISO sec 4.15.1).
904    Edge,
905    /// Any type (no enforcement).
906    Any,
907}
908
909impl PropertyDataType {
910    /// Parses a type name string (case-insensitive) into a `PropertyDataType`.
911    #[must_use]
912    pub fn from_type_name(name: &str) -> Self {
913        let upper = name.to_uppercase();
914        // Handle parameterized LIST<element_type>
915        if let Some(inner) = upper
916            .strip_prefix("LIST<")
917            .and_then(|s| s.strip_suffix('>'))
918        {
919            return Self::ListTyped(Box::new(Self::from_type_name(inner)));
920        }
921        match upper.as_str() {
922            "STRING" | "VARCHAR" | "TEXT" => Self::String,
923            "INT" | "INT64" | "INTEGER" | "BIGINT" => Self::Int64,
924            "FLOAT" | "FLOAT64" | "DOUBLE" | "REAL" => Self::Float64,
925            "BOOL" | "BOOLEAN" => Self::Bool,
926            "DATE" => Self::Date,
927            "TIME" => Self::Time,
928            "TIMESTAMP" | "DATETIME" => Self::Timestamp,
929            "DURATION" | "INTERVAL" => Self::Duration,
930            "LIST" | "ARRAY" => Self::List,
931            "MAP" | "RECORD" => Self::Map,
932            "BYTES" | "BINARY" | "BLOB" => Self::Bytes,
933            "NODE" => Self::Node,
934            "EDGE" | "RELATIONSHIP" => Self::Edge,
935            _ => Self::Any,
936        }
937    }
938
939    /// Checks whether a value conforms to this type.
940    #[must_use]
941    pub fn matches(&self, value: &Value) -> bool {
942        match (self, value) {
943            (Self::Any, _) | (_, Value::Null) => true,
944            (Self::String, Value::String(_)) => true,
945            (Self::Int64, Value::Int64(_)) => true,
946            (Self::Float64, Value::Float64(_)) => true,
947            (Self::Bool, Value::Bool(_)) => true,
948            (Self::Date, Value::Date(_)) => true,
949            (Self::Time, Value::Time(_)) => true,
950            (Self::Timestamp, Value::Timestamp(_)) => true,
951            (Self::Duration, Value::Duration(_)) => true,
952            (Self::List, Value::List(_)) => true,
953            (Self::ListTyped(elem_type), Value::List(items)) => {
954                items.iter().all(|item| elem_type.matches(item))
955            }
956            (Self::Bytes, Value::Bytes(_)) => true,
957            // Node/Edge reference types match Map values (graph elements are
958            // represented as maps with _id, _labels/_type, and properties)
959            (Self::Node | Self::Edge, Value::Map(_)) => true,
960            _ => false,
961        }
962    }
963}
964
965impl std::fmt::Display for PropertyDataType {
966    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
967        match self {
968            Self::String => write!(f, "STRING"),
969            Self::Int64 => write!(f, "INT64"),
970            Self::Float64 => write!(f, "FLOAT64"),
971            Self::Bool => write!(f, "BOOLEAN"),
972            Self::Date => write!(f, "DATE"),
973            Self::Time => write!(f, "TIME"),
974            Self::Timestamp => write!(f, "TIMESTAMP"),
975            Self::Duration => write!(f, "DURATION"),
976            Self::List => write!(f, "LIST"),
977            Self::ListTyped(elem) => write!(f, "LIST<{elem}>"),
978            Self::Map => write!(f, "MAP"),
979            Self::Bytes => write!(f, "BYTES"),
980            Self::Node => write!(f, "NODE"),
981            Self::Edge => write!(f, "EDGE"),
982            Self::Any => write!(f, "ANY"),
983        }
984    }
985}
986
987/// A typed property within a node or edge type definition.
988#[derive(Debug, Clone)]
989pub struct TypedProperty {
990    /// Property name.
991    pub name: String,
992    /// Expected data type.
993    pub data_type: PropertyDataType,
994    /// Whether NULL values are allowed.
995    pub nullable: bool,
996    /// Default value (used when property is not explicitly set).
997    pub default_value: Option<Value>,
998}
999
1000/// A constraint on a node or edge type.
1001#[derive(Debug, Clone)]
1002pub enum TypeConstraint {
1003    /// Primary key (implies UNIQUE + NOT NULL).
1004    PrimaryKey(Vec<String>),
1005    /// Uniqueness constraint on one or more properties.
1006    Unique(Vec<String>),
1007    /// NOT NULL constraint on a single property.
1008    NotNull(String),
1009    /// CHECK constraint with a named expression string.
1010    Check {
1011        /// Optional constraint name.
1012        name: Option<String>,
1013        /// Expression (stored as string for now).
1014        expression: String,
1015    },
1016}
1017
1018/// Definition of a node type (label schema).
1019#[derive(Debug, Clone)]
1020pub struct NodeTypeDefinition {
1021    /// Type name (corresponds to a label).
1022    pub name: String,
1023    /// Typed property definitions.
1024    pub properties: Vec<TypedProperty>,
1025    /// Type-level constraints.
1026    pub constraints: Vec<TypeConstraint>,
1027    /// Parent type names for inheritance (GQL `EXTENDS`).
1028    pub parent_types: Vec<String>,
1029}
1030
1031/// Definition of an edge type (relationship type schema).
1032#[derive(Debug, Clone)]
1033pub struct EdgeTypeDefinition {
1034    /// Type name (corresponds to an edge type / relationship type).
1035    pub name: String,
1036    /// Typed property definitions.
1037    pub properties: Vec<TypedProperty>,
1038    /// Type-level constraints.
1039    pub constraints: Vec<TypeConstraint>,
1040    /// Allowed source node types (empty = any).
1041    pub source_node_types: Vec<String>,
1042    /// Allowed target node types (empty = any).
1043    pub target_node_types: Vec<String>,
1044}
1045
1046/// Definition of a graph type (constrains which node/edge types a graph allows).
1047#[derive(Debug, Clone)]
1048pub struct GraphTypeDefinition {
1049    /// Graph type name.
1050    pub name: String,
1051    /// Allowed node types (empty = open).
1052    pub allowed_node_types: Vec<String>,
1053    /// Allowed edge types (empty = open).
1054    pub allowed_edge_types: Vec<String>,
1055    /// Whether unlisted types are permitted.
1056    pub open: bool,
1057}
1058
1059/// Definition of a stored procedure.
1060#[derive(Debug, Clone)]
1061pub struct ProcedureDefinition {
1062    /// Procedure name.
1063    pub name: String,
1064    /// Parameter definitions: (name, type).
1065    pub params: Vec<(String, String)>,
1066    /// Return column definitions: (name, type).
1067    pub returns: Vec<(String, String)>,
1068    /// Raw GQL query body.
1069    pub body: String,
1070}
1071
1072// === Schema Catalog ===
1073
1074/// Schema constraints and type definitions.
1075pub struct SchemaCatalog {
1076    /// Properties that must be unique for a given label.
1077    unique_constraints: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1078    /// Properties that are required (NOT NULL) for a given label.
1079    required_properties: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1080    /// Registered node type definitions.
1081    node_types: RwLock<HashMap<String, NodeTypeDefinition>>,
1082    /// Registered edge type definitions.
1083    edge_types: RwLock<HashMap<String, EdgeTypeDefinition>>,
1084    /// Registered graph type definitions.
1085    graph_types: RwLock<HashMap<String, GraphTypeDefinition>>,
1086    /// Schema namespaces.
1087    schemas: RwLock<Vec<String>>,
1088    /// Graph instance to graph type bindings.
1089    graph_type_bindings: RwLock<HashMap<String, String>>,
1090    /// Stored procedure definitions.
1091    procedures: RwLock<HashMap<String, ProcedureDefinition>>,
1092}
1093
1094impl SchemaCatalog {
1095    fn new() -> Self {
1096        Self {
1097            unique_constraints: RwLock::new(HashSet::new()),
1098            required_properties: RwLock::new(HashSet::new()),
1099            node_types: RwLock::new(HashMap::new()),
1100            edge_types: RwLock::new(HashMap::new()),
1101            graph_types: RwLock::new(HashMap::new()),
1102            schemas: RwLock::new(Vec::new()),
1103            graph_type_bindings: RwLock::new(HashMap::new()),
1104            procedures: RwLock::new(HashMap::new()),
1105        }
1106    }
1107
1108    // --- Node type operations ---
1109
1110    /// Registers a new node type definition.
1111    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
1112        let mut types = self.node_types.write();
1113        if types.contains_key(&def.name) {
1114            return Err(CatalogError::TypeAlreadyExists(def.name));
1115        }
1116        types.insert(def.name.clone(), def);
1117        Ok(())
1118    }
1119
1120    /// Registers or replaces a node type definition.
1121    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
1122        self.node_types.write().insert(def.name.clone(), def);
1123    }
1124
1125    /// Drops a node type definition by name.
1126    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
1127        let mut types = self.node_types.write();
1128        if types.remove(name).is_none() {
1129            return Err(CatalogError::TypeNotFound(name.to_string()));
1130        }
1131        Ok(())
1132    }
1133
1134    /// Gets a node type definition by name.
1135    #[must_use]
1136    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1137        self.node_types.read().get(name).cloned()
1138    }
1139
1140    /// Gets a resolved node type with inherited properties and constraints from parents.
1141    ///
1142    /// Walks the parent chain depth-first, collecting properties and constraints.
1143    /// Detects cycles via a visited set. Child properties override parent ones
1144    /// with the same name.
1145    #[must_use]
1146    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1147        let types = self.node_types.read();
1148        let base = types.get(name)?;
1149        if base.parent_types.is_empty() {
1150            return Some(base.clone());
1151        }
1152        let mut visited = HashSet::new();
1153        visited.insert(name.to_string());
1154        let mut all_properties = Vec::new();
1155        let mut all_constraints = Vec::new();
1156        Self::collect_inherited(
1157            &types,
1158            name,
1159            &mut visited,
1160            &mut all_properties,
1161            &mut all_constraints,
1162        );
1163        Some(NodeTypeDefinition {
1164            name: base.name.clone(),
1165            properties: all_properties,
1166            constraints: all_constraints,
1167            parent_types: base.parent_types.clone(),
1168        })
1169    }
1170
1171    /// Recursively collects properties and constraints from a type and its parents.
1172    fn collect_inherited(
1173        types: &HashMap<String, NodeTypeDefinition>,
1174        name: &str,
1175        visited: &mut HashSet<String>,
1176        properties: &mut Vec<TypedProperty>,
1177        constraints: &mut Vec<TypeConstraint>,
1178    ) {
1179        let Some(def) = types.get(name) else { return };
1180        // Walk parents first (depth-first) so child properties override
1181        for parent in &def.parent_types {
1182            if visited.insert(parent.clone()) {
1183                Self::collect_inherited(types, parent, visited, properties, constraints);
1184            }
1185        }
1186        // Add own properties, overriding parent ones with same name
1187        for prop in &def.properties {
1188            if let Some(pos) = properties.iter().position(|p| p.name == prop.name) {
1189                properties[pos] = prop.clone();
1190            } else {
1191                properties.push(prop.clone());
1192            }
1193        }
1194        // Append own constraints (no dedup, constraints are additive)
1195        constraints.extend(def.constraints.iter().cloned());
1196    }
1197
1198    /// Returns all registered node type names.
1199    #[must_use]
1200    pub fn all_node_types(&self) -> Vec<String> {
1201        self.node_types.read().keys().cloned().collect()
1202    }
1203
1204    // --- Edge type operations ---
1205
1206    /// Registers a new edge type definition.
1207    pub fn register_edge_type(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
1208        let mut types = self.edge_types.write();
1209        if types.contains_key(&def.name) {
1210            return Err(CatalogError::TypeAlreadyExists(def.name));
1211        }
1212        types.insert(def.name.clone(), def);
1213        Ok(())
1214    }
1215
1216    /// Registers or replaces an edge type definition.
1217    pub fn register_or_replace_edge_type(&self, def: EdgeTypeDefinition) {
1218        self.edge_types.write().insert(def.name.clone(), def);
1219    }
1220
1221    /// Drops an edge type definition by name.
1222    pub fn drop_edge_type(&self, name: &str) -> Result<(), CatalogError> {
1223        let mut types = self.edge_types.write();
1224        if types.remove(name).is_none() {
1225            return Err(CatalogError::TypeNotFound(name.to_string()));
1226        }
1227        Ok(())
1228    }
1229
1230    /// Gets an edge type definition by name.
1231    #[must_use]
1232    pub fn get_edge_type(&self, name: &str) -> Option<EdgeTypeDefinition> {
1233        self.edge_types.read().get(name).cloned()
1234    }
1235
1236    /// Returns all registered edge type names.
1237    #[must_use]
1238    pub fn all_edge_types(&self) -> Vec<String> {
1239        self.edge_types.read().keys().cloned().collect()
1240    }
1241
1242    // --- Graph type operations ---
1243
1244    /// Registers a new graph type definition.
1245    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
1246        let mut types = self.graph_types.write();
1247        if types.contains_key(&def.name) {
1248            return Err(CatalogError::TypeAlreadyExists(def.name));
1249        }
1250        types.insert(def.name.clone(), def);
1251        Ok(())
1252    }
1253
1254    /// Drops a graph type definition by name.
1255    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
1256        let mut types = self.graph_types.write();
1257        if types.remove(name).is_none() {
1258            return Err(CatalogError::TypeNotFound(name.to_string()));
1259        }
1260        Ok(())
1261    }
1262
1263    /// Gets a graph type definition by name.
1264    #[must_use]
1265    pub fn get_graph_type(&self, name: &str) -> Option<GraphTypeDefinition> {
1266        self.graph_types.read().get(name).cloned()
1267    }
1268
1269    /// Returns all registered graph type names.
1270    #[must_use]
1271    pub fn all_graph_types(&self) -> Vec<String> {
1272        self.graph_types.read().keys().cloned().collect()
1273    }
1274
1275    // --- Schema namespace operations ---
1276
1277    /// Registers a schema namespace.
1278    pub fn register_schema(&self, name: String) -> Result<(), CatalogError> {
1279        let mut schemas = self.schemas.write();
1280        if schemas.contains(&name) {
1281            return Err(CatalogError::SchemaAlreadyExists(name));
1282        }
1283        schemas.push(name);
1284        Ok(())
1285    }
1286
1287    /// Drops a schema namespace.
1288    pub fn drop_schema(&self, name: &str) -> Result<(), CatalogError> {
1289        let mut schemas = self.schemas.write();
1290        if let Some(pos) = schemas.iter().position(|s| s == name) {
1291            schemas.remove(pos);
1292            Ok(())
1293        } else {
1294            Err(CatalogError::SchemaNotFound(name.to_string()))
1295        }
1296    }
1297
1298    // --- ALTER operations ---
1299
1300    /// Adds a constraint to an existing node type, creating a minimal type if needed.
1301    pub fn add_constraint_to_type(
1302        &self,
1303        label: &str,
1304        constraint: TypeConstraint,
1305    ) -> Result<(), CatalogError> {
1306        let mut types = self.node_types.write();
1307        if let Some(def) = types.get_mut(label) {
1308            def.constraints.push(constraint);
1309        } else {
1310            // Auto-create a minimal type definition for the label
1311            types.insert(
1312                label.to_string(),
1313                NodeTypeDefinition {
1314                    name: label.to_string(),
1315                    properties: Vec::new(),
1316                    constraints: vec![constraint],
1317                    parent_types: Vec::new(),
1318                },
1319            );
1320        }
1321        Ok(())
1322    }
1323
1324    /// Adds a property to an existing node type.
1325    pub fn alter_node_type_add_property(
1326        &self,
1327        type_name: &str,
1328        property: TypedProperty,
1329    ) -> Result<(), CatalogError> {
1330        let mut types = self.node_types.write();
1331        let def = types
1332            .get_mut(type_name)
1333            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1334        if def.properties.iter().any(|p| p.name == property.name) {
1335            return Err(CatalogError::TypeAlreadyExists(format!(
1336                "property {} on {}",
1337                property.name, type_name
1338            )));
1339        }
1340        def.properties.push(property);
1341        Ok(())
1342    }
1343
1344    /// Drops a property from an existing node type.
1345    pub fn alter_node_type_drop_property(
1346        &self,
1347        type_name: &str,
1348        property_name: &str,
1349    ) -> Result<(), CatalogError> {
1350        let mut types = self.node_types.write();
1351        let def = types
1352            .get_mut(type_name)
1353            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1354        let len_before = def.properties.len();
1355        def.properties.retain(|p| p.name != property_name);
1356        if def.properties.len() == len_before {
1357            return Err(CatalogError::TypeNotFound(format!(
1358                "property {} on {}",
1359                property_name, type_name
1360            )));
1361        }
1362        Ok(())
1363    }
1364
1365    /// Adds a property to an existing edge type.
1366    pub fn alter_edge_type_add_property(
1367        &self,
1368        type_name: &str,
1369        property: TypedProperty,
1370    ) -> Result<(), CatalogError> {
1371        let mut types = self.edge_types.write();
1372        let def = types
1373            .get_mut(type_name)
1374            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1375        if def.properties.iter().any(|p| p.name == property.name) {
1376            return Err(CatalogError::TypeAlreadyExists(format!(
1377                "property {} on {}",
1378                property.name, type_name
1379            )));
1380        }
1381        def.properties.push(property);
1382        Ok(())
1383    }
1384
1385    /// Drops a property from an existing edge type.
1386    pub fn alter_edge_type_drop_property(
1387        &self,
1388        type_name: &str,
1389        property_name: &str,
1390    ) -> Result<(), CatalogError> {
1391        let mut types = self.edge_types.write();
1392        let def = types
1393            .get_mut(type_name)
1394            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1395        let len_before = def.properties.len();
1396        def.properties.retain(|p| p.name != property_name);
1397        if def.properties.len() == len_before {
1398            return Err(CatalogError::TypeNotFound(format!(
1399                "property {} on {}",
1400                property_name, type_name
1401            )));
1402        }
1403        Ok(())
1404    }
1405
1406    /// Adds a node type to a graph type.
1407    pub fn alter_graph_type_add_node_type(
1408        &self,
1409        graph_type_name: &str,
1410        node_type: String,
1411    ) -> Result<(), CatalogError> {
1412        let mut types = self.graph_types.write();
1413        let def = types
1414            .get_mut(graph_type_name)
1415            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1416        if !def.allowed_node_types.contains(&node_type) {
1417            def.allowed_node_types.push(node_type);
1418        }
1419        Ok(())
1420    }
1421
1422    /// Drops a node type from a graph type.
1423    pub fn alter_graph_type_drop_node_type(
1424        &self,
1425        graph_type_name: &str,
1426        node_type: &str,
1427    ) -> Result<(), CatalogError> {
1428        let mut types = self.graph_types.write();
1429        let def = types
1430            .get_mut(graph_type_name)
1431            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1432        def.allowed_node_types.retain(|t| t != node_type);
1433        Ok(())
1434    }
1435
1436    /// Adds an edge type to a graph type.
1437    pub fn alter_graph_type_add_edge_type(
1438        &self,
1439        graph_type_name: &str,
1440        edge_type: String,
1441    ) -> Result<(), CatalogError> {
1442        let mut types = self.graph_types.write();
1443        let def = types
1444            .get_mut(graph_type_name)
1445            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1446        if !def.allowed_edge_types.contains(&edge_type) {
1447            def.allowed_edge_types.push(edge_type);
1448        }
1449        Ok(())
1450    }
1451
1452    /// Drops an edge type from a graph type.
1453    pub fn alter_graph_type_drop_edge_type(
1454        &self,
1455        graph_type_name: &str,
1456        edge_type: &str,
1457    ) -> Result<(), CatalogError> {
1458        let mut types = self.graph_types.write();
1459        let def = types
1460            .get_mut(graph_type_name)
1461            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1462        def.allowed_edge_types.retain(|t| t != edge_type);
1463        Ok(())
1464    }
1465
1466    // --- Procedure operations ---
1467
1468    /// Registers a stored procedure.
1469    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
1470        let mut procs = self.procedures.write();
1471        if procs.contains_key(&def.name) {
1472            return Err(CatalogError::TypeAlreadyExists(def.name.clone()));
1473        }
1474        procs.insert(def.name.clone(), def);
1475        Ok(())
1476    }
1477
1478    /// Replaces or creates a stored procedure.
1479    pub fn replace_procedure(&self, def: ProcedureDefinition) {
1480        self.procedures.write().insert(def.name.clone(), def);
1481    }
1482
1483    /// Drops a stored procedure.
1484    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
1485        let mut procs = self.procedures.write();
1486        if procs.remove(name).is_none() {
1487            return Err(CatalogError::TypeNotFound(name.to_string()));
1488        }
1489        Ok(())
1490    }
1491
1492    /// Gets a stored procedure by name.
1493    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
1494        self.procedures.read().get(name).cloned()
1495    }
1496
1497    fn add_unique_constraint(
1498        &self,
1499        label: LabelId,
1500        property_key: PropertyKeyId,
1501    ) -> Result<(), CatalogError> {
1502        let mut constraints = self.unique_constraints.write();
1503        let key = (label, property_key);
1504        if !constraints.insert(key) {
1505            return Err(CatalogError::ConstraintAlreadyExists);
1506        }
1507        Ok(())
1508    }
1509
1510    fn add_required_property(
1511        &self,
1512        label: LabelId,
1513        property_key: PropertyKeyId,
1514    ) -> Result<(), CatalogError> {
1515        let mut required = self.required_properties.write();
1516        let key = (label, property_key);
1517        if !required.insert(key) {
1518            return Err(CatalogError::ConstraintAlreadyExists);
1519        }
1520        Ok(())
1521    }
1522
1523    fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1524        self.required_properties
1525            .read()
1526            .contains(&(label, property_key))
1527    }
1528
1529    fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1530        self.unique_constraints
1531            .read()
1532            .contains(&(label, property_key))
1533    }
1534}
1535
1536// === Errors ===
1537
1538/// Catalog-related errors.
1539#[derive(Debug, Clone, PartialEq, Eq)]
1540pub enum CatalogError {
1541    /// Schema constraints are not enabled.
1542    SchemaNotEnabled,
1543    /// The constraint already exists.
1544    ConstraintAlreadyExists,
1545    /// The label does not exist.
1546    LabelNotFound(String),
1547    /// The property key does not exist.
1548    PropertyKeyNotFound(String),
1549    /// The edge type does not exist.
1550    EdgeTypeNotFound(String),
1551    /// The index does not exist.
1552    IndexNotFound(IndexId),
1553    /// A type with this name already exists.
1554    TypeAlreadyExists(String),
1555    /// No type with this name exists.
1556    TypeNotFound(String),
1557    /// A schema with this name already exists.
1558    SchemaAlreadyExists(String),
1559    /// No schema with this name exists.
1560    SchemaNotFound(String),
1561}
1562
1563impl std::fmt::Display for CatalogError {
1564    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1565        match self {
1566            Self::SchemaNotEnabled => write!(f, "Schema constraints are not enabled"),
1567            Self::ConstraintAlreadyExists => write!(f, "Constraint already exists"),
1568            Self::LabelNotFound(name) => write!(f, "Label not found: {name}"),
1569            Self::PropertyKeyNotFound(name) => write!(f, "Property key not found: {name}"),
1570            Self::EdgeTypeNotFound(name) => write!(f, "Edge type not found: {name}"),
1571            Self::IndexNotFound(id) => write!(f, "Index not found: {id}"),
1572            Self::TypeAlreadyExists(name) => write!(f, "Type already exists: {name}"),
1573            Self::TypeNotFound(name) => write!(f, "Type not found: {name}"),
1574            Self::SchemaAlreadyExists(name) => write!(f, "Schema already exists: {name}"),
1575            Self::SchemaNotFound(name) => write!(f, "Schema not found: {name}"),
1576        }
1577    }
1578}
1579
1580impl std::error::Error for CatalogError {}
1581
1582// === Constraint Validator ===
1583
1584use grafeo_core::execution::operators::ConstraintValidator;
1585use grafeo_core::execution::operators::OperatorError;
1586
1587/// Validates schema constraints during mutation operations using the Catalog.
1588///
1589/// Checks type definitions, NOT NULL constraints, and UNIQUE constraints
1590/// against registered node/edge type definitions.
1591pub struct CatalogConstraintValidator {
1592    catalog: Arc<Catalog>,
1593    /// Optional graph name for graph-type-bound validation.
1594    graph_name: Option<String>,
1595    /// Optional graph store for UNIQUE constraint enforcement via index lookup.
1596    store: Option<Arc<dyn grafeo_core::graph::GraphStoreMut>>,
1597}
1598
1599impl CatalogConstraintValidator {
1600    /// Creates a new validator wrapping the given catalog.
1601    pub fn new(catalog: Arc<Catalog>) -> Self {
1602        Self {
1603            catalog,
1604            graph_name: None,
1605            store: None,
1606        }
1607    }
1608
1609    /// Sets the graph name for graph-type-bound validation.
1610    pub fn with_graph_name(mut self, name: String) -> Self {
1611        self.graph_name = Some(name);
1612        self
1613    }
1614
1615    /// Attaches a graph store for UNIQUE constraint enforcement.
1616    pub fn with_store(mut self, store: Arc<dyn grafeo_core::graph::GraphStoreMut>) -> Self {
1617        self.store = Some(store);
1618        self
1619    }
1620}
1621
1622impl ConstraintValidator for CatalogConstraintValidator {
1623    fn validate_node_property(
1624        &self,
1625        labels: &[String],
1626        key: &str,
1627        value: &Value,
1628    ) -> Result<(), OperatorError> {
1629        for label in labels {
1630            if let Some(type_def) = self.catalog.resolved_node_type(label)
1631                && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1632            {
1633                // Check NOT NULL
1634                if !typed_prop.nullable && *value == Value::Null {
1635                    return Err(OperatorError::ConstraintViolation(format!(
1636                        "property '{key}' on :{label} is NOT NULL, cannot set to null"
1637                    )));
1638                }
1639                // Check type compatibility
1640                if *value != Value::Null && !typed_prop.data_type.matches(value) {
1641                    return Err(OperatorError::ConstraintViolation(format!(
1642                        "property '{key}' on :{label} expects {:?}, got {:?}",
1643                        typed_prop.data_type, value
1644                    )));
1645                }
1646            }
1647        }
1648        Ok(())
1649    }
1650
1651    fn validate_node_complete(
1652        &self,
1653        labels: &[String],
1654        properties: &[(String, Value)],
1655    ) -> Result<(), OperatorError> {
1656        let prop_names: std::collections::HashSet<&str> =
1657            properties.iter().map(|(n, _)| n.as_str()).collect();
1658
1659        for label in labels {
1660            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1661                // Check that all NOT NULL properties are present
1662                for typed_prop in &type_def.properties {
1663                    if !typed_prop.nullable
1664                        && typed_prop.default_value.is_none()
1665                        && !prop_names.contains(typed_prop.name.as_str())
1666                    {
1667                        return Err(OperatorError::ConstraintViolation(format!(
1668                            "missing required property '{}' on :{label}",
1669                            typed_prop.name
1670                        )));
1671                    }
1672                }
1673                // Check type-level constraints
1674                for constraint in &type_def.constraints {
1675                    match constraint {
1676                        TypeConstraint::NotNull(prop_name) => {
1677                            if !prop_names.contains(prop_name.as_str()) {
1678                                return Err(OperatorError::ConstraintViolation(format!(
1679                                    "missing required property '{prop_name}' on :{label} (NOT NULL constraint)"
1680                                )));
1681                            }
1682                        }
1683                        TypeConstraint::PrimaryKey(key_props) => {
1684                            for pk in key_props {
1685                                if !prop_names.contains(pk.as_str()) {
1686                                    return Err(OperatorError::ConstraintViolation(format!(
1687                                        "missing primary key property '{pk}' on :{label}"
1688                                    )));
1689                                }
1690                            }
1691                        }
1692                        TypeConstraint::Check { name, expression } => {
1693                            match check_eval::evaluate_check(expression, properties) {
1694                                Ok(true) => {}
1695                                Ok(false) => {
1696                                    let constraint_name = name.as_deref().unwrap_or("unnamed");
1697                                    return Err(OperatorError::ConstraintViolation(format!(
1698                                        "CHECK constraint '{constraint_name}' violated on :{label}"
1699                                    )));
1700                                }
1701                                Err(err) => {
1702                                    return Err(OperatorError::ConstraintViolation(format!(
1703                                        "CHECK constraint evaluation error: {err}"
1704                                    )));
1705                                }
1706                            }
1707                        }
1708                        TypeConstraint::Unique(_) => {}
1709                    }
1710                }
1711            }
1712        }
1713        Ok(())
1714    }
1715
1716    fn check_unique_node_property(
1717        &self,
1718        labels: &[String],
1719        key: &str,
1720        value: &Value,
1721    ) -> Result<(), OperatorError> {
1722        // Skip uniqueness check for NULL values (NULLs are never duplicates)
1723        if *value == Value::Null {
1724            return Ok(());
1725        }
1726        for label in labels {
1727            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1728                for constraint in &type_def.constraints {
1729                    let is_unique = match constraint {
1730                        TypeConstraint::Unique(props) => props.iter().any(|p| p == key),
1731                        TypeConstraint::PrimaryKey(props) => props.iter().any(|p| p == key),
1732                        _ => false,
1733                    };
1734                    if is_unique && let Some(ref store) = self.store {
1735                        let existing = store.find_nodes_by_property(key, value);
1736                        for node_id in existing {
1737                            if let Some(node) = store.get_node(node_id) {
1738                                let has_label = node.labels.iter().any(|l| l.as_str() == label);
1739                                if has_label {
1740                                    return Err(OperatorError::ConstraintViolation(format!(
1741                                        "UNIQUE constraint violation: property '{key}' \
1742                                             with value {value:?} already exists on :{label}"
1743                                    )));
1744                                }
1745                            }
1746                        }
1747                    }
1748                }
1749            }
1750        }
1751        Ok(())
1752    }
1753
1754    fn validate_edge_property(
1755        &self,
1756        edge_type: &str,
1757        key: &str,
1758        value: &Value,
1759    ) -> Result<(), OperatorError> {
1760        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type)
1761            && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1762        {
1763            // Check NOT NULL
1764            if !typed_prop.nullable && *value == Value::Null {
1765                return Err(OperatorError::ConstraintViolation(format!(
1766                    "property '{key}' on :{edge_type} is NOT NULL, cannot set to null"
1767                )));
1768            }
1769            // Check type compatibility
1770            if *value != Value::Null && !typed_prop.data_type.matches(value) {
1771                return Err(OperatorError::ConstraintViolation(format!(
1772                    "property '{key}' on :{edge_type} expects {:?}, got {:?}",
1773                    typed_prop.data_type, value
1774                )));
1775            }
1776        }
1777        Ok(())
1778    }
1779
1780    fn validate_edge_complete(
1781        &self,
1782        edge_type: &str,
1783        properties: &[(String, Value)],
1784    ) -> Result<(), OperatorError> {
1785        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type) {
1786            let prop_names: std::collections::HashSet<&str> =
1787                properties.iter().map(|(n, _)| n.as_str()).collect();
1788
1789            for typed_prop in &type_def.properties {
1790                if !typed_prop.nullable
1791                    && typed_prop.default_value.is_none()
1792                    && !prop_names.contains(typed_prop.name.as_str())
1793                {
1794                    return Err(OperatorError::ConstraintViolation(format!(
1795                        "missing required property '{}' on :{edge_type}",
1796                        typed_prop.name
1797                    )));
1798                }
1799            }
1800
1801            for constraint in &type_def.constraints {
1802                if let TypeConstraint::Check { name, expression } = constraint {
1803                    match check_eval::evaluate_check(expression, properties) {
1804                        Ok(true) => {}
1805                        Ok(false) => {
1806                            let constraint_name = name.as_deref().unwrap_or("unnamed");
1807                            return Err(OperatorError::ConstraintViolation(format!(
1808                                "CHECK constraint '{constraint_name}' violated on :{edge_type}"
1809                            )));
1810                        }
1811                        Err(err) => {
1812                            return Err(OperatorError::ConstraintViolation(format!(
1813                                "CHECK constraint evaluation error: {err}"
1814                            )));
1815                        }
1816                    }
1817                }
1818            }
1819        }
1820        Ok(())
1821    }
1822
1823    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
1824        let Some(ref graph_name) = self.graph_name else {
1825            return Ok(());
1826        };
1827        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1828            return Ok(());
1829        };
1830        let Some(gt) = self
1831            .catalog
1832            .schema()
1833            .and_then(|s| s.get_graph_type(&type_name))
1834        else {
1835            return Ok(());
1836        };
1837        if !gt.open && !gt.allowed_node_types.is_empty() {
1838            let allowed = labels
1839                .iter()
1840                .any(|l| gt.allowed_node_types.iter().any(|a| a == l));
1841            if !allowed {
1842                return Err(OperatorError::ConstraintViolation(format!(
1843                    "node labels {labels:?} are not allowed by graph type '{}'",
1844                    gt.name
1845                )));
1846            }
1847        }
1848        Ok(())
1849    }
1850
1851    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
1852        let Some(ref graph_name) = self.graph_name else {
1853            return Ok(());
1854        };
1855        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1856            return Ok(());
1857        };
1858        let Some(gt) = self
1859            .catalog
1860            .schema()
1861            .and_then(|s| s.get_graph_type(&type_name))
1862        else {
1863            return Ok(());
1864        };
1865        if !gt.open && !gt.allowed_edge_types.is_empty() {
1866            let allowed = gt.allowed_edge_types.iter().any(|a| a == edge_type);
1867            if !allowed {
1868                return Err(OperatorError::ConstraintViolation(format!(
1869                    "edge type '{edge_type}' is not allowed by graph type '{}'",
1870                    gt.name
1871                )));
1872            }
1873        }
1874        Ok(())
1875    }
1876
1877    fn validate_edge_endpoints(
1878        &self,
1879        edge_type: &str,
1880        source_labels: &[String],
1881        target_labels: &[String],
1882    ) -> Result<(), OperatorError> {
1883        let Some(type_def) = self.catalog.get_edge_type_def(edge_type) else {
1884            return Ok(());
1885        };
1886        if !type_def.source_node_types.is_empty() {
1887            let source_ok = source_labels
1888                .iter()
1889                .any(|l| type_def.source_node_types.iter().any(|s| s == l));
1890            if !source_ok {
1891                return Err(OperatorError::ConstraintViolation(format!(
1892                    "source node labels {source_labels:?} are not allowed for edge type '{edge_type}', \
1893                     expected one of {:?}",
1894                    type_def.source_node_types
1895                )));
1896            }
1897        }
1898        if !type_def.target_node_types.is_empty() {
1899            let target_ok = target_labels
1900                .iter()
1901                .any(|l| type_def.target_node_types.iter().any(|t| t == l));
1902            if !target_ok {
1903                return Err(OperatorError::ConstraintViolation(format!(
1904                    "target node labels {target_labels:?} are not allowed for edge type '{edge_type}', \
1905                     expected one of {:?}",
1906                    type_def.target_node_types
1907                )));
1908            }
1909        }
1910        Ok(())
1911    }
1912
1913    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
1914        for label in labels {
1915            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1916                for typed_prop in &type_def.properties {
1917                    if let Some(ref default) = typed_prop.default_value {
1918                        let already_set = properties.iter().any(|(n, _)| n == &typed_prop.name);
1919                        if !already_set {
1920                            properties.push((typed_prop.name.clone(), default.clone()));
1921                        }
1922                    }
1923                }
1924            }
1925        }
1926    }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931    use super::*;
1932    use std::thread;
1933
1934    #[test]
1935    fn test_catalog_labels() {
1936        let catalog = Catalog::new();
1937
1938        // Get or create labels
1939        let person_id = catalog.get_or_create_label("Person");
1940        let company_id = catalog.get_or_create_label("Company");
1941
1942        // IDs should be different
1943        assert_ne!(person_id, company_id);
1944
1945        // Getting the same label should return the same ID
1946        assert_eq!(catalog.get_or_create_label("Person"), person_id);
1947
1948        // Should be able to look up by name
1949        assert_eq!(catalog.get_label_id("Person"), Some(person_id));
1950        assert_eq!(catalog.get_label_id("Company"), Some(company_id));
1951        assert_eq!(catalog.get_label_id("Unknown"), None);
1952
1953        // Should be able to look up by ID
1954        assert_eq!(catalog.get_label_name(person_id).as_deref(), Some("Person"));
1955        assert_eq!(
1956            catalog.get_label_name(company_id).as_deref(),
1957            Some("Company")
1958        );
1959
1960        // Count should be correct
1961        assert_eq!(catalog.label_count(), 2);
1962    }
1963
1964    #[test]
1965    fn test_catalog_property_keys() {
1966        let catalog = Catalog::new();
1967
1968        let name_id = catalog.get_or_create_property_key("name");
1969        let age_id = catalog.get_or_create_property_key("age");
1970
1971        assert_ne!(name_id, age_id);
1972        assert_eq!(catalog.get_or_create_property_key("name"), name_id);
1973        assert_eq!(catalog.get_property_key_id("name"), Some(name_id));
1974        assert_eq!(
1975            catalog.get_property_key_name(name_id).as_deref(),
1976            Some("name")
1977        );
1978        assert_eq!(catalog.property_key_count(), 2);
1979    }
1980
1981    #[test]
1982    fn test_catalog_edge_types() {
1983        let catalog = Catalog::new();
1984
1985        let knows_id = catalog.get_or_create_edge_type("KNOWS");
1986        let works_at_id = catalog.get_or_create_edge_type("WORKS_AT");
1987
1988        assert_ne!(knows_id, works_at_id);
1989        assert_eq!(catalog.get_or_create_edge_type("KNOWS"), knows_id);
1990        assert_eq!(catalog.get_edge_type_id("KNOWS"), Some(knows_id));
1991        assert_eq!(
1992            catalog.get_edge_type_name(knows_id).as_deref(),
1993            Some("KNOWS")
1994        );
1995        assert_eq!(catalog.edge_type_count(), 2);
1996    }
1997
1998    #[test]
1999    fn test_catalog_indexes() {
2000        let catalog = Catalog::new();
2001
2002        let person_id = catalog.get_or_create_label("Person");
2003        let name_id = catalog.get_or_create_property_key("name");
2004        let age_id = catalog.get_or_create_property_key("age");
2005
2006        // Create indexes
2007        let idx1 = catalog.create_index(person_id, name_id, IndexType::Hash);
2008        let idx2 = catalog.create_index(person_id, age_id, IndexType::BTree);
2009
2010        assert_ne!(idx1, idx2);
2011        assert_eq!(catalog.index_count(), 2);
2012
2013        // Look up by label
2014        let label_indexes = catalog.indexes_for_label(person_id);
2015        assert_eq!(label_indexes.len(), 2);
2016        assert!(label_indexes.contains(&idx1));
2017        assert!(label_indexes.contains(&idx2));
2018
2019        // Look up by label and property
2020        let name_indexes = catalog.indexes_for_label_property(person_id, name_id);
2021        assert_eq!(name_indexes.len(), 1);
2022        assert_eq!(name_indexes[0], idx1);
2023
2024        // Get definition
2025        let def = catalog.get_index(idx1).unwrap();
2026        assert_eq!(def.label, person_id);
2027        assert_eq!(def.property_key, name_id);
2028        assert_eq!(def.index_type, IndexType::Hash);
2029
2030        // Drop index
2031        assert!(catalog.drop_index(idx1));
2032        assert_eq!(catalog.index_count(), 1);
2033        assert!(catalog.get_index(idx1).is_none());
2034        assert_eq!(catalog.indexes_for_label(person_id).len(), 1);
2035    }
2036
2037    #[test]
2038    fn test_catalog_schema_constraints() {
2039        let catalog = Catalog::with_schema();
2040
2041        let person_id = catalog.get_or_create_label("Person");
2042        let email_id = catalog.get_or_create_property_key("email");
2043        let name_id = catalog.get_or_create_property_key("name");
2044
2045        // Add constraints
2046        assert!(catalog.add_unique_constraint(person_id, email_id).is_ok());
2047        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2048
2049        // Check constraints
2050        assert!(catalog.is_property_unique(person_id, email_id));
2051        assert!(!catalog.is_property_unique(person_id, name_id));
2052        assert!(catalog.is_property_required(person_id, name_id));
2053        assert!(!catalog.is_property_required(person_id, email_id));
2054
2055        // Duplicate constraint should fail
2056        assert_eq!(
2057            catalog.add_unique_constraint(person_id, email_id),
2058            Err(CatalogError::ConstraintAlreadyExists)
2059        );
2060    }
2061
2062    #[test]
2063    fn test_catalog_schema_always_enabled() {
2064        // Catalog::new() always enables schema
2065        let catalog = Catalog::new();
2066        assert!(catalog.has_schema());
2067
2068        let person_id = catalog.get_or_create_label("Person");
2069        let email_id = catalog.get_or_create_property_key("email");
2070
2071        // Should succeed with schema enabled
2072        assert_eq!(catalog.add_unique_constraint(person_id, email_id), Ok(()));
2073    }
2074
2075    // === Additional tests for comprehensive coverage ===
2076
2077    #[test]
2078    fn test_catalog_default() {
2079        let catalog = Catalog::default();
2080        assert!(catalog.has_schema());
2081        assert_eq!(catalog.label_count(), 0);
2082        assert_eq!(catalog.property_key_count(), 0);
2083        assert_eq!(catalog.edge_type_count(), 0);
2084        assert_eq!(catalog.index_count(), 0);
2085    }
2086
2087    #[test]
2088    fn test_catalog_all_labels() {
2089        let catalog = Catalog::new();
2090
2091        catalog.get_or_create_label("Person");
2092        catalog.get_or_create_label("Company");
2093        catalog.get_or_create_label("Product");
2094
2095        let all = catalog.all_labels();
2096        assert_eq!(all.len(), 3);
2097        assert!(all.iter().any(|l| l.as_ref() == "Person"));
2098        assert!(all.iter().any(|l| l.as_ref() == "Company"));
2099        assert!(all.iter().any(|l| l.as_ref() == "Product"));
2100    }
2101
2102    #[test]
2103    fn test_catalog_all_property_keys() {
2104        let catalog = Catalog::new();
2105
2106        catalog.get_or_create_property_key("name");
2107        catalog.get_or_create_property_key("age");
2108        catalog.get_or_create_property_key("email");
2109
2110        let all = catalog.all_property_keys();
2111        assert_eq!(all.len(), 3);
2112        assert!(all.iter().any(|k| k.as_ref() == "name"));
2113        assert!(all.iter().any(|k| k.as_ref() == "age"));
2114        assert!(all.iter().any(|k| k.as_ref() == "email"));
2115    }
2116
2117    #[test]
2118    fn test_catalog_all_edge_types() {
2119        let catalog = Catalog::new();
2120
2121        catalog.get_or_create_edge_type("KNOWS");
2122        catalog.get_or_create_edge_type("WORKS_AT");
2123        catalog.get_or_create_edge_type("LIVES_IN");
2124
2125        let all = catalog.all_edge_types();
2126        assert_eq!(all.len(), 3);
2127        assert!(all.iter().any(|t| t.as_ref() == "KNOWS"));
2128        assert!(all.iter().any(|t| t.as_ref() == "WORKS_AT"));
2129        assert!(all.iter().any(|t| t.as_ref() == "LIVES_IN"));
2130    }
2131
2132    #[test]
2133    fn test_catalog_invalid_id_lookup() {
2134        let catalog = Catalog::new();
2135
2136        // Create one label to ensure IDs are allocated
2137        let _ = catalog.get_or_create_label("Person");
2138
2139        // Try to look up non-existent IDs
2140        let invalid_label = LabelId::new(999);
2141        let invalid_property = PropertyKeyId::new(999);
2142        let invalid_edge_type = EdgeTypeId::new(999);
2143        let invalid_index = IndexId::new(999);
2144
2145        assert!(catalog.get_label_name(invalid_label).is_none());
2146        assert!(catalog.get_property_key_name(invalid_property).is_none());
2147        assert!(catalog.get_edge_type_name(invalid_edge_type).is_none());
2148        assert!(catalog.get_index(invalid_index).is_none());
2149    }
2150
2151    #[test]
2152    fn test_catalog_drop_nonexistent_index() {
2153        let catalog = Catalog::new();
2154        let invalid_index = IndexId::new(999);
2155        assert!(!catalog.drop_index(invalid_index));
2156    }
2157
2158    #[test]
2159    fn test_catalog_indexes_for_nonexistent_label() {
2160        let catalog = Catalog::new();
2161        let invalid_label = LabelId::new(999);
2162        let invalid_property = PropertyKeyId::new(999);
2163
2164        assert!(catalog.indexes_for_label(invalid_label).is_empty());
2165        assert!(
2166            catalog
2167                .indexes_for_label_property(invalid_label, invalid_property)
2168                .is_empty()
2169        );
2170    }
2171
2172    #[test]
2173    fn test_catalog_multiple_indexes_same_property() {
2174        let catalog = Catalog::new();
2175
2176        let person_id = catalog.get_or_create_label("Person");
2177        let name_id = catalog.get_or_create_property_key("name");
2178
2179        // Create multiple indexes on the same property with different types
2180        let hash_idx = catalog.create_index(person_id, name_id, IndexType::Hash);
2181        let btree_idx = catalog.create_index(person_id, name_id, IndexType::BTree);
2182        let fulltext_idx = catalog.create_index(person_id, name_id, IndexType::FullText);
2183
2184        assert_eq!(catalog.index_count(), 3);
2185
2186        let indexes = catalog.indexes_for_label_property(person_id, name_id);
2187        assert_eq!(indexes.len(), 3);
2188        assert!(indexes.contains(&hash_idx));
2189        assert!(indexes.contains(&btree_idx));
2190        assert!(indexes.contains(&fulltext_idx));
2191
2192        // Verify each has the correct type
2193        assert_eq!(
2194            catalog.get_index(hash_idx).unwrap().index_type,
2195            IndexType::Hash
2196        );
2197        assert_eq!(
2198            catalog.get_index(btree_idx).unwrap().index_type,
2199            IndexType::BTree
2200        );
2201        assert_eq!(
2202            catalog.get_index(fulltext_idx).unwrap().index_type,
2203            IndexType::FullText
2204        );
2205    }
2206
2207    #[test]
2208    fn test_catalog_schema_required_property_duplicate() {
2209        let catalog = Catalog::with_schema();
2210
2211        let person_id = catalog.get_or_create_label("Person");
2212        let name_id = catalog.get_or_create_property_key("name");
2213
2214        // First should succeed
2215        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2216
2217        // Duplicate should fail
2218        assert_eq!(
2219            catalog.add_required_property(person_id, name_id),
2220            Err(CatalogError::ConstraintAlreadyExists)
2221        );
2222    }
2223
2224    #[test]
2225    fn test_catalog_schema_check_without_constraints() {
2226        let catalog = Catalog::new();
2227
2228        let person_id = catalog.get_or_create_label("Person");
2229        let name_id = catalog.get_or_create_property_key("name");
2230
2231        // Without schema enabled, these should return false
2232        assert!(!catalog.is_property_unique(person_id, name_id));
2233        assert!(!catalog.is_property_required(person_id, name_id));
2234    }
2235
2236    #[test]
2237    fn test_catalog_has_schema() {
2238        // Both new() and with_schema() enable schema by default
2239        let catalog = Catalog::new();
2240        assert!(catalog.has_schema());
2241
2242        let with_schema = Catalog::with_schema();
2243        assert!(with_schema.has_schema());
2244    }
2245
2246    #[test]
2247    fn test_catalog_error_display() {
2248        assert_eq!(
2249            CatalogError::SchemaNotEnabled.to_string(),
2250            "Schema constraints are not enabled"
2251        );
2252        assert_eq!(
2253            CatalogError::ConstraintAlreadyExists.to_string(),
2254            "Constraint already exists"
2255        );
2256        assert_eq!(
2257            CatalogError::LabelNotFound("Person".to_string()).to_string(),
2258            "Label not found: Person"
2259        );
2260        assert_eq!(
2261            CatalogError::PropertyKeyNotFound("name".to_string()).to_string(),
2262            "Property key not found: name"
2263        );
2264        assert_eq!(
2265            CatalogError::EdgeTypeNotFound("KNOWS".to_string()).to_string(),
2266            "Edge type not found: KNOWS"
2267        );
2268        let idx = IndexId::new(42);
2269        assert!(CatalogError::IndexNotFound(idx).to_string().contains("42"));
2270    }
2271
2272    #[test]
2273    fn test_catalog_concurrent_label_creation() {
2274        use std::sync::Arc;
2275
2276        let catalog = Arc::new(Catalog::new());
2277        let mut handles = vec![];
2278
2279        // Spawn multiple threads trying to create the same labels
2280        for i in 0..10 {
2281            let catalog = Arc::clone(&catalog);
2282            handles.push(thread::spawn(move || {
2283                let label_name = format!("Label{}", i % 3); // Only 3 unique labels
2284                catalog.get_or_create_label(&label_name)
2285            }));
2286        }
2287
2288        let mut ids: Vec<LabelId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2289        ids.sort_by_key(|id| id.as_u32());
2290        ids.dedup();
2291
2292        // Should only have 3 unique label IDs
2293        assert_eq!(ids.len(), 3);
2294        assert_eq!(catalog.label_count(), 3);
2295    }
2296
2297    #[test]
2298    fn test_catalog_concurrent_property_key_creation() {
2299        use std::sync::Arc;
2300
2301        let catalog = Arc::new(Catalog::new());
2302        let mut handles = vec![];
2303
2304        for i in 0..10 {
2305            let catalog = Arc::clone(&catalog);
2306            handles.push(thread::spawn(move || {
2307                let key_name = format!("key{}", i % 4);
2308                catalog.get_or_create_property_key(&key_name)
2309            }));
2310        }
2311
2312        let mut ids: Vec<PropertyKeyId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2313        ids.sort_by_key(|id| id.as_u32());
2314        ids.dedup();
2315
2316        assert_eq!(ids.len(), 4);
2317        assert_eq!(catalog.property_key_count(), 4);
2318    }
2319
2320    #[test]
2321    fn test_catalog_concurrent_index_operations() {
2322        use std::sync::Arc;
2323
2324        let catalog = Arc::new(Catalog::new());
2325        let label = catalog.get_or_create_label("Node");
2326
2327        let mut handles = vec![];
2328
2329        // Create indexes concurrently
2330        for i in 0..5 {
2331            let catalog = Arc::clone(&catalog);
2332            handles.push(thread::spawn(move || {
2333                let prop = PropertyKeyId::new(i);
2334                catalog.create_index(label, prop, IndexType::Hash)
2335            }));
2336        }
2337
2338        let ids: Vec<IndexId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2339        assert_eq!(ids.len(), 5);
2340        assert_eq!(catalog.index_count(), 5);
2341    }
2342
2343    #[test]
2344    fn test_catalog_special_characters_in_names() {
2345        let catalog = Catalog::new();
2346
2347        // Test with various special characters
2348        let label1 = catalog.get_or_create_label("Label With Spaces");
2349        let label2 = catalog.get_or_create_label("Label-With-Dashes");
2350        let label3 = catalog.get_or_create_label("Label_With_Underscores");
2351        let label4 = catalog.get_or_create_label("LabelWithUnicode\u{00E9}");
2352
2353        assert_ne!(label1, label2);
2354        assert_ne!(label2, label3);
2355        assert_ne!(label3, label4);
2356
2357        assert_eq!(
2358            catalog.get_label_name(label1).as_deref(),
2359            Some("Label With Spaces")
2360        );
2361        assert_eq!(
2362            catalog.get_label_name(label4).as_deref(),
2363            Some("LabelWithUnicode\u{00E9}")
2364        );
2365    }
2366
2367    #[test]
2368    fn test_catalog_empty_names() {
2369        let catalog = Catalog::new();
2370
2371        // Empty names should be valid (edge case)
2372        let empty_label = catalog.get_or_create_label("");
2373        let empty_prop = catalog.get_or_create_property_key("");
2374        let empty_edge = catalog.get_or_create_edge_type("");
2375
2376        assert_eq!(catalog.get_label_name(empty_label).as_deref(), Some(""));
2377        assert_eq!(
2378            catalog.get_property_key_name(empty_prop).as_deref(),
2379            Some("")
2380        );
2381        assert_eq!(catalog.get_edge_type_name(empty_edge).as_deref(), Some(""));
2382
2383        // Calling again should return same ID
2384        assert_eq!(catalog.get_or_create_label(""), empty_label);
2385    }
2386
2387    #[test]
2388    fn test_catalog_large_number_of_entries() {
2389        let catalog = Catalog::new();
2390
2391        // Create many labels
2392        for i in 0..1000 {
2393            catalog.get_or_create_label(&format!("Label{}", i));
2394        }
2395
2396        assert_eq!(catalog.label_count(), 1000);
2397
2398        // Verify we can retrieve them all
2399        let all = catalog.all_labels();
2400        assert_eq!(all.len(), 1000);
2401
2402        // Verify a specific one
2403        let id = catalog.get_label_id("Label500").unwrap();
2404        assert_eq!(catalog.get_label_name(id).as_deref(), Some("Label500"));
2405    }
2406
2407    #[test]
2408    fn test_index_definition_debug() {
2409        let def = IndexDefinition {
2410            id: IndexId::new(1),
2411            label: LabelId::new(2),
2412            property_key: PropertyKeyId::new(3),
2413            index_type: IndexType::Hash,
2414        };
2415
2416        // Should be able to debug print
2417        let debug_str = format!("{:?}", def);
2418        assert!(debug_str.contains("IndexDefinition"));
2419        assert!(debug_str.contains("Hash"));
2420    }
2421
2422    #[test]
2423    fn test_index_type_equality() {
2424        assert_eq!(IndexType::Hash, IndexType::Hash);
2425        assert_ne!(IndexType::Hash, IndexType::BTree);
2426        assert_ne!(IndexType::BTree, IndexType::FullText);
2427
2428        // Clone
2429        let t = IndexType::Hash;
2430        let t2 = t;
2431        assert_eq!(t, t2);
2432    }
2433
2434    #[test]
2435    fn test_catalog_error_equality() {
2436        assert_eq!(
2437            CatalogError::SchemaNotEnabled,
2438            CatalogError::SchemaNotEnabled
2439        );
2440        assert_eq!(
2441            CatalogError::ConstraintAlreadyExists,
2442            CatalogError::ConstraintAlreadyExists
2443        );
2444        assert_eq!(
2445            CatalogError::LabelNotFound("X".to_string()),
2446            CatalogError::LabelNotFound("X".to_string())
2447        );
2448        assert_ne!(
2449            CatalogError::LabelNotFound("X".to_string()),
2450            CatalogError::LabelNotFound("Y".to_string())
2451        );
2452    }
2453}