llkv_table/catalog/
manager.rs

1//! High-level service for creating tables.
2//!
3//! Use `CatalogManager` to create tables. It coordinates metadata persistence,
4//! catalog registration, and storage initialization.
5
6#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_compute::time::current_time_micros;
18use llkv_plan::{DropIndexPlan, ForeignKeySpec, PlanColumnSpec};
19use llkv_result::{Error, Result as LlkvResult};
20use llkv_storage::pager::Pager;
21use rustc_hash::{FxHashMap, FxHashSet};
22use simd_r_drive_entry_handle::EntryHandle;
23
24use super::table_catalog::{FieldDefinition, TableCatalog};
25use crate::constraints::{ConstraintId, ConstraintKind};
26use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration, SingleColumnIndexEntry};
27use crate::sys_catalog::{
28    ColMeta, MultiColumnIndexEntryMeta, SysCatalog, TriggerEntryMeta, TriggerEventMeta,
29    TriggerTimingMeta,
30};
31use crate::table::Table;
32use crate::types::{FieldId, RowId, TableColumn, TableId};
33use crate::{
34    ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
35    ValidatedForeignKey,
36};
37
38/// Result of creating a table. The caller is responsible for wiring executor
39/// caches and any higher-level state that depends on the table schema.
40pub struct CreateTableResult<P>
41where
42    P: Pager<Blob = EntryHandle> + Send + Sync,
43{
44    pub table_id: TableId,
45    pub table: Arc<Table<P>>,
46    pub table_columns: Vec<TableColumn>,
47    pub column_lookup: FxHashMap<String, usize>,
48}
49
50/// Result of attempting to register a single-column index definition.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum SingleColumnIndexRegistration {
53    Created { index_name: String },
54    AlreadyExists { index_name: String },
55}
56
57/// Descriptor for a single-column index resolved from catalog metadata.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct SingleColumnIndexDescriptor {
60    pub index_name: String,
61    pub table_id: TableId,
62    pub canonical_table_name: String,
63    pub display_table_name: String,
64    pub field_id: FieldId,
65    pub column_name: String,
66    pub was_unique: bool,
67}
68
69/// Trait for constructing MVCC columns and Arrow metadata during batch ingestion.
70///
71/// Implementations can delegate to `llkv_transaction::mvcc` or provide custom logic
72/// for synthetic data sources. This indirection avoids coupling the table crate to the
73/// transaction crate while still centralizing MVCC helpers.
74pub trait MvccColumnBuilder: Send + Sync {
75    /// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
76    fn build_insert_columns(
77        &self,
78        row_count: usize,
79        start_row_id: RowId,
80        creator_txn_id: u64,
81        deleted_marker: u64,
82    ) -> (ArrayRef, ArrayRef, ArrayRef);
83
84    /// Return the Arrow field definitions for MVCC columns.
85    fn mvcc_fields(&self) -> Vec<Field>;
86
87    /// Construct a user column field with the required metadata assigned.
88    fn field_with_metadata(
89        &self,
90        name: &str,
91        data_type: DataType,
92        nullable: bool,
93        field_id: FieldId,
94    ) -> Field;
95}
96
97/// Service for creating tables.
98///
99/// Coordinates metadata persistence (`MetadataManager`), catalog registration
100/// (`TableCatalog`), and storage initialization (`ColumnStore`).
101#[derive(Clone)]
102pub struct CatalogManager<P>
103where
104    P: Pager<Blob = EntryHandle> + Send + Sync,
105{
106    metadata: Arc<MetadataManager<P>>,
107    catalog: Arc<TableCatalog>,
108    store: Arc<ColumnStore<P>>,
109    type_registry: Arc<std::sync::RwLock<FxHashMap<String, sqlparser::ast::DataType>>>,
110}
111
112impl<P> CatalogManager<P>
113where
114    P: Pager<Blob = EntryHandle> + Send + Sync,
115{
116    /// Creates a new CatalogManager coordinating metadata, catalog, and storage layers.
117    pub fn new(
118        metadata: Arc<MetadataManager<P>>,
119        catalog: Arc<TableCatalog>,
120        store: Arc<ColumnStore<P>>,
121    ) -> Self {
122        Self {
123            metadata,
124            catalog,
125            store,
126            type_registry: Arc::new(std::sync::RwLock::new(FxHashMap::default())),
127        }
128    }
129
130    // ============================================================================
131    // Type Registry Management
132    // ============================================================================
133
134    /// Load custom types from system catalog.
135    /// Should be called during initialization to restore persisted types.
136    pub fn load_types_from_catalog(&self) -> LlkvResult<()> {
137        use crate::sys_catalog::SysCatalog;
138
139        let sys_catalog = SysCatalog::new(&self.store);
140        match sys_catalog.all_custom_type_metas() {
141            Ok(type_metas) => {
142                tracing::debug!(
143                    "[CATALOG] Loaded {} custom type(s) from catalog",
144                    type_metas.len()
145                );
146
147                let mut registry = self.type_registry.write().unwrap();
148                for type_meta in type_metas {
149                    // Parse the base_type_sql back to a DataType
150                    if let Ok(parsed_type) = parse_data_type_from_sql(&type_meta.base_type_sql) {
151                        registry.insert(type_meta.name.to_lowercase(), parsed_type);
152                    } else {
153                        tracing::warn!(
154                            "[CATALOG] Failed to parse base type SQL for type '{}': {}",
155                            type_meta.name,
156                            type_meta.base_type_sql
157                        );
158                    }
159                }
160
161                tracing::debug!(
162                    "[CATALOG] Type registry initialized with {} type(s)",
163                    registry.len()
164                );
165                Ok(())
166            }
167            Err(e) => {
168                tracing::warn!(
169                    "[CATALOG] Failed to load custom types: {}, starting with empty type registry",
170                    e
171                );
172                Ok(()) // Non-fatal, start with empty registry
173            }
174        }
175    }
176
177    /// Register a custom type alias (CREATE TYPE/DOMAIN).
178    pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
179        let mut registry = self.type_registry.write().unwrap();
180        registry.insert(name.to_lowercase(), data_type);
181    }
182
183    /// Drop a custom type alias (DROP TYPE/DOMAIN).
184    pub fn drop_type(&self, name: &str) -> LlkvResult<()> {
185        let mut registry = self.type_registry.write().unwrap();
186        if registry.remove(&name.to_lowercase()).is_none() {
187            return Err(Error::InvalidArgumentError(format!(
188                "Type '{}' does not exist",
189                name
190            )));
191        }
192        Ok(())
193    }
194
195    /// Resolve a type name to its base DataType, recursively following aliases.
196    pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
197        use sqlparser::ast::DataType;
198
199        match data_type {
200            DataType::Custom(obj_name, _) => {
201                let name = obj_name.to_string().to_lowercase();
202                let registry = self.type_registry.read().unwrap();
203                if let Some(base_type) = registry.get(&name) {
204                    // Recursively resolve in case the base type is also an alias
205                    self.resolve_type(base_type)
206                } else {
207                    // Not a custom type, return as-is
208                    data_type.clone()
209                }
210            }
211            // For non-custom types, return as-is
212            _ => data_type.clone(),
213        }
214    }
215
216    // ============================================================================
217    // View Management
218    // ============================================================================
219
220    /// Create a view by storing its SQL definition in the catalog.
221    /// The view will be registered as a table with a view_definition.
222    pub fn create_view(
223        &self,
224        display_name: &str,
225        view_definition: String,
226        column_specs: Vec<PlanColumnSpec>,
227    ) -> LlkvResult<crate::types::TableId> {
228        if column_specs.is_empty() {
229            return Err(Error::InvalidArgumentError(
230                "CREATE VIEW requires at least one column".into(),
231            ));
232        }
233
234        use crate::sys_catalog::TableMeta;
235
236        // Reserve a new table ID for the view
237        let table_id = self.metadata.reserve_table_id()?;
238
239        let created_at_micros = SystemTime::now()
240            .duration_since(UNIX_EPOCH)
241            .unwrap_or_default()
242            .as_micros() as u64;
243
244        // Create the table metadata with view_definition set
245        let table_meta = TableMeta {
246            table_id,
247            name: Some(display_name.to_string()),
248            created_at_micros,
249            flags: 0,
250            epoch: 0,
251            view_definition: Some(view_definition),
252        };
253
254        let mut table_columns = Vec::with_capacity(column_specs.len());
255        for (idx, spec) in column_specs.iter().enumerate() {
256            let field_id = field_id_for_index(idx)?;
257            table_columns.push(TableColumn {
258                field_id,
259                name: spec.name.clone(),
260                data_type: spec.data_type.clone(),
261                nullable: spec.nullable,
262                primary_key: spec.primary_key,
263                unique: spec.unique,
264                check_expr: spec.check_expr.clone(),
265            });
266        }
267
268        // Store the metadata and flush to disk
269        self.metadata.set_table_meta(table_id, table_meta)?;
270        self.metadata
271            .apply_column_definitions(table_id, &table_columns, created_at_micros)?;
272        self.metadata.flush_table(table_id)?;
273
274        // Register the view in the catalog (no namespace prefix - namespacing handled at runtime session layer)
275        self.catalog.register_table(display_name, table_id)?;
276
277        if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
278            for column in &table_columns {
279                let definition = FieldDefinition::new(&column.name)
280                    .with_primary_key(column.primary_key)
281                    .with_unique(column.unique)
282                    .with_check_expr(column.check_expr.clone());
283                if let Err(err) = field_resolver.register_field(definition) {
284                    self.catalog.unregister_table(table_id);
285                    self.metadata.remove_table_state(table_id);
286                    return Err(err);
287                }
288            }
289        }
290
291        tracing::debug!("Created view '{}' with table_id={}", display_name, table_id);
292        Ok(table_id)
293    }
294
295    /// Check if a table is actually a view by looking at its metadata.
296    /// Returns true if the table exists and has a view_definition.
297    pub fn is_view(&self, table_id: crate::types::TableId) -> LlkvResult<bool> {
298        match self.metadata.table_meta(table_id)? {
299            Some(meta) => Ok(meta.view_definition.is_some()),
300            None => Ok(false),
301        }
302    }
303
304    /// Drop a view by removing its metadata and catalog entry.
305    pub fn drop_view(&self, canonical_name: &str, table_id: TableId) -> LlkvResult<()> {
306        let (_, field_ids) = self.sorted_user_fields(table_id);
307        self.metadata.prepare_table_drop(table_id, &field_ids)?;
308        self.metadata.flush_table(table_id)?;
309        self.metadata.remove_table_state(table_id);
310
311        if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
312            let _ = self.catalog.unregister_table(table_id_from_catalog);
313        } else {
314            let _ = self.catalog.unregister_table(table_id);
315        }
316
317        Ok(())
318    }
319
320    // ============================================================================
321    // Table Creation
322    // ============================================================================
323
324    /// Create a new table using column specifications.
325    ///
326    /// Reserves table ID from metadata, validates columns, persists schema,
327    /// registers in catalog, and returns a Table handle for data operations.
328    pub(crate) fn create_table_from_columns(
329        &self,
330        display_name: &str,
331        canonical_name: &str,
332        columns: &[PlanColumnSpec],
333    ) -> LlkvResult<CreateTableResult<P>> {
334        if columns.is_empty() {
335            return Err(Error::InvalidArgumentError(
336                "CREATE TABLE requires at least one column".into(),
337            ));
338        }
339
340        let mut lookup: FxHashMap<String, usize> =
341            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
342        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
343
344        for (idx, column) in columns.iter().enumerate() {
345            let normalized = column.name.to_ascii_lowercase();
346            if lookup.insert(normalized.clone(), idx).is_some() {
347                return Err(Error::InvalidArgumentError(format!(
348                    "duplicate column name '{}' in table '{}'",
349                    column.name, display_name
350                )));
351            }
352
353            table_columns.push(TableColumn {
354                field_id: field_id_for_index(idx)?,
355                name: column.name.clone(),
356                data_type: column.data_type.clone(),
357                nullable: column.nullable,
358                primary_key: column.primary_key,
359                unique: column.unique,
360                check_expr: column.check_expr.clone(),
361            });
362        }
363
364        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
365    }
366
367    /// Create a new table using an Arrow schema (used by CTAS flows).
368    pub fn create_table_from_schema(
369        &self,
370        display_name: &str,
371        canonical_name: &str,
372        schema: &Schema,
373    ) -> LlkvResult<CreateTableResult<P>> {
374        if schema.fields().is_empty() {
375            return Err(Error::InvalidArgumentError(
376                "CREATE TABLE AS SELECT requires at least one column".into(),
377            ));
378        }
379
380        let mut lookup: FxHashMap<String, usize> =
381            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
382        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
383
384        for (idx, field) in schema.fields().iter().enumerate() {
385            // TODO: Extend with other supported types
386            let data_type = match field.data_type() {
387                DataType::Int64
388                | DataType::Int32
389                | DataType::Float64
390                | DataType::Utf8
391                | DataType::Date32
392                | DataType::Boolean
393                | DataType::Struct(_)
394                | DataType::Decimal128(_, _) => field.data_type().clone(),
395                other => {
396                    return Err(Error::InvalidArgumentError(format!(
397                        "unsupported column type in CTAS result: {other:?}"
398                    )));
399                }
400            };
401
402            let normalized = field.name().to_ascii_lowercase();
403            if lookup.insert(normalized.clone(), idx).is_some() {
404                return Err(Error::InvalidArgumentError(format!(
405                    "duplicate column name '{}' in CTAS result",
406                    field.name()
407                )));
408            }
409
410            table_columns.push(TableColumn {
411                field_id: field_id_for_index(idx)?,
412                name: field.name().to_string(),
413                data_type,
414                nullable: field.is_nullable(),
415                primary_key: false,
416                unique: false,
417                check_expr: None,
418            });
419        }
420
421        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
422    }
423
424    fn create_table_inner(
425        &self,
426        display_name: &str,
427        _canonical_name: &str,
428        table_columns: Vec<TableColumn>,
429        column_lookup: FxHashMap<String, usize>,
430    ) -> LlkvResult<CreateTableResult<P>> {
431        let table_id = self.metadata.reserve_table_id()?;
432        let timestamp = current_time_micros();
433        let table_meta = crate::sys_catalog::TableMeta {
434            table_id,
435            name: Some(display_name.to_string()),
436            created_at_micros: timestamp,
437            flags: 0,
438            epoch: 0,
439            view_definition: None, // Regular table, not a view
440        };
441
442        self.metadata.set_table_meta(table_id, table_meta)?;
443        self.metadata
444            .apply_column_definitions(table_id, &table_columns, timestamp)?;
445        self.metadata.flush_table(table_id)?;
446
447        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
448
449        // Register table in catalog using the table_id from metadata
450        tracing::debug!(
451            "[CATALOG_REGISTER] Registering table '{}' (id={}) in catalog @ {:p}",
452            display_name,
453            table_id,
454            &*self.catalog
455        );
456        if let Err(err) = self.catalog.register_table(display_name, table_id) {
457            self.metadata.remove_table_state(table_id);
458            return Err(err);
459        }
460
461        if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
462            for column in &table_columns {
463                let definition = FieldDefinition::new(&column.name)
464                    .with_primary_key(column.primary_key)
465                    .with_unique(column.unique)
466                    .with_check_expr(column.check_expr.clone());
467                if let Err(err) = field_resolver.register_field(definition) {
468                    self.catalog.unregister_table(table_id);
469                    self.metadata.remove_table_state(table_id);
470                    return Err(err);
471                }
472            }
473        }
474
475        Ok(CreateTableResult {
476            table_id,
477            table: Arc::new(table),
478            table_columns,
479            column_lookup,
480        })
481    }
482
483    /// Prepare metadata state and unregister catalog entries for a dropped table.
484    pub fn drop_table(
485        &self,
486        canonical_name: &str,
487        table_id: TableId,
488        column_field_ids: &[FieldId],
489    ) -> LlkvResult<()> {
490        use llkv_types::LogicalFieldId;
491
492        self.metadata
493            .prepare_table_drop(table_id, column_field_ids)?;
494        self.metadata.flush_table(table_id)?;
495        self.metadata.remove_table_state(table_id);
496
497        for field_id in column_field_ids {
498            let logical_field_id = LogicalFieldId::for_user(table_id, *field_id);
499            self.store.remove_column(logical_field_id)?;
500        }
501
502        self.store
503            .remove_column(LogicalFieldId::for_mvcc_created_by(table_id))?;
504        self.store
505            .remove_column(LogicalFieldId::for_mvcc_deleted_by(table_id))?;
506
507        if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
508            let _ = self.catalog.unregister_table(table_id_from_catalog);
509        } else {
510            let _ = self.catalog.unregister_table(table_id);
511        }
512        Ok(())
513    }
514
515    /// Rename a table across metadata and catalog layers.
516    pub fn rename_table(
517        &self,
518        table_id: TableId,
519        current_name: &str,
520        new_name: &str,
521    ) -> LlkvResult<()> {
522        if !current_name.eq_ignore_ascii_case(new_name) && self.catalog.table_id(new_name).is_some()
523        {
524            return Err(Error::CatalogError(format!(
525                "Table '{}' already exists",
526                new_name
527            )));
528        }
529
530        let previous_meta = self.metadata.table_meta(table_id)?;
531        let mut prior_snapshot = None;
532        if let Some(mut meta) = previous_meta.clone() {
533            prior_snapshot = Some(meta.clone());
534            meta.name = Some(new_name.to_string());
535            self.metadata.set_table_meta(table_id, meta)?;
536        }
537
538        if let Err(err) = self.catalog.rename_registered_table(current_name, new_name) {
539            if let Some(prior) = prior_snapshot {
540                let _ = self.metadata.set_table_meta(table_id, prior);
541            }
542            return Err(err);
543        }
544
545        if let Some(prior) = prior_snapshot.clone()
546            && let Err(err) = self.metadata.flush_table(table_id)
547        {
548            let _ = self.metadata.set_table_meta(table_id, prior);
549            let _ = self.catalog.rename_registered_table(new_name, current_name);
550            let _ = self.metadata.flush_table(table_id);
551            return Err(err);
552        }
553
554        Ok(())
555    }
556
557    /// Rename a column in a table by updating its metadata.
558    pub fn rename_column(
559        &self,
560        table_id: TableId,
561        old_column_name: &str,
562        new_column_name: &str,
563    ) -> LlkvResult<()> {
564        // Get all column metas for this table
565        let (_, field_ids) = self.sorted_user_fields(table_id);
566        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
567
568        // Find the column by old name
569        let mut found_col: Option<(u32, ColMeta)> = None;
570        for (idx, meta_opt) in column_metas.iter().enumerate() {
571            if let Some(meta) = meta_opt
572                && let Some(name) = &meta.name
573                && name.eq_ignore_ascii_case(old_column_name)
574            {
575                found_col = Some((field_ids[idx], meta.clone()));
576                break;
577            }
578        }
579
580        let (_field_id, mut col_meta) = found_col.ok_or_else(|| {
581            Error::InvalidArgumentError(format!("column '{}' not found in table", old_column_name))
582        })?;
583
584        // Update the column name
585        col_meta.name = Some(new_column_name.to_string());
586
587        // Save to catalog
588        let catalog = SysCatalog::new(&self.store);
589        catalog.put_col_meta(table_id, &col_meta);
590
591        // Update metadata manager cache
592        self.metadata.set_column_meta(table_id, col_meta)?;
593
594        // Update field resolver mapping for this column name change.
595        if let Some(resolver) = self.catalog.field_resolver(table_id) {
596            resolver.rename_field(old_column_name, new_column_name)?;
597        }
598
599        self.metadata.flush_table(table_id)?;
600
601        Ok(())
602    }
603
604    /// Alter the data type of a column.
605    ///
606    /// This updates both the column metadata and the storage layer's data type fingerprint.
607    /// Note that actual data conversion is NOT performed - the caller must ensure that
608    /// existing data is compatible with the new type (or that no data exists).
609    ///
610    /// # Arguments
611    /// * `table_id` - The table containing the column
612    /// * `column_name` - Name of the column to alter
613    /// * `new_data_type` - Arrow DataType to set for this column
614    pub fn alter_column_type(
615        &self,
616        table_id: TableId,
617        column_name: &str,
618        new_data_type: &DataType,
619    ) -> LlkvResult<()> {
620        // Get all column metas for this table
621        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
622        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
623
624        // Find the column by name
625        let mut found_col: Option<(usize, u32, ColMeta)> = None;
626        for (idx, meta_opt) in column_metas.iter().enumerate() {
627            if let Some(meta) = meta_opt
628                && let Some(name) = &meta.name
629                && name.eq_ignore_ascii_case(column_name)
630            {
631                found_col = Some((idx, field_ids[idx], meta.clone()));
632                break;
633            }
634        }
635
636        let (col_idx, _field_id, col_meta) = found_col.ok_or_else(|| {
637            Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
638        })?;
639
640        // Update the data type in the storage layer
641        let lfid = logical_fields[col_idx];
642        self.store.update_data_type(lfid, new_data_type)?;
643
644        // Save metadata to catalog
645        let catalog = SysCatalog::new(&self.store);
646        catalog.put_col_meta(table_id, &col_meta);
647
648        // Update metadata manager cache
649        self.metadata.set_column_meta(table_id, col_meta)?;
650
651        Ok(())
652    }
653
654    /// Drop a column from a table by removing its metadata.
655    pub fn drop_column(&self, table_id: TableId, column_name: &str) -> LlkvResult<()> {
656        // Get all column metas for this table
657        let (_, field_ids) = self.sorted_user_fields(table_id);
658        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
659
660        // Find the column by name
661        let mut found_col_id: Option<u32> = None;
662        for (idx, meta_opt) in column_metas.iter().enumerate() {
663            if let Some(meta) = meta_opt
664                && let Some(name) = &meta.name
665                && name.eq_ignore_ascii_case(column_name)
666            {
667                found_col_id = Some(field_ids[idx]);
668                break;
669            }
670        }
671
672        let col_id = found_col_id.ok_or_else(|| {
673            Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
674        })?;
675
676        // Remove column from the column store
677        use llkv_types::{LogicalFieldId, LogicalStorageNamespace};
678        let logical_field_id =
679            LogicalFieldId::from_parts(LogicalStorageNamespace::UserData, table_id, col_id);
680        self.store.remove_column(logical_field_id)?;
681
682        // Delete from catalog
683        let catalog = SysCatalog::new(&self.store);
684        catalog.delete_col_meta(table_id, &[col_id])?;
685
686        Ok(())
687    }
688
689    /// Register a single-column sort (B-tree) index. Optionally marks the field unique.
690    /// Returns `true` if the index was newly created, `false` if it already existed and `if_not_exists` was true.
691    #[allow(clippy::too_many_arguments)]
692    pub fn register_single_column_index(
693        &self,
694        display_name: &str,
695        canonical_name: &str,
696        table: &Table<P>,
697        field_id: FieldId,
698        column_name: &str,
699        index_name: Option<String>,
700        mark_unique: bool,
701        ascending: bool,
702        nulls_first: bool,
703        if_not_exists: bool,
704    ) -> LlkvResult<SingleColumnIndexRegistration> {
705        let table_id = table.table_id();
706        let existing_indexes = table.list_registered_indexes(field_id)?;
707        if existing_indexes.contains(&IndexKind::Sort) {
708            let existing_name = self
709                .metadata
710                .single_column_indexes(table_id)?
711                .into_iter()
712                .find(|entry| entry.column_id == field_id)
713                .map(|entry| entry.index_name)
714                .unwrap_or_else(|| column_name.to_string());
715
716            if if_not_exists {
717                return Ok(SingleColumnIndexRegistration::AlreadyExists {
718                    index_name: existing_name,
719                });
720            }
721
722            return Err(Error::CatalogError(format!(
723                "Index already exists on column '{}' in table '{}'",
724                column_name, display_name
725            )));
726        }
727
728        let index_display_name = match index_name {
729            Some(name) => name,
730            None => {
731                self.generate_single_column_index_name(table_id, canonical_name, column_name)?
732            }
733        };
734        if index_display_name.is_empty() {
735            return Err(Error::InvalidArgumentError(
736                "Index name must not be empty".into(),
737            ));
738        }
739        let canonical_index_name = index_display_name.to_ascii_lowercase();
740
741        if let Some(existing) = self
742            .metadata
743            .single_column_index(table_id, &canonical_index_name)?
744        {
745            if if_not_exists {
746                return Ok(SingleColumnIndexRegistration::AlreadyExists {
747                    index_name: existing.index_name,
748                });
749            }
750
751            return Err(Error::CatalogError(format!(
752                "Index '{}' already exists on table '{}'",
753                existing.index_name, display_name
754            )));
755        }
756
757        let entry = SingleColumnIndexEntry {
758            index_name: index_display_name.clone(),
759            canonical_name: canonical_index_name,
760            column_id: field_id,
761            column_name: column_name.to_string(),
762            unique: mark_unique,
763            ascending,
764            nulls_first,
765        };
766
767        self.metadata.put_single_column_index(table_id, entry)?;
768        self.metadata.register_sort_index(table_id, field_id)?;
769
770        if mark_unique {
771            let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
772            if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
773                resolver.set_field_unique(column_name, true)?;
774            }
775        }
776
777        self.metadata.flush_table(table_id)?;
778
779        Ok(SingleColumnIndexRegistration::Created {
780            index_name: index_display_name,
781        })
782    }
783
784    pub fn drop_single_column_index(
785        &self,
786        plan: DropIndexPlan,
787    ) -> LlkvResult<Option<SingleColumnIndexDescriptor>> {
788        let canonical_index = plan.canonical_name.to_ascii_lowercase();
789        let snapshot = self.catalog.snapshot();
790
791        for canonical_table_name in snapshot.table_names() {
792            let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
793                continue;
794            };
795
796            if let Some(entry) = self
797                .metadata
798                .single_column_index(table_id, &canonical_index)?
799            {
800                self.metadata
801                    .remove_single_column_index(table_id, &canonical_index)?;
802
803                if entry.unique
804                    && let Some(resolver) = self.catalog.field_resolver(table_id)
805                {
806                    resolver.set_field_unique(&entry.column_name, false)?;
807                }
808
809                self.metadata.flush_table(table_id)?;
810
811                let display_table_name = self
812                    .metadata
813                    .table_meta(table_id)?
814                    .and_then(|meta| meta.name)
815                    .unwrap_or_else(|| canonical_table_name.clone());
816
817                return Ok(Some(SingleColumnIndexDescriptor {
818                    index_name: entry.index_name,
819                    table_id,
820                    canonical_table_name,
821                    display_table_name,
822                    field_id: entry.column_id,
823                    column_name: entry.column_name,
824                    was_unique: entry.unique,
825                }));
826            }
827        }
828
829        if plan.if_exists {
830            Ok(None)
831        } else {
832            Err(Error::CatalogError(format!(
833                "Index '{}' does not exist",
834                plan.name
835            )))
836        }
837    }
838
839    /// Register a multi-column UNIQUE index.
840    pub fn register_multi_column_unique_index(
841        &self,
842        table_id: TableId,
843        field_ids: &[FieldId],
844        index_name: Option<String>,
845    ) -> LlkvResult<MultiColumnUniqueRegistration> {
846        let registration = self
847            .metadata
848            .register_multi_column_unique(table_id, field_ids, index_name)?;
849
850        if matches!(registration, MultiColumnUniqueRegistration::Created) {
851            self.metadata.flush_table(table_id)?;
852        }
853
854        Ok(registration)
855    }
856
857    #[allow(clippy::too_many_arguments)]
858    pub fn create_trigger(
859        &self,
860        trigger_display_name: &str,
861        canonical_trigger_name: &str,
862        table_display_name: &str,
863        canonical_table_name: &str,
864        timing: TriggerTimingMeta,
865        event: TriggerEventMeta,
866        for_each_row: bool,
867        condition: Option<String>,
868        body_sql: String,
869        if_not_exists: bool,
870    ) -> LlkvResult<bool> {
871        let Some(table_id) = self.catalog.table_id(canonical_table_name) else {
872            return Err(Error::CatalogError(format!(
873                "Table '{}' does not exist",
874                table_display_name
875            )));
876        };
877
878        let table_meta = self.metadata.table_meta(table_id)?;
879        let is_view = table_meta
880            .as_ref()
881            .and_then(|meta| meta.view_definition.as_ref())
882            .is_some();
883
884        match timing {
885            TriggerTimingMeta::InsteadOf => {
886                if !is_view {
887                    return Err(Error::InvalidArgumentError(format!(
888                        "INSTEAD OF trigger '{}' requires a view target",
889                        trigger_display_name
890                    )));
891                }
892            }
893            _ => {
894                if is_view {
895                    return Err(Error::InvalidArgumentError(format!(
896                        "Trigger '{}' must use INSTEAD OF when targeting a view",
897                        trigger_display_name
898                    )));
899                }
900            }
901        }
902
903        if self
904            .metadata
905            .trigger(table_id, canonical_trigger_name)?
906            .is_some()
907        {
908            if if_not_exists {
909                return Ok(false);
910            }
911            return Err(Error::CatalogError(format!(
912                "Trigger '{}' already exists",
913                trigger_display_name
914            )));
915        }
916
917        let entry = TriggerEntryMeta {
918            name: trigger_display_name.to_string(),
919            canonical_name: canonical_trigger_name.to_string(),
920            timing,
921            event,
922            for_each_row,
923            condition,
924            body_sql,
925        };
926
927        self.metadata.insert_trigger(table_id, entry)?;
928        self.metadata.flush_table(table_id)?;
929        Ok(true)
930    }
931
932    pub fn drop_trigger(
933        &self,
934        trigger_display_name: &str,
935        canonical_trigger_name: &str,
936        table_hint_display: Option<&str>,
937        table_hint_canonical: Option<&str>,
938        if_exists: bool,
939    ) -> LlkvResult<bool> {
940        let mut candidate_tables: Vec<(TableId, String)> = Vec::new();
941
942        if let Some(canonical_table) = table_hint_canonical {
943            match self.catalog.table_id(canonical_table) {
944                Some(table_id) => candidate_tables.push((table_id, canonical_table.to_string())),
945                None => {
946                    if if_exists {
947                        return Ok(false);
948                    }
949                    let display = table_hint_display.unwrap_or(canonical_table);
950                    return Err(Error::CatalogError(format!(
951                        "Table '{}' does not exist",
952                        display
953                    )));
954                }
955            }
956        } else {
957            let snapshot = self.catalog.snapshot();
958            for canonical_table in snapshot.table_names() {
959                if let Some(table_id) = snapshot.table_id(&canonical_table) {
960                    candidate_tables.push((table_id, canonical_table));
961                }
962            }
963        }
964
965        for (table_id, canonical_table) in candidate_tables {
966            if self
967                .metadata
968                .remove_trigger(table_id, canonical_trigger_name)?
969            {
970                self.metadata.flush_table(table_id)?;
971                return Ok(true);
972            } else if table_hint_canonical.is_some()
973                && table_hint_canonical
974                    .unwrap()
975                    .eq_ignore_ascii_case(&canonical_table)
976            {
977                break;
978            }
979        }
980
981        if if_exists {
982            Ok(false)
983        } else {
984            Err(Error::CatalogError(format!(
985                "Trigger '{}' does not exist",
986                trigger_display_name
987            )))
988        }
989    }
990
991    /// Register a multi-column index (unique or non-unique).
992    ///
993    /// Returns true if the index was created, false if it already exists.
994    pub fn register_multi_column_index(
995        &self,
996        table_id: TableId,
997        field_ids: &[FieldId],
998        index_name: String,
999        unique: bool,
1000    ) -> LlkvResult<bool> {
1001        let canonical_name = index_name.to_lowercase();
1002
1003        // Check if index already exists
1004        if let Some(_existing) = self
1005            .metadata
1006            .get_multi_column_index(table_id, &canonical_name)?
1007        {
1008            return Ok(false);
1009        }
1010
1011        // Create new index entry
1012        let entry = MultiColumnIndexEntryMeta {
1013            index_name: Some(index_name),
1014            canonical_name,
1015            column_ids: field_ids.to_vec(),
1016            unique,
1017        };
1018
1019        self.metadata.put_multi_column_index(table_id, entry)?;
1020        self.metadata.flush_table(table_id)?;
1021
1022        Ok(true)
1023    }
1024
1025    fn generate_single_column_index_name(
1026        &self,
1027        table_id: TableId,
1028        canonical_table_name: &str,
1029        column_name: &str,
1030    ) -> LlkvResult<String> {
1031        let table_token = if canonical_table_name.is_empty() {
1032            "table".to_string()
1033        } else {
1034            canonical_table_name.replace('.', "_")
1035        };
1036        let column_token = column_name.to_ascii_lowercase();
1037
1038        let mut candidate = format!("{}_{}_idx", table_token, column_token);
1039        let mut suffix: u32 = 1;
1040        loop {
1041            let canonical = candidate.to_ascii_lowercase();
1042            if self
1043                .metadata
1044                .single_column_index(table_id, &canonical)?
1045                .is_none()
1046            {
1047                return Ok(candidate);
1048            }
1049
1050            candidate = format!("{}_{}_idx{}", table_token, column_token, suffix);
1051            suffix = suffix.checked_add(1).ok_or_else(|| {
1052                Error::InvalidArgumentError("exhausted unique index name generation space".into())
1053            })?;
1054        }
1055    }
1056
1057    /// Append RecordBatches to a freshly created table, injecting MVCC columns.
1058    #[allow(clippy::too_many_arguments)]
1059    pub fn append_batches_with_mvcc(
1060        &self,
1061        table: &Table<P>,
1062        table_columns: &[TableColumn],
1063        batches: &[RecordBatch],
1064        creator_txn_id: u64,
1065        deleted_marker: u64,
1066        starting_row_id: RowId,
1067        mvcc_builder: &dyn MvccColumnBuilder,
1068    ) -> LlkvResult<(RowId, u64)> {
1069        let mut next_row_id = starting_row_id;
1070        let mut total_rows: u64 = 0;
1071
1072        for batch in batches {
1073            if batch.num_rows() == 0 {
1074                continue;
1075            }
1076
1077            if batch.num_columns() != table_columns.len() {
1078                return Err(Error::InvalidArgumentError(format!(
1079                    "CTAS query returned unexpected column count (expected {}, found {})",
1080                    table_columns.len(),
1081                    batch.num_columns()
1082                )));
1083            }
1084
1085            let row_count = batch.num_rows();
1086
1087            let (row_id_array, created_by_array, deleted_by_array) = mvcc_builder
1088                .build_insert_columns(row_count, next_row_id, creator_txn_id, deleted_marker);
1089
1090            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
1091            arrays.push(row_id_array);
1092            arrays.push(created_by_array);
1093            arrays.push(deleted_by_array);
1094
1095            let mut fields = mvcc_builder.mvcc_fields();
1096
1097            for (idx, column) in table_columns.iter().enumerate() {
1098                let array = batch.column(idx).clone();
1099                let field = mvcc_builder.field_with_metadata(
1100                    &column.name,
1101                    column.data_type.clone(),
1102                    column.nullable,
1103                    column.field_id,
1104                );
1105                arrays.push(array);
1106                fields.push(field);
1107            }
1108
1109            let append_schema = Arc::new(Schema::new(fields));
1110            let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
1111            table.append(&append_batch)?;
1112
1113            next_row_id = next_row_id.saturating_add(row_count as u64);
1114            total_rows = total_rows.saturating_add(row_count as u64);
1115        }
1116
1117        Ok((next_row_id, total_rows))
1118    }
1119
1120    /// Validate and register foreign keys for a newly created table.
1121    #[allow(clippy::too_many_arguments)]
1122    pub fn register_foreign_keys_for_new_table<F>(
1123        &self,
1124        table_id: TableId,
1125        display_name: &str,
1126        canonical_name: &str,
1127        table_columns: &[TableColumn],
1128        specs: &[ForeignKeySpec],
1129        lookup_table: F,
1130        timestamp_micros: u64,
1131    ) -> LlkvResult<Vec<ValidatedForeignKey>>
1132    where
1133        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1134    {
1135        if specs.is_empty() {
1136            return Ok(Vec::new());
1137        }
1138
1139        let referencing_columns: Vec<ForeignKeyColumn> = table_columns
1140            .iter()
1141            .map(|column| ForeignKeyColumn {
1142                name: column.name.clone(),
1143                data_type: column.data_type.clone(),
1144                nullable: column.nullable,
1145                primary_key: column.primary_key,
1146                unique: column.unique,
1147                field_id: column.field_id,
1148            })
1149            .collect();
1150
1151        let multi_column_uniques = {
1152            let catalog = SysCatalog::new(&self.store);
1153            let all_indexes = catalog.get_multi_column_indexes(table_id)?;
1154            all_indexes.into_iter().filter(|idx| idx.unique).collect()
1155        };
1156
1157        let referencing_table = ForeignKeyTableInfo {
1158            display_name: display_name.to_string(),
1159            canonical_name: canonical_name.to_string(),
1160            table_id,
1161            columns: referencing_columns,
1162            multi_column_uniques,
1163        };
1164
1165        self.metadata.validate_and_register_foreign_keys(
1166            &referencing_table,
1167            specs,
1168            lookup_table,
1169            timestamp_micros,
1170        )
1171    }
1172
1173    /// Resolve referenced tables for the provided foreign key view definitions.
1174    pub fn referenced_table_info(
1175        &self,
1176        views: &[ForeignKeyView],
1177    ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
1178        let mut results = Vec::with_capacity(views.len());
1179        for view in views {
1180            let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
1181                return Err(Error::CatalogError(format!(
1182                    "Catalog Error: referenced table '{}' does not exist",
1183                    view.referenced_table_display
1184                )));
1185            };
1186
1187            let Some(resolver) = self.catalog.field_resolver(table_id) else {
1188                return Err(Error::Internal(format!(
1189                    "catalog resolver missing for table '{}'",
1190                    view.referenced_table_display
1191                )));
1192            };
1193
1194            let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
1195            for field_id in &view.referenced_field_ids {
1196                let info = resolver.field_info(*field_id).ok_or_else(|| {
1197                    Error::Internal(format!(
1198                        "field metadata missing for id {} in table '{}'",
1199                        field_id, view.referenced_table_display
1200                    ))
1201                })?;
1202
1203                let data_type = self.metadata.column_data_type(table_id, *field_id)?;
1204
1205                columns.push(ForeignKeyColumn {
1206                    name: info.display_name.to_string(),
1207                    data_type,
1208                    nullable: !info.constraints.primary_key,
1209                    primary_key: info.constraints.primary_key,
1210                    unique: info.constraints.unique,
1211                    field_id: *field_id,
1212                });
1213            }
1214
1215            let multi_column_uniques = {
1216                let catalog = SysCatalog::new(&self.store);
1217                let all_indexes = catalog.get_multi_column_indexes(table_id)?;
1218                all_indexes.into_iter().filter(|idx| idx.unique).collect()
1219            };
1220
1221            results.push(ForeignKeyTableInfo {
1222                display_name: view.referenced_table_display.clone(),
1223                canonical_name: view.referenced_table_canonical.clone(),
1224                table_id,
1225                columns,
1226                multi_column_uniques,
1227            });
1228        }
1229
1230        Ok(results)
1231    }
1232
1233    /// Return the current metadata snapshot for a table, including column metadata and constraints.
1234    pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
1235        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1236            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1237        })?;
1238
1239        let (_, field_ids) = self.sorted_user_fields(table_id);
1240        self.table_view_with_field_ids(table_id, &field_ids)
1241    }
1242
1243    /// Produce a read-only view of a table's catalog, including column metadata and constraints.
1244    pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<PlanColumnSpec>> {
1245        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1246            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1247        })?;
1248
1249        let resolver = self
1250            .catalog
1251            .field_resolver(table_id)
1252            .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1253
1254        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
1255
1256        let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
1257        let column_metas = table_view.column_metas;
1258        let constraint_records = table_view.constraint_records;
1259
1260        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
1261        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
1262        let mut has_primary_key_records = false;
1263        let mut has_single_unique_records = false;
1264
1265        for record in constraint_records
1266            .iter()
1267            .filter(|record| record.is_active())
1268        {
1269            match &record.kind {
1270                ConstraintKind::PrimaryKey(pk) => {
1271                    has_primary_key_records = true;
1272                    for field_id in &pk.field_ids {
1273                        metadata_primary_keys.insert(*field_id);
1274                        metadata_unique_fields.insert(*field_id);
1275                    }
1276                }
1277                ConstraintKind::Unique(unique) => {
1278                    if unique.field_ids.len() == 1 {
1279                        has_single_unique_records = true;
1280                        if let Some(field_id) = unique.field_ids.first() {
1281                            metadata_unique_fields.insert(*field_id);
1282                        }
1283                    }
1284                }
1285                _ => {}
1286            }
1287        }
1288
1289        let mut specs = Vec::with_capacity(field_ids.len());
1290
1291        for (idx, lfid) in logical_fields.iter().enumerate() {
1292            let field_id = lfid.field_id();
1293
1294            let column_name = column_metas
1295                .get(idx)
1296                .and_then(|meta| meta.as_ref())
1297                .and_then(|meta| meta.name.clone())
1298                .unwrap_or_else(|| format!("col_{}", field_id));
1299
1300            let fallback_constraints = resolver
1301                .field_constraints_by_name(&column_name)
1302                .unwrap_or_default();
1303
1304            let metadata_primary = metadata_primary_keys.contains(&field_id);
1305            let primary_key = if has_primary_key_records {
1306                metadata_primary
1307            } else {
1308                fallback_constraints.primary_key
1309            };
1310
1311            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
1312            let unique = if has_primary_key_records || has_single_unique_records {
1313                metadata_unique
1314            } else {
1315                fallback_constraints.primary_key || fallback_constraints.unique
1316            };
1317
1318            let data_type = self.store.data_type(*lfid)?;
1319            let nullable = !primary_key;
1320
1321            let mut spec = PlanColumnSpec::new(column_name.clone(), data_type, nullable)
1322                .with_primary_key(primary_key)
1323                .with_unique(unique);
1324
1325            if let Some(check_expr) = fallback_constraints.check_expr.clone() {
1326                spec = spec.with_check(Some(check_expr));
1327            }
1328
1329            specs.push(spec);
1330        }
1331
1332        Ok(specs)
1333    }
1334
1335    /// Return the foreign key metadata for the specified table.
1336    pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
1337        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1338            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1339        })?;
1340
1341        self.metadata.foreign_key_views(&self.catalog, table_id)
1342    }
1343
1344    /// Return constraint-related catalog metadata for the specified table.
1345    pub fn table_constraint_summary(
1346        &self,
1347        canonical_name: &str,
1348    ) -> LlkvResult<TableConstraintSummaryView> {
1349        tracing::trace!(
1350            "[TABLE_CONSTRAINT_SUMMARY] Looking up table '{}' in catalog @ {:p}",
1351            canonical_name,
1352            &*self.catalog
1353        );
1354        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1355            tracing::error!(
1356                "[TABLE_CONSTRAINT_SUMMARY] Table '{}' NOT FOUND in catalog @ {:p}",
1357                canonical_name,
1358                &*self.catalog
1359            );
1360            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1361        })?;
1362        tracing::trace!(
1363            "[TABLE_CONSTRAINT_SUMMARY] Found table '{}' with id={} in catalog",
1364            canonical_name,
1365            table_id
1366        );
1367
1368        let (_, field_ids) = self.sorted_user_fields(table_id);
1369        let table_meta = self.metadata.table_meta(table_id)?;
1370        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
1371        let constraint_records = self.metadata.constraint_records(table_id)?;
1372        let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
1373        let constraint_names = self.metadata.constraint_names(table_id)?;
1374
1375        Ok(TableConstraintSummaryView {
1376            table_meta,
1377            column_metas,
1378            constraint_records,
1379            multi_column_uniques,
1380            constraint_names,
1381        })
1382    }
1383
1384    fn sorted_user_fields(
1385        &self,
1386        table_id: TableId,
1387    ) -> (Vec<llkv_types::LogicalFieldId>, Vec<FieldId>) {
1388        let mut logical_fields = self.store.user_field_ids_for_table(table_id);
1389        logical_fields.sort_by_key(|lfid| lfid.field_id());
1390        let field_ids = logical_fields
1391            .iter()
1392            .map(|lfid| lfid.field_id())
1393            .collect::<Vec<_>>();
1394
1395        (logical_fields, field_ids)
1396    }
1397
1398    fn table_view_with_field_ids(
1399        &self,
1400        table_id: TableId,
1401        field_ids: &[FieldId],
1402    ) -> LlkvResult<TableView> {
1403        self.metadata.table_view(&self.catalog, table_id, field_ids)
1404    }
1405
1406    // -------------------------------------------------------------------------
1407    // Catalog read-only views
1408    // -------------------------------------------------------------------------
1409
1410    /// Returns all table names in the catalog.
1411    pub fn table_names(&self) -> Vec<String> {
1412        self.catalog.table_names()
1413    }
1414
1415    /// Returns the TableId for a canonical table name.
1416    pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
1417        self.catalog.table_id(canonical_name)
1418    }
1419
1420    /// Returns a field resolver for the given table.
1421    pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
1422        self.catalog.field_resolver(table_id)
1423    }
1424
1425    /// Returns a snapshot of the catalog for read-only access.
1426    pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
1427        self.catalog.snapshot()
1428    }
1429
1430    /// Returns a reference to the internal catalog for services that need it.
1431    /// Note: This is primarily for internal use by services like ConstraintService.
1432    pub fn catalog(&self) -> &Arc<TableCatalog> {
1433        &self.catalog
1434    }
1435
1436    /// Returns all foreign keys that reference the specified table.
1437    /// Returns a vector of (referencing_table_id, constraint_id) pairs.
1438    pub fn foreign_keys_referencing(
1439        &self,
1440        referenced_table_id: TableId,
1441    ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
1442        self.metadata.foreign_keys_referencing(referenced_table_id)
1443    }
1444
1445    /// Returns detailed foreign key views for a specific table.
1446    /// This includes foreign keys where the specified table is the referencing table.
1447    pub fn foreign_key_views_for_table(
1448        &self,
1449        table_id: TableId,
1450    ) -> LlkvResult<Vec<ForeignKeyView>> {
1451        self.metadata.foreign_key_views(&self.catalog, table_id)
1452    }
1453}
1454
1455fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
1456    FieldId::try_from(idx + 1).map_err(|_| {
1457        Error::Internal(format!(
1458            "column index {} exceeded supported field id range",
1459            idx + 1
1460        ))
1461    })
1462}
1463
1464/// Parse a SQL type string (e.g., "INTEGER") back into a DataType.
1465fn parse_data_type_from_sql(sql: &str) -> LlkvResult<sqlparser::ast::DataType> {
1466    use sqlparser::dialect::GenericDialect;
1467    use sqlparser::parser::Parser;
1468
1469    // Try to parse as a simple CREATE DOMAIN statement
1470    let create_sql = format!("CREATE DOMAIN dummy AS {}", sql);
1471    let dialect = GenericDialect {};
1472
1473    match Parser::parse_sql(&dialect, &create_sql) {
1474        Ok(stmts) if !stmts.is_empty() => {
1475            if let sqlparser::ast::Statement::CreateDomain(create_domain) = &stmts[0] {
1476                Ok(create_domain.data_type.clone())
1477            } else {
1478                Err(Error::InvalidArgumentError(format!(
1479                    "Failed to parse type from SQL: {}",
1480                    sql
1481                )))
1482            }
1483        }
1484        _ => Err(Error::InvalidArgumentError(format!(
1485            "Failed to parse type from SQL: {}",
1486            sql
1487        ))),
1488    }
1489}