Skip to main content

grafeo_engine/catalog/
mod.rs

1//! Schema metadata - what labels, properties, and indexes exist.
2//!
3//! The catalog is the "dictionary" of your database. When you write `(:Person)`,
4//! the catalog maps "Person" to an internal LabelId. This indirection keeps
5//! storage compact while names stay readable.
6//!
7//! | What it tracks | Why it matters |
8//! | -------------- | -------------- |
9//! | Labels | Maps "Person" → LabelId for efficient storage |
10//! | Property keys | Maps "name" → PropertyKeyId |
11//! | Edge types | Maps "KNOWS" → EdgeTypeId |
12//! | Indexes | Which properties are indexed for fast lookups |
13
14mod check_eval;
15
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19
20use parking_lot::{Mutex, RwLock};
21
22use grafeo_common::collections::{GrafeoConcurrentMap, grafeo_concurrent_map};
23use grafeo_common::types::{EdgeTypeId, IndexId, LabelId, PropertyKeyId, Value};
24
25/// The database's schema dictionary - maps names to compact internal IDs.
26///
27/// You rarely interact with this directly. The query processor uses it to
28/// resolve names like "Person" and "name" to internal IDs.
29pub struct Catalog {
30    /// Label name-to-ID mappings.
31    labels: LabelCatalog,
32    /// Property key name-to-ID mappings.
33    property_keys: PropertyCatalog,
34    /// Edge type name-to-ID mappings.
35    edge_types: EdgeTypeCatalog,
36    /// Index definitions.
37    indexes: IndexCatalog,
38    /// Optional schema constraints.
39    schema: Option<SchemaCatalog>,
40}
41
42impl Catalog {
43    /// Creates a new empty catalog with schema support enabled.
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            labels: LabelCatalog::new(),
48            property_keys: PropertyCatalog::new(),
49            edge_types: EdgeTypeCatalog::new(),
50            indexes: IndexCatalog::new(),
51            schema: Some(SchemaCatalog::new()),
52        }
53    }
54
55    /// Creates a new catalog with schema constraints enabled.
56    ///
57    /// This is now equivalent to `new()` since schema is always enabled.
58    #[must_use]
59    pub fn with_schema() -> Self {
60        Self::new()
61    }
62
63    // === Label Operations ===
64
65    /// Gets or creates a label ID for the given label name.
66    pub fn get_or_create_label(&self, name: &str) -> LabelId {
67        self.labels.get_or_create(name)
68    }
69
70    /// Gets the label ID for a label name, if it exists.
71    #[must_use]
72    pub fn get_label_id(&self, name: &str) -> Option<LabelId> {
73        self.labels.get_id(name)
74    }
75
76    /// Gets the label name for a label ID, if it exists.
77    #[must_use]
78    pub fn get_label_name(&self, id: LabelId) -> Option<Arc<str>> {
79        self.labels.get_name(id)
80    }
81
82    /// Returns the number of distinct labels.
83    #[must_use]
84    pub fn label_count(&self) -> usize {
85        self.labels.count()
86    }
87
88    /// Returns all label names.
89    #[must_use]
90    pub fn all_labels(&self) -> Vec<Arc<str>> {
91        self.labels.all_names()
92    }
93
94    // === Property Key Operations ===
95
96    /// Gets or creates a property key ID for the given property key name.
97    pub fn get_or_create_property_key(&self, name: &str) -> PropertyKeyId {
98        self.property_keys.get_or_create(name)
99    }
100
101    /// Gets the property key ID for a property key name, if it exists.
102    #[must_use]
103    pub fn get_property_key_id(&self, name: &str) -> Option<PropertyKeyId> {
104        self.property_keys.get_id(name)
105    }
106
107    /// Gets the property key name for a property key ID, if it exists.
108    #[must_use]
109    pub fn get_property_key_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
110        self.property_keys.get_name(id)
111    }
112
113    /// Returns the number of distinct property keys.
114    #[must_use]
115    pub fn property_key_count(&self) -> usize {
116        self.property_keys.count()
117    }
118
119    /// Returns all property key names.
120    #[must_use]
121    pub fn all_property_keys(&self) -> Vec<Arc<str>> {
122        self.property_keys.all_names()
123    }
124
125    // === Edge Type Operations ===
126
127    /// Gets or creates an edge type ID for the given edge type name.
128    pub fn get_or_create_edge_type(&self, name: &str) -> EdgeTypeId {
129        self.edge_types.get_or_create(name)
130    }
131
132    /// Gets the edge type ID for an edge type name, if it exists.
133    #[must_use]
134    pub fn get_edge_type_id(&self, name: &str) -> Option<EdgeTypeId> {
135        self.edge_types.get_id(name)
136    }
137
138    /// Gets the edge type name for an edge type ID, if it exists.
139    #[must_use]
140    pub fn get_edge_type_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
141        self.edge_types.get_name(id)
142    }
143
144    /// Returns the number of distinct edge types.
145    #[must_use]
146    pub fn edge_type_count(&self) -> usize {
147        self.edge_types.count()
148    }
149
150    /// Returns all edge type names.
151    #[must_use]
152    pub fn all_edge_types(&self) -> Vec<Arc<str>> {
153        self.edge_types.all_names()
154    }
155
156    // === Index Operations ===
157
158    /// Creates a new index on a label and property key.
159    pub fn create_index(
160        &self,
161        name: &str,
162        label: LabelId,
163        property_key: PropertyKeyId,
164        index_type: IndexType,
165    ) -> IndexId {
166        self.indexes.create(name, label, property_key, index_type)
167    }
168
169    /// Drops an index by ID.
170    pub fn drop_index(&self, id: IndexId) -> bool {
171        self.indexes.drop(id)
172    }
173
174    /// Finds an index by its user-defined name.
175    #[must_use]
176    pub fn find_index_by_name(&self, name: &str) -> Option<IndexId> {
177        self.indexes.find_by_name(name)
178    }
179
180    /// Gets the index definition for an index ID.
181    #[must_use]
182    pub fn get_index(&self, id: IndexId) -> Option<IndexDefinition> {
183        self.indexes.get(id)
184    }
185
186    /// Finds indexes for a given label.
187    #[must_use]
188    pub fn indexes_for_label(&self, label: LabelId) -> Vec<IndexId> {
189        self.indexes.for_label(label)
190    }
191
192    /// Finds indexes for a given label and property key.
193    #[must_use]
194    pub fn indexes_for_label_property(
195        &self,
196        label: LabelId,
197        property_key: PropertyKeyId,
198    ) -> Vec<IndexId> {
199        self.indexes.for_label_property(label, property_key)
200    }
201
202    /// Returns all index definitions.
203    #[must_use]
204    pub fn all_indexes(&self) -> Vec<IndexDefinition> {
205        self.indexes.all()
206    }
207
208    /// Returns the number of indexes.
209    #[must_use]
210    pub fn index_count(&self) -> usize {
211        self.indexes.count()
212    }
213
214    // === Schema Operations ===
215
216    /// Returns whether schema constraints are enabled.
217    #[must_use]
218    pub fn has_schema(&self) -> bool {
219        self.schema.is_some()
220    }
221
222    /// Adds a uniqueness constraint.
223    ///
224    /// Returns an error if schema is not enabled or constraint already exists.
225    pub fn add_unique_constraint(
226        &self,
227        label: LabelId,
228        property_key: PropertyKeyId,
229    ) -> Result<(), CatalogError> {
230        match &self.schema {
231            Some(schema) => schema.add_unique_constraint(label, property_key),
232            None => Err(CatalogError::SchemaNotEnabled),
233        }
234    }
235
236    /// Adds a required property constraint (NOT NULL).
237    ///
238    /// Returns an error if schema is not enabled or constraint already exists.
239    pub fn add_required_property(
240        &self,
241        label: LabelId,
242        property_key: PropertyKeyId,
243    ) -> Result<(), CatalogError> {
244        match &self.schema {
245            Some(schema) => schema.add_required_property(label, property_key),
246            None => Err(CatalogError::SchemaNotEnabled),
247        }
248    }
249
250    /// Checks if a property is required for a label.
251    #[must_use]
252    pub fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
253        self.schema
254            .as_ref()
255            .is_some_and(|s| s.is_property_required(label, property_key))
256    }
257
258    /// Checks if a property must be unique for a label.
259    #[must_use]
260    pub fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
261        self.schema
262            .as_ref()
263            .is_some_and(|s| s.is_property_unique(label, property_key))
264    }
265
266    // === Type Definition Operations ===
267
268    /// Returns a reference to the schema catalog.
269    #[must_use]
270    pub fn schema(&self) -> Option<&SchemaCatalog> {
271        self.schema.as_ref()
272    }
273
274    /// Registers a node type definition.
275    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
276        match &self.schema {
277            Some(schema) => schema.register_node_type(def),
278            None => Err(CatalogError::SchemaNotEnabled),
279        }
280    }
281
282    /// Registers or replaces a node type definition.
283    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
284        if let Some(schema) = &self.schema {
285            schema.register_or_replace_node_type(def);
286        }
287    }
288
289    /// Drops a node type definition.
290    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
291        match &self.schema {
292            Some(schema) => schema.drop_node_type(name),
293            None => Err(CatalogError::SchemaNotEnabled),
294        }
295    }
296
297    /// Gets a node type definition by name.
298    #[must_use]
299    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
300        self.schema.as_ref().and_then(|s| s.get_node_type(name))
301    }
302
303    /// Gets a resolved node type with inherited properties from parents.
304    #[must_use]
305    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
306        self.schema
307            .as_ref()
308            .and_then(|s| s.resolved_node_type(name))
309    }
310
311    /// Returns all registered node type names.
312    #[must_use]
313    pub fn all_node_type_names(&self) -> Vec<String> {
314        self.schema
315            .as_ref()
316            .map(SchemaCatalog::all_node_types)
317            .unwrap_or_default()
318    }
319
320    /// Returns all registered edge type definition names.
321    #[must_use]
322    pub fn all_edge_type_names(&self) -> Vec<String> {
323        self.schema
324            .as_ref()
325            .map(SchemaCatalog::all_edge_types)
326            .unwrap_or_default()
327    }
328
329    /// Registers an edge type definition.
330    pub fn register_edge_type_def(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
331        match &self.schema {
332            Some(schema) => schema.register_edge_type(def),
333            None => Err(CatalogError::SchemaNotEnabled),
334        }
335    }
336
337    /// Registers or replaces an edge type definition.
338    pub fn register_or_replace_edge_type_def(&self, def: EdgeTypeDefinition) {
339        if let Some(schema) = &self.schema {
340            schema.register_or_replace_edge_type(def);
341        }
342    }
343
344    /// Drops an edge type definition.
345    pub fn drop_edge_type_def(&self, name: &str) -> Result<(), CatalogError> {
346        match &self.schema {
347            Some(schema) => schema.drop_edge_type(name),
348            None => Err(CatalogError::SchemaNotEnabled),
349        }
350    }
351
352    /// Gets an edge type definition by name.
353    #[must_use]
354    pub fn get_edge_type_def(&self, name: &str) -> Option<EdgeTypeDefinition> {
355        self.schema.as_ref().and_then(|s| s.get_edge_type(name))
356    }
357
358    /// Registers a graph type definition.
359    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
360        match &self.schema {
361            Some(schema) => schema.register_graph_type(def),
362            None => Err(CatalogError::SchemaNotEnabled),
363        }
364    }
365
366    /// Drops a graph type definition.
367    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
368        match &self.schema {
369            Some(schema) => schema.drop_graph_type(name),
370            None => Err(CatalogError::SchemaNotEnabled),
371        }
372    }
373
374    /// Returns all registered graph type names.
375    #[must_use]
376    pub fn all_graph_type_names(&self) -> Vec<String> {
377        self.schema
378            .as_ref()
379            .map(SchemaCatalog::all_graph_types)
380            .unwrap_or_default()
381    }
382
383    /// Gets a graph type definition by name.
384    #[must_use]
385    pub fn get_graph_type_def(&self, name: &str) -> Option<GraphTypeDefinition> {
386        self.schema.as_ref().and_then(|s| s.get_graph_type(name))
387    }
388
389    /// Registers a schema namespace.
390    pub fn register_schema_namespace(&self, name: String) -> Result<(), CatalogError> {
391        match &self.schema {
392            Some(schema) => schema.register_schema(name),
393            None => Err(CatalogError::SchemaNotEnabled),
394        }
395    }
396
397    /// Drops a schema namespace.
398    pub fn drop_schema_namespace(&self, name: &str) -> Result<(), CatalogError> {
399        match &self.schema {
400            Some(schema) => schema.drop_schema(name),
401            None => Err(CatalogError::SchemaNotEnabled),
402        }
403    }
404
405    /// Checks whether a schema namespace exists.
406    #[must_use]
407    pub fn schema_exists(&self, name: &str) -> bool {
408        self.schema.as_ref().is_some_and(|s| s.schema_exists(name))
409    }
410
411    /// Returns all registered schema namespace names.
412    #[must_use]
413    pub fn schema_names(&self) -> Vec<String> {
414        self.schema
415            .as_ref()
416            .map(|s| s.schema_names())
417            .unwrap_or_default()
418    }
419
420    /// Adds a constraint to an existing node type, creating a minimal type if needed.
421    pub fn add_constraint_to_type(
422        &self,
423        label: &str,
424        constraint: TypeConstraint,
425    ) -> Result<(), CatalogError> {
426        match &self.schema {
427            Some(schema) => schema.add_constraint_to_type(label, constraint),
428            None => Err(CatalogError::SchemaNotEnabled),
429        }
430    }
431
432    /// Adds a property to a node type.
433    pub fn alter_node_type_add_property(
434        &self,
435        type_name: &str,
436        property: TypedProperty,
437    ) -> Result<(), CatalogError> {
438        match &self.schema {
439            Some(schema) => schema.alter_node_type_add_property(type_name, property),
440            None => Err(CatalogError::SchemaNotEnabled),
441        }
442    }
443
444    /// Drops a property from a node type.
445    pub fn alter_node_type_drop_property(
446        &self,
447        type_name: &str,
448        property_name: &str,
449    ) -> Result<(), CatalogError> {
450        match &self.schema {
451            Some(schema) => schema.alter_node_type_drop_property(type_name, property_name),
452            None => Err(CatalogError::SchemaNotEnabled),
453        }
454    }
455
456    /// Adds a property to an edge type.
457    pub fn alter_edge_type_add_property(
458        &self,
459        type_name: &str,
460        property: TypedProperty,
461    ) -> Result<(), CatalogError> {
462        match &self.schema {
463            Some(schema) => schema.alter_edge_type_add_property(type_name, property),
464            None => Err(CatalogError::SchemaNotEnabled),
465        }
466    }
467
468    /// Drops a property from an edge type.
469    pub fn alter_edge_type_drop_property(
470        &self,
471        type_name: &str,
472        property_name: &str,
473    ) -> Result<(), CatalogError> {
474        match &self.schema {
475            Some(schema) => schema.alter_edge_type_drop_property(type_name, property_name),
476            None => Err(CatalogError::SchemaNotEnabled),
477        }
478    }
479
480    /// Adds a node type to a graph type.
481    pub fn alter_graph_type_add_node_type(
482        &self,
483        graph_type_name: &str,
484        node_type: String,
485    ) -> Result<(), CatalogError> {
486        match &self.schema {
487            Some(schema) => schema.alter_graph_type_add_node_type(graph_type_name, node_type),
488            None => Err(CatalogError::SchemaNotEnabled),
489        }
490    }
491
492    /// Drops a node type from a graph type.
493    pub fn alter_graph_type_drop_node_type(
494        &self,
495        graph_type_name: &str,
496        node_type: &str,
497    ) -> Result<(), CatalogError> {
498        match &self.schema {
499            Some(schema) => schema.alter_graph_type_drop_node_type(graph_type_name, node_type),
500            None => Err(CatalogError::SchemaNotEnabled),
501        }
502    }
503
504    /// Adds an edge type to a graph type.
505    pub fn alter_graph_type_add_edge_type(
506        &self,
507        graph_type_name: &str,
508        edge_type: String,
509    ) -> Result<(), CatalogError> {
510        match &self.schema {
511            Some(schema) => schema.alter_graph_type_add_edge_type(graph_type_name, edge_type),
512            None => Err(CatalogError::SchemaNotEnabled),
513        }
514    }
515
516    /// Drops an edge type from a graph type.
517    pub fn alter_graph_type_drop_edge_type(
518        &self,
519        graph_type_name: &str,
520        edge_type: &str,
521    ) -> Result<(), CatalogError> {
522        match &self.schema {
523            Some(schema) => schema.alter_graph_type_drop_edge_type(graph_type_name, edge_type),
524            None => Err(CatalogError::SchemaNotEnabled),
525        }
526    }
527
528    /// Binds a graph instance to a graph type.
529    pub fn bind_graph_type(
530        &self,
531        graph_name: &str,
532        graph_type: String,
533    ) -> Result<(), CatalogError> {
534        match &self.schema {
535            Some(schema) => {
536                // Verify the graph type exists
537                if schema.get_graph_type(&graph_type).is_none() {
538                    return Err(CatalogError::TypeNotFound(graph_type));
539                }
540                schema
541                    .graph_type_bindings
542                    .write()
543                    .insert(graph_name.to_string(), graph_type);
544                Ok(())
545            }
546            None => Err(CatalogError::SchemaNotEnabled),
547        }
548    }
549
550    /// Gets the graph type binding for a graph instance.
551    pub fn get_graph_type_binding(&self, graph_name: &str) -> Option<String> {
552        self.schema
553            .as_ref()?
554            .graph_type_bindings
555            .read()
556            .get(graph_name)
557            .cloned()
558    }
559
560    /// Registers a stored procedure.
561    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
562        match &self.schema {
563            Some(schema) => schema.register_procedure(def),
564            None => Err(CatalogError::SchemaNotEnabled),
565        }
566    }
567
568    /// Replaces or creates a stored procedure.
569    pub fn replace_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
570        match &self.schema {
571            Some(schema) => {
572                schema.replace_procedure(def);
573                Ok(())
574            }
575            None => Err(CatalogError::SchemaNotEnabled),
576        }
577    }
578
579    /// Drops a stored procedure.
580    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
581        match &self.schema {
582            Some(schema) => schema.drop_procedure(name),
583            None => Err(CatalogError::SchemaNotEnabled),
584        }
585    }
586
587    /// Gets a stored procedure by name.
588    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
589        self.schema.as_ref()?.get_procedure(name)
590    }
591
592    /// Returns all registered node type definitions.
593    #[must_use]
594    pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
595        self.schema
596            .as_ref()
597            .map(SchemaCatalog::all_node_type_defs)
598            .unwrap_or_default()
599    }
600
601    /// Returns all registered edge type definitions.
602    #[must_use]
603    pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
604        self.schema
605            .as_ref()
606            .map(SchemaCatalog::all_edge_type_defs)
607            .unwrap_or_default()
608    }
609
610    /// Returns all registered graph type definitions.
611    #[must_use]
612    pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
613        self.schema
614            .as_ref()
615            .map(SchemaCatalog::all_graph_type_defs)
616            .unwrap_or_default()
617    }
618
619    /// Returns all registered procedure definitions.
620    #[must_use]
621    pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
622        self.schema
623            .as_ref()
624            .map(SchemaCatalog::all_procedure_defs)
625            .unwrap_or_default()
626    }
627
628    /// Returns all graph type bindings (graph_name, type_name).
629    #[must_use]
630    pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
631        self.schema
632            .as_ref()
633            .map(SchemaCatalog::all_graph_type_bindings)
634            .unwrap_or_default()
635    }
636}
637
638impl Default for Catalog {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644// === Label Catalog ===
645
646/// Bidirectional mapping between label names and IDs.
647///
648/// Uses `DashMap` (shard-level locking) for `name_to_id` so concurrent
649/// readers never block each other. A separate `Mutex` serializes the rare
650/// create path to keep `id_to_name` consistent.
651struct LabelCatalog {
652    name_to_id: GrafeoConcurrentMap<Arc<str>, LabelId>,
653    id_to_name: RwLock<Vec<Arc<str>>>,
654    next_id: AtomicU32,
655    create_lock: Mutex<()>,
656}
657
658impl LabelCatalog {
659    fn new() -> Self {
660        Self {
661            name_to_id: grafeo_concurrent_map(),
662            id_to_name: RwLock::new(Vec::new()),
663            next_id: AtomicU32::new(0),
664            create_lock: Mutex::new(()),
665        }
666    }
667
668    fn get_or_create(&self, name: &str) -> LabelId {
669        // Fast path: shard-level read (no global lock)
670        if let Some(id) = self.name_to_id.get(name) {
671            return *id;
672        }
673
674        // Slow path: serialize creates to keep id_to_name consistent
675        let _guard = self.create_lock.lock();
676        if let Some(id) = self.name_to_id.get(name) {
677            return *id;
678        }
679
680        let id = LabelId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
681        let name: Arc<str> = name.into();
682        self.id_to_name.write().push(Arc::clone(&name));
683        self.name_to_id.insert(name, id);
684        id
685    }
686
687    fn get_id(&self, name: &str) -> Option<LabelId> {
688        self.name_to_id.get(name).map(|r| *r)
689    }
690
691    fn get_name(&self, id: LabelId) -> Option<Arc<str>> {
692        self.id_to_name.read().get(id.as_u32() as usize).cloned()
693    }
694
695    fn count(&self) -> usize {
696        self.id_to_name.read().len()
697    }
698
699    fn all_names(&self) -> Vec<Arc<str>> {
700        self.id_to_name.read().clone()
701    }
702}
703
704// === Property Catalog ===
705
706/// Bidirectional mapping between property key names and IDs.
707struct PropertyCatalog {
708    name_to_id: GrafeoConcurrentMap<Arc<str>, PropertyKeyId>,
709    id_to_name: RwLock<Vec<Arc<str>>>,
710    next_id: AtomicU32,
711    create_lock: Mutex<()>,
712}
713
714impl PropertyCatalog {
715    fn new() -> Self {
716        Self {
717            name_to_id: grafeo_concurrent_map(),
718            id_to_name: RwLock::new(Vec::new()),
719            next_id: AtomicU32::new(0),
720            create_lock: Mutex::new(()),
721        }
722    }
723
724    fn get_or_create(&self, name: &str) -> PropertyKeyId {
725        // Fast path: shard-level read (no global lock)
726        if let Some(id) = self.name_to_id.get(name) {
727            return *id;
728        }
729
730        // Slow path: serialize creates to keep id_to_name consistent
731        let _guard = self.create_lock.lock();
732        if let Some(id) = self.name_to_id.get(name) {
733            return *id;
734        }
735
736        let id = PropertyKeyId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
737        let name: Arc<str> = name.into();
738        self.id_to_name.write().push(Arc::clone(&name));
739        self.name_to_id.insert(name, id);
740        id
741    }
742
743    fn get_id(&self, name: &str) -> Option<PropertyKeyId> {
744        self.name_to_id.get(name).map(|r| *r)
745    }
746
747    fn get_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
748        self.id_to_name.read().get(id.as_u32() as usize).cloned()
749    }
750
751    fn count(&self) -> usize {
752        self.id_to_name.read().len()
753    }
754
755    fn all_names(&self) -> Vec<Arc<str>> {
756        self.id_to_name.read().clone()
757    }
758}
759
760// === Edge Type Catalog ===
761
762/// Bidirectional mapping between edge type names and IDs.
763struct EdgeTypeCatalog {
764    name_to_id: GrafeoConcurrentMap<Arc<str>, EdgeTypeId>,
765    id_to_name: RwLock<Vec<Arc<str>>>,
766    next_id: AtomicU32,
767    create_lock: Mutex<()>,
768}
769
770impl EdgeTypeCatalog {
771    fn new() -> Self {
772        Self {
773            name_to_id: grafeo_concurrent_map(),
774            id_to_name: RwLock::new(Vec::new()),
775            next_id: AtomicU32::new(0),
776            create_lock: Mutex::new(()),
777        }
778    }
779
780    fn get_or_create(&self, name: &str) -> EdgeTypeId {
781        // Fast path: shard-level read (no global lock)
782        if let Some(id) = self.name_to_id.get(name) {
783            return *id;
784        }
785
786        // Slow path: serialize creates to keep id_to_name consistent
787        let _guard = self.create_lock.lock();
788        if let Some(id) = self.name_to_id.get(name) {
789            return *id;
790        }
791
792        let id = EdgeTypeId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
793        let name: Arc<str> = name.into();
794        self.id_to_name.write().push(Arc::clone(&name));
795        self.name_to_id.insert(name, id);
796        id
797    }
798
799    fn get_id(&self, name: &str) -> Option<EdgeTypeId> {
800        self.name_to_id.get(name).map(|r| *r)
801    }
802
803    fn get_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
804        self.id_to_name.read().get(id.as_u32() as usize).cloned()
805    }
806
807    fn count(&self) -> usize {
808        self.id_to_name.read().len()
809    }
810
811    fn all_names(&self) -> Vec<Arc<str>> {
812        self.id_to_name.read().clone()
813    }
814}
815
816// === Index Catalog ===
817
818/// Type of index.
819#[derive(Debug, Clone, Copy, PartialEq, Eq)]
820pub enum IndexType {
821    /// Hash index for equality lookups.
822    Hash,
823    /// BTree index for range queries.
824    BTree,
825    /// Full-text index for text search.
826    FullText,
827}
828
829/// Index definition.
830#[derive(Debug, Clone)]
831pub struct IndexDefinition {
832    /// The index ID.
833    pub id: IndexId,
834    /// User-defined index name (e.g., `idx_person_name`).
835    pub name: String,
836    /// The label this index applies to.
837    pub label: LabelId,
838    /// The property key being indexed.
839    pub property_key: PropertyKeyId,
840    /// The type of index.
841    pub index_type: IndexType,
842}
843
844/// Manages index definitions.
845struct IndexCatalog {
846    indexes: RwLock<HashMap<IndexId, IndexDefinition>>,
847    label_indexes: RwLock<HashMap<LabelId, Vec<IndexId>>>,
848    label_property_indexes: RwLock<HashMap<(LabelId, PropertyKeyId), Vec<IndexId>>>,
849    name_index: RwLock<HashMap<String, IndexId>>,
850    next_id: AtomicU32,
851}
852
853impl IndexCatalog {
854    fn new() -> Self {
855        Self {
856            indexes: RwLock::new(HashMap::new()),
857            label_indexes: RwLock::new(HashMap::new()),
858            label_property_indexes: RwLock::new(HashMap::new()),
859            name_index: RwLock::new(HashMap::new()),
860            next_id: AtomicU32::new(0),
861        }
862    }
863
864    fn create(
865        &self,
866        name: &str,
867        label: LabelId,
868        property_key: PropertyKeyId,
869        index_type: IndexType,
870    ) -> IndexId {
871        let id = IndexId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
872        let definition = IndexDefinition {
873            id,
874            name: name.to_string(),
875            label,
876            property_key,
877            index_type,
878        };
879
880        let mut indexes = self.indexes.write();
881        let mut label_indexes = self.label_indexes.write();
882        let mut label_property_indexes = self.label_property_indexes.write();
883
884        indexes.insert(id, definition);
885        label_indexes.entry(label).or_default().push(id);
886        label_property_indexes
887            .entry((label, property_key))
888            .or_default()
889            .push(id);
890        self.name_index.write().insert(name.to_string(), id);
891
892        id
893    }
894
895    fn drop(&self, id: IndexId) -> bool {
896        let mut indexes = self.indexes.write();
897        let mut label_indexes = self.label_indexes.write();
898        let mut label_property_indexes = self.label_property_indexes.write();
899
900        if let Some(definition) = indexes.remove(&id) {
901            // Remove from label index
902            if let Some(ids) = label_indexes.get_mut(&definition.label) {
903                ids.retain(|&i| i != id);
904            }
905            // Remove from label-property index
906            if let Some(ids) =
907                label_property_indexes.get_mut(&(definition.label, definition.property_key))
908            {
909                ids.retain(|&i| i != id);
910            }
911            // Remove from name index
912            self.name_index.write().remove(&definition.name);
913            true
914        } else {
915            false
916        }
917    }
918
919    fn find_by_name(&self, name: &str) -> Option<IndexId> {
920        self.name_index.read().get(name).copied()
921    }
922
923    fn get(&self, id: IndexId) -> Option<IndexDefinition> {
924        self.indexes.read().get(&id).cloned()
925    }
926
927    fn for_label(&self, label: LabelId) -> Vec<IndexId> {
928        self.label_indexes
929            .read()
930            .get(&label)
931            .cloned()
932            .unwrap_or_default()
933    }
934
935    fn for_label_property(&self, label: LabelId, property_key: PropertyKeyId) -> Vec<IndexId> {
936        self.label_property_indexes
937            .read()
938            .get(&(label, property_key))
939            .cloned()
940            .unwrap_or_default()
941    }
942
943    fn count(&self) -> usize {
944        self.indexes.read().len()
945    }
946
947    fn all(&self) -> Vec<IndexDefinition> {
948        self.indexes.read().values().cloned().collect()
949    }
950}
951
952// === Type Definitions ===
953
954/// Data type for a typed property in a node or edge type definition.
955#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
956pub enum PropertyDataType {
957    /// UTF-8 string.
958    String,
959    /// 64-bit signed integer.
960    Int64,
961    /// 64-bit floating point.
962    Float64,
963    /// Boolean.
964    Bool,
965    /// Calendar date.
966    Date,
967    /// Time of day.
968    Time,
969    /// Timestamp (date + time).
970    Timestamp,
971    /// Duration / interval.
972    Duration,
973    /// Ordered list of values (untyped).
974    List,
975    /// Typed list: `LIST<element_type>` (ISO sec 4.16.9).
976    ListTyped(Box<PropertyDataType>),
977    /// Key-value map.
978    Map,
979    /// Raw bytes.
980    Bytes,
981    /// Node reference type (ISO sec 4.15.1).
982    Node,
983    /// Edge reference type (ISO sec 4.15.1).
984    Edge,
985    /// Any type (no enforcement).
986    Any,
987}
988
989impl PropertyDataType {
990    /// Parses a type name string (case-insensitive) into a `PropertyDataType`.
991    #[must_use]
992    pub fn from_type_name(name: &str) -> Self {
993        let upper = name.to_uppercase();
994        // Handle parameterized LIST<element_type>
995        if let Some(inner) = upper
996            .strip_prefix("LIST<")
997            .and_then(|s| s.strip_suffix('>'))
998        {
999            return Self::ListTyped(Box::new(Self::from_type_name(inner)));
1000        }
1001        match upper.as_str() {
1002            "STRING" | "VARCHAR" | "TEXT" => Self::String,
1003            "INT" | "INT64" | "INTEGER" | "BIGINT" => Self::Int64,
1004            "FLOAT" | "FLOAT64" | "DOUBLE" | "REAL" => Self::Float64,
1005            "BOOL" | "BOOLEAN" => Self::Bool,
1006            "DATE" => Self::Date,
1007            "TIME" => Self::Time,
1008            "TIMESTAMP" | "DATETIME" => Self::Timestamp,
1009            "DURATION" | "INTERVAL" => Self::Duration,
1010            "LIST" | "ARRAY" => Self::List,
1011            "MAP" | "RECORD" => Self::Map,
1012            "BYTES" | "BINARY" | "BLOB" => Self::Bytes,
1013            "NODE" => Self::Node,
1014            "EDGE" | "RELATIONSHIP" => Self::Edge,
1015            _ => Self::Any,
1016        }
1017    }
1018
1019    /// Checks whether a value conforms to this type.
1020    #[must_use]
1021    pub fn matches(&self, value: &Value) -> bool {
1022        match (self, value) {
1023            (Self::Any, _) | (_, Value::Null) => true,
1024            (Self::String, Value::String(_)) => true,
1025            (Self::Int64, Value::Int64(_)) => true,
1026            (Self::Float64, Value::Float64(_)) => true,
1027            (Self::Bool, Value::Bool(_)) => true,
1028            (Self::Date, Value::Date(_)) => true,
1029            (Self::Time, Value::Time(_)) => true,
1030            (Self::Timestamp, Value::Timestamp(_)) => true,
1031            (Self::Duration, Value::Duration(_)) => true,
1032            (Self::List, Value::List(_)) => true,
1033            (Self::ListTyped(elem_type), Value::List(items)) => {
1034                items.iter().all(|item| elem_type.matches(item))
1035            }
1036            (Self::Bytes, Value::Bytes(_)) => true,
1037            // Node/Edge reference types match Map values (graph elements are
1038            // represented as maps with _id, _labels/_type, and properties)
1039            (Self::Node | Self::Edge, Value::Map(_)) => true,
1040            _ => false,
1041        }
1042    }
1043}
1044
1045impl std::fmt::Display for PropertyDataType {
1046    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1047        match self {
1048            Self::String => write!(f, "STRING"),
1049            Self::Int64 => write!(f, "INT64"),
1050            Self::Float64 => write!(f, "FLOAT64"),
1051            Self::Bool => write!(f, "BOOLEAN"),
1052            Self::Date => write!(f, "DATE"),
1053            Self::Time => write!(f, "TIME"),
1054            Self::Timestamp => write!(f, "TIMESTAMP"),
1055            Self::Duration => write!(f, "DURATION"),
1056            Self::List => write!(f, "LIST"),
1057            Self::ListTyped(elem) => write!(f, "LIST<{elem}>"),
1058            Self::Map => write!(f, "MAP"),
1059            Self::Bytes => write!(f, "BYTES"),
1060            Self::Node => write!(f, "NODE"),
1061            Self::Edge => write!(f, "EDGE"),
1062            Self::Any => write!(f, "ANY"),
1063        }
1064    }
1065}
1066
1067/// A typed property within a node or edge type definition.
1068#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1069pub struct TypedProperty {
1070    /// Property name.
1071    pub name: String,
1072    /// Expected data type.
1073    pub data_type: PropertyDataType,
1074    /// Whether NULL values are allowed.
1075    pub nullable: bool,
1076    /// Default value (used when property is not explicitly set).
1077    pub default_value: Option<Value>,
1078}
1079
1080/// A constraint on a node or edge type.
1081#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1082pub enum TypeConstraint {
1083    /// Primary key (implies UNIQUE + NOT NULL).
1084    PrimaryKey(Vec<String>),
1085    /// Uniqueness constraint on one or more properties.
1086    Unique(Vec<String>),
1087    /// NOT NULL constraint on a single property.
1088    NotNull(String),
1089    /// CHECK constraint with a named expression string.
1090    Check {
1091        /// Optional constraint name.
1092        name: Option<String>,
1093        /// Expression (stored as string for now).
1094        expression: String,
1095    },
1096}
1097
1098/// Definition of a node type (label schema).
1099#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1100pub struct NodeTypeDefinition {
1101    /// Type name (corresponds to a label).
1102    pub name: String,
1103    /// Typed property definitions.
1104    pub properties: Vec<TypedProperty>,
1105    /// Type-level constraints.
1106    pub constraints: Vec<TypeConstraint>,
1107    /// Parent type names for inheritance (GQL `EXTENDS`).
1108    pub parent_types: Vec<String>,
1109}
1110
1111/// Definition of an edge type (relationship type schema).
1112#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1113pub struct EdgeTypeDefinition {
1114    /// Type name (corresponds to an edge type / relationship type).
1115    pub name: String,
1116    /// Typed property definitions.
1117    pub properties: Vec<TypedProperty>,
1118    /// Type-level constraints.
1119    pub constraints: Vec<TypeConstraint>,
1120    /// Allowed source node types (empty = any).
1121    pub source_node_types: Vec<String>,
1122    /// Allowed target node types (empty = any).
1123    pub target_node_types: Vec<String>,
1124}
1125
1126/// Definition of a graph type (constrains which node/edge types a graph allows).
1127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1128pub struct GraphTypeDefinition {
1129    /// Graph type name.
1130    pub name: String,
1131    /// Allowed node types (empty = open).
1132    pub allowed_node_types: Vec<String>,
1133    /// Allowed edge types (empty = open).
1134    pub allowed_edge_types: Vec<String>,
1135    /// Whether unlisted types are permitted.
1136    pub open: bool,
1137}
1138
1139/// Definition of a stored procedure.
1140#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1141pub struct ProcedureDefinition {
1142    /// Procedure name.
1143    pub name: String,
1144    /// Parameter definitions: (name, type).
1145    pub params: Vec<(String, String)>,
1146    /// Return column definitions: (name, type).
1147    pub returns: Vec<(String, String)>,
1148    /// Raw GQL query body.
1149    pub body: String,
1150}
1151
1152// === Schema Catalog ===
1153
1154/// Schema constraints and type definitions.
1155pub struct SchemaCatalog {
1156    /// Properties that must be unique for a given label.
1157    unique_constraints: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1158    /// Properties that are required (NOT NULL) for a given label.
1159    required_properties: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1160    /// Registered node type definitions.
1161    node_types: RwLock<HashMap<String, NodeTypeDefinition>>,
1162    /// Registered edge type definitions.
1163    edge_types: RwLock<HashMap<String, EdgeTypeDefinition>>,
1164    /// Registered graph type definitions.
1165    graph_types: RwLock<HashMap<String, GraphTypeDefinition>>,
1166    /// Schema namespaces.
1167    schemas: RwLock<Vec<String>>,
1168    /// Graph instance to graph type bindings.
1169    graph_type_bindings: RwLock<HashMap<String, String>>,
1170    /// Stored procedure definitions.
1171    procedures: RwLock<HashMap<String, ProcedureDefinition>>,
1172}
1173
1174impl SchemaCatalog {
1175    fn new() -> Self {
1176        Self {
1177            unique_constraints: RwLock::new(HashSet::new()),
1178            required_properties: RwLock::new(HashSet::new()),
1179            node_types: RwLock::new(HashMap::new()),
1180            edge_types: RwLock::new(HashMap::new()),
1181            graph_types: RwLock::new(HashMap::new()),
1182            schemas: RwLock::new(Vec::new()),
1183            graph_type_bindings: RwLock::new(HashMap::new()),
1184            procedures: RwLock::new(HashMap::new()),
1185        }
1186    }
1187
1188    // --- Node type operations ---
1189
1190    /// Registers a new node type definition.
1191    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
1192        let mut types = self.node_types.write();
1193        if types.contains_key(&def.name) {
1194            return Err(CatalogError::TypeAlreadyExists(def.name));
1195        }
1196        types.insert(def.name.clone(), def);
1197        Ok(())
1198    }
1199
1200    /// Registers or replaces a node type definition.
1201    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
1202        self.node_types.write().insert(def.name.clone(), def);
1203    }
1204
1205    /// Drops a node type definition by name.
1206    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
1207        let mut types = self.node_types.write();
1208        if types.remove(name).is_none() {
1209            return Err(CatalogError::TypeNotFound(name.to_string()));
1210        }
1211        Ok(())
1212    }
1213
1214    /// Gets a node type definition by name.
1215    #[must_use]
1216    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1217        self.node_types.read().get(name).cloned()
1218    }
1219
1220    /// Gets a resolved node type with inherited properties and constraints from parents.
1221    ///
1222    /// Walks the parent chain depth-first, collecting properties and constraints.
1223    /// Detects cycles via a visited set. Child properties override parent ones
1224    /// with the same name.
1225    #[must_use]
1226    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1227        let types = self.node_types.read();
1228        let base = types.get(name)?;
1229        if base.parent_types.is_empty() {
1230            return Some(base.clone());
1231        }
1232        let mut visited = HashSet::new();
1233        visited.insert(name.to_string());
1234        let mut all_properties = Vec::new();
1235        let mut all_constraints = Vec::new();
1236        Self::collect_inherited(
1237            &types,
1238            name,
1239            &mut visited,
1240            &mut all_properties,
1241            &mut all_constraints,
1242        );
1243        Some(NodeTypeDefinition {
1244            name: base.name.clone(),
1245            properties: all_properties,
1246            constraints: all_constraints,
1247            parent_types: base.parent_types.clone(),
1248        })
1249    }
1250
1251    /// Recursively collects properties and constraints from a type and its parents.
1252    fn collect_inherited(
1253        types: &HashMap<String, NodeTypeDefinition>,
1254        name: &str,
1255        visited: &mut HashSet<String>,
1256        properties: &mut Vec<TypedProperty>,
1257        constraints: &mut Vec<TypeConstraint>,
1258    ) {
1259        let Some(def) = types.get(name) else { return };
1260        // Walk parents first (depth-first) so child properties override
1261        for parent in &def.parent_types {
1262            if visited.insert(parent.clone()) {
1263                Self::collect_inherited(types, parent, visited, properties, constraints);
1264            }
1265        }
1266        // Add own properties, overriding parent ones with same name
1267        for prop in &def.properties {
1268            if let Some(pos) = properties.iter().position(|p| p.name == prop.name) {
1269                properties[pos] = prop.clone();
1270            } else {
1271                properties.push(prop.clone());
1272            }
1273        }
1274        // Append own constraints (no dedup, constraints are additive)
1275        constraints.extend(def.constraints.iter().cloned());
1276    }
1277
1278    /// Returns all registered node type names.
1279    #[must_use]
1280    pub fn all_node_types(&self) -> Vec<String> {
1281        self.node_types.read().keys().cloned().collect()
1282    }
1283
1284    /// Returns all registered node type definitions.
1285    #[must_use]
1286    pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
1287        self.node_types.read().values().cloned().collect()
1288    }
1289
1290    // --- Edge type operations ---
1291
1292    /// Registers a new edge type definition.
1293    pub fn register_edge_type(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
1294        let mut types = self.edge_types.write();
1295        if types.contains_key(&def.name) {
1296            return Err(CatalogError::TypeAlreadyExists(def.name));
1297        }
1298        types.insert(def.name.clone(), def);
1299        Ok(())
1300    }
1301
1302    /// Registers or replaces an edge type definition.
1303    pub fn register_or_replace_edge_type(&self, def: EdgeTypeDefinition) {
1304        self.edge_types.write().insert(def.name.clone(), def);
1305    }
1306
1307    /// Drops an edge type definition by name.
1308    pub fn drop_edge_type(&self, name: &str) -> Result<(), CatalogError> {
1309        let mut types = self.edge_types.write();
1310        if types.remove(name).is_none() {
1311            return Err(CatalogError::TypeNotFound(name.to_string()));
1312        }
1313        Ok(())
1314    }
1315
1316    /// Gets an edge type definition by name.
1317    #[must_use]
1318    pub fn get_edge_type(&self, name: &str) -> Option<EdgeTypeDefinition> {
1319        self.edge_types.read().get(name).cloned()
1320    }
1321
1322    /// Returns all registered edge type names.
1323    #[must_use]
1324    pub fn all_edge_types(&self) -> Vec<String> {
1325        self.edge_types.read().keys().cloned().collect()
1326    }
1327
1328    /// Returns all registered edge type definitions.
1329    #[must_use]
1330    pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
1331        self.edge_types.read().values().cloned().collect()
1332    }
1333
1334    // --- Graph type operations ---
1335
1336    /// Registers a new graph type definition.
1337    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
1338        let mut types = self.graph_types.write();
1339        if types.contains_key(&def.name) {
1340            return Err(CatalogError::TypeAlreadyExists(def.name));
1341        }
1342        types.insert(def.name.clone(), def);
1343        Ok(())
1344    }
1345
1346    /// Drops a graph type definition by name.
1347    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
1348        let mut types = self.graph_types.write();
1349        if types.remove(name).is_none() {
1350            return Err(CatalogError::TypeNotFound(name.to_string()));
1351        }
1352        Ok(())
1353    }
1354
1355    /// Gets a graph type definition by name.
1356    #[must_use]
1357    pub fn get_graph_type(&self, name: &str) -> Option<GraphTypeDefinition> {
1358        self.graph_types.read().get(name).cloned()
1359    }
1360
1361    /// Returns all registered graph type names.
1362    #[must_use]
1363    pub fn all_graph_types(&self) -> Vec<String> {
1364        self.graph_types.read().keys().cloned().collect()
1365    }
1366
1367    /// Returns all registered graph type definitions.
1368    #[must_use]
1369    pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
1370        self.graph_types.read().values().cloned().collect()
1371    }
1372
1373    // --- Schema namespace operations ---
1374
1375    /// Registers a schema namespace.
1376    pub fn register_schema(&self, name: String) -> Result<(), CatalogError> {
1377        let mut schemas = self.schemas.write();
1378        if schemas.contains(&name) {
1379            return Err(CatalogError::SchemaAlreadyExists(name));
1380        }
1381        schemas.push(name);
1382        Ok(())
1383    }
1384
1385    /// Drops a schema namespace.
1386    pub fn drop_schema(&self, name: &str) -> Result<(), CatalogError> {
1387        let mut schemas = self.schemas.write();
1388        if let Some(pos) = schemas.iter().position(|s| s == name) {
1389            schemas.remove(pos);
1390            Ok(())
1391        } else {
1392            Err(CatalogError::SchemaNotFound(name.to_string()))
1393        }
1394    }
1395
1396    /// Checks whether a schema namespace exists.
1397    #[must_use]
1398    pub fn schema_exists(&self, name: &str) -> bool {
1399        self.schemas
1400            .read()
1401            .iter()
1402            .any(|s| s.eq_ignore_ascii_case(name))
1403    }
1404
1405    /// Returns all registered schema namespace names.
1406    #[must_use]
1407    pub fn schema_names(&self) -> Vec<String> {
1408        self.schemas.read().clone()
1409    }
1410
1411    // --- ALTER operations ---
1412
1413    /// Adds a constraint to an existing node type, creating a minimal type if needed.
1414    pub fn add_constraint_to_type(
1415        &self,
1416        label: &str,
1417        constraint: TypeConstraint,
1418    ) -> Result<(), CatalogError> {
1419        let mut types = self.node_types.write();
1420        if let Some(def) = types.get_mut(label) {
1421            def.constraints.push(constraint);
1422        } else {
1423            // Auto-create a minimal type definition for the label
1424            types.insert(
1425                label.to_string(),
1426                NodeTypeDefinition {
1427                    name: label.to_string(),
1428                    properties: Vec::new(),
1429                    constraints: vec![constraint],
1430                    parent_types: Vec::new(),
1431                },
1432            );
1433        }
1434        Ok(())
1435    }
1436
1437    /// Adds a property to an existing node type.
1438    pub fn alter_node_type_add_property(
1439        &self,
1440        type_name: &str,
1441        property: TypedProperty,
1442    ) -> Result<(), CatalogError> {
1443        let mut types = self.node_types.write();
1444        let def = types
1445            .get_mut(type_name)
1446            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1447        if def.properties.iter().any(|p| p.name == property.name) {
1448            return Err(CatalogError::TypeAlreadyExists(format!(
1449                "property {} on {}",
1450                property.name, type_name
1451            )));
1452        }
1453        def.properties.push(property);
1454        Ok(())
1455    }
1456
1457    /// Drops a property from an existing node type.
1458    pub fn alter_node_type_drop_property(
1459        &self,
1460        type_name: &str,
1461        property_name: &str,
1462    ) -> Result<(), CatalogError> {
1463        let mut types = self.node_types.write();
1464        let def = types
1465            .get_mut(type_name)
1466            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1467        let len_before = def.properties.len();
1468        def.properties.retain(|p| p.name != property_name);
1469        if def.properties.len() == len_before {
1470            return Err(CatalogError::TypeNotFound(format!(
1471                "property {} on {}",
1472                property_name, type_name
1473            )));
1474        }
1475        Ok(())
1476    }
1477
1478    /// Adds a property to an existing edge type.
1479    pub fn alter_edge_type_add_property(
1480        &self,
1481        type_name: &str,
1482        property: TypedProperty,
1483    ) -> Result<(), CatalogError> {
1484        let mut types = self.edge_types.write();
1485        let def = types
1486            .get_mut(type_name)
1487            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1488        if def.properties.iter().any(|p| p.name == property.name) {
1489            return Err(CatalogError::TypeAlreadyExists(format!(
1490                "property {} on {}",
1491                property.name, type_name
1492            )));
1493        }
1494        def.properties.push(property);
1495        Ok(())
1496    }
1497
1498    /// Drops a property from an existing edge type.
1499    pub fn alter_edge_type_drop_property(
1500        &self,
1501        type_name: &str,
1502        property_name: &str,
1503    ) -> Result<(), CatalogError> {
1504        let mut types = self.edge_types.write();
1505        let def = types
1506            .get_mut(type_name)
1507            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1508        let len_before = def.properties.len();
1509        def.properties.retain(|p| p.name != property_name);
1510        if def.properties.len() == len_before {
1511            return Err(CatalogError::TypeNotFound(format!(
1512                "property {} on {}",
1513                property_name, type_name
1514            )));
1515        }
1516        Ok(())
1517    }
1518
1519    /// Adds a node type to a graph type.
1520    pub fn alter_graph_type_add_node_type(
1521        &self,
1522        graph_type_name: &str,
1523        node_type: String,
1524    ) -> Result<(), CatalogError> {
1525        let mut types = self.graph_types.write();
1526        let def = types
1527            .get_mut(graph_type_name)
1528            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1529        if !def.allowed_node_types.contains(&node_type) {
1530            def.allowed_node_types.push(node_type);
1531        }
1532        Ok(())
1533    }
1534
1535    /// Drops a node type from a graph type.
1536    pub fn alter_graph_type_drop_node_type(
1537        &self,
1538        graph_type_name: &str,
1539        node_type: &str,
1540    ) -> Result<(), CatalogError> {
1541        let mut types = self.graph_types.write();
1542        let def = types
1543            .get_mut(graph_type_name)
1544            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1545        def.allowed_node_types.retain(|t| t != node_type);
1546        Ok(())
1547    }
1548
1549    /// Adds an edge type to a graph type.
1550    pub fn alter_graph_type_add_edge_type(
1551        &self,
1552        graph_type_name: &str,
1553        edge_type: String,
1554    ) -> Result<(), CatalogError> {
1555        let mut types = self.graph_types.write();
1556        let def = types
1557            .get_mut(graph_type_name)
1558            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1559        if !def.allowed_edge_types.contains(&edge_type) {
1560            def.allowed_edge_types.push(edge_type);
1561        }
1562        Ok(())
1563    }
1564
1565    /// Drops an edge type from a graph type.
1566    pub fn alter_graph_type_drop_edge_type(
1567        &self,
1568        graph_type_name: &str,
1569        edge_type: &str,
1570    ) -> Result<(), CatalogError> {
1571        let mut types = self.graph_types.write();
1572        let def = types
1573            .get_mut(graph_type_name)
1574            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1575        def.allowed_edge_types.retain(|t| t != edge_type);
1576        Ok(())
1577    }
1578
1579    // --- Procedure operations ---
1580
1581    /// Registers a stored procedure.
1582    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
1583        let mut procs = self.procedures.write();
1584        if procs.contains_key(&def.name) {
1585            return Err(CatalogError::TypeAlreadyExists(def.name.clone()));
1586        }
1587        procs.insert(def.name.clone(), def);
1588        Ok(())
1589    }
1590
1591    /// Replaces or creates a stored procedure.
1592    pub fn replace_procedure(&self, def: ProcedureDefinition) {
1593        self.procedures.write().insert(def.name.clone(), def);
1594    }
1595
1596    /// Drops a stored procedure.
1597    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
1598        let mut procs = self.procedures.write();
1599        if procs.remove(name).is_none() {
1600            return Err(CatalogError::TypeNotFound(name.to_string()));
1601        }
1602        Ok(())
1603    }
1604
1605    /// Gets a stored procedure by name.
1606    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
1607        self.procedures.read().get(name).cloned()
1608    }
1609
1610    /// Returns all registered procedure definitions.
1611    #[must_use]
1612    pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
1613        self.procedures.read().values().cloned().collect()
1614    }
1615
1616    /// Returns all graph type bindings (graph_name, type_name).
1617    #[must_use]
1618    pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
1619        self.graph_type_bindings
1620            .read()
1621            .iter()
1622            .map(|(k, v)| (k.clone(), v.clone()))
1623            .collect()
1624    }
1625
1626    fn add_unique_constraint(
1627        &self,
1628        label: LabelId,
1629        property_key: PropertyKeyId,
1630    ) -> Result<(), CatalogError> {
1631        let mut constraints = self.unique_constraints.write();
1632        let key = (label, property_key);
1633        if !constraints.insert(key) {
1634            return Err(CatalogError::ConstraintAlreadyExists);
1635        }
1636        Ok(())
1637    }
1638
1639    fn add_required_property(
1640        &self,
1641        label: LabelId,
1642        property_key: PropertyKeyId,
1643    ) -> Result<(), CatalogError> {
1644        let mut required = self.required_properties.write();
1645        let key = (label, property_key);
1646        if !required.insert(key) {
1647            return Err(CatalogError::ConstraintAlreadyExists);
1648        }
1649        Ok(())
1650    }
1651
1652    fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1653        self.required_properties
1654            .read()
1655            .contains(&(label, property_key))
1656    }
1657
1658    fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1659        self.unique_constraints
1660            .read()
1661            .contains(&(label, property_key))
1662    }
1663}
1664
1665// === Errors ===
1666
1667/// Catalog-related errors.
1668#[derive(Debug, Clone, PartialEq, Eq)]
1669pub enum CatalogError {
1670    /// Schema constraints are not enabled.
1671    SchemaNotEnabled,
1672    /// The constraint already exists.
1673    ConstraintAlreadyExists,
1674    /// The label does not exist.
1675    LabelNotFound(String),
1676    /// The property key does not exist.
1677    PropertyKeyNotFound(String),
1678    /// The edge type does not exist.
1679    EdgeTypeNotFound(String),
1680    /// The index does not exist.
1681    IndexNotFound(IndexId),
1682    /// A type with this name already exists.
1683    TypeAlreadyExists(String),
1684    /// No type with this name exists.
1685    TypeNotFound(String),
1686    /// A schema with this name already exists.
1687    SchemaAlreadyExists(String),
1688    /// No schema with this name exists.
1689    SchemaNotFound(String),
1690}
1691
1692impl std::fmt::Display for CatalogError {
1693    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1694        match self {
1695            Self::SchemaNotEnabled => write!(f, "Schema constraints are not enabled"),
1696            Self::ConstraintAlreadyExists => write!(f, "Constraint already exists"),
1697            Self::LabelNotFound(name) => write!(f, "Label not found: {name}"),
1698            Self::PropertyKeyNotFound(name) => write!(f, "Property key not found: {name}"),
1699            Self::EdgeTypeNotFound(name) => write!(f, "Edge type not found: {name}"),
1700            Self::IndexNotFound(id) => write!(f, "Index not found: {id}"),
1701            Self::TypeAlreadyExists(name) => write!(f, "Type already exists: {name}"),
1702            Self::TypeNotFound(name) => write!(f, "Type not found: {name}"),
1703            Self::SchemaAlreadyExists(name) => write!(f, "Schema already exists: {name}"),
1704            Self::SchemaNotFound(name) => write!(f, "Schema not found: {name}"),
1705        }
1706    }
1707}
1708
1709impl std::error::Error for CatalogError {}
1710
1711// === Constraint Validator ===
1712
1713use grafeo_core::execution::operators::ConstraintValidator;
1714use grafeo_core::execution::operators::OperatorError;
1715
1716/// Validates schema constraints during mutation operations using the Catalog.
1717///
1718/// Checks type definitions, NOT NULL constraints, and UNIQUE constraints
1719/// against registered node/edge type definitions.
1720pub struct CatalogConstraintValidator {
1721    catalog: Arc<Catalog>,
1722    /// Optional graph name for graph-type-bound validation.
1723    graph_name: Option<String>,
1724    /// Optional graph store for UNIQUE constraint enforcement via index lookup.
1725    store: Option<Arc<dyn grafeo_core::graph::GraphStore>>,
1726}
1727
1728impl CatalogConstraintValidator {
1729    /// Creates a new validator wrapping the given catalog.
1730    pub fn new(catalog: Arc<Catalog>) -> Self {
1731        Self {
1732            catalog,
1733            graph_name: None,
1734            store: None,
1735        }
1736    }
1737
1738    /// Sets the graph name for graph-type-bound validation.
1739    pub fn with_graph_name(mut self, name: String) -> Self {
1740        self.graph_name = Some(name);
1741        self
1742    }
1743
1744    /// Attaches a graph store for UNIQUE constraint enforcement.
1745    pub fn with_store(mut self, store: Arc<dyn grafeo_core::graph::GraphStore>) -> Self {
1746        self.store = Some(store);
1747        self
1748    }
1749}
1750
1751impl ConstraintValidator for CatalogConstraintValidator {
1752    fn validate_node_property(
1753        &self,
1754        labels: &[String],
1755        key: &str,
1756        value: &Value,
1757    ) -> Result<(), OperatorError> {
1758        for label in labels {
1759            if let Some(type_def) = self.catalog.resolved_node_type(label)
1760                && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1761            {
1762                // Check NOT NULL
1763                if !typed_prop.nullable && *value == Value::Null {
1764                    return Err(OperatorError::ConstraintViolation(format!(
1765                        "property '{key}' on :{label} is NOT NULL, cannot set to null"
1766                    )));
1767                }
1768                // Check type compatibility
1769                if *value != Value::Null && !typed_prop.data_type.matches(value) {
1770                    return Err(OperatorError::ConstraintViolation(format!(
1771                        "property '{key}' on :{label} expects {:?}, got {:?}",
1772                        typed_prop.data_type, value
1773                    )));
1774                }
1775            }
1776        }
1777        Ok(())
1778    }
1779
1780    fn validate_node_complete(
1781        &self,
1782        labels: &[String],
1783        properties: &[(String, Value)],
1784    ) -> Result<(), OperatorError> {
1785        let prop_names: std::collections::HashSet<&str> =
1786            properties.iter().map(|(n, _)| n.as_str()).collect();
1787
1788        for label in labels {
1789            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1790                // Check that all NOT NULL properties are present
1791                for typed_prop in &type_def.properties {
1792                    if !typed_prop.nullable
1793                        && typed_prop.default_value.is_none()
1794                        && !prop_names.contains(typed_prop.name.as_str())
1795                    {
1796                        return Err(OperatorError::ConstraintViolation(format!(
1797                            "missing required property '{}' on :{label}",
1798                            typed_prop.name
1799                        )));
1800                    }
1801                }
1802                // Check type-level constraints
1803                for constraint in &type_def.constraints {
1804                    match constraint {
1805                        TypeConstraint::NotNull(prop_name) => {
1806                            if !prop_names.contains(prop_name.as_str()) {
1807                                return Err(OperatorError::ConstraintViolation(format!(
1808                                    "missing required property '{prop_name}' on :{label} (NOT NULL constraint)"
1809                                )));
1810                            }
1811                        }
1812                        TypeConstraint::PrimaryKey(key_props) => {
1813                            for pk in key_props {
1814                                if !prop_names.contains(pk.as_str()) {
1815                                    return Err(OperatorError::ConstraintViolation(format!(
1816                                        "missing primary key property '{pk}' on :{label}"
1817                                    )));
1818                                }
1819                            }
1820                        }
1821                        TypeConstraint::Check { name, expression } => {
1822                            match check_eval::evaluate_check(expression, properties) {
1823                                Ok(true) => {}
1824                                Ok(false) => {
1825                                    let constraint_name = name.as_deref().unwrap_or("unnamed");
1826                                    return Err(OperatorError::ConstraintViolation(format!(
1827                                        "CHECK constraint '{constraint_name}' violated on :{label}"
1828                                    )));
1829                                }
1830                                Err(err) => {
1831                                    return Err(OperatorError::ConstraintViolation(format!(
1832                                        "CHECK constraint evaluation error: {err}"
1833                                    )));
1834                                }
1835                            }
1836                        }
1837                        TypeConstraint::Unique(_) => {}
1838                    }
1839                }
1840            }
1841        }
1842        Ok(())
1843    }
1844
1845    fn check_unique_node_property(
1846        &self,
1847        labels: &[String],
1848        key: &str,
1849        value: &Value,
1850    ) -> Result<(), OperatorError> {
1851        // Skip uniqueness check for NULL values (NULLs are never duplicates)
1852        if *value == Value::Null {
1853            return Ok(());
1854        }
1855        for label in labels {
1856            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1857                for constraint in &type_def.constraints {
1858                    let is_unique = match constraint {
1859                        TypeConstraint::Unique(props) => props.iter().any(|p| p == key),
1860                        TypeConstraint::PrimaryKey(props) => props.iter().any(|p| p == key),
1861                        _ => false,
1862                    };
1863                    if is_unique && let Some(ref store) = self.store {
1864                        let existing = store.find_nodes_by_property(key, value);
1865                        for node_id in existing {
1866                            if let Some(node) = store.get_node(node_id) {
1867                                let has_label = node.labels.iter().any(|l| l.as_str() == label);
1868                                if has_label {
1869                                    return Err(OperatorError::ConstraintViolation(format!(
1870                                        "UNIQUE constraint violation: property '{key}' \
1871                                             with value {value:?} already exists on :{label}"
1872                                    )));
1873                                }
1874                            }
1875                        }
1876                    }
1877                }
1878            }
1879        }
1880        Ok(())
1881    }
1882
1883    fn validate_edge_property(
1884        &self,
1885        edge_type: &str,
1886        key: &str,
1887        value: &Value,
1888    ) -> Result<(), OperatorError> {
1889        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type)
1890            && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1891        {
1892            // Check NOT NULL
1893            if !typed_prop.nullable && *value == Value::Null {
1894                return Err(OperatorError::ConstraintViolation(format!(
1895                    "property '{key}' on :{edge_type} is NOT NULL, cannot set to null"
1896                )));
1897            }
1898            // Check type compatibility
1899            if *value != Value::Null && !typed_prop.data_type.matches(value) {
1900                return Err(OperatorError::ConstraintViolation(format!(
1901                    "property '{key}' on :{edge_type} expects {:?}, got {:?}",
1902                    typed_prop.data_type, value
1903                )));
1904            }
1905        }
1906        Ok(())
1907    }
1908
1909    fn validate_edge_complete(
1910        &self,
1911        edge_type: &str,
1912        properties: &[(String, Value)],
1913    ) -> Result<(), OperatorError> {
1914        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type) {
1915            let prop_names: std::collections::HashSet<&str> =
1916                properties.iter().map(|(n, _)| n.as_str()).collect();
1917
1918            for typed_prop in &type_def.properties {
1919                if !typed_prop.nullable
1920                    && typed_prop.default_value.is_none()
1921                    && !prop_names.contains(typed_prop.name.as_str())
1922                {
1923                    return Err(OperatorError::ConstraintViolation(format!(
1924                        "missing required property '{}' on :{edge_type}",
1925                        typed_prop.name
1926                    )));
1927                }
1928            }
1929
1930            for constraint in &type_def.constraints {
1931                if let TypeConstraint::Check { name, expression } = constraint {
1932                    match check_eval::evaluate_check(expression, properties) {
1933                        Ok(true) => {}
1934                        Ok(false) => {
1935                            let constraint_name = name.as_deref().unwrap_or("unnamed");
1936                            return Err(OperatorError::ConstraintViolation(format!(
1937                                "CHECK constraint '{constraint_name}' violated on :{edge_type}"
1938                            )));
1939                        }
1940                        Err(err) => {
1941                            return Err(OperatorError::ConstraintViolation(format!(
1942                                "CHECK constraint evaluation error: {err}"
1943                            )));
1944                        }
1945                    }
1946                }
1947            }
1948        }
1949        Ok(())
1950    }
1951
1952    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
1953        let Some(ref graph_name) = self.graph_name else {
1954            return Ok(());
1955        };
1956        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1957            return Ok(());
1958        };
1959        let Some(gt) = self
1960            .catalog
1961            .schema()
1962            .and_then(|s| s.get_graph_type(&type_name))
1963        else {
1964            return Ok(());
1965        };
1966        if !gt.open && !gt.allowed_node_types.is_empty() {
1967            let allowed = labels
1968                .iter()
1969                .any(|l| gt.allowed_node_types.iter().any(|a| a == l));
1970            if !allowed {
1971                return Err(OperatorError::ConstraintViolation(format!(
1972                    "node labels {labels:?} are not allowed by graph type '{}'",
1973                    gt.name
1974                )));
1975            }
1976        }
1977        Ok(())
1978    }
1979
1980    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
1981        let Some(ref graph_name) = self.graph_name else {
1982            return Ok(());
1983        };
1984        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1985            return Ok(());
1986        };
1987        let Some(gt) = self
1988            .catalog
1989            .schema()
1990            .and_then(|s| s.get_graph_type(&type_name))
1991        else {
1992            return Ok(());
1993        };
1994        if !gt.open && !gt.allowed_edge_types.is_empty() {
1995            let allowed = gt.allowed_edge_types.iter().any(|a| a == edge_type);
1996            if !allowed {
1997                return Err(OperatorError::ConstraintViolation(format!(
1998                    "edge type '{edge_type}' is not allowed by graph type '{}'",
1999                    gt.name
2000                )));
2001            }
2002        }
2003        Ok(())
2004    }
2005
2006    fn validate_edge_endpoints(
2007        &self,
2008        edge_type: &str,
2009        source_labels: &[String],
2010        target_labels: &[String],
2011    ) -> Result<(), OperatorError> {
2012        let Some(type_def) = self.catalog.get_edge_type_def(edge_type) else {
2013            return Ok(());
2014        };
2015        if !type_def.source_node_types.is_empty() {
2016            let source_ok = source_labels
2017                .iter()
2018                .any(|l| type_def.source_node_types.iter().any(|s| s == l));
2019            if !source_ok {
2020                return Err(OperatorError::ConstraintViolation(format!(
2021                    "source node labels {source_labels:?} are not allowed for edge type '{edge_type}', \
2022                     expected one of {:?}",
2023                    type_def.source_node_types
2024                )));
2025            }
2026        }
2027        if !type_def.target_node_types.is_empty() {
2028            let target_ok = target_labels
2029                .iter()
2030                .any(|l| type_def.target_node_types.iter().any(|t| t == l));
2031            if !target_ok {
2032                return Err(OperatorError::ConstraintViolation(format!(
2033                    "target node labels {target_labels:?} are not allowed for edge type '{edge_type}', \
2034                     expected one of {:?}",
2035                    type_def.target_node_types
2036                )));
2037            }
2038        }
2039        Ok(())
2040    }
2041
2042    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
2043        for label in labels {
2044            if let Some(type_def) = self.catalog.resolved_node_type(label) {
2045                for typed_prop in &type_def.properties {
2046                    if let Some(ref default) = typed_prop.default_value {
2047                        let already_set = properties.iter().any(|(n, _)| n == &typed_prop.name);
2048                        if !already_set {
2049                            properties.push((typed_prop.name.clone(), default.clone()));
2050                        }
2051                    }
2052                }
2053            }
2054        }
2055    }
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060    use super::*;
2061    use std::thread;
2062
2063    #[test]
2064    fn test_catalog_labels() {
2065        let catalog = Catalog::new();
2066
2067        // Get or create labels
2068        let person_id = catalog.get_or_create_label("Person");
2069        let company_id = catalog.get_or_create_label("Company");
2070
2071        // IDs should be different
2072        assert_ne!(person_id, company_id);
2073
2074        // Getting the same label should return the same ID
2075        assert_eq!(catalog.get_or_create_label("Person"), person_id);
2076
2077        // Should be able to look up by name
2078        assert_eq!(catalog.get_label_id("Person"), Some(person_id));
2079        assert_eq!(catalog.get_label_id("Company"), Some(company_id));
2080        assert_eq!(catalog.get_label_id("Unknown"), None);
2081
2082        // Should be able to look up by ID
2083        assert_eq!(catalog.get_label_name(person_id).as_deref(), Some("Person"));
2084        assert_eq!(
2085            catalog.get_label_name(company_id).as_deref(),
2086            Some("Company")
2087        );
2088
2089        // Count should be correct
2090        assert_eq!(catalog.label_count(), 2);
2091    }
2092
2093    #[test]
2094    fn test_catalog_property_keys() {
2095        let catalog = Catalog::new();
2096
2097        let name_id = catalog.get_or_create_property_key("name");
2098        let age_id = catalog.get_or_create_property_key("age");
2099
2100        assert_ne!(name_id, age_id);
2101        assert_eq!(catalog.get_or_create_property_key("name"), name_id);
2102        assert_eq!(catalog.get_property_key_id("name"), Some(name_id));
2103        assert_eq!(
2104            catalog.get_property_key_name(name_id).as_deref(),
2105            Some("name")
2106        );
2107        assert_eq!(catalog.property_key_count(), 2);
2108    }
2109
2110    #[test]
2111    fn test_catalog_edge_types() {
2112        let catalog = Catalog::new();
2113
2114        let knows_id = catalog.get_or_create_edge_type("KNOWS");
2115        let works_at_id = catalog.get_or_create_edge_type("WORKS_AT");
2116
2117        assert_ne!(knows_id, works_at_id);
2118        assert_eq!(catalog.get_or_create_edge_type("KNOWS"), knows_id);
2119        assert_eq!(catalog.get_edge_type_id("KNOWS"), Some(knows_id));
2120        assert_eq!(
2121            catalog.get_edge_type_name(knows_id).as_deref(),
2122            Some("KNOWS")
2123        );
2124        assert_eq!(catalog.edge_type_count(), 2);
2125    }
2126
2127    #[test]
2128    fn test_catalog_indexes() {
2129        let catalog = Catalog::new();
2130
2131        let person_id = catalog.get_or_create_label("Person");
2132        let name_id = catalog.get_or_create_property_key("name");
2133        let age_id = catalog.get_or_create_property_key("age");
2134
2135        // Create indexes
2136        let idx1 = catalog.create_index("idx_person_name", person_id, name_id, IndexType::Hash);
2137        let idx2 = catalog.create_index("idx_person_age", person_id, age_id, IndexType::BTree);
2138
2139        assert_ne!(idx1, idx2);
2140        assert_eq!(catalog.index_count(), 2);
2141
2142        // Look up by label
2143        let label_indexes = catalog.indexes_for_label(person_id);
2144        assert_eq!(label_indexes.len(), 2);
2145        assert!(label_indexes.contains(&idx1));
2146        assert!(label_indexes.contains(&idx2));
2147
2148        // Look up by label and property
2149        let name_indexes = catalog.indexes_for_label_property(person_id, name_id);
2150        assert_eq!(name_indexes.len(), 1);
2151        assert_eq!(name_indexes[0], idx1);
2152
2153        // Get definition
2154        let def = catalog.get_index(idx1).unwrap();
2155        assert_eq!(def.label, person_id);
2156        assert_eq!(def.property_key, name_id);
2157        assert_eq!(def.index_type, IndexType::Hash);
2158
2159        // Drop index
2160        assert!(catalog.drop_index(idx1));
2161        assert_eq!(catalog.index_count(), 1);
2162        assert!(catalog.get_index(idx1).is_none());
2163        assert_eq!(catalog.indexes_for_label(person_id).len(), 1);
2164    }
2165
2166    #[test]
2167    fn test_catalog_schema_constraints() {
2168        let catalog = Catalog::with_schema();
2169
2170        let person_id = catalog.get_or_create_label("Person");
2171        let email_id = catalog.get_or_create_property_key("email");
2172        let name_id = catalog.get_or_create_property_key("name");
2173
2174        // Add constraints
2175        assert!(catalog.add_unique_constraint(person_id, email_id).is_ok());
2176        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2177
2178        // Check constraints
2179        assert!(catalog.is_property_unique(person_id, email_id));
2180        assert!(!catalog.is_property_unique(person_id, name_id));
2181        assert!(catalog.is_property_required(person_id, name_id));
2182        assert!(!catalog.is_property_required(person_id, email_id));
2183
2184        // Duplicate constraint should fail
2185        assert_eq!(
2186            catalog.add_unique_constraint(person_id, email_id),
2187            Err(CatalogError::ConstraintAlreadyExists)
2188        );
2189    }
2190
2191    #[test]
2192    fn test_catalog_schema_always_enabled() {
2193        // Catalog::new() always enables schema
2194        let catalog = Catalog::new();
2195        assert!(catalog.has_schema());
2196
2197        let person_id = catalog.get_or_create_label("Person");
2198        let email_id = catalog.get_or_create_property_key("email");
2199
2200        // Should succeed with schema enabled
2201        assert_eq!(catalog.add_unique_constraint(person_id, email_id), Ok(()));
2202    }
2203
2204    // === Additional tests for comprehensive coverage ===
2205
2206    #[test]
2207    fn test_catalog_default() {
2208        let catalog = Catalog::default();
2209        assert!(catalog.has_schema());
2210        assert_eq!(catalog.label_count(), 0);
2211        assert_eq!(catalog.property_key_count(), 0);
2212        assert_eq!(catalog.edge_type_count(), 0);
2213        assert_eq!(catalog.index_count(), 0);
2214    }
2215
2216    #[test]
2217    fn test_catalog_all_labels() {
2218        let catalog = Catalog::new();
2219
2220        catalog.get_or_create_label("Person");
2221        catalog.get_or_create_label("Company");
2222        catalog.get_or_create_label("Product");
2223
2224        let all = catalog.all_labels();
2225        assert_eq!(all.len(), 3);
2226        assert!(all.iter().any(|l| l.as_ref() == "Person"));
2227        assert!(all.iter().any(|l| l.as_ref() == "Company"));
2228        assert!(all.iter().any(|l| l.as_ref() == "Product"));
2229    }
2230
2231    #[test]
2232    fn test_catalog_all_property_keys() {
2233        let catalog = Catalog::new();
2234
2235        catalog.get_or_create_property_key("name");
2236        catalog.get_or_create_property_key("age");
2237        catalog.get_or_create_property_key("email");
2238
2239        let all = catalog.all_property_keys();
2240        assert_eq!(all.len(), 3);
2241        assert!(all.iter().any(|k| k.as_ref() == "name"));
2242        assert!(all.iter().any(|k| k.as_ref() == "age"));
2243        assert!(all.iter().any(|k| k.as_ref() == "email"));
2244    }
2245
2246    #[test]
2247    fn test_catalog_all_edge_types() {
2248        let catalog = Catalog::new();
2249
2250        catalog.get_or_create_edge_type("KNOWS");
2251        catalog.get_or_create_edge_type("WORKS_AT");
2252        catalog.get_or_create_edge_type("LIVES_IN");
2253
2254        let all = catalog.all_edge_types();
2255        assert_eq!(all.len(), 3);
2256        assert!(all.iter().any(|t| t.as_ref() == "KNOWS"));
2257        assert!(all.iter().any(|t| t.as_ref() == "WORKS_AT"));
2258        assert!(all.iter().any(|t| t.as_ref() == "LIVES_IN"));
2259    }
2260
2261    #[test]
2262    fn test_catalog_invalid_id_lookup() {
2263        let catalog = Catalog::new();
2264
2265        // Create one label to ensure IDs are allocated
2266        let _ = catalog.get_or_create_label("Person");
2267
2268        // Try to look up non-existent IDs
2269        let invalid_label = LabelId::new(999);
2270        let invalid_property = PropertyKeyId::new(999);
2271        let invalid_edge_type = EdgeTypeId::new(999);
2272        let invalid_index = IndexId::new(999);
2273
2274        assert!(catalog.get_label_name(invalid_label).is_none());
2275        assert!(catalog.get_property_key_name(invalid_property).is_none());
2276        assert!(catalog.get_edge_type_name(invalid_edge_type).is_none());
2277        assert!(catalog.get_index(invalid_index).is_none());
2278    }
2279
2280    #[test]
2281    fn test_catalog_drop_nonexistent_index() {
2282        let catalog = Catalog::new();
2283        let invalid_index = IndexId::new(999);
2284        assert!(!catalog.drop_index(invalid_index));
2285    }
2286
2287    #[test]
2288    fn test_catalog_indexes_for_nonexistent_label() {
2289        let catalog = Catalog::new();
2290        let invalid_label = LabelId::new(999);
2291        let invalid_property = PropertyKeyId::new(999);
2292
2293        assert!(catalog.indexes_for_label(invalid_label).is_empty());
2294        assert!(
2295            catalog
2296                .indexes_for_label_property(invalid_label, invalid_property)
2297                .is_empty()
2298        );
2299    }
2300
2301    #[test]
2302    fn test_catalog_multiple_indexes_same_property() {
2303        let catalog = Catalog::new();
2304
2305        let person_id = catalog.get_or_create_label("Person");
2306        let name_id = catalog.get_or_create_property_key("name");
2307
2308        // Create multiple indexes on the same property with different types
2309        let hash_idx = catalog.create_index("idx_hash", person_id, name_id, IndexType::Hash);
2310        let btree_idx = catalog.create_index("idx_btree", person_id, name_id, IndexType::BTree);
2311        let fulltext_idx =
2312            catalog.create_index("idx_fulltext", person_id, name_id, IndexType::FullText);
2313
2314        assert_eq!(catalog.index_count(), 3);
2315
2316        let indexes = catalog.indexes_for_label_property(person_id, name_id);
2317        assert_eq!(indexes.len(), 3);
2318        assert!(indexes.contains(&hash_idx));
2319        assert!(indexes.contains(&btree_idx));
2320        assert!(indexes.contains(&fulltext_idx));
2321
2322        // Verify each has the correct type
2323        assert_eq!(
2324            catalog.get_index(hash_idx).unwrap().index_type,
2325            IndexType::Hash
2326        );
2327        assert_eq!(
2328            catalog.get_index(btree_idx).unwrap().index_type,
2329            IndexType::BTree
2330        );
2331        assert_eq!(
2332            catalog.get_index(fulltext_idx).unwrap().index_type,
2333            IndexType::FullText
2334        );
2335    }
2336
2337    #[test]
2338    fn test_catalog_schema_required_property_duplicate() {
2339        let catalog = Catalog::with_schema();
2340
2341        let person_id = catalog.get_or_create_label("Person");
2342        let name_id = catalog.get_or_create_property_key("name");
2343
2344        // First should succeed
2345        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2346
2347        // Duplicate should fail
2348        assert_eq!(
2349            catalog.add_required_property(person_id, name_id),
2350            Err(CatalogError::ConstraintAlreadyExists)
2351        );
2352    }
2353
2354    #[test]
2355    fn test_catalog_schema_check_without_constraints() {
2356        let catalog = Catalog::new();
2357
2358        let person_id = catalog.get_or_create_label("Person");
2359        let name_id = catalog.get_or_create_property_key("name");
2360
2361        // Without schema enabled, these should return false
2362        assert!(!catalog.is_property_unique(person_id, name_id));
2363        assert!(!catalog.is_property_required(person_id, name_id));
2364    }
2365
2366    #[test]
2367    fn test_catalog_has_schema() {
2368        // Both new() and with_schema() enable schema by default
2369        let catalog = Catalog::new();
2370        assert!(catalog.has_schema());
2371
2372        let with_schema = Catalog::with_schema();
2373        assert!(with_schema.has_schema());
2374    }
2375
2376    #[test]
2377    fn test_catalog_error_display() {
2378        assert_eq!(
2379            CatalogError::SchemaNotEnabled.to_string(),
2380            "Schema constraints are not enabled"
2381        );
2382        assert_eq!(
2383            CatalogError::ConstraintAlreadyExists.to_string(),
2384            "Constraint already exists"
2385        );
2386        assert_eq!(
2387            CatalogError::LabelNotFound("Person".to_string()).to_string(),
2388            "Label not found: Person"
2389        );
2390        assert_eq!(
2391            CatalogError::PropertyKeyNotFound("name".to_string()).to_string(),
2392            "Property key not found: name"
2393        );
2394        assert_eq!(
2395            CatalogError::EdgeTypeNotFound("KNOWS".to_string()).to_string(),
2396            "Edge type not found: KNOWS"
2397        );
2398        let idx = IndexId::new(42);
2399        assert!(CatalogError::IndexNotFound(idx).to_string().contains("42"));
2400    }
2401
2402    #[test]
2403    fn test_catalog_concurrent_label_creation() {
2404        use std::sync::Arc;
2405
2406        let catalog = Arc::new(Catalog::new());
2407        let mut handles = vec![];
2408
2409        // Spawn multiple threads trying to create the same labels
2410        for i in 0..10 {
2411            let catalog = Arc::clone(&catalog);
2412            handles.push(thread::spawn(move || {
2413                let label_name = format!("Label{}", i % 3); // Only 3 unique labels
2414                catalog.get_or_create_label(&label_name)
2415            }));
2416        }
2417
2418        let mut ids: Vec<LabelId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2419        ids.sort_by_key(|id| id.as_u32());
2420        ids.dedup();
2421
2422        // Should only have 3 unique label IDs
2423        assert_eq!(ids.len(), 3);
2424        assert_eq!(catalog.label_count(), 3);
2425    }
2426
2427    #[test]
2428    fn test_catalog_concurrent_property_key_creation() {
2429        use std::sync::Arc;
2430
2431        let catalog = Arc::new(Catalog::new());
2432        let mut handles = vec![];
2433
2434        for i in 0..10 {
2435            let catalog = Arc::clone(&catalog);
2436            handles.push(thread::spawn(move || {
2437                let key_name = format!("key{}", i % 4);
2438                catalog.get_or_create_property_key(&key_name)
2439            }));
2440        }
2441
2442        let mut ids: Vec<PropertyKeyId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2443        ids.sort_by_key(|id| id.as_u32());
2444        ids.dedup();
2445
2446        assert_eq!(ids.len(), 4);
2447        assert_eq!(catalog.property_key_count(), 4);
2448    }
2449
2450    #[test]
2451    fn test_catalog_concurrent_index_operations() {
2452        use std::sync::Arc;
2453
2454        let catalog = Arc::new(Catalog::new());
2455        let label = catalog.get_or_create_label("Node");
2456
2457        let mut handles = vec![];
2458
2459        // Create indexes concurrently
2460        for i in 0..5 {
2461            let catalog = Arc::clone(&catalog);
2462            handles.push(thread::spawn(move || {
2463                let prop = PropertyKeyId::new(i);
2464                catalog.create_index(&format!("idx_{i}"), label, prop, IndexType::Hash)
2465            }));
2466        }
2467
2468        let ids: Vec<IndexId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2469        assert_eq!(ids.len(), 5);
2470        assert_eq!(catalog.index_count(), 5);
2471    }
2472
2473    #[test]
2474    fn test_catalog_special_characters_in_names() {
2475        let catalog = Catalog::new();
2476
2477        // Test with various special characters
2478        let label1 = catalog.get_or_create_label("Label With Spaces");
2479        let label2 = catalog.get_or_create_label("Label-With-Dashes");
2480        let label3 = catalog.get_or_create_label("Label_With_Underscores");
2481        let label4 = catalog.get_or_create_label("LabelWithUnicode\u{00E9}");
2482
2483        assert_ne!(label1, label2);
2484        assert_ne!(label2, label3);
2485        assert_ne!(label3, label4);
2486
2487        assert_eq!(
2488            catalog.get_label_name(label1).as_deref(),
2489            Some("Label With Spaces")
2490        );
2491        assert_eq!(
2492            catalog.get_label_name(label4).as_deref(),
2493            Some("LabelWithUnicode\u{00E9}")
2494        );
2495    }
2496
2497    #[test]
2498    fn test_catalog_empty_names() {
2499        let catalog = Catalog::new();
2500
2501        // Empty names should be valid (edge case)
2502        let empty_label = catalog.get_or_create_label("");
2503        let empty_prop = catalog.get_or_create_property_key("");
2504        let empty_edge = catalog.get_or_create_edge_type("");
2505
2506        assert_eq!(catalog.get_label_name(empty_label).as_deref(), Some(""));
2507        assert_eq!(
2508            catalog.get_property_key_name(empty_prop).as_deref(),
2509            Some("")
2510        );
2511        assert_eq!(catalog.get_edge_type_name(empty_edge).as_deref(), Some(""));
2512
2513        // Calling again should return same ID
2514        assert_eq!(catalog.get_or_create_label(""), empty_label);
2515    }
2516
2517    #[test]
2518    fn test_catalog_large_number_of_entries() {
2519        let catalog = Catalog::new();
2520
2521        // Create many labels
2522        for i in 0..1000 {
2523            catalog.get_or_create_label(&format!("Label{}", i));
2524        }
2525
2526        assert_eq!(catalog.label_count(), 1000);
2527
2528        // Verify we can retrieve them all
2529        let all = catalog.all_labels();
2530        assert_eq!(all.len(), 1000);
2531
2532        // Verify a specific one
2533        let id = catalog.get_label_id("Label500").unwrap();
2534        assert_eq!(catalog.get_label_name(id).as_deref(), Some("Label500"));
2535    }
2536
2537    #[test]
2538    fn test_index_definition_debug() {
2539        let def = IndexDefinition {
2540            id: IndexId::new(1),
2541            name: "test_index".to_string(),
2542            label: LabelId::new(2),
2543            property_key: PropertyKeyId::new(3),
2544            index_type: IndexType::Hash,
2545        };
2546
2547        // Should be able to debug print
2548        let debug_str = format!("{:?}", def);
2549        assert!(debug_str.contains("IndexDefinition"));
2550        assert!(debug_str.contains("Hash"));
2551    }
2552
2553    #[test]
2554    fn test_index_type_equality() {
2555        assert_eq!(IndexType::Hash, IndexType::Hash);
2556        assert_ne!(IndexType::Hash, IndexType::BTree);
2557        assert_ne!(IndexType::BTree, IndexType::FullText);
2558
2559        // Clone
2560        let t = IndexType::Hash;
2561        let t2 = t;
2562        assert_eq!(t, t2);
2563    }
2564
2565    #[test]
2566    fn test_catalog_error_equality() {
2567        assert_eq!(
2568            CatalogError::SchemaNotEnabled,
2569            CatalogError::SchemaNotEnabled
2570        );
2571        assert_eq!(
2572            CatalogError::ConstraintAlreadyExists,
2573            CatalogError::ConstraintAlreadyExists
2574        );
2575        assert_eq!(
2576            CatalogError::LabelNotFound("X".to_string()),
2577            CatalogError::LabelNotFound("X".to_string())
2578        );
2579        assert_ne!(
2580            CatalogError::LabelNotFound("X".to_string()),
2581            CatalogError::LabelNotFound("Y".to_string())
2582        );
2583    }
2584}