llkv_table/catalog/
manager.rs

1//! High-level service for creating tables.
2//!
3//! Use `CatalogManager` to create tables. It coordinates metadata persistence,
4//! catalog registration, and storage initialization.
5
6#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{DropIndexPlan, ForeignKeySpec, PlanColumnSpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use super::table_catalog::{FieldDefinition, TableCatalog};
24use crate::constraints::{ConstraintId, ConstraintKind};
25use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration, SingleColumnIndexEntry};
26use crate::sys_catalog::{ColMeta, MultiColumnIndexEntryMeta, SysCatalog};
27use crate::table::Table;
28use crate::types::{FieldId, RowId, TableColumn, TableId};
29use crate::{
30    ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
31    ValidatedForeignKey,
32};
33
34/// Result of creating a table. The caller is responsible for wiring executor
35/// caches and any higher-level state that depends on the table schema.
36pub struct CreateTableResult<P>
37where
38    P: Pager<Blob = EntryHandle> + Send + Sync,
39{
40    pub table_id: TableId,
41    pub table: Arc<Table<P>>,
42    pub table_columns: Vec<TableColumn>,
43    pub column_lookup: FxHashMap<String, usize>,
44}
45
46/// Result of attempting to register a single-column index definition.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum SingleColumnIndexRegistration {
49    Created { index_name: String },
50    AlreadyExists { index_name: String },
51}
52
53/// Descriptor for a single-column index resolved from catalog metadata.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct SingleColumnIndexDescriptor {
56    pub index_name: String,
57    pub table_id: TableId,
58    pub canonical_table_name: String,
59    pub display_table_name: String,
60    pub field_id: FieldId,
61    pub column_name: String,
62    pub was_unique: bool,
63}
64
65/// Trait for constructing MVCC columns and Arrow metadata during batch ingestion.
66///
67/// Implementations can delegate to `llkv_transaction::mvcc` or provide custom logic
68/// for synthetic data sources. This indirection avoids coupling the table crate to the
69/// transaction crate while still centralizing MVCC helpers.
70pub trait MvccColumnBuilder: Send + Sync {
71    /// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
72    fn build_insert_columns(
73        &self,
74        row_count: usize,
75        start_row_id: RowId,
76        creator_txn_id: u64,
77        deleted_marker: u64,
78    ) -> (ArrayRef, ArrayRef, ArrayRef);
79
80    /// Return the Arrow field definitions for MVCC columns.
81    fn mvcc_fields(&self) -> Vec<Field>;
82
83    /// Construct a user column field with the required metadata assigned.
84    fn field_with_metadata(
85        &self,
86        name: &str,
87        data_type: DataType,
88        nullable: bool,
89        field_id: FieldId,
90    ) -> Field;
91}
92
93/// Service for creating tables.
94///
95/// Coordinates metadata persistence (`MetadataManager`), catalog registration
96/// (`TableCatalog`), and storage initialization (`ColumnStore`).
97#[derive(Clone)]
98pub struct CatalogManager<P>
99where
100    P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102    metadata: Arc<MetadataManager<P>>,
103    catalog: Arc<TableCatalog>,
104    store: Arc<ColumnStore<P>>,
105    type_registry: Arc<std::sync::RwLock<FxHashMap<String, sqlparser::ast::DataType>>>,
106}
107
108impl<P> CatalogManager<P>
109where
110    P: Pager<Blob = EntryHandle> + Send + Sync,
111{
112    /// Creates a new CatalogManager coordinating metadata, catalog, and storage layers.
113    pub fn new(
114        metadata: Arc<MetadataManager<P>>,
115        catalog: Arc<TableCatalog>,
116        store: Arc<ColumnStore<P>>,
117    ) -> Self {
118        Self {
119            metadata,
120            catalog,
121            store,
122            type_registry: Arc::new(std::sync::RwLock::new(FxHashMap::default())),
123        }
124    }
125
126    // ============================================================================
127    // Type Registry Management
128    // ============================================================================
129
130    /// Load custom types from system catalog.
131    /// Should be called during initialization to restore persisted types.
132    pub fn load_types_from_catalog(&self) -> LlkvResult<()> {
133        use crate::sys_catalog::SysCatalog;
134
135        let sys_catalog = SysCatalog::new(&self.store);
136        match sys_catalog.all_custom_type_metas() {
137            Ok(type_metas) => {
138                tracing::debug!(
139                    "[CATALOG] Loaded {} custom type(s) from catalog",
140                    type_metas.len()
141                );
142
143                let mut registry = self.type_registry.write().unwrap();
144                for type_meta in type_metas {
145                    // Parse the base_type_sql back to a DataType
146                    if let Ok(parsed_type) = parse_data_type_from_sql(&type_meta.base_type_sql) {
147                        registry.insert(type_meta.name.to_lowercase(), parsed_type);
148                    } else {
149                        tracing::warn!(
150                            "[CATALOG] Failed to parse base type SQL for type '{}': {}",
151                            type_meta.name,
152                            type_meta.base_type_sql
153                        );
154                    }
155                }
156
157                tracing::debug!(
158                    "[CATALOG] Type registry initialized with {} type(s)",
159                    registry.len()
160                );
161                Ok(())
162            }
163            Err(e) => {
164                tracing::warn!(
165                    "[CATALOG] Failed to load custom types: {}, starting with empty type registry",
166                    e
167                );
168                Ok(()) // Non-fatal, start with empty registry
169            }
170        }
171    }
172
173    /// Register a custom type alias (CREATE TYPE/DOMAIN).
174    pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
175        let mut registry = self.type_registry.write().unwrap();
176        registry.insert(name.to_lowercase(), data_type);
177    }
178
179    /// Drop a custom type alias (DROP TYPE/DOMAIN).
180    pub fn drop_type(&self, name: &str) -> LlkvResult<()> {
181        let mut registry = self.type_registry.write().unwrap();
182        if registry.remove(&name.to_lowercase()).is_none() {
183            return Err(Error::InvalidArgumentError(format!(
184                "Type '{}' does not exist",
185                name
186            )));
187        }
188        Ok(())
189    }
190
191    /// Resolve a type name to its base DataType, recursively following aliases.
192    pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
193        use sqlparser::ast::DataType;
194
195        match data_type {
196            DataType::Custom(obj_name, _) => {
197                let name = obj_name.to_string().to_lowercase();
198                let registry = self.type_registry.read().unwrap();
199                if let Some(base_type) = registry.get(&name) {
200                    // Recursively resolve in case the base type is also an alias
201                    self.resolve_type(base_type)
202                } else {
203                    // Not a custom type, return as-is
204                    data_type.clone()
205                }
206            }
207            // For non-custom types, return as-is
208            _ => data_type.clone(),
209        }
210    }
211
212    // ============================================================================
213    // View Management
214    // ============================================================================
215
216    /// Create a view by storing its SQL definition in the catalog.
217    /// The view will be registered as a table with a view_definition.
218    pub fn create_view(
219        &self,
220        display_name: &str,
221        view_definition: String,
222    ) -> LlkvResult<crate::types::TableId> {
223        use crate::sys_catalog::TableMeta;
224
225        // Reserve a new table ID for the view
226        let table_id = self.metadata.reserve_table_id()?;
227
228        let created_at_micros = SystemTime::now()
229            .duration_since(UNIX_EPOCH)
230            .unwrap_or_default()
231            .as_micros() as u64;
232
233        // Create the table metadata with view_definition set
234        let table_meta = TableMeta {
235            table_id,
236            name: Some(display_name.to_string()),
237            created_at_micros,
238            flags: 0,
239            epoch: 0,
240            view_definition: Some(view_definition),
241        };
242
243        // Store the metadata and flush to disk
244        self.metadata.set_table_meta(table_id, table_meta)?;
245        self.metadata.flush_table(table_id)?;
246
247        // Register the view in the catalog so it can be looked up by name
248        self.catalog.register_table(display_name, table_id)?;
249
250        tracing::debug!("Created view '{}' with table_id={}", display_name, table_id);
251        Ok(table_id)
252    }
253
254    /// Check if a table is actually a view by looking at its metadata.
255    /// Returns true if the table exists and has a view_definition.
256    pub fn is_view(&self, table_id: crate::types::TableId) -> LlkvResult<bool> {
257        match self.metadata.table_meta(table_id)? {
258            Some(meta) => Ok(meta.view_definition.is_some()),
259            None => Ok(false),
260        }
261    }
262
263    // ============================================================================
264    // Table Creation
265    // ============================================================================
266
267    /// Create a new table using column specifications.
268    ///
269    /// Reserves table ID from metadata, validates columns, persists schema,
270    /// registers in catalog, and returns a Table handle for data operations.
271    pub(crate) fn create_table_from_columns(
272        &self,
273        display_name: &str,
274        canonical_name: &str,
275        columns: &[PlanColumnSpec],
276    ) -> LlkvResult<CreateTableResult<P>> {
277        if columns.is_empty() {
278            return Err(Error::InvalidArgumentError(
279                "CREATE TABLE requires at least one column".into(),
280            ));
281        }
282
283        let mut lookup: FxHashMap<String, usize> =
284            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
285        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
286
287        for (idx, column) in columns.iter().enumerate() {
288            let normalized = column.name.to_ascii_lowercase();
289            if lookup.insert(normalized.clone(), idx).is_some() {
290                return Err(Error::InvalidArgumentError(format!(
291                    "duplicate column name '{}' in table '{}'",
292                    column.name, display_name
293                )));
294            }
295
296            table_columns.push(TableColumn {
297                field_id: field_id_for_index(idx)?,
298                name: column.name.clone(),
299                data_type: column.data_type.clone(),
300                nullable: column.nullable,
301                primary_key: column.primary_key,
302                unique: column.unique,
303                check_expr: column.check_expr.clone(),
304            });
305        }
306
307        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
308    }
309
310    /// Create a new table using an Arrow schema (used by CTAS flows).
311    pub fn create_table_from_schema(
312        &self,
313        display_name: &str,
314        canonical_name: &str,
315        schema: &Schema,
316    ) -> LlkvResult<CreateTableResult<P>> {
317        if schema.fields().is_empty() {
318            return Err(Error::InvalidArgumentError(
319                "CREATE TABLE AS SELECT requires at least one column".into(),
320            ));
321        }
322
323        let mut lookup: FxHashMap<String, usize> =
324            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
325        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
326
327        for (idx, field) in schema.fields().iter().enumerate() {
328            let data_type = match field.data_type() {
329                DataType::Int64
330                | DataType::Float64
331                | DataType::Utf8
332                | DataType::Date32
333                | DataType::Struct(_) => field.data_type().clone(),
334                other => {
335                    return Err(Error::InvalidArgumentError(format!(
336                        "unsupported column type in CTAS result: {other:?}"
337                    )));
338                }
339            };
340
341            let normalized = field.name().to_ascii_lowercase();
342            if lookup.insert(normalized.clone(), idx).is_some() {
343                return Err(Error::InvalidArgumentError(format!(
344                    "duplicate column name '{}' in CTAS result",
345                    field.name()
346                )));
347            }
348
349            table_columns.push(TableColumn {
350                field_id: field_id_for_index(idx)?,
351                name: field.name().to_string(),
352                data_type,
353                nullable: field.is_nullable(),
354                primary_key: false,
355                unique: false,
356                check_expr: None,
357            });
358        }
359
360        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
361    }
362
363    fn create_table_inner(
364        &self,
365        display_name: &str,
366        _canonical_name: &str,
367        table_columns: Vec<TableColumn>,
368        column_lookup: FxHashMap<String, usize>,
369    ) -> LlkvResult<CreateTableResult<P>> {
370        let table_id = self.metadata.reserve_table_id()?;
371        let timestamp = current_time_micros();
372        let table_meta = crate::sys_catalog::TableMeta {
373            table_id,
374            name: Some(display_name.to_string()),
375            created_at_micros: timestamp,
376            flags: 0,
377            epoch: 0,
378            view_definition: None, // Regular table, not a view
379        };
380
381        self.metadata.set_table_meta(table_id, table_meta)?;
382        self.metadata
383            .apply_column_definitions(table_id, &table_columns, timestamp)?;
384        self.metadata.flush_table(table_id)?;
385
386        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
387
388        // Register table in catalog using the table_id from metadata
389        if let Err(err) = self.catalog.register_table(display_name, table_id) {
390            self.metadata.remove_table_state(table_id);
391            return Err(err);
392        }
393
394        if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
395            for column in &table_columns {
396                let definition = FieldDefinition::new(&column.name)
397                    .with_primary_key(column.primary_key)
398                    .with_unique(column.unique)
399                    .with_check_expr(column.check_expr.clone());
400                if let Err(err) = field_resolver.register_field(definition) {
401                    self.catalog.unregister_table(table_id);
402                    self.metadata.remove_table_state(table_id);
403                    return Err(err);
404                }
405            }
406        }
407
408        Ok(CreateTableResult {
409            table_id,
410            table: Arc::new(table),
411            table_columns,
412            column_lookup,
413        })
414    }
415
416    /// Prepare metadata state and unregister catalog entries for a dropped table.
417    pub fn drop_table(
418        &self,
419        canonical_name: &str,
420        table_id: TableId,
421        column_field_ids: &[FieldId],
422    ) -> LlkvResult<()> {
423        self.metadata
424            .prepare_table_drop(table_id, column_field_ids)?;
425        self.metadata.flush_table(table_id)?;
426        self.metadata.remove_table_state(table_id);
427        if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
428            let _ = self.catalog.unregister_table(table_id_from_catalog);
429        } else {
430            let _ = self.catalog.unregister_table(table_id);
431        }
432        Ok(())
433    }
434
435    /// Rename a table across metadata and catalog layers.
436    pub fn rename_table(
437        &self,
438        table_id: TableId,
439        current_name: &str,
440        new_name: &str,
441    ) -> LlkvResult<()> {
442        if !current_name.eq_ignore_ascii_case(new_name) && self.catalog.table_id(new_name).is_some()
443        {
444            return Err(Error::CatalogError(format!(
445                "Table '{}' already exists",
446                new_name
447            )));
448        }
449
450        let previous_meta = self.metadata.table_meta(table_id)?;
451        let mut prior_snapshot = None;
452        if let Some(mut meta) = previous_meta.clone() {
453            prior_snapshot = Some(meta.clone());
454            meta.name = Some(new_name.to_string());
455            self.metadata.set_table_meta(table_id, meta)?;
456        }
457
458        if let Err(err) = self.catalog.rename_registered_table(current_name, new_name) {
459            if let Some(prior) = prior_snapshot {
460                let _ = self.metadata.set_table_meta(table_id, prior);
461            }
462            return Err(err);
463        }
464
465        if let Some(prior) = prior_snapshot.clone()
466            && let Err(err) = self.metadata.flush_table(table_id)
467        {
468            let _ = self.metadata.set_table_meta(table_id, prior);
469            let _ = self.catalog.rename_registered_table(new_name, current_name);
470            let _ = self.metadata.flush_table(table_id);
471            return Err(err);
472        }
473
474        Ok(())
475    }
476
477    /// Rename a column in a table by updating its metadata.
478    pub fn rename_column(
479        &self,
480        table_id: TableId,
481        old_column_name: &str,
482        new_column_name: &str,
483    ) -> LlkvResult<()> {
484        // Get all column metas for this table
485        let (_, field_ids) = self.sorted_user_fields(table_id);
486        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
487
488        // Find the column by old name
489        let mut found_col: Option<(u32, ColMeta)> = None;
490        for (idx, meta_opt) in column_metas.iter().enumerate() {
491            if let Some(meta) = meta_opt
492                && let Some(name) = &meta.name
493                && name.eq_ignore_ascii_case(old_column_name)
494            {
495                found_col = Some((field_ids[idx], meta.clone()));
496                break;
497            }
498        }
499
500        let (_field_id, mut col_meta) = found_col.ok_or_else(|| {
501            Error::InvalidArgumentError(format!("column '{}' not found in table", old_column_name))
502        })?;
503
504        // Update the column name
505        col_meta.name = Some(new_column_name.to_string());
506
507        // Save to catalog
508        let catalog = SysCatalog::new(&self.store);
509        catalog.put_col_meta(table_id, &col_meta);
510
511        // Update metadata manager cache
512        self.metadata.set_column_meta(table_id, col_meta)?;
513
514        // Update field resolver mapping for this column name change.
515        if let Some(resolver) = self.catalog.field_resolver(table_id) {
516            resolver.rename_field(old_column_name, new_column_name)?;
517        }
518
519        self.metadata.flush_table(table_id)?;
520
521        Ok(())
522    }
523
524    /// Alter the data type of a column.
525    ///
526    /// This updates both the column metadata and the storage layer's data type fingerprint.
527    /// Note that actual data conversion is NOT performed - the caller must ensure that
528    /// existing data is compatible with the new type (or that no data exists).
529    ///
530    /// # Arguments
531    /// * `table_id` - The table containing the column
532    /// * `column_name` - Name of the column to alter
533    /// * `new_data_type` - Arrow DataType to set for this column
534    pub fn alter_column_type(
535        &self,
536        table_id: TableId,
537        column_name: &str,
538        new_data_type: &DataType,
539    ) -> LlkvResult<()> {
540        // Get all column metas for this table
541        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
542        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
543
544        // Find the column by name
545        let mut found_col: Option<(usize, u32, ColMeta)> = None;
546        for (idx, meta_opt) in column_metas.iter().enumerate() {
547            if let Some(meta) = meta_opt
548                && let Some(name) = &meta.name
549                && name.eq_ignore_ascii_case(column_name)
550            {
551                found_col = Some((idx, field_ids[idx], meta.clone()));
552                break;
553            }
554        }
555
556        let (col_idx, _field_id, col_meta) = found_col.ok_or_else(|| {
557            Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
558        })?;
559
560        // Update the data type in the storage layer
561        let lfid = logical_fields[col_idx];
562        self.store.update_data_type(lfid, new_data_type)?;
563
564        // Save metadata to catalog
565        let catalog = SysCatalog::new(&self.store);
566        catalog.put_col_meta(table_id, &col_meta);
567
568        // Update metadata manager cache
569        self.metadata.set_column_meta(table_id, col_meta)?;
570
571        Ok(())
572    }
573
574    /// Drop a column from a table by removing its metadata.
575    pub fn drop_column(&self, table_id: TableId, column_name: &str) -> LlkvResult<()> {
576        // Get all column metas for this table
577        let (_, field_ids) = self.sorted_user_fields(table_id);
578        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
579
580        // Find the column by name
581        let mut found_col_id: Option<u32> = None;
582        for (idx, meta_opt) in column_metas.iter().enumerate() {
583            if let Some(meta) = meta_opt
584                && let Some(name) = &meta.name
585                && name.eq_ignore_ascii_case(column_name)
586            {
587                found_col_id = Some(field_ids[idx]);
588                break;
589            }
590        }
591
592        let col_id = found_col_id.ok_or_else(|| {
593            Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
594        })?;
595
596        // Delete from catalog
597        let catalog = SysCatalog::new(&self.store);
598        catalog.delete_col_meta(table_id, &[col_id])?;
599
600        Ok(())
601    }
602
603    /// Register a single-column sort (B-tree) index. Optionally marks the field unique.
604    /// Returns `true` if the index was newly created, `false` if it already existed and `if_not_exists` was true.
605    #[allow(clippy::too_many_arguments)]
606    pub fn register_single_column_index(
607        &self,
608        display_name: &str,
609        canonical_name: &str,
610        table: &Table<P>,
611        field_id: FieldId,
612        column_name: &str,
613        index_name: Option<String>,
614        mark_unique: bool,
615        ascending: bool,
616        nulls_first: bool,
617        if_not_exists: bool,
618    ) -> LlkvResult<SingleColumnIndexRegistration> {
619        let table_id = table.table_id();
620        let existing_indexes = table.list_registered_indexes(field_id)?;
621        if existing_indexes.contains(&IndexKind::Sort) {
622            let existing_name = self
623                .metadata
624                .single_column_indexes(table_id)?
625                .into_iter()
626                .find(|entry| entry.column_id == field_id)
627                .map(|entry| entry.index_name)
628                .unwrap_or_else(|| column_name.to_string());
629
630            if if_not_exists {
631                return Ok(SingleColumnIndexRegistration::AlreadyExists {
632                    index_name: existing_name,
633                });
634            }
635
636            return Err(Error::CatalogError(format!(
637                "Index already exists on column '{}' in table '{}'",
638                column_name, display_name
639            )));
640        }
641
642        let index_display_name = match index_name {
643            Some(name) => name,
644            None => {
645                self.generate_single_column_index_name(table_id, canonical_name, column_name)?
646            }
647        };
648        if index_display_name.is_empty() {
649            return Err(Error::InvalidArgumentError(
650                "Index name must not be empty".into(),
651            ));
652        }
653        let canonical_index_name = index_display_name.to_ascii_lowercase();
654
655        if let Some(existing) = self
656            .metadata
657            .single_column_index(table_id, &canonical_index_name)?
658        {
659            if if_not_exists {
660                return Ok(SingleColumnIndexRegistration::AlreadyExists {
661                    index_name: existing.index_name,
662                });
663            }
664
665            return Err(Error::CatalogError(format!(
666                "Index '{}' already exists on table '{}'",
667                existing.index_name, display_name
668            )));
669        }
670
671        let entry = SingleColumnIndexEntry {
672            index_name: index_display_name.clone(),
673            canonical_name: canonical_index_name,
674            column_id: field_id,
675            column_name: column_name.to_string(),
676            unique: mark_unique,
677            ascending,
678            nulls_first,
679        };
680
681        self.metadata.put_single_column_index(table_id, entry)?;
682        self.metadata.register_sort_index(table_id, field_id)?;
683
684        if mark_unique {
685            let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
686            if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
687                resolver.set_field_unique(column_name, true)?;
688            }
689        }
690
691        self.metadata.flush_table(table_id)?;
692
693        Ok(SingleColumnIndexRegistration::Created {
694            index_name: index_display_name,
695        })
696    }
697
698    pub fn drop_single_column_index(
699        &self,
700        plan: DropIndexPlan,
701    ) -> LlkvResult<Option<SingleColumnIndexDescriptor>> {
702        let canonical_index = plan.canonical_name.to_ascii_lowercase();
703        let snapshot = self.catalog.snapshot();
704
705        for canonical_table_name in snapshot.table_names() {
706            let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
707                continue;
708            };
709
710            if let Some(entry) = self
711                .metadata
712                .single_column_index(table_id, &canonical_index)?
713            {
714                self.metadata
715                    .remove_single_column_index(table_id, &canonical_index)?;
716
717                if entry.unique
718                    && let Some(resolver) = self.catalog.field_resolver(table_id)
719                {
720                    resolver.set_field_unique(&entry.column_name, false)?;
721                }
722
723                self.metadata.flush_table(table_id)?;
724
725                let display_table_name = self
726                    .metadata
727                    .table_meta(table_id)?
728                    .and_then(|meta| meta.name)
729                    .unwrap_or_else(|| canonical_table_name.clone());
730
731                return Ok(Some(SingleColumnIndexDescriptor {
732                    index_name: entry.index_name,
733                    table_id,
734                    canonical_table_name,
735                    display_table_name,
736                    field_id: entry.column_id,
737                    column_name: entry.column_name,
738                    was_unique: entry.unique,
739                }));
740            }
741        }
742
743        if plan.if_exists {
744            Ok(None)
745        } else {
746            Err(Error::CatalogError(format!(
747                "Index '{}' does not exist",
748                plan.name
749            )))
750        }
751    }
752
753    /// Register a multi-column UNIQUE index.
754    pub fn register_multi_column_unique_index(
755        &self,
756        table_id: TableId,
757        field_ids: &[FieldId],
758        index_name: Option<String>,
759    ) -> LlkvResult<MultiColumnUniqueRegistration> {
760        let registration = self
761            .metadata
762            .register_multi_column_unique(table_id, field_ids, index_name)?;
763
764        if matches!(registration, MultiColumnUniqueRegistration::Created) {
765            self.metadata.flush_table(table_id)?;
766        }
767
768        Ok(registration)
769    }
770
771    /// Register a multi-column index (unique or non-unique).
772    ///
773    /// Returns true if the index was created, false if it already exists.
774    pub fn register_multi_column_index(
775        &self,
776        table_id: TableId,
777        field_ids: &[FieldId],
778        index_name: String,
779        unique: bool,
780    ) -> LlkvResult<bool> {
781        let canonical_name = index_name.to_lowercase();
782
783        // Check if index already exists
784        if let Some(_existing) = self
785            .metadata
786            .get_multi_column_index(table_id, &canonical_name)?
787        {
788            return Ok(false);
789        }
790
791        // Create new index entry
792        let entry = MultiColumnIndexEntryMeta {
793            index_name: Some(index_name),
794            canonical_name,
795            column_ids: field_ids.to_vec(),
796            unique,
797        };
798
799        self.metadata.put_multi_column_index(table_id, entry)?;
800        self.metadata.flush_table(table_id)?;
801
802        Ok(true)
803    }
804
805    fn generate_single_column_index_name(
806        &self,
807        table_id: TableId,
808        canonical_table_name: &str,
809        column_name: &str,
810    ) -> LlkvResult<String> {
811        let table_token = if canonical_table_name.is_empty() {
812            "table".to_string()
813        } else {
814            canonical_table_name.replace('.', "_")
815        };
816        let column_token = column_name.to_ascii_lowercase();
817
818        let mut candidate = format!("{}_{}_idx", table_token, column_token);
819        let mut suffix: u32 = 1;
820        loop {
821            let canonical = candidate.to_ascii_lowercase();
822            if self
823                .metadata
824                .single_column_index(table_id, &canonical)?
825                .is_none()
826            {
827                return Ok(candidate);
828            }
829
830            candidate = format!("{}_{}_idx{}", table_token, column_token, suffix);
831            suffix = suffix.checked_add(1).ok_or_else(|| {
832                Error::InvalidArgumentError("exhausted unique index name generation space".into())
833            })?;
834        }
835    }
836
837    /// Append RecordBatches to a freshly created table, injecting MVCC columns.
838    #[allow(clippy::too_many_arguments)]
839    pub fn append_batches_with_mvcc(
840        &self,
841        table: &Table<P>,
842        table_columns: &[TableColumn],
843        batches: &[RecordBatch],
844        creator_txn_id: u64,
845        deleted_marker: u64,
846        starting_row_id: RowId,
847        mvcc_builder: &dyn MvccColumnBuilder,
848    ) -> LlkvResult<(RowId, u64)> {
849        let mut next_row_id = starting_row_id;
850        let mut total_rows: u64 = 0;
851
852        for batch in batches {
853            if batch.num_rows() == 0 {
854                continue;
855            }
856
857            if batch.num_columns() != table_columns.len() {
858                return Err(Error::InvalidArgumentError(format!(
859                    "CTAS query returned unexpected column count (expected {}, found {})",
860                    table_columns.len(),
861                    batch.num_columns()
862                )));
863            }
864
865            let row_count = batch.num_rows();
866
867            let (row_id_array, created_by_array, deleted_by_array) = mvcc_builder
868                .build_insert_columns(row_count, next_row_id, creator_txn_id, deleted_marker);
869
870            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
871            arrays.push(row_id_array);
872            arrays.push(created_by_array);
873            arrays.push(deleted_by_array);
874
875            let mut fields = mvcc_builder.mvcc_fields();
876
877            for (idx, column) in table_columns.iter().enumerate() {
878                let array = batch.column(idx).clone();
879                let field = mvcc_builder.field_with_metadata(
880                    &column.name,
881                    column.data_type.clone(),
882                    column.nullable,
883                    column.field_id,
884                );
885                arrays.push(array);
886                fields.push(field);
887            }
888
889            let append_schema = Arc::new(Schema::new(fields));
890            let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
891            table.append(&append_batch)?;
892
893            next_row_id = next_row_id.saturating_add(row_count as u64);
894            total_rows = total_rows.saturating_add(row_count as u64);
895        }
896
897        Ok((next_row_id, total_rows))
898    }
899
900    /// Validate and register foreign keys for a newly created table.
901    #[allow(clippy::too_many_arguments)]
902    pub fn register_foreign_keys_for_new_table<F>(
903        &self,
904        table_id: TableId,
905        display_name: &str,
906        canonical_name: &str,
907        table_columns: &[TableColumn],
908        specs: &[ForeignKeySpec],
909        lookup_table: F,
910        timestamp_micros: u64,
911    ) -> LlkvResult<Vec<ValidatedForeignKey>>
912    where
913        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
914    {
915        if specs.is_empty() {
916            return Ok(Vec::new());
917        }
918
919        let referencing_columns: Vec<ForeignKeyColumn> = table_columns
920            .iter()
921            .map(|column| ForeignKeyColumn {
922                name: column.name.clone(),
923                data_type: column.data_type.clone(),
924                nullable: column.nullable,
925                primary_key: column.primary_key,
926                unique: column.unique,
927                field_id: column.field_id,
928            })
929            .collect();
930
931        let multi_column_uniques = {
932            let catalog = SysCatalog::new(&self.store);
933            let all_indexes = catalog.get_multi_column_indexes(table_id)?;
934            all_indexes.into_iter().filter(|idx| idx.unique).collect()
935        };
936
937        let referencing_table = ForeignKeyTableInfo {
938            display_name: display_name.to_string(),
939            canonical_name: canonical_name.to_string(),
940            table_id,
941            columns: referencing_columns,
942            multi_column_uniques,
943        };
944
945        self.metadata.validate_and_register_foreign_keys(
946            &referencing_table,
947            specs,
948            lookup_table,
949            timestamp_micros,
950        )
951    }
952
953    /// Resolve referenced tables for the provided foreign key view definitions.
954    pub fn referenced_table_info(
955        &self,
956        views: &[ForeignKeyView],
957    ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
958        let mut results = Vec::with_capacity(views.len());
959        for view in views {
960            let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
961                return Err(Error::CatalogError(format!(
962                    "Catalog Error: referenced table '{}' does not exist",
963                    view.referenced_table_display
964                )));
965            };
966
967            let Some(resolver) = self.catalog.field_resolver(table_id) else {
968                return Err(Error::Internal(format!(
969                    "catalog resolver missing for table '{}'",
970                    view.referenced_table_display
971                )));
972            };
973
974            let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
975            for field_id in &view.referenced_field_ids {
976                let info = resolver.field_info(*field_id).ok_or_else(|| {
977                    Error::Internal(format!(
978                        "field metadata missing for id {} in table '{}'",
979                        field_id, view.referenced_table_display
980                    ))
981                })?;
982
983                let data_type = self.metadata.column_data_type(table_id, *field_id)?;
984
985                columns.push(ForeignKeyColumn {
986                    name: info.display_name.to_string(),
987                    data_type,
988                    nullable: !info.constraints.primary_key,
989                    primary_key: info.constraints.primary_key,
990                    unique: info.constraints.unique,
991                    field_id: *field_id,
992                });
993            }
994
995            let multi_column_uniques = {
996                let catalog = SysCatalog::new(&self.store);
997                let all_indexes = catalog.get_multi_column_indexes(table_id)?;
998                all_indexes.into_iter().filter(|idx| idx.unique).collect()
999            };
1000
1001            results.push(ForeignKeyTableInfo {
1002                display_name: view.referenced_table_display.clone(),
1003                canonical_name: view.referenced_table_canonical.clone(),
1004                table_id,
1005                columns,
1006                multi_column_uniques,
1007            });
1008        }
1009
1010        Ok(results)
1011    }
1012
1013    /// Return the current metadata snapshot for a table, including column metadata and constraints.
1014    pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
1015        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1016            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1017        })?;
1018
1019        let (_, field_ids) = self.sorted_user_fields(table_id);
1020        self.table_view_with_field_ids(table_id, &field_ids)
1021    }
1022
1023    /// Produce a read-only view of a table's catalog, including column metadata and constraints.
1024    pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<PlanColumnSpec>> {
1025        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1026            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1027        })?;
1028
1029        let resolver = self
1030            .catalog
1031            .field_resolver(table_id)
1032            .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1033
1034        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
1035
1036        let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
1037        let column_metas = table_view.column_metas;
1038        let constraint_records = table_view.constraint_records;
1039
1040        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
1041        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
1042        let mut has_primary_key_records = false;
1043        let mut has_single_unique_records = false;
1044
1045        for record in constraint_records
1046            .iter()
1047            .filter(|record| record.is_active())
1048        {
1049            match &record.kind {
1050                ConstraintKind::PrimaryKey(pk) => {
1051                    has_primary_key_records = true;
1052                    for field_id in &pk.field_ids {
1053                        metadata_primary_keys.insert(*field_id);
1054                        metadata_unique_fields.insert(*field_id);
1055                    }
1056                }
1057                ConstraintKind::Unique(unique) => {
1058                    if unique.field_ids.len() == 1 {
1059                        has_single_unique_records = true;
1060                        if let Some(field_id) = unique.field_ids.first() {
1061                            metadata_unique_fields.insert(*field_id);
1062                        }
1063                    }
1064                }
1065                _ => {}
1066            }
1067        }
1068
1069        let mut specs = Vec::with_capacity(field_ids.len());
1070
1071        for (idx, lfid) in logical_fields.iter().enumerate() {
1072            let field_id = lfid.field_id();
1073
1074            let column_name = column_metas
1075                .get(idx)
1076                .and_then(|meta| meta.as_ref())
1077                .and_then(|meta| meta.name.clone())
1078                .unwrap_or_else(|| format!("col_{}", field_id));
1079
1080            let fallback_constraints = resolver
1081                .field_constraints_by_name(&column_name)
1082                .unwrap_or_default();
1083
1084            let metadata_primary = metadata_primary_keys.contains(&field_id);
1085            let primary_key = if has_primary_key_records {
1086                metadata_primary
1087            } else {
1088                fallback_constraints.primary_key
1089            };
1090
1091            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
1092            let unique = if has_primary_key_records || has_single_unique_records {
1093                metadata_unique
1094            } else {
1095                fallback_constraints.primary_key || fallback_constraints.unique
1096            };
1097
1098            let data_type = self.store.data_type(*lfid)?;
1099            let nullable = !primary_key;
1100
1101            let mut spec = PlanColumnSpec::new(column_name.clone(), data_type, nullable)
1102                .with_primary_key(primary_key)
1103                .with_unique(unique);
1104
1105            if let Some(check_expr) = fallback_constraints.check_expr.clone() {
1106                spec = spec.with_check(Some(check_expr));
1107            }
1108
1109            specs.push(spec);
1110        }
1111
1112        Ok(specs)
1113    }
1114
1115    /// Return the foreign key metadata for the specified table.
1116    pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
1117        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1118            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1119        })?;
1120
1121        self.metadata.foreign_key_views(&self.catalog, table_id)
1122    }
1123
1124    /// Return constraint-related catalog metadata for the specified table.
1125    pub fn table_constraint_summary(
1126        &self,
1127        canonical_name: &str,
1128    ) -> LlkvResult<TableConstraintSummaryView> {
1129        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1130            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1131        })?;
1132
1133        let (_, field_ids) = self.sorted_user_fields(table_id);
1134        let table_meta = self.metadata.table_meta(table_id)?;
1135        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
1136        let constraint_records = self.metadata.constraint_records(table_id)?;
1137        let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
1138
1139        Ok(TableConstraintSummaryView {
1140            table_meta,
1141            column_metas,
1142            constraint_records,
1143            multi_column_uniques,
1144        })
1145    }
1146
1147    fn sorted_user_fields(
1148        &self,
1149        table_id: TableId,
1150    ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
1151        let mut logical_fields = self.store.user_field_ids_for_table(table_id);
1152        logical_fields.sort_by_key(|lfid| lfid.field_id());
1153        let field_ids = logical_fields
1154            .iter()
1155            .map(|lfid| lfid.field_id())
1156            .collect::<Vec<_>>();
1157
1158        (logical_fields, field_ids)
1159    }
1160
1161    fn table_view_with_field_ids(
1162        &self,
1163        table_id: TableId,
1164        field_ids: &[FieldId],
1165    ) -> LlkvResult<TableView> {
1166        self.metadata.table_view(&self.catalog, table_id, field_ids)
1167    }
1168
1169    // -------------------------------------------------------------------------
1170    // Catalog read-only views
1171    // -------------------------------------------------------------------------
1172
1173    /// Returns all table names in the catalog.
1174    pub fn table_names(&self) -> Vec<String> {
1175        self.catalog.table_names()
1176    }
1177
1178    /// Returns the TableId for a canonical table name.
1179    pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
1180        self.catalog.table_id(canonical_name)
1181    }
1182
1183    /// Returns a field resolver for the given table.
1184    pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
1185        self.catalog.field_resolver(table_id)
1186    }
1187
1188    /// Returns a snapshot of the catalog for read-only access.
1189    pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
1190        self.catalog.snapshot()
1191    }
1192
1193    /// Returns a reference to the internal catalog for services that need it.
1194    /// Note: This is primarily for internal use by services like ConstraintService.
1195    pub fn catalog(&self) -> &Arc<TableCatalog> {
1196        &self.catalog
1197    }
1198
1199    /// Returns all foreign keys that reference the specified table.
1200    /// Returns a vector of (referencing_table_id, constraint_id) pairs.
1201    pub fn foreign_keys_referencing(
1202        &self,
1203        referenced_table_id: TableId,
1204    ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
1205        self.metadata.foreign_keys_referencing(referenced_table_id)
1206    }
1207
1208    /// Returns detailed foreign key views for a specific table.
1209    /// This includes foreign keys where the specified table is the referencing table.
1210    pub fn foreign_key_views_for_table(
1211        &self,
1212        table_id: TableId,
1213    ) -> LlkvResult<Vec<ForeignKeyView>> {
1214        self.metadata.foreign_key_views(&self.catalog, table_id)
1215    }
1216}
1217
1218fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
1219    FieldId::try_from(idx + 1).map_err(|_| {
1220        Error::Internal(format!(
1221            "column index {} exceeded supported field id range",
1222            idx + 1
1223        ))
1224    })
1225}
1226
1227// TODO: Dedupe (another instance exists in llkv-executor)
1228#[allow(clippy::unnecessary_wraps)]
1229fn current_time_micros() -> u64 {
1230    SystemTime::now()
1231        .duration_since(UNIX_EPOCH)
1232        .map(|duration| duration.as_micros() as u64)
1233        .unwrap_or(0)
1234}
1235
1236/// Parse a SQL type string (e.g., "INTEGER") back into a DataType.
1237fn parse_data_type_from_sql(sql: &str) -> LlkvResult<sqlparser::ast::DataType> {
1238    use sqlparser::dialect::GenericDialect;
1239    use sqlparser::parser::Parser;
1240
1241    // Try to parse as a simple CREATE DOMAIN statement
1242    let create_sql = format!("CREATE DOMAIN dummy AS {}", sql);
1243    let dialect = GenericDialect {};
1244
1245    match Parser::parse_sql(&dialect, &create_sql) {
1246        Ok(stmts) if !stmts.is_empty() => {
1247            if let sqlparser::ast::Statement::CreateDomain(create_domain) = &stmts[0] {
1248                Ok(create_domain.data_type.clone())
1249            } else {
1250                Err(Error::InvalidArgumentError(format!(
1251                    "Failed to parse type from SQL: {}",
1252                    sql
1253                )))
1254            }
1255        }
1256        _ => Err(Error::InvalidArgumentError(format!(
1257            "Failed to parse type from SQL: {}",
1258            sql
1259        ))),
1260    }
1261}