Skip to main content

grafeo_engine/catalog/
mod.rs

1//! Schema metadata - what labels, properties, and indexes exist.
2//!
3//! The catalog is the "dictionary" of your database. When you write `(:Person)`,
4//! the catalog maps "Person" to an internal LabelId. This indirection keeps
5//! storage compact while names stay readable.
6//!
7//! | What it tracks | Why it matters |
8//! | -------------- | -------------- |
9//! | Labels | Maps "Person" → LabelId for efficient storage |
10//! | Property keys | Maps "name" → PropertyKeyId |
11//! | Edge types | Maps "KNOWS" → EdgeTypeId |
12//! | Indexes | Which properties are indexed for fast lookups |
13
14mod check_eval;
15
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19
20use parking_lot::{Mutex, RwLock};
21
22use grafeo_common::collections::{GrafeoConcurrentMap, grafeo_concurrent_map};
23use grafeo_common::types::{EdgeTypeId, IndexId, LabelId, PropertyKeyId, Value};
24
25/// The database's schema dictionary - maps names to compact internal IDs.
26///
27/// You rarely interact with this directly. The query processor uses it to
28/// resolve names like "Person" and "name" to internal IDs.
29pub struct Catalog {
30    /// Label name-to-ID mappings.
31    labels: LabelCatalog,
32    /// Property key name-to-ID mappings.
33    property_keys: PropertyCatalog,
34    /// Edge type name-to-ID mappings.
35    edge_types: EdgeTypeCatalog,
36    /// Index definitions.
37    indexes: IndexCatalog,
38    /// Optional schema constraints.
39    schema: Option<SchemaCatalog>,
40}
41
42impl Catalog {
43    /// Creates a new empty catalog with schema support enabled.
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            labels: LabelCatalog::new(),
48            property_keys: PropertyCatalog::new(),
49            edge_types: EdgeTypeCatalog::new(),
50            indexes: IndexCatalog::new(),
51            schema: Some(SchemaCatalog::new()),
52        }
53    }
54
55    /// Creates a new catalog with schema constraints enabled.
56    ///
57    /// This is now equivalent to `new()` since schema is always enabled.
58    #[must_use]
59    pub fn with_schema() -> Self {
60        Self::new()
61    }
62
63    // === Label Operations ===
64
65    /// Gets or creates a label ID for the given label name.
66    pub fn get_or_create_label(&self, name: &str) -> LabelId {
67        self.labels.get_or_create(name)
68    }
69
70    /// Gets the label ID for a label name, if it exists.
71    #[must_use]
72    pub fn get_label_id(&self, name: &str) -> Option<LabelId> {
73        self.labels.get_id(name)
74    }
75
76    /// Gets the label name for a label ID, if it exists.
77    #[must_use]
78    pub fn get_label_name(&self, id: LabelId) -> Option<Arc<str>> {
79        self.labels.get_name(id)
80    }
81
82    /// Returns the number of distinct labels.
83    #[must_use]
84    pub fn label_count(&self) -> usize {
85        self.labels.count()
86    }
87
88    /// Returns all label names.
89    #[must_use]
90    pub fn all_labels(&self) -> Vec<Arc<str>> {
91        self.labels.all_names()
92    }
93
94    // === Property Key Operations ===
95
96    /// Gets or creates a property key ID for the given property key name.
97    pub fn get_or_create_property_key(&self, name: &str) -> PropertyKeyId {
98        self.property_keys.get_or_create(name)
99    }
100
101    /// Gets the property key ID for a property key name, if it exists.
102    #[must_use]
103    pub fn get_property_key_id(&self, name: &str) -> Option<PropertyKeyId> {
104        self.property_keys.get_id(name)
105    }
106
107    /// Gets the property key name for a property key ID, if it exists.
108    #[must_use]
109    pub fn get_property_key_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
110        self.property_keys.get_name(id)
111    }
112
113    /// Returns the number of distinct property keys.
114    #[must_use]
115    pub fn property_key_count(&self) -> usize {
116        self.property_keys.count()
117    }
118
119    /// Returns all property key names.
120    #[must_use]
121    pub fn all_property_keys(&self) -> Vec<Arc<str>> {
122        self.property_keys.all_names()
123    }
124
125    // === Edge Type Operations ===
126
127    /// Gets or creates an edge type ID for the given edge type name.
128    pub fn get_or_create_edge_type(&self, name: &str) -> EdgeTypeId {
129        self.edge_types.get_or_create(name)
130    }
131
132    /// Gets the edge type ID for an edge type name, if it exists.
133    #[must_use]
134    pub fn get_edge_type_id(&self, name: &str) -> Option<EdgeTypeId> {
135        self.edge_types.get_id(name)
136    }
137
138    /// Gets the edge type name for an edge type ID, if it exists.
139    #[must_use]
140    pub fn get_edge_type_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
141        self.edge_types.get_name(id)
142    }
143
144    /// Returns the number of distinct edge types.
145    #[must_use]
146    pub fn edge_type_count(&self) -> usize {
147        self.edge_types.count()
148    }
149
150    /// Returns all edge type names.
151    #[must_use]
152    pub fn all_edge_types(&self) -> Vec<Arc<str>> {
153        self.edge_types.all_names()
154    }
155
156    // === Index Operations ===
157
158    /// Creates a new index on a label and property key.
159    pub fn create_index(
160        &self,
161        label: LabelId,
162        property_key: PropertyKeyId,
163        index_type: IndexType,
164    ) -> IndexId {
165        self.indexes.create(label, property_key, index_type)
166    }
167
168    /// Drops an index by ID.
169    pub fn drop_index(&self, id: IndexId) -> bool {
170        self.indexes.drop(id)
171    }
172
173    /// Gets the index definition for an index ID.
174    #[must_use]
175    pub fn get_index(&self, id: IndexId) -> Option<IndexDefinition> {
176        self.indexes.get(id)
177    }
178
179    /// Finds indexes for a given label.
180    #[must_use]
181    pub fn indexes_for_label(&self, label: LabelId) -> Vec<IndexId> {
182        self.indexes.for_label(label)
183    }
184
185    /// Finds indexes for a given label and property key.
186    #[must_use]
187    pub fn indexes_for_label_property(
188        &self,
189        label: LabelId,
190        property_key: PropertyKeyId,
191    ) -> Vec<IndexId> {
192        self.indexes.for_label_property(label, property_key)
193    }
194
195    /// Returns all index definitions.
196    #[must_use]
197    pub fn all_indexes(&self) -> Vec<IndexDefinition> {
198        self.indexes.all()
199    }
200
201    /// Returns the number of indexes.
202    #[must_use]
203    pub fn index_count(&self) -> usize {
204        self.indexes.count()
205    }
206
207    // === Schema Operations ===
208
209    /// Returns whether schema constraints are enabled.
210    #[must_use]
211    pub fn has_schema(&self) -> bool {
212        self.schema.is_some()
213    }
214
215    /// Adds a uniqueness constraint.
216    ///
217    /// Returns an error if schema is not enabled or constraint already exists.
218    pub fn add_unique_constraint(
219        &self,
220        label: LabelId,
221        property_key: PropertyKeyId,
222    ) -> Result<(), CatalogError> {
223        match &self.schema {
224            Some(schema) => schema.add_unique_constraint(label, property_key),
225            None => Err(CatalogError::SchemaNotEnabled),
226        }
227    }
228
229    /// Adds a required property constraint (NOT NULL).
230    ///
231    /// Returns an error if schema is not enabled or constraint already exists.
232    pub fn add_required_property(
233        &self,
234        label: LabelId,
235        property_key: PropertyKeyId,
236    ) -> Result<(), CatalogError> {
237        match &self.schema {
238            Some(schema) => schema.add_required_property(label, property_key),
239            None => Err(CatalogError::SchemaNotEnabled),
240        }
241    }
242
243    /// Checks if a property is required for a label.
244    #[must_use]
245    pub fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
246        self.schema
247            .as_ref()
248            .is_some_and(|s| s.is_property_required(label, property_key))
249    }
250
251    /// Checks if a property must be unique for a label.
252    #[must_use]
253    pub fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
254        self.schema
255            .as_ref()
256            .is_some_and(|s| s.is_property_unique(label, property_key))
257    }
258
259    // === Type Definition Operations ===
260
261    /// Returns a reference to the schema catalog.
262    #[must_use]
263    pub fn schema(&self) -> Option<&SchemaCatalog> {
264        self.schema.as_ref()
265    }
266
267    /// Registers a node type definition.
268    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
269        match &self.schema {
270            Some(schema) => schema.register_node_type(def),
271            None => Err(CatalogError::SchemaNotEnabled),
272        }
273    }
274
275    /// Registers or replaces a node type definition.
276    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
277        if let Some(schema) = &self.schema {
278            schema.register_or_replace_node_type(def);
279        }
280    }
281
282    /// Drops a node type definition.
283    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
284        match &self.schema {
285            Some(schema) => schema.drop_node_type(name),
286            None => Err(CatalogError::SchemaNotEnabled),
287        }
288    }
289
290    /// Gets a node type definition by name.
291    #[must_use]
292    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
293        self.schema.as_ref().and_then(|s| s.get_node_type(name))
294    }
295
296    /// Gets a resolved node type with inherited properties from parents.
297    #[must_use]
298    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
299        self.schema
300            .as_ref()
301            .and_then(|s| s.resolved_node_type(name))
302    }
303
304    /// Returns all registered node type names.
305    #[must_use]
306    pub fn all_node_type_names(&self) -> Vec<String> {
307        self.schema
308            .as_ref()
309            .map(SchemaCatalog::all_node_types)
310            .unwrap_or_default()
311    }
312
313    /// Returns all registered edge type definition names.
314    #[must_use]
315    pub fn all_edge_type_names(&self) -> Vec<String> {
316        self.schema
317            .as_ref()
318            .map(SchemaCatalog::all_edge_types)
319            .unwrap_or_default()
320    }
321
322    /// Registers an edge type definition.
323    pub fn register_edge_type_def(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
324        match &self.schema {
325            Some(schema) => schema.register_edge_type(def),
326            None => Err(CatalogError::SchemaNotEnabled),
327        }
328    }
329
330    /// Registers or replaces an edge type definition.
331    pub fn register_or_replace_edge_type_def(&self, def: EdgeTypeDefinition) {
332        if let Some(schema) = &self.schema {
333            schema.register_or_replace_edge_type(def);
334        }
335    }
336
337    /// Drops an edge type definition.
338    pub fn drop_edge_type_def(&self, name: &str) -> Result<(), CatalogError> {
339        match &self.schema {
340            Some(schema) => schema.drop_edge_type(name),
341            None => Err(CatalogError::SchemaNotEnabled),
342        }
343    }
344
345    /// Gets an edge type definition by name.
346    #[must_use]
347    pub fn get_edge_type_def(&self, name: &str) -> Option<EdgeTypeDefinition> {
348        self.schema.as_ref().and_then(|s| s.get_edge_type(name))
349    }
350
351    /// Registers a graph type definition.
352    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
353        match &self.schema {
354            Some(schema) => schema.register_graph_type(def),
355            None => Err(CatalogError::SchemaNotEnabled),
356        }
357    }
358
359    /// Drops a graph type definition.
360    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
361        match &self.schema {
362            Some(schema) => schema.drop_graph_type(name),
363            None => Err(CatalogError::SchemaNotEnabled),
364        }
365    }
366
367    /// Returns all registered graph type names.
368    #[must_use]
369    pub fn all_graph_type_names(&self) -> Vec<String> {
370        self.schema
371            .as_ref()
372            .map(SchemaCatalog::all_graph_types)
373            .unwrap_or_default()
374    }
375
376    /// Gets a graph type definition by name.
377    #[must_use]
378    pub fn get_graph_type_def(&self, name: &str) -> Option<GraphTypeDefinition> {
379        self.schema.as_ref().and_then(|s| s.get_graph_type(name))
380    }
381
382    /// Registers a schema namespace.
383    pub fn register_schema_namespace(&self, name: String) -> Result<(), CatalogError> {
384        match &self.schema {
385            Some(schema) => schema.register_schema(name),
386            None => Err(CatalogError::SchemaNotEnabled),
387        }
388    }
389
390    /// Drops a schema namespace.
391    pub fn drop_schema_namespace(&self, name: &str) -> Result<(), CatalogError> {
392        match &self.schema {
393            Some(schema) => schema.drop_schema(name),
394            None => Err(CatalogError::SchemaNotEnabled),
395        }
396    }
397
398    /// Checks whether a schema namespace exists.
399    #[must_use]
400    pub fn schema_exists(&self, name: &str) -> bool {
401        self.schema.as_ref().is_some_and(|s| s.schema_exists(name))
402    }
403
404    /// Returns all registered schema namespace names.
405    #[must_use]
406    pub fn schema_names(&self) -> Vec<String> {
407        self.schema
408            .as_ref()
409            .map(|s| s.schema_names())
410            .unwrap_or_default()
411    }
412
413    /// Adds a constraint to an existing node type, creating a minimal type if needed.
414    pub fn add_constraint_to_type(
415        &self,
416        label: &str,
417        constraint: TypeConstraint,
418    ) -> Result<(), CatalogError> {
419        match &self.schema {
420            Some(schema) => schema.add_constraint_to_type(label, constraint),
421            None => Err(CatalogError::SchemaNotEnabled),
422        }
423    }
424
425    /// Adds a property to a node type.
426    pub fn alter_node_type_add_property(
427        &self,
428        type_name: &str,
429        property: TypedProperty,
430    ) -> Result<(), CatalogError> {
431        match &self.schema {
432            Some(schema) => schema.alter_node_type_add_property(type_name, property),
433            None => Err(CatalogError::SchemaNotEnabled),
434        }
435    }
436
437    /// Drops a property from a node type.
438    pub fn alter_node_type_drop_property(
439        &self,
440        type_name: &str,
441        property_name: &str,
442    ) -> Result<(), CatalogError> {
443        match &self.schema {
444            Some(schema) => schema.alter_node_type_drop_property(type_name, property_name),
445            None => Err(CatalogError::SchemaNotEnabled),
446        }
447    }
448
449    /// Adds a property to an edge type.
450    pub fn alter_edge_type_add_property(
451        &self,
452        type_name: &str,
453        property: TypedProperty,
454    ) -> Result<(), CatalogError> {
455        match &self.schema {
456            Some(schema) => schema.alter_edge_type_add_property(type_name, property),
457            None => Err(CatalogError::SchemaNotEnabled),
458        }
459    }
460
461    /// Drops a property from an edge type.
462    pub fn alter_edge_type_drop_property(
463        &self,
464        type_name: &str,
465        property_name: &str,
466    ) -> Result<(), CatalogError> {
467        match &self.schema {
468            Some(schema) => schema.alter_edge_type_drop_property(type_name, property_name),
469            None => Err(CatalogError::SchemaNotEnabled),
470        }
471    }
472
473    /// Adds a node type to a graph type.
474    pub fn alter_graph_type_add_node_type(
475        &self,
476        graph_type_name: &str,
477        node_type: String,
478    ) -> Result<(), CatalogError> {
479        match &self.schema {
480            Some(schema) => schema.alter_graph_type_add_node_type(graph_type_name, node_type),
481            None => Err(CatalogError::SchemaNotEnabled),
482        }
483    }
484
485    /// Drops a node type from a graph type.
486    pub fn alter_graph_type_drop_node_type(
487        &self,
488        graph_type_name: &str,
489        node_type: &str,
490    ) -> Result<(), CatalogError> {
491        match &self.schema {
492            Some(schema) => schema.alter_graph_type_drop_node_type(graph_type_name, node_type),
493            None => Err(CatalogError::SchemaNotEnabled),
494        }
495    }
496
497    /// Adds an edge type to a graph type.
498    pub fn alter_graph_type_add_edge_type(
499        &self,
500        graph_type_name: &str,
501        edge_type: String,
502    ) -> Result<(), CatalogError> {
503        match &self.schema {
504            Some(schema) => schema.alter_graph_type_add_edge_type(graph_type_name, edge_type),
505            None => Err(CatalogError::SchemaNotEnabled),
506        }
507    }
508
509    /// Drops an edge type from a graph type.
510    pub fn alter_graph_type_drop_edge_type(
511        &self,
512        graph_type_name: &str,
513        edge_type: &str,
514    ) -> Result<(), CatalogError> {
515        match &self.schema {
516            Some(schema) => schema.alter_graph_type_drop_edge_type(graph_type_name, edge_type),
517            None => Err(CatalogError::SchemaNotEnabled),
518        }
519    }
520
521    /// Binds a graph instance to a graph type.
522    pub fn bind_graph_type(
523        &self,
524        graph_name: &str,
525        graph_type: String,
526    ) -> Result<(), CatalogError> {
527        match &self.schema {
528            Some(schema) => {
529                // Verify the graph type exists
530                if schema.get_graph_type(&graph_type).is_none() {
531                    return Err(CatalogError::TypeNotFound(graph_type));
532                }
533                schema
534                    .graph_type_bindings
535                    .write()
536                    .insert(graph_name.to_string(), graph_type);
537                Ok(())
538            }
539            None => Err(CatalogError::SchemaNotEnabled),
540        }
541    }
542
543    /// Gets the graph type binding for a graph instance.
544    pub fn get_graph_type_binding(&self, graph_name: &str) -> Option<String> {
545        self.schema
546            .as_ref()?
547            .graph_type_bindings
548            .read()
549            .get(graph_name)
550            .cloned()
551    }
552
553    /// Registers a stored procedure.
554    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
555        match &self.schema {
556            Some(schema) => schema.register_procedure(def),
557            None => Err(CatalogError::SchemaNotEnabled),
558        }
559    }
560
561    /// Replaces or creates a stored procedure.
562    pub fn replace_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
563        match &self.schema {
564            Some(schema) => {
565                schema.replace_procedure(def);
566                Ok(())
567            }
568            None => Err(CatalogError::SchemaNotEnabled),
569        }
570    }
571
572    /// Drops a stored procedure.
573    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
574        match &self.schema {
575            Some(schema) => schema.drop_procedure(name),
576            None => Err(CatalogError::SchemaNotEnabled),
577        }
578    }
579
580    /// Gets a stored procedure by name.
581    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
582        self.schema.as_ref()?.get_procedure(name)
583    }
584
585    /// Returns all registered node type definitions.
586    #[must_use]
587    pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
588        self.schema
589            .as_ref()
590            .map(SchemaCatalog::all_node_type_defs)
591            .unwrap_or_default()
592    }
593
594    /// Returns all registered edge type definitions.
595    #[must_use]
596    pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
597        self.schema
598            .as_ref()
599            .map(SchemaCatalog::all_edge_type_defs)
600            .unwrap_or_default()
601    }
602
603    /// Returns all registered graph type definitions.
604    #[must_use]
605    pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
606        self.schema
607            .as_ref()
608            .map(SchemaCatalog::all_graph_type_defs)
609            .unwrap_or_default()
610    }
611
612    /// Returns all registered procedure definitions.
613    #[must_use]
614    pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
615        self.schema
616            .as_ref()
617            .map(SchemaCatalog::all_procedure_defs)
618            .unwrap_or_default()
619    }
620
621    /// Returns all graph type bindings (graph_name, type_name).
622    #[must_use]
623    pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
624        self.schema
625            .as_ref()
626            .map(SchemaCatalog::all_graph_type_bindings)
627            .unwrap_or_default()
628    }
629}
630
631impl Default for Catalog {
632    fn default() -> Self {
633        Self::new()
634    }
635}
636
637// === Label Catalog ===
638
639/// Bidirectional mapping between label names and IDs.
640///
641/// Uses `DashMap` (shard-level locking) for `name_to_id` so concurrent
642/// readers never block each other. A separate `Mutex` serializes the rare
643/// create path to keep `id_to_name` consistent.
644struct LabelCatalog {
645    name_to_id: GrafeoConcurrentMap<Arc<str>, LabelId>,
646    id_to_name: RwLock<Vec<Arc<str>>>,
647    next_id: AtomicU32,
648    create_lock: Mutex<()>,
649}
650
651impl LabelCatalog {
652    fn new() -> Self {
653        Self {
654            name_to_id: grafeo_concurrent_map(),
655            id_to_name: RwLock::new(Vec::new()),
656            next_id: AtomicU32::new(0),
657            create_lock: Mutex::new(()),
658        }
659    }
660
661    fn get_or_create(&self, name: &str) -> LabelId {
662        // Fast path: shard-level read (no global lock)
663        if let Some(id) = self.name_to_id.get(name) {
664            return *id;
665        }
666
667        // Slow path: serialize creates to keep id_to_name consistent
668        let _guard = self.create_lock.lock();
669        if let Some(id) = self.name_to_id.get(name) {
670            return *id;
671        }
672
673        let id = LabelId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
674        let name: Arc<str> = name.into();
675        self.id_to_name.write().push(Arc::clone(&name));
676        self.name_to_id.insert(name, id);
677        id
678    }
679
680    fn get_id(&self, name: &str) -> Option<LabelId> {
681        self.name_to_id.get(name).map(|r| *r)
682    }
683
684    fn get_name(&self, id: LabelId) -> Option<Arc<str>> {
685        self.id_to_name.read().get(id.as_u32() as usize).cloned()
686    }
687
688    fn count(&self) -> usize {
689        self.id_to_name.read().len()
690    }
691
692    fn all_names(&self) -> Vec<Arc<str>> {
693        self.id_to_name.read().clone()
694    }
695}
696
697// === Property Catalog ===
698
699/// Bidirectional mapping between property key names and IDs.
700struct PropertyCatalog {
701    name_to_id: GrafeoConcurrentMap<Arc<str>, PropertyKeyId>,
702    id_to_name: RwLock<Vec<Arc<str>>>,
703    next_id: AtomicU32,
704    create_lock: Mutex<()>,
705}
706
707impl PropertyCatalog {
708    fn new() -> Self {
709        Self {
710            name_to_id: grafeo_concurrent_map(),
711            id_to_name: RwLock::new(Vec::new()),
712            next_id: AtomicU32::new(0),
713            create_lock: Mutex::new(()),
714        }
715    }
716
717    fn get_or_create(&self, name: &str) -> PropertyKeyId {
718        // Fast path: shard-level read (no global lock)
719        if let Some(id) = self.name_to_id.get(name) {
720            return *id;
721        }
722
723        // Slow path: serialize creates to keep id_to_name consistent
724        let _guard = self.create_lock.lock();
725        if let Some(id) = self.name_to_id.get(name) {
726            return *id;
727        }
728
729        let id = PropertyKeyId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
730        let name: Arc<str> = name.into();
731        self.id_to_name.write().push(Arc::clone(&name));
732        self.name_to_id.insert(name, id);
733        id
734    }
735
736    fn get_id(&self, name: &str) -> Option<PropertyKeyId> {
737        self.name_to_id.get(name).map(|r| *r)
738    }
739
740    fn get_name(&self, id: PropertyKeyId) -> Option<Arc<str>> {
741        self.id_to_name.read().get(id.as_u32() as usize).cloned()
742    }
743
744    fn count(&self) -> usize {
745        self.id_to_name.read().len()
746    }
747
748    fn all_names(&self) -> Vec<Arc<str>> {
749        self.id_to_name.read().clone()
750    }
751}
752
753// === Edge Type Catalog ===
754
755/// Bidirectional mapping between edge type names and IDs.
756struct EdgeTypeCatalog {
757    name_to_id: GrafeoConcurrentMap<Arc<str>, EdgeTypeId>,
758    id_to_name: RwLock<Vec<Arc<str>>>,
759    next_id: AtomicU32,
760    create_lock: Mutex<()>,
761}
762
763impl EdgeTypeCatalog {
764    fn new() -> Self {
765        Self {
766            name_to_id: grafeo_concurrent_map(),
767            id_to_name: RwLock::new(Vec::new()),
768            next_id: AtomicU32::new(0),
769            create_lock: Mutex::new(()),
770        }
771    }
772
773    fn get_or_create(&self, name: &str) -> EdgeTypeId {
774        // Fast path: shard-level read (no global lock)
775        if let Some(id) = self.name_to_id.get(name) {
776            return *id;
777        }
778
779        // Slow path: serialize creates to keep id_to_name consistent
780        let _guard = self.create_lock.lock();
781        if let Some(id) = self.name_to_id.get(name) {
782            return *id;
783        }
784
785        let id = EdgeTypeId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
786        let name: Arc<str> = name.into();
787        self.id_to_name.write().push(Arc::clone(&name));
788        self.name_to_id.insert(name, id);
789        id
790    }
791
792    fn get_id(&self, name: &str) -> Option<EdgeTypeId> {
793        self.name_to_id.get(name).map(|r| *r)
794    }
795
796    fn get_name(&self, id: EdgeTypeId) -> Option<Arc<str>> {
797        self.id_to_name.read().get(id.as_u32() as usize).cloned()
798    }
799
800    fn count(&self) -> usize {
801        self.id_to_name.read().len()
802    }
803
804    fn all_names(&self) -> Vec<Arc<str>> {
805        self.id_to_name.read().clone()
806    }
807}
808
809// === Index Catalog ===
810
811/// Type of index.
812#[derive(Debug, Clone, Copy, PartialEq, Eq)]
813pub enum IndexType {
814    /// Hash index for equality lookups.
815    Hash,
816    /// BTree index for range queries.
817    BTree,
818    /// Full-text index for text search.
819    FullText,
820}
821
822/// Index definition.
823#[derive(Debug, Clone)]
824pub struct IndexDefinition {
825    /// The index ID.
826    pub id: IndexId,
827    /// The label this index applies to.
828    pub label: LabelId,
829    /// The property key being indexed.
830    pub property_key: PropertyKeyId,
831    /// The type of index.
832    pub index_type: IndexType,
833}
834
835/// Manages index definitions.
836struct IndexCatalog {
837    indexes: RwLock<HashMap<IndexId, IndexDefinition>>,
838    label_indexes: RwLock<HashMap<LabelId, Vec<IndexId>>>,
839    label_property_indexes: RwLock<HashMap<(LabelId, PropertyKeyId), Vec<IndexId>>>,
840    next_id: AtomicU32,
841}
842
843impl IndexCatalog {
844    fn new() -> Self {
845        Self {
846            indexes: RwLock::new(HashMap::new()),
847            label_indexes: RwLock::new(HashMap::new()),
848            label_property_indexes: RwLock::new(HashMap::new()),
849            next_id: AtomicU32::new(0),
850        }
851    }
852
853    fn create(
854        &self,
855        label: LabelId,
856        property_key: PropertyKeyId,
857        index_type: IndexType,
858    ) -> IndexId {
859        let id = IndexId::new(self.next_id.fetch_add(1, Ordering::Relaxed));
860        let definition = IndexDefinition {
861            id,
862            label,
863            property_key,
864            index_type,
865        };
866
867        let mut indexes = self.indexes.write();
868        let mut label_indexes = self.label_indexes.write();
869        let mut label_property_indexes = self.label_property_indexes.write();
870
871        indexes.insert(id, definition);
872        label_indexes.entry(label).or_default().push(id);
873        label_property_indexes
874            .entry((label, property_key))
875            .or_default()
876            .push(id);
877
878        id
879    }
880
881    fn drop(&self, id: IndexId) -> bool {
882        let mut indexes = self.indexes.write();
883        let mut label_indexes = self.label_indexes.write();
884        let mut label_property_indexes = self.label_property_indexes.write();
885
886        if let Some(definition) = indexes.remove(&id) {
887            // Remove from label index
888            if let Some(ids) = label_indexes.get_mut(&definition.label) {
889                ids.retain(|&i| i != id);
890            }
891            // Remove from label-property index
892            if let Some(ids) =
893                label_property_indexes.get_mut(&(definition.label, definition.property_key))
894            {
895                ids.retain(|&i| i != id);
896            }
897            true
898        } else {
899            false
900        }
901    }
902
903    fn get(&self, id: IndexId) -> Option<IndexDefinition> {
904        self.indexes.read().get(&id).cloned()
905    }
906
907    fn for_label(&self, label: LabelId) -> Vec<IndexId> {
908        self.label_indexes
909            .read()
910            .get(&label)
911            .cloned()
912            .unwrap_or_default()
913    }
914
915    fn for_label_property(&self, label: LabelId, property_key: PropertyKeyId) -> Vec<IndexId> {
916        self.label_property_indexes
917            .read()
918            .get(&(label, property_key))
919            .cloned()
920            .unwrap_or_default()
921    }
922
923    fn count(&self) -> usize {
924        self.indexes.read().len()
925    }
926
927    fn all(&self) -> Vec<IndexDefinition> {
928        self.indexes.read().values().cloned().collect()
929    }
930}
931
932// === Type Definitions ===
933
934/// Data type for a typed property in a node or edge type definition.
935#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
936pub enum PropertyDataType {
937    /// UTF-8 string.
938    String,
939    /// 64-bit signed integer.
940    Int64,
941    /// 64-bit floating point.
942    Float64,
943    /// Boolean.
944    Bool,
945    /// Calendar date.
946    Date,
947    /// Time of day.
948    Time,
949    /// Timestamp (date + time).
950    Timestamp,
951    /// Duration / interval.
952    Duration,
953    /// Ordered list of values (untyped).
954    List,
955    /// Typed list: `LIST<element_type>` (ISO sec 4.16.9).
956    ListTyped(Box<PropertyDataType>),
957    /// Key-value map.
958    Map,
959    /// Raw bytes.
960    Bytes,
961    /// Node reference type (ISO sec 4.15.1).
962    Node,
963    /// Edge reference type (ISO sec 4.15.1).
964    Edge,
965    /// Any type (no enforcement).
966    Any,
967}
968
969impl PropertyDataType {
970    /// Parses a type name string (case-insensitive) into a `PropertyDataType`.
971    #[must_use]
972    pub fn from_type_name(name: &str) -> Self {
973        let upper = name.to_uppercase();
974        // Handle parameterized LIST<element_type>
975        if let Some(inner) = upper
976            .strip_prefix("LIST<")
977            .and_then(|s| s.strip_suffix('>'))
978        {
979            return Self::ListTyped(Box::new(Self::from_type_name(inner)));
980        }
981        match upper.as_str() {
982            "STRING" | "VARCHAR" | "TEXT" => Self::String,
983            "INT" | "INT64" | "INTEGER" | "BIGINT" => Self::Int64,
984            "FLOAT" | "FLOAT64" | "DOUBLE" | "REAL" => Self::Float64,
985            "BOOL" | "BOOLEAN" => Self::Bool,
986            "DATE" => Self::Date,
987            "TIME" => Self::Time,
988            "TIMESTAMP" | "DATETIME" => Self::Timestamp,
989            "DURATION" | "INTERVAL" => Self::Duration,
990            "LIST" | "ARRAY" => Self::List,
991            "MAP" | "RECORD" => Self::Map,
992            "BYTES" | "BINARY" | "BLOB" => Self::Bytes,
993            "NODE" => Self::Node,
994            "EDGE" | "RELATIONSHIP" => Self::Edge,
995            _ => Self::Any,
996        }
997    }
998
999    /// Checks whether a value conforms to this type.
1000    #[must_use]
1001    pub fn matches(&self, value: &Value) -> bool {
1002        match (self, value) {
1003            (Self::Any, _) | (_, Value::Null) => true,
1004            (Self::String, Value::String(_)) => true,
1005            (Self::Int64, Value::Int64(_)) => true,
1006            (Self::Float64, Value::Float64(_)) => true,
1007            (Self::Bool, Value::Bool(_)) => true,
1008            (Self::Date, Value::Date(_)) => true,
1009            (Self::Time, Value::Time(_)) => true,
1010            (Self::Timestamp, Value::Timestamp(_)) => true,
1011            (Self::Duration, Value::Duration(_)) => true,
1012            (Self::List, Value::List(_)) => true,
1013            (Self::ListTyped(elem_type), Value::List(items)) => {
1014                items.iter().all(|item| elem_type.matches(item))
1015            }
1016            (Self::Bytes, Value::Bytes(_)) => true,
1017            // Node/Edge reference types match Map values (graph elements are
1018            // represented as maps with _id, _labels/_type, and properties)
1019            (Self::Node | Self::Edge, Value::Map(_)) => true,
1020            _ => false,
1021        }
1022    }
1023}
1024
1025impl std::fmt::Display for PropertyDataType {
1026    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027        match self {
1028            Self::String => write!(f, "STRING"),
1029            Self::Int64 => write!(f, "INT64"),
1030            Self::Float64 => write!(f, "FLOAT64"),
1031            Self::Bool => write!(f, "BOOLEAN"),
1032            Self::Date => write!(f, "DATE"),
1033            Self::Time => write!(f, "TIME"),
1034            Self::Timestamp => write!(f, "TIMESTAMP"),
1035            Self::Duration => write!(f, "DURATION"),
1036            Self::List => write!(f, "LIST"),
1037            Self::ListTyped(elem) => write!(f, "LIST<{elem}>"),
1038            Self::Map => write!(f, "MAP"),
1039            Self::Bytes => write!(f, "BYTES"),
1040            Self::Node => write!(f, "NODE"),
1041            Self::Edge => write!(f, "EDGE"),
1042            Self::Any => write!(f, "ANY"),
1043        }
1044    }
1045}
1046
1047/// A typed property within a node or edge type definition.
1048#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1049pub struct TypedProperty {
1050    /// Property name.
1051    pub name: String,
1052    /// Expected data type.
1053    pub data_type: PropertyDataType,
1054    /// Whether NULL values are allowed.
1055    pub nullable: bool,
1056    /// Default value (used when property is not explicitly set).
1057    pub default_value: Option<Value>,
1058}
1059
1060/// A constraint on a node or edge type.
1061#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1062pub enum TypeConstraint {
1063    /// Primary key (implies UNIQUE + NOT NULL).
1064    PrimaryKey(Vec<String>),
1065    /// Uniqueness constraint on one or more properties.
1066    Unique(Vec<String>),
1067    /// NOT NULL constraint on a single property.
1068    NotNull(String),
1069    /// CHECK constraint with a named expression string.
1070    Check {
1071        /// Optional constraint name.
1072        name: Option<String>,
1073        /// Expression (stored as string for now).
1074        expression: String,
1075    },
1076}
1077
1078/// Definition of a node type (label schema).
1079#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1080pub struct NodeTypeDefinition {
1081    /// Type name (corresponds to a label).
1082    pub name: String,
1083    /// Typed property definitions.
1084    pub properties: Vec<TypedProperty>,
1085    /// Type-level constraints.
1086    pub constraints: Vec<TypeConstraint>,
1087    /// Parent type names for inheritance (GQL `EXTENDS`).
1088    pub parent_types: Vec<String>,
1089}
1090
1091/// Definition of an edge type (relationship type schema).
1092#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1093pub struct EdgeTypeDefinition {
1094    /// Type name (corresponds to an edge type / relationship type).
1095    pub name: String,
1096    /// Typed property definitions.
1097    pub properties: Vec<TypedProperty>,
1098    /// Type-level constraints.
1099    pub constraints: Vec<TypeConstraint>,
1100    /// Allowed source node types (empty = any).
1101    pub source_node_types: Vec<String>,
1102    /// Allowed target node types (empty = any).
1103    pub target_node_types: Vec<String>,
1104}
1105
1106/// Definition of a graph type (constrains which node/edge types a graph allows).
1107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1108pub struct GraphTypeDefinition {
1109    /// Graph type name.
1110    pub name: String,
1111    /// Allowed node types (empty = open).
1112    pub allowed_node_types: Vec<String>,
1113    /// Allowed edge types (empty = open).
1114    pub allowed_edge_types: Vec<String>,
1115    /// Whether unlisted types are permitted.
1116    pub open: bool,
1117}
1118
1119/// Definition of a stored procedure.
1120#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1121pub struct ProcedureDefinition {
1122    /// Procedure name.
1123    pub name: String,
1124    /// Parameter definitions: (name, type).
1125    pub params: Vec<(String, String)>,
1126    /// Return column definitions: (name, type).
1127    pub returns: Vec<(String, String)>,
1128    /// Raw GQL query body.
1129    pub body: String,
1130}
1131
1132// === Schema Catalog ===
1133
1134/// Schema constraints and type definitions.
1135pub struct SchemaCatalog {
1136    /// Properties that must be unique for a given label.
1137    unique_constraints: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1138    /// Properties that are required (NOT NULL) for a given label.
1139    required_properties: RwLock<HashSet<(LabelId, PropertyKeyId)>>,
1140    /// Registered node type definitions.
1141    node_types: RwLock<HashMap<String, NodeTypeDefinition>>,
1142    /// Registered edge type definitions.
1143    edge_types: RwLock<HashMap<String, EdgeTypeDefinition>>,
1144    /// Registered graph type definitions.
1145    graph_types: RwLock<HashMap<String, GraphTypeDefinition>>,
1146    /// Schema namespaces.
1147    schemas: RwLock<Vec<String>>,
1148    /// Graph instance to graph type bindings.
1149    graph_type_bindings: RwLock<HashMap<String, String>>,
1150    /// Stored procedure definitions.
1151    procedures: RwLock<HashMap<String, ProcedureDefinition>>,
1152}
1153
1154impl SchemaCatalog {
1155    fn new() -> Self {
1156        Self {
1157            unique_constraints: RwLock::new(HashSet::new()),
1158            required_properties: RwLock::new(HashSet::new()),
1159            node_types: RwLock::new(HashMap::new()),
1160            edge_types: RwLock::new(HashMap::new()),
1161            graph_types: RwLock::new(HashMap::new()),
1162            schemas: RwLock::new(Vec::new()),
1163            graph_type_bindings: RwLock::new(HashMap::new()),
1164            procedures: RwLock::new(HashMap::new()),
1165        }
1166    }
1167
1168    // --- Node type operations ---
1169
1170    /// Registers a new node type definition.
1171    pub fn register_node_type(&self, def: NodeTypeDefinition) -> Result<(), CatalogError> {
1172        let mut types = self.node_types.write();
1173        if types.contains_key(&def.name) {
1174            return Err(CatalogError::TypeAlreadyExists(def.name));
1175        }
1176        types.insert(def.name.clone(), def);
1177        Ok(())
1178    }
1179
1180    /// Registers or replaces a node type definition.
1181    pub fn register_or_replace_node_type(&self, def: NodeTypeDefinition) {
1182        self.node_types.write().insert(def.name.clone(), def);
1183    }
1184
1185    /// Drops a node type definition by name.
1186    pub fn drop_node_type(&self, name: &str) -> Result<(), CatalogError> {
1187        let mut types = self.node_types.write();
1188        if types.remove(name).is_none() {
1189            return Err(CatalogError::TypeNotFound(name.to_string()));
1190        }
1191        Ok(())
1192    }
1193
1194    /// Gets a node type definition by name.
1195    #[must_use]
1196    pub fn get_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1197        self.node_types.read().get(name).cloned()
1198    }
1199
1200    /// Gets a resolved node type with inherited properties and constraints from parents.
1201    ///
1202    /// Walks the parent chain depth-first, collecting properties and constraints.
1203    /// Detects cycles via a visited set. Child properties override parent ones
1204    /// with the same name.
1205    #[must_use]
1206    pub fn resolved_node_type(&self, name: &str) -> Option<NodeTypeDefinition> {
1207        let types = self.node_types.read();
1208        let base = types.get(name)?;
1209        if base.parent_types.is_empty() {
1210            return Some(base.clone());
1211        }
1212        let mut visited = HashSet::new();
1213        visited.insert(name.to_string());
1214        let mut all_properties = Vec::new();
1215        let mut all_constraints = Vec::new();
1216        Self::collect_inherited(
1217            &types,
1218            name,
1219            &mut visited,
1220            &mut all_properties,
1221            &mut all_constraints,
1222        );
1223        Some(NodeTypeDefinition {
1224            name: base.name.clone(),
1225            properties: all_properties,
1226            constraints: all_constraints,
1227            parent_types: base.parent_types.clone(),
1228        })
1229    }
1230
1231    /// Recursively collects properties and constraints from a type and its parents.
1232    fn collect_inherited(
1233        types: &HashMap<String, NodeTypeDefinition>,
1234        name: &str,
1235        visited: &mut HashSet<String>,
1236        properties: &mut Vec<TypedProperty>,
1237        constraints: &mut Vec<TypeConstraint>,
1238    ) {
1239        let Some(def) = types.get(name) else { return };
1240        // Walk parents first (depth-first) so child properties override
1241        for parent in &def.parent_types {
1242            if visited.insert(parent.clone()) {
1243                Self::collect_inherited(types, parent, visited, properties, constraints);
1244            }
1245        }
1246        // Add own properties, overriding parent ones with same name
1247        for prop in &def.properties {
1248            if let Some(pos) = properties.iter().position(|p| p.name == prop.name) {
1249                properties[pos] = prop.clone();
1250            } else {
1251                properties.push(prop.clone());
1252            }
1253        }
1254        // Append own constraints (no dedup, constraints are additive)
1255        constraints.extend(def.constraints.iter().cloned());
1256    }
1257
1258    /// Returns all registered node type names.
1259    #[must_use]
1260    pub fn all_node_types(&self) -> Vec<String> {
1261        self.node_types.read().keys().cloned().collect()
1262    }
1263
1264    /// Returns all registered node type definitions.
1265    #[must_use]
1266    pub fn all_node_type_defs(&self) -> Vec<NodeTypeDefinition> {
1267        self.node_types.read().values().cloned().collect()
1268    }
1269
1270    // --- Edge type operations ---
1271
1272    /// Registers a new edge type definition.
1273    pub fn register_edge_type(&self, def: EdgeTypeDefinition) -> Result<(), CatalogError> {
1274        let mut types = self.edge_types.write();
1275        if types.contains_key(&def.name) {
1276            return Err(CatalogError::TypeAlreadyExists(def.name));
1277        }
1278        types.insert(def.name.clone(), def);
1279        Ok(())
1280    }
1281
1282    /// Registers or replaces an edge type definition.
1283    pub fn register_or_replace_edge_type(&self, def: EdgeTypeDefinition) {
1284        self.edge_types.write().insert(def.name.clone(), def);
1285    }
1286
1287    /// Drops an edge type definition by name.
1288    pub fn drop_edge_type(&self, name: &str) -> Result<(), CatalogError> {
1289        let mut types = self.edge_types.write();
1290        if types.remove(name).is_none() {
1291            return Err(CatalogError::TypeNotFound(name.to_string()));
1292        }
1293        Ok(())
1294    }
1295
1296    /// Gets an edge type definition by name.
1297    #[must_use]
1298    pub fn get_edge_type(&self, name: &str) -> Option<EdgeTypeDefinition> {
1299        self.edge_types.read().get(name).cloned()
1300    }
1301
1302    /// Returns all registered edge type names.
1303    #[must_use]
1304    pub fn all_edge_types(&self) -> Vec<String> {
1305        self.edge_types.read().keys().cloned().collect()
1306    }
1307
1308    /// Returns all registered edge type definitions.
1309    #[must_use]
1310    pub fn all_edge_type_defs(&self) -> Vec<EdgeTypeDefinition> {
1311        self.edge_types.read().values().cloned().collect()
1312    }
1313
1314    // --- Graph type operations ---
1315
1316    /// Registers a new graph type definition.
1317    pub fn register_graph_type(&self, def: GraphTypeDefinition) -> Result<(), CatalogError> {
1318        let mut types = self.graph_types.write();
1319        if types.contains_key(&def.name) {
1320            return Err(CatalogError::TypeAlreadyExists(def.name));
1321        }
1322        types.insert(def.name.clone(), def);
1323        Ok(())
1324    }
1325
1326    /// Drops a graph type definition by name.
1327    pub fn drop_graph_type(&self, name: &str) -> Result<(), CatalogError> {
1328        let mut types = self.graph_types.write();
1329        if types.remove(name).is_none() {
1330            return Err(CatalogError::TypeNotFound(name.to_string()));
1331        }
1332        Ok(())
1333    }
1334
1335    /// Gets a graph type definition by name.
1336    #[must_use]
1337    pub fn get_graph_type(&self, name: &str) -> Option<GraphTypeDefinition> {
1338        self.graph_types.read().get(name).cloned()
1339    }
1340
1341    /// Returns all registered graph type names.
1342    #[must_use]
1343    pub fn all_graph_types(&self) -> Vec<String> {
1344        self.graph_types.read().keys().cloned().collect()
1345    }
1346
1347    /// Returns all registered graph type definitions.
1348    #[must_use]
1349    pub fn all_graph_type_defs(&self) -> Vec<GraphTypeDefinition> {
1350        self.graph_types.read().values().cloned().collect()
1351    }
1352
1353    // --- Schema namespace operations ---
1354
1355    /// Registers a schema namespace.
1356    pub fn register_schema(&self, name: String) -> Result<(), CatalogError> {
1357        let mut schemas = self.schemas.write();
1358        if schemas.contains(&name) {
1359            return Err(CatalogError::SchemaAlreadyExists(name));
1360        }
1361        schemas.push(name);
1362        Ok(())
1363    }
1364
1365    /// Drops a schema namespace.
1366    pub fn drop_schema(&self, name: &str) -> Result<(), CatalogError> {
1367        let mut schemas = self.schemas.write();
1368        if let Some(pos) = schemas.iter().position(|s| s == name) {
1369            schemas.remove(pos);
1370            Ok(())
1371        } else {
1372            Err(CatalogError::SchemaNotFound(name.to_string()))
1373        }
1374    }
1375
1376    /// Checks whether a schema namespace exists.
1377    #[must_use]
1378    pub fn schema_exists(&self, name: &str) -> bool {
1379        self.schemas
1380            .read()
1381            .iter()
1382            .any(|s| s.eq_ignore_ascii_case(name))
1383    }
1384
1385    /// Returns all registered schema namespace names.
1386    #[must_use]
1387    pub fn schema_names(&self) -> Vec<String> {
1388        self.schemas.read().clone()
1389    }
1390
1391    // --- ALTER operations ---
1392
1393    /// Adds a constraint to an existing node type, creating a minimal type if needed.
1394    pub fn add_constraint_to_type(
1395        &self,
1396        label: &str,
1397        constraint: TypeConstraint,
1398    ) -> Result<(), CatalogError> {
1399        let mut types = self.node_types.write();
1400        if let Some(def) = types.get_mut(label) {
1401            def.constraints.push(constraint);
1402        } else {
1403            // Auto-create a minimal type definition for the label
1404            types.insert(
1405                label.to_string(),
1406                NodeTypeDefinition {
1407                    name: label.to_string(),
1408                    properties: Vec::new(),
1409                    constraints: vec![constraint],
1410                    parent_types: Vec::new(),
1411                },
1412            );
1413        }
1414        Ok(())
1415    }
1416
1417    /// Adds a property to an existing node type.
1418    pub fn alter_node_type_add_property(
1419        &self,
1420        type_name: &str,
1421        property: TypedProperty,
1422    ) -> Result<(), CatalogError> {
1423        let mut types = self.node_types.write();
1424        let def = types
1425            .get_mut(type_name)
1426            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1427        if def.properties.iter().any(|p| p.name == property.name) {
1428            return Err(CatalogError::TypeAlreadyExists(format!(
1429                "property {} on {}",
1430                property.name, type_name
1431            )));
1432        }
1433        def.properties.push(property);
1434        Ok(())
1435    }
1436
1437    /// Drops a property from an existing node type.
1438    pub fn alter_node_type_drop_property(
1439        &self,
1440        type_name: &str,
1441        property_name: &str,
1442    ) -> Result<(), CatalogError> {
1443        let mut types = self.node_types.write();
1444        let def = types
1445            .get_mut(type_name)
1446            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1447        let len_before = def.properties.len();
1448        def.properties.retain(|p| p.name != property_name);
1449        if def.properties.len() == len_before {
1450            return Err(CatalogError::TypeNotFound(format!(
1451                "property {} on {}",
1452                property_name, type_name
1453            )));
1454        }
1455        Ok(())
1456    }
1457
1458    /// Adds a property to an existing edge type.
1459    pub fn alter_edge_type_add_property(
1460        &self,
1461        type_name: &str,
1462        property: TypedProperty,
1463    ) -> Result<(), CatalogError> {
1464        let mut types = self.edge_types.write();
1465        let def = types
1466            .get_mut(type_name)
1467            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1468        if def.properties.iter().any(|p| p.name == property.name) {
1469            return Err(CatalogError::TypeAlreadyExists(format!(
1470                "property {} on {}",
1471                property.name, type_name
1472            )));
1473        }
1474        def.properties.push(property);
1475        Ok(())
1476    }
1477
1478    /// Drops a property from an existing edge type.
1479    pub fn alter_edge_type_drop_property(
1480        &self,
1481        type_name: &str,
1482        property_name: &str,
1483    ) -> Result<(), CatalogError> {
1484        let mut types = self.edge_types.write();
1485        let def = types
1486            .get_mut(type_name)
1487            .ok_or_else(|| CatalogError::TypeNotFound(type_name.to_string()))?;
1488        let len_before = def.properties.len();
1489        def.properties.retain(|p| p.name != property_name);
1490        if def.properties.len() == len_before {
1491            return Err(CatalogError::TypeNotFound(format!(
1492                "property {} on {}",
1493                property_name, type_name
1494            )));
1495        }
1496        Ok(())
1497    }
1498
1499    /// Adds a node type to a graph type.
1500    pub fn alter_graph_type_add_node_type(
1501        &self,
1502        graph_type_name: &str,
1503        node_type: String,
1504    ) -> Result<(), CatalogError> {
1505        let mut types = self.graph_types.write();
1506        let def = types
1507            .get_mut(graph_type_name)
1508            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1509        if !def.allowed_node_types.contains(&node_type) {
1510            def.allowed_node_types.push(node_type);
1511        }
1512        Ok(())
1513    }
1514
1515    /// Drops a node type from a graph type.
1516    pub fn alter_graph_type_drop_node_type(
1517        &self,
1518        graph_type_name: &str,
1519        node_type: &str,
1520    ) -> Result<(), CatalogError> {
1521        let mut types = self.graph_types.write();
1522        let def = types
1523            .get_mut(graph_type_name)
1524            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1525        def.allowed_node_types.retain(|t| t != node_type);
1526        Ok(())
1527    }
1528
1529    /// Adds an edge type to a graph type.
1530    pub fn alter_graph_type_add_edge_type(
1531        &self,
1532        graph_type_name: &str,
1533        edge_type: String,
1534    ) -> Result<(), CatalogError> {
1535        let mut types = self.graph_types.write();
1536        let def = types
1537            .get_mut(graph_type_name)
1538            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1539        if !def.allowed_edge_types.contains(&edge_type) {
1540            def.allowed_edge_types.push(edge_type);
1541        }
1542        Ok(())
1543    }
1544
1545    /// Drops an edge type from a graph type.
1546    pub fn alter_graph_type_drop_edge_type(
1547        &self,
1548        graph_type_name: &str,
1549        edge_type: &str,
1550    ) -> Result<(), CatalogError> {
1551        let mut types = self.graph_types.write();
1552        let def = types
1553            .get_mut(graph_type_name)
1554            .ok_or_else(|| CatalogError::TypeNotFound(graph_type_name.to_string()))?;
1555        def.allowed_edge_types.retain(|t| t != edge_type);
1556        Ok(())
1557    }
1558
1559    // --- Procedure operations ---
1560
1561    /// Registers a stored procedure.
1562    pub fn register_procedure(&self, def: ProcedureDefinition) -> Result<(), CatalogError> {
1563        let mut procs = self.procedures.write();
1564        if procs.contains_key(&def.name) {
1565            return Err(CatalogError::TypeAlreadyExists(def.name.clone()));
1566        }
1567        procs.insert(def.name.clone(), def);
1568        Ok(())
1569    }
1570
1571    /// Replaces or creates a stored procedure.
1572    pub fn replace_procedure(&self, def: ProcedureDefinition) {
1573        self.procedures.write().insert(def.name.clone(), def);
1574    }
1575
1576    /// Drops a stored procedure.
1577    pub fn drop_procedure(&self, name: &str) -> Result<(), CatalogError> {
1578        let mut procs = self.procedures.write();
1579        if procs.remove(name).is_none() {
1580            return Err(CatalogError::TypeNotFound(name.to_string()));
1581        }
1582        Ok(())
1583    }
1584
1585    /// Gets a stored procedure by name.
1586    pub fn get_procedure(&self, name: &str) -> Option<ProcedureDefinition> {
1587        self.procedures.read().get(name).cloned()
1588    }
1589
1590    /// Returns all registered procedure definitions.
1591    #[must_use]
1592    pub fn all_procedure_defs(&self) -> Vec<ProcedureDefinition> {
1593        self.procedures.read().values().cloned().collect()
1594    }
1595
1596    /// Returns all graph type bindings (graph_name, type_name).
1597    #[must_use]
1598    pub fn all_graph_type_bindings(&self) -> Vec<(String, String)> {
1599        self.graph_type_bindings
1600            .read()
1601            .iter()
1602            .map(|(k, v)| (k.clone(), v.clone()))
1603            .collect()
1604    }
1605
1606    fn add_unique_constraint(
1607        &self,
1608        label: LabelId,
1609        property_key: PropertyKeyId,
1610    ) -> Result<(), CatalogError> {
1611        let mut constraints = self.unique_constraints.write();
1612        let key = (label, property_key);
1613        if !constraints.insert(key) {
1614            return Err(CatalogError::ConstraintAlreadyExists);
1615        }
1616        Ok(())
1617    }
1618
1619    fn add_required_property(
1620        &self,
1621        label: LabelId,
1622        property_key: PropertyKeyId,
1623    ) -> Result<(), CatalogError> {
1624        let mut required = self.required_properties.write();
1625        let key = (label, property_key);
1626        if !required.insert(key) {
1627            return Err(CatalogError::ConstraintAlreadyExists);
1628        }
1629        Ok(())
1630    }
1631
1632    fn is_property_required(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1633        self.required_properties
1634            .read()
1635            .contains(&(label, property_key))
1636    }
1637
1638    fn is_property_unique(&self, label: LabelId, property_key: PropertyKeyId) -> bool {
1639        self.unique_constraints
1640            .read()
1641            .contains(&(label, property_key))
1642    }
1643}
1644
1645// === Errors ===
1646
1647/// Catalog-related errors.
1648#[derive(Debug, Clone, PartialEq, Eq)]
1649pub enum CatalogError {
1650    /// Schema constraints are not enabled.
1651    SchemaNotEnabled,
1652    /// The constraint already exists.
1653    ConstraintAlreadyExists,
1654    /// The label does not exist.
1655    LabelNotFound(String),
1656    /// The property key does not exist.
1657    PropertyKeyNotFound(String),
1658    /// The edge type does not exist.
1659    EdgeTypeNotFound(String),
1660    /// The index does not exist.
1661    IndexNotFound(IndexId),
1662    /// A type with this name already exists.
1663    TypeAlreadyExists(String),
1664    /// No type with this name exists.
1665    TypeNotFound(String),
1666    /// A schema with this name already exists.
1667    SchemaAlreadyExists(String),
1668    /// No schema with this name exists.
1669    SchemaNotFound(String),
1670}
1671
1672impl std::fmt::Display for CatalogError {
1673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1674        match self {
1675            Self::SchemaNotEnabled => write!(f, "Schema constraints are not enabled"),
1676            Self::ConstraintAlreadyExists => write!(f, "Constraint already exists"),
1677            Self::LabelNotFound(name) => write!(f, "Label not found: {name}"),
1678            Self::PropertyKeyNotFound(name) => write!(f, "Property key not found: {name}"),
1679            Self::EdgeTypeNotFound(name) => write!(f, "Edge type not found: {name}"),
1680            Self::IndexNotFound(id) => write!(f, "Index not found: {id}"),
1681            Self::TypeAlreadyExists(name) => write!(f, "Type already exists: {name}"),
1682            Self::TypeNotFound(name) => write!(f, "Type not found: {name}"),
1683            Self::SchemaAlreadyExists(name) => write!(f, "Schema already exists: {name}"),
1684            Self::SchemaNotFound(name) => write!(f, "Schema not found: {name}"),
1685        }
1686    }
1687}
1688
1689impl std::error::Error for CatalogError {}
1690
1691// === Constraint Validator ===
1692
1693use grafeo_core::execution::operators::ConstraintValidator;
1694use grafeo_core::execution::operators::OperatorError;
1695
1696/// Validates schema constraints during mutation operations using the Catalog.
1697///
1698/// Checks type definitions, NOT NULL constraints, and UNIQUE constraints
1699/// against registered node/edge type definitions.
1700pub struct CatalogConstraintValidator {
1701    catalog: Arc<Catalog>,
1702    /// Optional graph name for graph-type-bound validation.
1703    graph_name: Option<String>,
1704    /// Optional graph store for UNIQUE constraint enforcement via index lookup.
1705    store: Option<Arc<dyn grafeo_core::graph::GraphStoreMut>>,
1706}
1707
1708impl CatalogConstraintValidator {
1709    /// Creates a new validator wrapping the given catalog.
1710    pub fn new(catalog: Arc<Catalog>) -> Self {
1711        Self {
1712            catalog,
1713            graph_name: None,
1714            store: None,
1715        }
1716    }
1717
1718    /// Sets the graph name for graph-type-bound validation.
1719    pub fn with_graph_name(mut self, name: String) -> Self {
1720        self.graph_name = Some(name);
1721        self
1722    }
1723
1724    /// Attaches a graph store for UNIQUE constraint enforcement.
1725    pub fn with_store(mut self, store: Arc<dyn grafeo_core::graph::GraphStoreMut>) -> Self {
1726        self.store = Some(store);
1727        self
1728    }
1729}
1730
1731impl ConstraintValidator for CatalogConstraintValidator {
1732    fn validate_node_property(
1733        &self,
1734        labels: &[String],
1735        key: &str,
1736        value: &Value,
1737    ) -> Result<(), OperatorError> {
1738        for label in labels {
1739            if let Some(type_def) = self.catalog.resolved_node_type(label)
1740                && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1741            {
1742                // Check NOT NULL
1743                if !typed_prop.nullable && *value == Value::Null {
1744                    return Err(OperatorError::ConstraintViolation(format!(
1745                        "property '{key}' on :{label} is NOT NULL, cannot set to null"
1746                    )));
1747                }
1748                // Check type compatibility
1749                if *value != Value::Null && !typed_prop.data_type.matches(value) {
1750                    return Err(OperatorError::ConstraintViolation(format!(
1751                        "property '{key}' on :{label} expects {:?}, got {:?}",
1752                        typed_prop.data_type, value
1753                    )));
1754                }
1755            }
1756        }
1757        Ok(())
1758    }
1759
1760    fn validate_node_complete(
1761        &self,
1762        labels: &[String],
1763        properties: &[(String, Value)],
1764    ) -> Result<(), OperatorError> {
1765        let prop_names: std::collections::HashSet<&str> =
1766            properties.iter().map(|(n, _)| n.as_str()).collect();
1767
1768        for label in labels {
1769            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1770                // Check that all NOT NULL properties are present
1771                for typed_prop in &type_def.properties {
1772                    if !typed_prop.nullable
1773                        && typed_prop.default_value.is_none()
1774                        && !prop_names.contains(typed_prop.name.as_str())
1775                    {
1776                        return Err(OperatorError::ConstraintViolation(format!(
1777                            "missing required property '{}' on :{label}",
1778                            typed_prop.name
1779                        )));
1780                    }
1781                }
1782                // Check type-level constraints
1783                for constraint in &type_def.constraints {
1784                    match constraint {
1785                        TypeConstraint::NotNull(prop_name) => {
1786                            if !prop_names.contains(prop_name.as_str()) {
1787                                return Err(OperatorError::ConstraintViolation(format!(
1788                                    "missing required property '{prop_name}' on :{label} (NOT NULL constraint)"
1789                                )));
1790                            }
1791                        }
1792                        TypeConstraint::PrimaryKey(key_props) => {
1793                            for pk in key_props {
1794                                if !prop_names.contains(pk.as_str()) {
1795                                    return Err(OperatorError::ConstraintViolation(format!(
1796                                        "missing primary key property '{pk}' on :{label}"
1797                                    )));
1798                                }
1799                            }
1800                        }
1801                        TypeConstraint::Check { name, expression } => {
1802                            match check_eval::evaluate_check(expression, properties) {
1803                                Ok(true) => {}
1804                                Ok(false) => {
1805                                    let constraint_name = name.as_deref().unwrap_or("unnamed");
1806                                    return Err(OperatorError::ConstraintViolation(format!(
1807                                        "CHECK constraint '{constraint_name}' violated on :{label}"
1808                                    )));
1809                                }
1810                                Err(err) => {
1811                                    return Err(OperatorError::ConstraintViolation(format!(
1812                                        "CHECK constraint evaluation error: {err}"
1813                                    )));
1814                                }
1815                            }
1816                        }
1817                        TypeConstraint::Unique(_) => {}
1818                    }
1819                }
1820            }
1821        }
1822        Ok(())
1823    }
1824
1825    fn check_unique_node_property(
1826        &self,
1827        labels: &[String],
1828        key: &str,
1829        value: &Value,
1830    ) -> Result<(), OperatorError> {
1831        // Skip uniqueness check for NULL values (NULLs are never duplicates)
1832        if *value == Value::Null {
1833            return Ok(());
1834        }
1835        for label in labels {
1836            if let Some(type_def) = self.catalog.resolved_node_type(label) {
1837                for constraint in &type_def.constraints {
1838                    let is_unique = match constraint {
1839                        TypeConstraint::Unique(props) => props.iter().any(|p| p == key),
1840                        TypeConstraint::PrimaryKey(props) => props.iter().any(|p| p == key),
1841                        _ => false,
1842                    };
1843                    if is_unique && let Some(ref store) = self.store {
1844                        let existing = store.find_nodes_by_property(key, value);
1845                        for node_id in existing {
1846                            if let Some(node) = store.get_node(node_id) {
1847                                let has_label = node.labels.iter().any(|l| l.as_str() == label);
1848                                if has_label {
1849                                    return Err(OperatorError::ConstraintViolation(format!(
1850                                        "UNIQUE constraint violation: property '{key}' \
1851                                             with value {value:?} already exists on :{label}"
1852                                    )));
1853                                }
1854                            }
1855                        }
1856                    }
1857                }
1858            }
1859        }
1860        Ok(())
1861    }
1862
1863    fn validate_edge_property(
1864        &self,
1865        edge_type: &str,
1866        key: &str,
1867        value: &Value,
1868    ) -> Result<(), OperatorError> {
1869        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type)
1870            && let Some(typed_prop) = type_def.properties.iter().find(|p| p.name == key)
1871        {
1872            // Check NOT NULL
1873            if !typed_prop.nullable && *value == Value::Null {
1874                return Err(OperatorError::ConstraintViolation(format!(
1875                    "property '{key}' on :{edge_type} is NOT NULL, cannot set to null"
1876                )));
1877            }
1878            // Check type compatibility
1879            if *value != Value::Null && !typed_prop.data_type.matches(value) {
1880                return Err(OperatorError::ConstraintViolation(format!(
1881                    "property '{key}' on :{edge_type} expects {:?}, got {:?}",
1882                    typed_prop.data_type, value
1883                )));
1884            }
1885        }
1886        Ok(())
1887    }
1888
1889    fn validate_edge_complete(
1890        &self,
1891        edge_type: &str,
1892        properties: &[(String, Value)],
1893    ) -> Result<(), OperatorError> {
1894        if let Some(type_def) = self.catalog.get_edge_type_def(edge_type) {
1895            let prop_names: std::collections::HashSet<&str> =
1896                properties.iter().map(|(n, _)| n.as_str()).collect();
1897
1898            for typed_prop in &type_def.properties {
1899                if !typed_prop.nullable
1900                    && typed_prop.default_value.is_none()
1901                    && !prop_names.contains(typed_prop.name.as_str())
1902                {
1903                    return Err(OperatorError::ConstraintViolation(format!(
1904                        "missing required property '{}' on :{edge_type}",
1905                        typed_prop.name
1906                    )));
1907                }
1908            }
1909
1910            for constraint in &type_def.constraints {
1911                if let TypeConstraint::Check { name, expression } = constraint {
1912                    match check_eval::evaluate_check(expression, properties) {
1913                        Ok(true) => {}
1914                        Ok(false) => {
1915                            let constraint_name = name.as_deref().unwrap_or("unnamed");
1916                            return Err(OperatorError::ConstraintViolation(format!(
1917                                "CHECK constraint '{constraint_name}' violated on :{edge_type}"
1918                            )));
1919                        }
1920                        Err(err) => {
1921                            return Err(OperatorError::ConstraintViolation(format!(
1922                                "CHECK constraint evaluation error: {err}"
1923                            )));
1924                        }
1925                    }
1926                }
1927            }
1928        }
1929        Ok(())
1930    }
1931
1932    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
1933        let Some(ref graph_name) = self.graph_name else {
1934            return Ok(());
1935        };
1936        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1937            return Ok(());
1938        };
1939        let Some(gt) = self
1940            .catalog
1941            .schema()
1942            .and_then(|s| s.get_graph_type(&type_name))
1943        else {
1944            return Ok(());
1945        };
1946        if !gt.open && !gt.allowed_node_types.is_empty() {
1947            let allowed = labels
1948                .iter()
1949                .any(|l| gt.allowed_node_types.iter().any(|a| a == l));
1950            if !allowed {
1951                return Err(OperatorError::ConstraintViolation(format!(
1952                    "node labels {labels:?} are not allowed by graph type '{}'",
1953                    gt.name
1954                )));
1955            }
1956        }
1957        Ok(())
1958    }
1959
1960    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
1961        let Some(ref graph_name) = self.graph_name else {
1962            return Ok(());
1963        };
1964        let Some(type_name) = self.catalog.get_graph_type_binding(graph_name) else {
1965            return Ok(());
1966        };
1967        let Some(gt) = self
1968            .catalog
1969            .schema()
1970            .and_then(|s| s.get_graph_type(&type_name))
1971        else {
1972            return Ok(());
1973        };
1974        if !gt.open && !gt.allowed_edge_types.is_empty() {
1975            let allowed = gt.allowed_edge_types.iter().any(|a| a == edge_type);
1976            if !allowed {
1977                return Err(OperatorError::ConstraintViolation(format!(
1978                    "edge type '{edge_type}' is not allowed by graph type '{}'",
1979                    gt.name
1980                )));
1981            }
1982        }
1983        Ok(())
1984    }
1985
1986    fn validate_edge_endpoints(
1987        &self,
1988        edge_type: &str,
1989        source_labels: &[String],
1990        target_labels: &[String],
1991    ) -> Result<(), OperatorError> {
1992        let Some(type_def) = self.catalog.get_edge_type_def(edge_type) else {
1993            return Ok(());
1994        };
1995        if !type_def.source_node_types.is_empty() {
1996            let source_ok = source_labels
1997                .iter()
1998                .any(|l| type_def.source_node_types.iter().any(|s| s == l));
1999            if !source_ok {
2000                return Err(OperatorError::ConstraintViolation(format!(
2001                    "source node labels {source_labels:?} are not allowed for edge type '{edge_type}', \
2002                     expected one of {:?}",
2003                    type_def.source_node_types
2004                )));
2005            }
2006        }
2007        if !type_def.target_node_types.is_empty() {
2008            let target_ok = target_labels
2009                .iter()
2010                .any(|l| type_def.target_node_types.iter().any(|t| t == l));
2011            if !target_ok {
2012                return Err(OperatorError::ConstraintViolation(format!(
2013                    "target node labels {target_labels:?} are not allowed for edge type '{edge_type}', \
2014                     expected one of {:?}",
2015                    type_def.target_node_types
2016                )));
2017            }
2018        }
2019        Ok(())
2020    }
2021
2022    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
2023        for label in labels {
2024            if let Some(type_def) = self.catalog.resolved_node_type(label) {
2025                for typed_prop in &type_def.properties {
2026                    if let Some(ref default) = typed_prop.default_value {
2027                        let already_set = properties.iter().any(|(n, _)| n == &typed_prop.name);
2028                        if !already_set {
2029                            properties.push((typed_prop.name.clone(), default.clone()));
2030                        }
2031                    }
2032                }
2033            }
2034        }
2035    }
2036}
2037
2038#[cfg(test)]
2039mod tests {
2040    use super::*;
2041    use std::thread;
2042
2043    #[test]
2044    fn test_catalog_labels() {
2045        let catalog = Catalog::new();
2046
2047        // Get or create labels
2048        let person_id = catalog.get_or_create_label("Person");
2049        let company_id = catalog.get_or_create_label("Company");
2050
2051        // IDs should be different
2052        assert_ne!(person_id, company_id);
2053
2054        // Getting the same label should return the same ID
2055        assert_eq!(catalog.get_or_create_label("Person"), person_id);
2056
2057        // Should be able to look up by name
2058        assert_eq!(catalog.get_label_id("Person"), Some(person_id));
2059        assert_eq!(catalog.get_label_id("Company"), Some(company_id));
2060        assert_eq!(catalog.get_label_id("Unknown"), None);
2061
2062        // Should be able to look up by ID
2063        assert_eq!(catalog.get_label_name(person_id).as_deref(), Some("Person"));
2064        assert_eq!(
2065            catalog.get_label_name(company_id).as_deref(),
2066            Some("Company")
2067        );
2068
2069        // Count should be correct
2070        assert_eq!(catalog.label_count(), 2);
2071    }
2072
2073    #[test]
2074    fn test_catalog_property_keys() {
2075        let catalog = Catalog::new();
2076
2077        let name_id = catalog.get_or_create_property_key("name");
2078        let age_id = catalog.get_or_create_property_key("age");
2079
2080        assert_ne!(name_id, age_id);
2081        assert_eq!(catalog.get_or_create_property_key("name"), name_id);
2082        assert_eq!(catalog.get_property_key_id("name"), Some(name_id));
2083        assert_eq!(
2084            catalog.get_property_key_name(name_id).as_deref(),
2085            Some("name")
2086        );
2087        assert_eq!(catalog.property_key_count(), 2);
2088    }
2089
2090    #[test]
2091    fn test_catalog_edge_types() {
2092        let catalog = Catalog::new();
2093
2094        let knows_id = catalog.get_or_create_edge_type("KNOWS");
2095        let works_at_id = catalog.get_or_create_edge_type("WORKS_AT");
2096
2097        assert_ne!(knows_id, works_at_id);
2098        assert_eq!(catalog.get_or_create_edge_type("KNOWS"), knows_id);
2099        assert_eq!(catalog.get_edge_type_id("KNOWS"), Some(knows_id));
2100        assert_eq!(
2101            catalog.get_edge_type_name(knows_id).as_deref(),
2102            Some("KNOWS")
2103        );
2104        assert_eq!(catalog.edge_type_count(), 2);
2105    }
2106
2107    #[test]
2108    fn test_catalog_indexes() {
2109        let catalog = Catalog::new();
2110
2111        let person_id = catalog.get_or_create_label("Person");
2112        let name_id = catalog.get_or_create_property_key("name");
2113        let age_id = catalog.get_or_create_property_key("age");
2114
2115        // Create indexes
2116        let idx1 = catalog.create_index(person_id, name_id, IndexType::Hash);
2117        let idx2 = catalog.create_index(person_id, age_id, IndexType::BTree);
2118
2119        assert_ne!(idx1, idx2);
2120        assert_eq!(catalog.index_count(), 2);
2121
2122        // Look up by label
2123        let label_indexes = catalog.indexes_for_label(person_id);
2124        assert_eq!(label_indexes.len(), 2);
2125        assert!(label_indexes.contains(&idx1));
2126        assert!(label_indexes.contains(&idx2));
2127
2128        // Look up by label and property
2129        let name_indexes = catalog.indexes_for_label_property(person_id, name_id);
2130        assert_eq!(name_indexes.len(), 1);
2131        assert_eq!(name_indexes[0], idx1);
2132
2133        // Get definition
2134        let def = catalog.get_index(idx1).unwrap();
2135        assert_eq!(def.label, person_id);
2136        assert_eq!(def.property_key, name_id);
2137        assert_eq!(def.index_type, IndexType::Hash);
2138
2139        // Drop index
2140        assert!(catalog.drop_index(idx1));
2141        assert_eq!(catalog.index_count(), 1);
2142        assert!(catalog.get_index(idx1).is_none());
2143        assert_eq!(catalog.indexes_for_label(person_id).len(), 1);
2144    }
2145
2146    #[test]
2147    fn test_catalog_schema_constraints() {
2148        let catalog = Catalog::with_schema();
2149
2150        let person_id = catalog.get_or_create_label("Person");
2151        let email_id = catalog.get_or_create_property_key("email");
2152        let name_id = catalog.get_or_create_property_key("name");
2153
2154        // Add constraints
2155        assert!(catalog.add_unique_constraint(person_id, email_id).is_ok());
2156        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2157
2158        // Check constraints
2159        assert!(catalog.is_property_unique(person_id, email_id));
2160        assert!(!catalog.is_property_unique(person_id, name_id));
2161        assert!(catalog.is_property_required(person_id, name_id));
2162        assert!(!catalog.is_property_required(person_id, email_id));
2163
2164        // Duplicate constraint should fail
2165        assert_eq!(
2166            catalog.add_unique_constraint(person_id, email_id),
2167            Err(CatalogError::ConstraintAlreadyExists)
2168        );
2169    }
2170
2171    #[test]
2172    fn test_catalog_schema_always_enabled() {
2173        // Catalog::new() always enables schema
2174        let catalog = Catalog::new();
2175        assert!(catalog.has_schema());
2176
2177        let person_id = catalog.get_or_create_label("Person");
2178        let email_id = catalog.get_or_create_property_key("email");
2179
2180        // Should succeed with schema enabled
2181        assert_eq!(catalog.add_unique_constraint(person_id, email_id), Ok(()));
2182    }
2183
2184    // === Additional tests for comprehensive coverage ===
2185
2186    #[test]
2187    fn test_catalog_default() {
2188        let catalog = Catalog::default();
2189        assert!(catalog.has_schema());
2190        assert_eq!(catalog.label_count(), 0);
2191        assert_eq!(catalog.property_key_count(), 0);
2192        assert_eq!(catalog.edge_type_count(), 0);
2193        assert_eq!(catalog.index_count(), 0);
2194    }
2195
2196    #[test]
2197    fn test_catalog_all_labels() {
2198        let catalog = Catalog::new();
2199
2200        catalog.get_or_create_label("Person");
2201        catalog.get_or_create_label("Company");
2202        catalog.get_or_create_label("Product");
2203
2204        let all = catalog.all_labels();
2205        assert_eq!(all.len(), 3);
2206        assert!(all.iter().any(|l| l.as_ref() == "Person"));
2207        assert!(all.iter().any(|l| l.as_ref() == "Company"));
2208        assert!(all.iter().any(|l| l.as_ref() == "Product"));
2209    }
2210
2211    #[test]
2212    fn test_catalog_all_property_keys() {
2213        let catalog = Catalog::new();
2214
2215        catalog.get_or_create_property_key("name");
2216        catalog.get_or_create_property_key("age");
2217        catalog.get_or_create_property_key("email");
2218
2219        let all = catalog.all_property_keys();
2220        assert_eq!(all.len(), 3);
2221        assert!(all.iter().any(|k| k.as_ref() == "name"));
2222        assert!(all.iter().any(|k| k.as_ref() == "age"));
2223        assert!(all.iter().any(|k| k.as_ref() == "email"));
2224    }
2225
2226    #[test]
2227    fn test_catalog_all_edge_types() {
2228        let catalog = Catalog::new();
2229
2230        catalog.get_or_create_edge_type("KNOWS");
2231        catalog.get_or_create_edge_type("WORKS_AT");
2232        catalog.get_or_create_edge_type("LIVES_IN");
2233
2234        let all = catalog.all_edge_types();
2235        assert_eq!(all.len(), 3);
2236        assert!(all.iter().any(|t| t.as_ref() == "KNOWS"));
2237        assert!(all.iter().any(|t| t.as_ref() == "WORKS_AT"));
2238        assert!(all.iter().any(|t| t.as_ref() == "LIVES_IN"));
2239    }
2240
2241    #[test]
2242    fn test_catalog_invalid_id_lookup() {
2243        let catalog = Catalog::new();
2244
2245        // Create one label to ensure IDs are allocated
2246        let _ = catalog.get_or_create_label("Person");
2247
2248        // Try to look up non-existent IDs
2249        let invalid_label = LabelId::new(999);
2250        let invalid_property = PropertyKeyId::new(999);
2251        let invalid_edge_type = EdgeTypeId::new(999);
2252        let invalid_index = IndexId::new(999);
2253
2254        assert!(catalog.get_label_name(invalid_label).is_none());
2255        assert!(catalog.get_property_key_name(invalid_property).is_none());
2256        assert!(catalog.get_edge_type_name(invalid_edge_type).is_none());
2257        assert!(catalog.get_index(invalid_index).is_none());
2258    }
2259
2260    #[test]
2261    fn test_catalog_drop_nonexistent_index() {
2262        let catalog = Catalog::new();
2263        let invalid_index = IndexId::new(999);
2264        assert!(!catalog.drop_index(invalid_index));
2265    }
2266
2267    #[test]
2268    fn test_catalog_indexes_for_nonexistent_label() {
2269        let catalog = Catalog::new();
2270        let invalid_label = LabelId::new(999);
2271        let invalid_property = PropertyKeyId::new(999);
2272
2273        assert!(catalog.indexes_for_label(invalid_label).is_empty());
2274        assert!(
2275            catalog
2276                .indexes_for_label_property(invalid_label, invalid_property)
2277                .is_empty()
2278        );
2279    }
2280
2281    #[test]
2282    fn test_catalog_multiple_indexes_same_property() {
2283        let catalog = Catalog::new();
2284
2285        let person_id = catalog.get_or_create_label("Person");
2286        let name_id = catalog.get_or_create_property_key("name");
2287
2288        // Create multiple indexes on the same property with different types
2289        let hash_idx = catalog.create_index(person_id, name_id, IndexType::Hash);
2290        let btree_idx = catalog.create_index(person_id, name_id, IndexType::BTree);
2291        let fulltext_idx = catalog.create_index(person_id, name_id, IndexType::FullText);
2292
2293        assert_eq!(catalog.index_count(), 3);
2294
2295        let indexes = catalog.indexes_for_label_property(person_id, name_id);
2296        assert_eq!(indexes.len(), 3);
2297        assert!(indexes.contains(&hash_idx));
2298        assert!(indexes.contains(&btree_idx));
2299        assert!(indexes.contains(&fulltext_idx));
2300
2301        // Verify each has the correct type
2302        assert_eq!(
2303            catalog.get_index(hash_idx).unwrap().index_type,
2304            IndexType::Hash
2305        );
2306        assert_eq!(
2307            catalog.get_index(btree_idx).unwrap().index_type,
2308            IndexType::BTree
2309        );
2310        assert_eq!(
2311            catalog.get_index(fulltext_idx).unwrap().index_type,
2312            IndexType::FullText
2313        );
2314    }
2315
2316    #[test]
2317    fn test_catalog_schema_required_property_duplicate() {
2318        let catalog = Catalog::with_schema();
2319
2320        let person_id = catalog.get_or_create_label("Person");
2321        let name_id = catalog.get_or_create_property_key("name");
2322
2323        // First should succeed
2324        assert!(catalog.add_required_property(person_id, name_id).is_ok());
2325
2326        // Duplicate should fail
2327        assert_eq!(
2328            catalog.add_required_property(person_id, name_id),
2329            Err(CatalogError::ConstraintAlreadyExists)
2330        );
2331    }
2332
2333    #[test]
2334    fn test_catalog_schema_check_without_constraints() {
2335        let catalog = Catalog::new();
2336
2337        let person_id = catalog.get_or_create_label("Person");
2338        let name_id = catalog.get_or_create_property_key("name");
2339
2340        // Without schema enabled, these should return false
2341        assert!(!catalog.is_property_unique(person_id, name_id));
2342        assert!(!catalog.is_property_required(person_id, name_id));
2343    }
2344
2345    #[test]
2346    fn test_catalog_has_schema() {
2347        // Both new() and with_schema() enable schema by default
2348        let catalog = Catalog::new();
2349        assert!(catalog.has_schema());
2350
2351        let with_schema = Catalog::with_schema();
2352        assert!(with_schema.has_schema());
2353    }
2354
2355    #[test]
2356    fn test_catalog_error_display() {
2357        assert_eq!(
2358            CatalogError::SchemaNotEnabled.to_string(),
2359            "Schema constraints are not enabled"
2360        );
2361        assert_eq!(
2362            CatalogError::ConstraintAlreadyExists.to_string(),
2363            "Constraint already exists"
2364        );
2365        assert_eq!(
2366            CatalogError::LabelNotFound("Person".to_string()).to_string(),
2367            "Label not found: Person"
2368        );
2369        assert_eq!(
2370            CatalogError::PropertyKeyNotFound("name".to_string()).to_string(),
2371            "Property key not found: name"
2372        );
2373        assert_eq!(
2374            CatalogError::EdgeTypeNotFound("KNOWS".to_string()).to_string(),
2375            "Edge type not found: KNOWS"
2376        );
2377        let idx = IndexId::new(42);
2378        assert!(CatalogError::IndexNotFound(idx).to_string().contains("42"));
2379    }
2380
2381    #[test]
2382    fn test_catalog_concurrent_label_creation() {
2383        use std::sync::Arc;
2384
2385        let catalog = Arc::new(Catalog::new());
2386        let mut handles = vec![];
2387
2388        // Spawn multiple threads trying to create the same labels
2389        for i in 0..10 {
2390            let catalog = Arc::clone(&catalog);
2391            handles.push(thread::spawn(move || {
2392                let label_name = format!("Label{}", i % 3); // Only 3 unique labels
2393                catalog.get_or_create_label(&label_name)
2394            }));
2395        }
2396
2397        let mut ids: Vec<LabelId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2398        ids.sort_by_key(|id| id.as_u32());
2399        ids.dedup();
2400
2401        // Should only have 3 unique label IDs
2402        assert_eq!(ids.len(), 3);
2403        assert_eq!(catalog.label_count(), 3);
2404    }
2405
2406    #[test]
2407    fn test_catalog_concurrent_property_key_creation() {
2408        use std::sync::Arc;
2409
2410        let catalog = Arc::new(Catalog::new());
2411        let mut handles = vec![];
2412
2413        for i in 0..10 {
2414            let catalog = Arc::clone(&catalog);
2415            handles.push(thread::spawn(move || {
2416                let key_name = format!("key{}", i % 4);
2417                catalog.get_or_create_property_key(&key_name)
2418            }));
2419        }
2420
2421        let mut ids: Vec<PropertyKeyId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2422        ids.sort_by_key(|id| id.as_u32());
2423        ids.dedup();
2424
2425        assert_eq!(ids.len(), 4);
2426        assert_eq!(catalog.property_key_count(), 4);
2427    }
2428
2429    #[test]
2430    fn test_catalog_concurrent_index_operations() {
2431        use std::sync::Arc;
2432
2433        let catalog = Arc::new(Catalog::new());
2434        let label = catalog.get_or_create_label("Node");
2435
2436        let mut handles = vec![];
2437
2438        // Create indexes concurrently
2439        for i in 0..5 {
2440            let catalog = Arc::clone(&catalog);
2441            handles.push(thread::spawn(move || {
2442                let prop = PropertyKeyId::new(i);
2443                catalog.create_index(label, prop, IndexType::Hash)
2444            }));
2445        }
2446
2447        let ids: Vec<IndexId> = handles.into_iter().map(|h| h.join().unwrap()).collect();
2448        assert_eq!(ids.len(), 5);
2449        assert_eq!(catalog.index_count(), 5);
2450    }
2451
2452    #[test]
2453    fn test_catalog_special_characters_in_names() {
2454        let catalog = Catalog::new();
2455
2456        // Test with various special characters
2457        let label1 = catalog.get_or_create_label("Label With Spaces");
2458        let label2 = catalog.get_or_create_label("Label-With-Dashes");
2459        let label3 = catalog.get_or_create_label("Label_With_Underscores");
2460        let label4 = catalog.get_or_create_label("LabelWithUnicode\u{00E9}");
2461
2462        assert_ne!(label1, label2);
2463        assert_ne!(label2, label3);
2464        assert_ne!(label3, label4);
2465
2466        assert_eq!(
2467            catalog.get_label_name(label1).as_deref(),
2468            Some("Label With Spaces")
2469        );
2470        assert_eq!(
2471            catalog.get_label_name(label4).as_deref(),
2472            Some("LabelWithUnicode\u{00E9}")
2473        );
2474    }
2475
2476    #[test]
2477    fn test_catalog_empty_names() {
2478        let catalog = Catalog::new();
2479
2480        // Empty names should be valid (edge case)
2481        let empty_label = catalog.get_or_create_label("");
2482        let empty_prop = catalog.get_or_create_property_key("");
2483        let empty_edge = catalog.get_or_create_edge_type("");
2484
2485        assert_eq!(catalog.get_label_name(empty_label).as_deref(), Some(""));
2486        assert_eq!(
2487            catalog.get_property_key_name(empty_prop).as_deref(),
2488            Some("")
2489        );
2490        assert_eq!(catalog.get_edge_type_name(empty_edge).as_deref(), Some(""));
2491
2492        // Calling again should return same ID
2493        assert_eq!(catalog.get_or_create_label(""), empty_label);
2494    }
2495
2496    #[test]
2497    fn test_catalog_large_number_of_entries() {
2498        let catalog = Catalog::new();
2499
2500        // Create many labels
2501        for i in 0..1000 {
2502            catalog.get_or_create_label(&format!("Label{}", i));
2503        }
2504
2505        assert_eq!(catalog.label_count(), 1000);
2506
2507        // Verify we can retrieve them all
2508        let all = catalog.all_labels();
2509        assert_eq!(all.len(), 1000);
2510
2511        // Verify a specific one
2512        let id = catalog.get_label_id("Label500").unwrap();
2513        assert_eq!(catalog.get_label_name(id).as_deref(), Some("Label500"));
2514    }
2515
2516    #[test]
2517    fn test_index_definition_debug() {
2518        let def = IndexDefinition {
2519            id: IndexId::new(1),
2520            label: LabelId::new(2),
2521            property_key: PropertyKeyId::new(3),
2522            index_type: IndexType::Hash,
2523        };
2524
2525        // Should be able to debug print
2526        let debug_str = format!("{:?}", def);
2527        assert!(debug_str.contains("IndexDefinition"));
2528        assert!(debug_str.contains("Hash"));
2529    }
2530
2531    #[test]
2532    fn test_index_type_equality() {
2533        assert_eq!(IndexType::Hash, IndexType::Hash);
2534        assert_ne!(IndexType::Hash, IndexType::BTree);
2535        assert_ne!(IndexType::BTree, IndexType::FullText);
2536
2537        // Clone
2538        let t = IndexType::Hash;
2539        let t2 = t;
2540        assert_eq!(t, t2);
2541    }
2542
2543    #[test]
2544    fn test_catalog_error_equality() {
2545        assert_eq!(
2546            CatalogError::SchemaNotEnabled,
2547            CatalogError::SchemaNotEnabled
2548        );
2549        assert_eq!(
2550            CatalogError::ConstraintAlreadyExists,
2551            CatalogError::ConstraintAlreadyExists
2552        );
2553        assert_eq!(
2554            CatalogError::LabelNotFound("X".to_string()),
2555            CatalogError::LabelNotFound("X".to_string())
2556        );
2557        assert_ne!(
2558            CatalogError::LabelNotFound("X".to_string()),
2559            CatalogError::LabelNotFound("Y".to_string())
2560        );
2561    }
2562}