llkv_table/catalog/
manager.rs

1//! High-level service for creating tables.
2//!
3//! Use `CatalogManager` to create tables. It coordinates metadata persistence,
4//! catalog registration, and storage initialization.
5
6#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{ColumnSpec, ForeignKeySpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use crate::mvcc;
24
25use super::table_catalog::{FieldDefinition, TableCatalog};
26use crate::constraints::ConstraintKind;
27use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration};
28use crate::table::Table;
29use crate::types::{FieldId, RowId, TableColumn, TableId};
30use crate::{
31    ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
32    ValidatedForeignKey,
33};
34
35/// Result of creating a table. The caller is responsible for wiring executor
36/// caches and any higher-level state that depends on the table schema.
37pub struct CreateTableResult<P>
38where
39    P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41    pub table_id: TableId,
42    pub table: Arc<Table<P>>,
43    pub table_columns: Vec<TableColumn>,
44    pub column_lookup: FxHashMap<String, usize>,
45}
46
47/// Service for creating tables.
48///
49/// Coordinates metadata persistence (`MetadataManager`), catalog registration
50/// (`TableCatalog`), and storage initialization (`ColumnStore`).
51#[derive(Clone)]
52pub struct CatalogManager<P>
53where
54    P: Pager<Blob = EntryHandle> + Send + Sync,
55{
56    metadata: Arc<MetadataManager<P>>,
57    catalog: Arc<TableCatalog>,
58    store: Arc<ColumnStore<P>>,
59}
60
61impl<P> CatalogManager<P>
62where
63    P: Pager<Blob = EntryHandle> + Send + Sync,
64{
65    pub fn new(
66        metadata: Arc<MetadataManager<P>>,
67        catalog: Arc<TableCatalog>,
68        store: Arc<ColumnStore<P>>,
69    ) -> Self {
70        Self {
71            metadata,
72            catalog,
73            store,
74        }
75    }
76
77    /// Create a new table using column specifications.
78    ///
79    /// Reserves table ID from metadata, validates columns, persists schema,
80    /// registers in catalog, and returns a Table handle for data operations.
81    pub(crate) fn create_table_from_columns(
82        &self,
83        display_name: &str,
84        canonical_name: &str,
85        columns: &[ColumnSpec],
86    ) -> LlkvResult<CreateTableResult<P>> {
87        if columns.is_empty() {
88            return Err(Error::InvalidArgumentError(
89                "CREATE TABLE requires at least one column".into(),
90            ));
91        }
92
93        let mut lookup: FxHashMap<String, usize> =
94            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
95        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
96
97        for (idx, column) in columns.iter().enumerate() {
98            let normalized = column.name.to_ascii_lowercase();
99            if lookup.insert(normalized.clone(), idx).is_some() {
100                return Err(Error::InvalidArgumentError(format!(
101                    "duplicate column name '{}' in table '{}'",
102                    column.name, display_name
103                )));
104            }
105
106            table_columns.push(TableColumn {
107                field_id: field_id_for_index(idx)?,
108                name: column.name.clone(),
109                data_type: column.data_type.clone(),
110                nullable: column.nullable,
111                primary_key: column.primary_key,
112                unique: column.unique,
113                check_expr: column.check_expr.clone(),
114            });
115        }
116
117        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
118    }
119
120    /// Create a new table using an Arrow schema (used by CTAS flows).
121    pub fn create_table_from_schema(
122        &self,
123        display_name: &str,
124        canonical_name: &str,
125        schema: &Schema,
126    ) -> LlkvResult<CreateTableResult<P>> {
127        if schema.fields().is_empty() {
128            return Err(Error::InvalidArgumentError(
129                "CREATE TABLE AS SELECT requires at least one column".into(),
130            ));
131        }
132
133        let mut lookup: FxHashMap<String, usize> =
134            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
135        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
136
137        for (idx, field) in schema.fields().iter().enumerate() {
138            let data_type = match field.data_type() {
139                DataType::Int64
140                | DataType::Float64
141                | DataType::Utf8
142                | DataType::Date32
143                | DataType::Struct(_) => field.data_type().clone(),
144                other => {
145                    return Err(Error::InvalidArgumentError(format!(
146                        "unsupported column type in CTAS result: {other:?}"
147                    )));
148                }
149            };
150
151            let normalized = field.name().to_ascii_lowercase();
152            if lookup.insert(normalized.clone(), idx).is_some() {
153                return Err(Error::InvalidArgumentError(format!(
154                    "duplicate column name '{}' in CTAS result",
155                    field.name()
156                )));
157            }
158
159            table_columns.push(TableColumn {
160                field_id: field_id_for_index(idx)?,
161                name: field.name().to_string(),
162                data_type,
163                nullable: field.is_nullable(),
164                primary_key: false,
165                unique: false,
166                check_expr: None,
167            });
168        }
169
170        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
171    }
172
173    fn create_table_inner(
174        &self,
175        display_name: &str,
176        _canonical_name: &str,
177        table_columns: Vec<TableColumn>,
178        column_lookup: FxHashMap<String, usize>,
179    ) -> LlkvResult<CreateTableResult<P>> {
180        let table_id = self.metadata.reserve_table_id()?;
181        let timestamp = current_time_micros();
182        let table_meta = crate::sys_catalog::TableMeta {
183            table_id,
184            name: Some(display_name.to_string()),
185            created_at_micros: timestamp,
186            flags: 0,
187            epoch: 0,
188        };
189
190        self.metadata.set_table_meta(table_id, table_meta)?;
191        self.metadata
192            .apply_column_definitions(table_id, &table_columns, timestamp)?;
193        self.metadata.flush_table(table_id)?;
194
195        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
196
197        // Register table in catalog using the table_id from metadata
198        if let Err(err) = self.catalog.register_table(display_name, table_id) {
199            self.metadata.remove_table_state(table_id);
200            return Err(err);
201        }
202
203        if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
204            for column in &table_columns {
205                let definition = FieldDefinition::new(&column.name)
206                    .with_primary_key(column.primary_key)
207                    .with_unique(column.unique)
208                    .with_check_expr(column.check_expr.clone());
209                if let Err(err) = field_resolver.register_field(definition) {
210                    self.catalog.unregister_table(table_id);
211                    self.metadata.remove_table_state(table_id);
212                    return Err(err);
213                }
214            }
215        }
216
217        Ok(CreateTableResult {
218            table_id,
219            table: Arc::new(table),
220            table_columns,
221            column_lookup,
222        })
223    }
224
225    /// Prepare metadata state and unregister catalog entries for a dropped table.
226    pub fn drop_table(
227        &self,
228        canonical_name: &str,
229        table_id: TableId,
230        column_field_ids: &[FieldId],
231    ) -> LlkvResult<()> {
232        self.metadata
233            .prepare_table_drop(table_id, column_field_ids)?;
234        self.metadata.flush_table(table_id)?;
235        self.metadata.remove_table_state(table_id);
236        if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
237            let _ = self.catalog.unregister_table(table_id_from_catalog);
238        } else {
239            let _ = self.catalog.unregister_table(table_id);
240        }
241        Ok(())
242    }
243
244    /// Register a single-column sort (B-tree) index. Optionally marks the field unique.
245    /// Returns `true` if the index was newly created, `false` if it already existed and `if_not_exists` was true.
246    #[allow(clippy::too_many_arguments)]
247    pub fn register_single_column_index(
248        &self,
249        display_name: &str,
250        canonical_name: &str,
251        table: &Table<P>,
252        field_id: FieldId,
253        column_name: &str,
254        mark_unique: bool,
255        if_not_exists: bool,
256    ) -> LlkvResult<bool> {
257        let existing_indexes = table.list_registered_indexes(field_id)?;
258        if existing_indexes.contains(&IndexKind::Sort) {
259            if if_not_exists {
260                return Ok(false);
261            }
262            return Err(Error::CatalogError(format!(
263                "Index already exists on column '{}' in table '{}'",
264                column_name, display_name
265            )));
266        }
267
268        let table_id = table.table_id();
269        self.metadata.register_sort_index(table_id, field_id)?;
270
271        if mark_unique {
272            let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
273            if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
274                resolver.set_field_unique(column_name, true)?;
275            }
276        }
277
278        self.metadata.flush_table(table_id)?;
279        Ok(true)
280    }
281
282    /// Register a multi-column UNIQUE index.
283    pub fn register_multi_column_unique_index(
284        &self,
285        table_id: TableId,
286        field_ids: &[FieldId],
287        index_name: Option<String>,
288    ) -> LlkvResult<MultiColumnUniqueRegistration> {
289        let registration = self
290            .metadata
291            .register_multi_column_unique(table_id, field_ids, index_name)?;
292
293        if matches!(registration, MultiColumnUniqueRegistration::Created) {
294            self.metadata.flush_table(table_id)?;
295        }
296
297        Ok(registration)
298    }
299
300    /// Append RecordBatches to a freshly created table, injecting MVCC columns.
301    pub fn append_batches_with_mvcc(
302        &self,
303        table: &Table<P>,
304        table_columns: &[TableColumn],
305        batches: &[RecordBatch],
306        creator_txn_id: u64,
307        deleted_marker: u64,
308        starting_row_id: RowId,
309    ) -> LlkvResult<(RowId, u64)> {
310        let mut next_row_id = starting_row_id;
311        let mut total_rows: u64 = 0;
312
313        for batch in batches {
314            if batch.num_rows() == 0 {
315                continue;
316            }
317
318            if batch.num_columns() != table_columns.len() {
319                return Err(Error::InvalidArgumentError(format!(
320                    "CTAS query returned unexpected column count (expected {}, found {})",
321                    table_columns.len(),
322                    batch.num_columns()
323                )));
324            }
325
326            let row_count = batch.num_rows();
327
328            let (row_id_array, created_by_array, deleted_by_array) =
329                mvcc::build_insert_mvcc_columns(
330                    row_count,
331                    next_row_id,
332                    creator_txn_id,
333                    deleted_marker,
334                );
335
336            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
337            arrays.push(row_id_array);
338            arrays.push(created_by_array);
339            arrays.push(deleted_by_array);
340
341            let mut fields = mvcc::build_mvcc_fields();
342
343            for (idx, column) in table_columns.iter().enumerate() {
344                let array = batch.column(idx).clone();
345                let field = mvcc::build_field_with_metadata(
346                    &column.name,
347                    column.data_type.clone(),
348                    column.nullable,
349                    column.field_id,
350                );
351                arrays.push(array);
352                fields.push(field);
353            }
354
355            let append_schema = Arc::new(Schema::new(fields));
356            let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
357            table.append(&append_batch)?;
358
359            next_row_id = next_row_id.saturating_add(row_count as u64);
360            total_rows = total_rows.saturating_add(row_count as u64);
361        }
362
363        Ok((next_row_id, total_rows))
364    }
365
366    /// Validate and register foreign keys for a newly created table.
367    #[allow(clippy::too_many_arguments)]
368    pub fn register_foreign_keys_for_new_table<F>(
369        &self,
370        table_id: TableId,
371        display_name: &str,
372        canonical_name: &str,
373        table_columns: &[TableColumn],
374        specs: &[ForeignKeySpec],
375        lookup_table: F,
376        timestamp_micros: u64,
377    ) -> LlkvResult<Vec<ValidatedForeignKey>>
378    where
379        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
380    {
381        if specs.is_empty() {
382            return Ok(Vec::new());
383        }
384
385        let referencing_columns: Vec<ForeignKeyColumn> = table_columns
386            .iter()
387            .map(|column| ForeignKeyColumn {
388                name: column.name.clone(),
389                data_type: column.data_type.clone(),
390                nullable: column.nullable,
391                primary_key: column.primary_key,
392                unique: column.unique,
393                field_id: column.field_id,
394            })
395            .collect();
396
397        let referencing_table = ForeignKeyTableInfo {
398            display_name: display_name.to_string(),
399            canonical_name: canonical_name.to_string(),
400            table_id,
401            columns: referencing_columns,
402        };
403
404        self.metadata.validate_and_register_foreign_keys(
405            &referencing_table,
406            specs,
407            lookup_table,
408            timestamp_micros,
409        )
410    }
411
412    /// Resolve referenced tables for the provided foreign key view definitions.
413    pub fn referenced_table_info(
414        &self,
415        views: &[ForeignKeyView],
416    ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
417        let mut results = Vec::with_capacity(views.len());
418        for view in views {
419            let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
420                return Err(Error::InvalidArgumentError(format!(
421                    "referenced table '{}' does not exist",
422                    view.referenced_table_display
423                )));
424            };
425
426            let Some(resolver) = self.catalog.field_resolver(table_id) else {
427                return Err(Error::Internal(format!(
428                    "catalog resolver missing for table '{}'",
429                    view.referenced_table_display
430                )));
431            };
432
433            let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
434            for field_id in &view.referenced_field_ids {
435                let info = resolver.field_info(*field_id).ok_or_else(|| {
436                    Error::Internal(format!(
437                        "field metadata missing for id {} in table '{}'",
438                        field_id, view.referenced_table_display
439                    ))
440                })?;
441
442                let data_type = self.metadata.column_data_type(table_id, *field_id)?;
443
444                columns.push(ForeignKeyColumn {
445                    name: info.display_name.to_string(),
446                    data_type,
447                    nullable: !info.constraints.primary_key,
448                    primary_key: info.constraints.primary_key,
449                    unique: info.constraints.unique,
450                    field_id: *field_id,
451                });
452            }
453
454            results.push(ForeignKeyTableInfo {
455                display_name: view.referenced_table_display.clone(),
456                canonical_name: view.referenced_table_canonical.clone(),
457                table_id,
458                columns,
459            });
460        }
461
462        Ok(results)
463    }
464
465    /// Return the current metadata snapshot for a table, including column metadata and constraints.
466    pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
467        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
468            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
469        })?;
470
471        let (_, field_ids) = self.sorted_user_fields(table_id);
472        self.table_view_with_field_ids(table_id, &field_ids)
473    }
474
475    /// Produce a read-only view of a table's catalog, including column metadata and constraints.
476    pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<ColumnSpec>> {
477        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
478            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
479        })?;
480
481        let resolver = self
482            .catalog
483            .field_resolver(table_id)
484            .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
485
486        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
487
488        let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
489        let column_metas = table_view.column_metas;
490        let constraint_records = table_view.constraint_records;
491
492        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
493        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
494        let mut has_primary_key_records = false;
495        let mut has_single_unique_records = false;
496
497        for record in constraint_records
498            .iter()
499            .filter(|record| record.is_active())
500        {
501            match &record.kind {
502                ConstraintKind::PrimaryKey(pk) => {
503                    has_primary_key_records = true;
504                    for field_id in &pk.field_ids {
505                        metadata_primary_keys.insert(*field_id);
506                        metadata_unique_fields.insert(*field_id);
507                    }
508                }
509                ConstraintKind::Unique(unique) => {
510                    if unique.field_ids.len() == 1 {
511                        has_single_unique_records = true;
512                        if let Some(field_id) = unique.field_ids.first() {
513                            metadata_unique_fields.insert(*field_id);
514                        }
515                    }
516                }
517                _ => {}
518            }
519        }
520
521        let mut specs = Vec::with_capacity(field_ids.len());
522
523        for (idx, lfid) in logical_fields.iter().enumerate() {
524            let field_id = lfid.field_id();
525
526            let column_name = column_metas
527                .get(idx)
528                .and_then(|meta| meta.as_ref())
529                .and_then(|meta| meta.name.clone())
530                .unwrap_or_else(|| format!("col_{}", field_id));
531
532            let fallback_constraints = resolver
533                .field_constraints_by_name(&column_name)
534                .unwrap_or_default();
535
536            let metadata_primary = metadata_primary_keys.contains(&field_id);
537            let primary_key = if has_primary_key_records {
538                metadata_primary
539            } else {
540                fallback_constraints.primary_key
541            };
542
543            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
544            let unique = if has_primary_key_records || has_single_unique_records {
545                metadata_unique
546            } else {
547                fallback_constraints.primary_key || fallback_constraints.unique
548            };
549
550            let data_type = self.store.data_type(*lfid)?;
551            let nullable = !primary_key;
552
553            let mut spec = ColumnSpec::new(column_name.clone(), data_type, nullable)
554                .with_primary_key(primary_key)
555                .with_unique(unique);
556
557            if let Some(check_expr) = fallback_constraints.check_expr.clone() {
558                spec = spec.with_check(Some(check_expr));
559            }
560
561            specs.push(spec);
562        }
563
564        Ok(specs)
565    }
566
567    /// Return the foreign key metadata for the specified table.
568    pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
569        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
570            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
571        })?;
572
573        self.metadata.foreign_key_views(&self.catalog, table_id)
574    }
575
576    /// Return constraint-related catalog metadata for the specified table.
577    pub fn table_constraint_summary(
578        &self,
579        canonical_name: &str,
580    ) -> LlkvResult<TableConstraintSummaryView> {
581        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
582            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
583        })?;
584
585        let (_, field_ids) = self.sorted_user_fields(table_id);
586        let table_meta = self.metadata.table_meta(table_id)?;
587        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
588        let constraint_records = self.metadata.constraint_records(table_id)?;
589        let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
590
591        Ok(TableConstraintSummaryView {
592            table_meta,
593            column_metas,
594            constraint_records,
595            multi_column_uniques,
596        })
597    }
598
599    fn sorted_user_fields(
600        &self,
601        table_id: TableId,
602    ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
603        let mut logical_fields = self.store.user_field_ids_for_table(table_id);
604        logical_fields.sort_by_key(|lfid| lfid.field_id());
605        let field_ids = logical_fields
606            .iter()
607            .map(|lfid| lfid.field_id())
608            .collect::<Vec<_>>();
609
610        (logical_fields, field_ids)
611    }
612
613    fn table_view_with_field_ids(
614        &self,
615        table_id: TableId,
616        field_ids: &[FieldId],
617    ) -> LlkvResult<TableView> {
618        self.metadata.table_view(&self.catalog, table_id, field_ids)
619    }
620
621    // -------------------------------------------------------------------------
622    // Catalog read-only views
623    // -------------------------------------------------------------------------
624
625    /// Returns all table names in the catalog.
626    pub fn table_names(&self) -> Vec<String> {
627        self.catalog.table_names()
628    }
629
630    /// Returns the TableId for a canonical table name.
631    pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
632        self.catalog.table_id(canonical_name)
633    }
634
635    /// Returns a field resolver for the given table.
636    pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
637        self.catalog.field_resolver(table_id)
638    }
639
640    /// Returns a snapshot of the catalog for read-only access.
641    pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
642        self.catalog.snapshot()
643    }
644
645    /// Returns a reference to the internal catalog for services that need it.
646    /// Note: This is primarily for internal use by services like ConstraintService.
647    pub fn catalog(&self) -> &Arc<TableCatalog> {
648        &self.catalog
649    }
650}
651
652fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
653    FieldId::try_from(idx + 1).map_err(|_| {
654        Error::Internal(format!(
655            "column index {} exceeded supported field id range",
656            idx + 1
657        ))
658    })
659}
660
661#[allow(clippy::unnecessary_wraps)]
662fn current_time_micros() -> u64 {
663    SystemTime::now()
664        .duration_since(UNIX_EPOCH)
665        .map(|duration| duration.as_micros() as u64)
666        .unwrap_or(0)
667}