llkv_table/catalog/
manager.rs

1//! High-level service for creating tables.
2//!
3//! Use `CatalogManager` to create tables. It coordinates metadata persistence,
4//! catalog registration, and storage initialization.
5
6#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{ColumnSpec, ForeignKeySpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use crate::mvcc;
24
25use super::table_catalog::{FieldDefinition, TableCatalog};
26use crate::constraints::ConstraintKind;
27use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration};
28use crate::table::Table;
29use crate::types::{FieldId, RowId, TableColumn, TableId};
30use crate::{
31    ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
32    ValidatedForeignKey,
33};
34
35/// Result of creating a table. The caller is responsible for wiring executor
36/// caches and any higher-level state that depends on the table schema.
37pub struct CreateTableResult<P>
38where
39    P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41    pub table_id: TableId,
42    pub table: Arc<Table<P>>,
43    pub table_columns: Vec<TableColumn>,
44    pub column_lookup: FxHashMap<String, usize>,
45}
46
47/// Service for creating tables.
48///
49/// Coordinates metadata persistence (`MetadataManager`), catalog registration
50/// (`TableCatalog`), and storage initialization (`ColumnStore`).
51#[derive(Clone)]
52pub struct CatalogManager<P>
53where
54    P: Pager<Blob = EntryHandle> + Send + Sync,
55{
56    metadata: Arc<MetadataManager<P>>,
57    catalog: Arc<TableCatalog>,
58    store: Arc<ColumnStore<P>>,
59}
60
61impl<P> CatalogManager<P>
62where
63    P: Pager<Blob = EntryHandle> + Send + Sync,
64{
65    /// Creates a new CatalogManager coordinating metadata, catalog, and storage layers.
66    pub fn new(
67        metadata: Arc<MetadataManager<P>>,
68        catalog: Arc<TableCatalog>,
69        store: Arc<ColumnStore<P>>,
70    ) -> Self {
71        Self {
72            metadata,
73            catalog,
74            store,
75        }
76    }
77
78    /// Create a new table using column specifications.
79    ///
80    /// Reserves table ID from metadata, validates columns, persists schema,
81    /// registers in catalog, and returns a Table handle for data operations.
82    pub(crate) fn create_table_from_columns(
83        &self,
84        display_name: &str,
85        canonical_name: &str,
86        columns: &[ColumnSpec],
87    ) -> LlkvResult<CreateTableResult<P>> {
88        if columns.is_empty() {
89            return Err(Error::InvalidArgumentError(
90                "CREATE TABLE requires at least one column".into(),
91            ));
92        }
93
94        let mut lookup: FxHashMap<String, usize> =
95            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
96        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
97
98        for (idx, column) in columns.iter().enumerate() {
99            let normalized = column.name.to_ascii_lowercase();
100            if lookup.insert(normalized.clone(), idx).is_some() {
101                return Err(Error::InvalidArgumentError(format!(
102                    "duplicate column name '{}' in table '{}'",
103                    column.name, display_name
104                )));
105            }
106
107            table_columns.push(TableColumn {
108                field_id: field_id_for_index(idx)?,
109                name: column.name.clone(),
110                data_type: column.data_type.clone(),
111                nullable: column.nullable,
112                primary_key: column.primary_key,
113                unique: column.unique,
114                check_expr: column.check_expr.clone(),
115            });
116        }
117
118        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
119    }
120
121    /// Create a new table using an Arrow schema (used by CTAS flows).
122    pub fn create_table_from_schema(
123        &self,
124        display_name: &str,
125        canonical_name: &str,
126        schema: &Schema,
127    ) -> LlkvResult<CreateTableResult<P>> {
128        if schema.fields().is_empty() {
129            return Err(Error::InvalidArgumentError(
130                "CREATE TABLE AS SELECT requires at least one column".into(),
131            ));
132        }
133
134        let mut lookup: FxHashMap<String, usize> =
135            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
136        let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
137
138        for (idx, field) in schema.fields().iter().enumerate() {
139            let data_type = match field.data_type() {
140                DataType::Int64
141                | DataType::Float64
142                | DataType::Utf8
143                | DataType::Date32
144                | DataType::Struct(_) => field.data_type().clone(),
145                other => {
146                    return Err(Error::InvalidArgumentError(format!(
147                        "unsupported column type in CTAS result: {other:?}"
148                    )));
149                }
150            };
151
152            let normalized = field.name().to_ascii_lowercase();
153            if lookup.insert(normalized.clone(), idx).is_some() {
154                return Err(Error::InvalidArgumentError(format!(
155                    "duplicate column name '{}' in CTAS result",
156                    field.name()
157                )));
158            }
159
160            table_columns.push(TableColumn {
161                field_id: field_id_for_index(idx)?,
162                name: field.name().to_string(),
163                data_type,
164                nullable: field.is_nullable(),
165                primary_key: false,
166                unique: false,
167                check_expr: None,
168            });
169        }
170
171        self.create_table_inner(display_name, canonical_name, table_columns, lookup)
172    }
173
174    fn create_table_inner(
175        &self,
176        display_name: &str,
177        _canonical_name: &str,
178        table_columns: Vec<TableColumn>,
179        column_lookup: FxHashMap<String, usize>,
180    ) -> LlkvResult<CreateTableResult<P>> {
181        let table_id = self.metadata.reserve_table_id()?;
182        let timestamp = current_time_micros();
183        let table_meta = crate::sys_catalog::TableMeta {
184            table_id,
185            name: Some(display_name.to_string()),
186            created_at_micros: timestamp,
187            flags: 0,
188            epoch: 0,
189        };
190
191        self.metadata.set_table_meta(table_id, table_meta)?;
192        self.metadata
193            .apply_column_definitions(table_id, &table_columns, timestamp)?;
194        self.metadata.flush_table(table_id)?;
195
196        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
197
198        // Register table in catalog using the table_id from metadata
199        if let Err(err) = self.catalog.register_table(display_name, table_id) {
200            self.metadata.remove_table_state(table_id);
201            return Err(err);
202        }
203
204        if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
205            for column in &table_columns {
206                let definition = FieldDefinition::new(&column.name)
207                    .with_primary_key(column.primary_key)
208                    .with_unique(column.unique)
209                    .with_check_expr(column.check_expr.clone());
210                if let Err(err) = field_resolver.register_field(definition) {
211                    self.catalog.unregister_table(table_id);
212                    self.metadata.remove_table_state(table_id);
213                    return Err(err);
214                }
215            }
216        }
217
218        Ok(CreateTableResult {
219            table_id,
220            table: Arc::new(table),
221            table_columns,
222            column_lookup,
223        })
224    }
225
226    /// Prepare metadata state and unregister catalog entries for a dropped table.
227    pub fn drop_table(
228        &self,
229        canonical_name: &str,
230        table_id: TableId,
231        column_field_ids: &[FieldId],
232    ) -> LlkvResult<()> {
233        self.metadata
234            .prepare_table_drop(table_id, column_field_ids)?;
235        self.metadata.flush_table(table_id)?;
236        self.metadata.remove_table_state(table_id);
237        if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
238            let _ = self.catalog.unregister_table(table_id_from_catalog);
239        } else {
240            let _ = self.catalog.unregister_table(table_id);
241        }
242        Ok(())
243    }
244
245    /// Register a single-column sort (B-tree) index. Optionally marks the field unique.
246    /// Returns `true` if the index was newly created, `false` if it already existed and `if_not_exists` was true.
247    #[allow(clippy::too_many_arguments)]
248    pub fn register_single_column_index(
249        &self,
250        display_name: &str,
251        canonical_name: &str,
252        table: &Table<P>,
253        field_id: FieldId,
254        column_name: &str,
255        mark_unique: bool,
256        if_not_exists: bool,
257    ) -> LlkvResult<bool> {
258        let existing_indexes = table.list_registered_indexes(field_id)?;
259        if existing_indexes.contains(&IndexKind::Sort) {
260            if if_not_exists {
261                return Ok(false);
262            }
263            return Err(Error::CatalogError(format!(
264                "Index already exists on column '{}' in table '{}'",
265                column_name, display_name
266            )));
267        }
268
269        let table_id = table.table_id();
270        self.metadata.register_sort_index(table_id, field_id)?;
271
272        if mark_unique {
273            let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
274            if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
275                resolver.set_field_unique(column_name, true)?;
276            }
277        }
278
279        self.metadata.flush_table(table_id)?;
280        Ok(true)
281    }
282
283    /// Register a multi-column UNIQUE index.
284    pub fn register_multi_column_unique_index(
285        &self,
286        table_id: TableId,
287        field_ids: &[FieldId],
288        index_name: Option<String>,
289    ) -> LlkvResult<MultiColumnUniqueRegistration> {
290        let registration = self
291            .metadata
292            .register_multi_column_unique(table_id, field_ids, index_name)?;
293
294        if matches!(registration, MultiColumnUniqueRegistration::Created) {
295            self.metadata.flush_table(table_id)?;
296        }
297
298        Ok(registration)
299    }
300
301    /// Append RecordBatches to a freshly created table, injecting MVCC columns.
302    pub fn append_batches_with_mvcc(
303        &self,
304        table: &Table<P>,
305        table_columns: &[TableColumn],
306        batches: &[RecordBatch],
307        creator_txn_id: u64,
308        deleted_marker: u64,
309        starting_row_id: RowId,
310    ) -> LlkvResult<(RowId, u64)> {
311        let mut next_row_id = starting_row_id;
312        let mut total_rows: u64 = 0;
313
314        for batch in batches {
315            if batch.num_rows() == 0 {
316                continue;
317            }
318
319            if batch.num_columns() != table_columns.len() {
320                return Err(Error::InvalidArgumentError(format!(
321                    "CTAS query returned unexpected column count (expected {}, found {})",
322                    table_columns.len(),
323                    batch.num_columns()
324                )));
325            }
326
327            let row_count = batch.num_rows();
328
329            let (row_id_array, created_by_array, deleted_by_array) =
330                mvcc::build_insert_mvcc_columns(
331                    row_count,
332                    next_row_id,
333                    creator_txn_id,
334                    deleted_marker,
335                );
336
337            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
338            arrays.push(row_id_array);
339            arrays.push(created_by_array);
340            arrays.push(deleted_by_array);
341
342            let mut fields = mvcc::build_mvcc_fields();
343
344            for (idx, column) in table_columns.iter().enumerate() {
345                let array = batch.column(idx).clone();
346                let field = mvcc::build_field_with_metadata(
347                    &column.name,
348                    column.data_type.clone(),
349                    column.nullable,
350                    column.field_id,
351                );
352                arrays.push(array);
353                fields.push(field);
354            }
355
356            let append_schema = Arc::new(Schema::new(fields));
357            let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
358            table.append(&append_batch)?;
359
360            next_row_id = next_row_id.saturating_add(row_count as u64);
361            total_rows = total_rows.saturating_add(row_count as u64);
362        }
363
364        Ok((next_row_id, total_rows))
365    }
366
367    /// Validate and register foreign keys for a newly created table.
368    #[allow(clippy::too_many_arguments)]
369    pub fn register_foreign_keys_for_new_table<F>(
370        &self,
371        table_id: TableId,
372        display_name: &str,
373        canonical_name: &str,
374        table_columns: &[TableColumn],
375        specs: &[ForeignKeySpec],
376        lookup_table: F,
377        timestamp_micros: u64,
378    ) -> LlkvResult<Vec<ValidatedForeignKey>>
379    where
380        F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
381    {
382        if specs.is_empty() {
383            return Ok(Vec::new());
384        }
385
386        let referencing_columns: Vec<ForeignKeyColumn> = table_columns
387            .iter()
388            .map(|column| ForeignKeyColumn {
389                name: column.name.clone(),
390                data_type: column.data_type.clone(),
391                nullable: column.nullable,
392                primary_key: column.primary_key,
393                unique: column.unique,
394                field_id: column.field_id,
395            })
396            .collect();
397
398        let referencing_table = ForeignKeyTableInfo {
399            display_name: display_name.to_string(),
400            canonical_name: canonical_name.to_string(),
401            table_id,
402            columns: referencing_columns,
403        };
404
405        self.metadata.validate_and_register_foreign_keys(
406            &referencing_table,
407            specs,
408            lookup_table,
409            timestamp_micros,
410        )
411    }
412
413    /// Resolve referenced tables for the provided foreign key view definitions.
414    pub fn referenced_table_info(
415        &self,
416        views: &[ForeignKeyView],
417    ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
418        let mut results = Vec::with_capacity(views.len());
419        for view in views {
420            let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
421                return Err(Error::InvalidArgumentError(format!(
422                    "referenced table '{}' does not exist",
423                    view.referenced_table_display
424                )));
425            };
426
427            let Some(resolver) = self.catalog.field_resolver(table_id) else {
428                return Err(Error::Internal(format!(
429                    "catalog resolver missing for table '{}'",
430                    view.referenced_table_display
431                )));
432            };
433
434            let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
435            for field_id in &view.referenced_field_ids {
436                let info = resolver.field_info(*field_id).ok_or_else(|| {
437                    Error::Internal(format!(
438                        "field metadata missing for id {} in table '{}'",
439                        field_id, view.referenced_table_display
440                    ))
441                })?;
442
443                let data_type = self.metadata.column_data_type(table_id, *field_id)?;
444
445                columns.push(ForeignKeyColumn {
446                    name: info.display_name.to_string(),
447                    data_type,
448                    nullable: !info.constraints.primary_key,
449                    primary_key: info.constraints.primary_key,
450                    unique: info.constraints.unique,
451                    field_id: *field_id,
452                });
453            }
454
455            results.push(ForeignKeyTableInfo {
456                display_name: view.referenced_table_display.clone(),
457                canonical_name: view.referenced_table_canonical.clone(),
458                table_id,
459                columns,
460            });
461        }
462
463        Ok(results)
464    }
465
466    /// Return the current metadata snapshot for a table, including column metadata and constraints.
467    pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
468        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
469            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
470        })?;
471
472        let (_, field_ids) = self.sorted_user_fields(table_id);
473        self.table_view_with_field_ids(table_id, &field_ids)
474    }
475
476    /// Produce a read-only view of a table's catalog, including column metadata and constraints.
477    pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<ColumnSpec>> {
478        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
479            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
480        })?;
481
482        let resolver = self
483            .catalog
484            .field_resolver(table_id)
485            .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
486
487        let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
488
489        let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
490        let column_metas = table_view.column_metas;
491        let constraint_records = table_view.constraint_records;
492
493        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
494        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
495        let mut has_primary_key_records = false;
496        let mut has_single_unique_records = false;
497
498        for record in constraint_records
499            .iter()
500            .filter(|record| record.is_active())
501        {
502            match &record.kind {
503                ConstraintKind::PrimaryKey(pk) => {
504                    has_primary_key_records = true;
505                    for field_id in &pk.field_ids {
506                        metadata_primary_keys.insert(*field_id);
507                        metadata_unique_fields.insert(*field_id);
508                    }
509                }
510                ConstraintKind::Unique(unique) => {
511                    if unique.field_ids.len() == 1 {
512                        has_single_unique_records = true;
513                        if let Some(field_id) = unique.field_ids.first() {
514                            metadata_unique_fields.insert(*field_id);
515                        }
516                    }
517                }
518                _ => {}
519            }
520        }
521
522        let mut specs = Vec::with_capacity(field_ids.len());
523
524        for (idx, lfid) in logical_fields.iter().enumerate() {
525            let field_id = lfid.field_id();
526
527            let column_name = column_metas
528                .get(idx)
529                .and_then(|meta| meta.as_ref())
530                .and_then(|meta| meta.name.clone())
531                .unwrap_or_else(|| format!("col_{}", field_id));
532
533            let fallback_constraints = resolver
534                .field_constraints_by_name(&column_name)
535                .unwrap_or_default();
536
537            let metadata_primary = metadata_primary_keys.contains(&field_id);
538            let primary_key = if has_primary_key_records {
539                metadata_primary
540            } else {
541                fallback_constraints.primary_key
542            };
543
544            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
545            let unique = if has_primary_key_records || has_single_unique_records {
546                metadata_unique
547            } else {
548                fallback_constraints.primary_key || fallback_constraints.unique
549            };
550
551            let data_type = self.store.data_type(*lfid)?;
552            let nullable = !primary_key;
553
554            let mut spec = ColumnSpec::new(column_name.clone(), data_type, nullable)
555                .with_primary_key(primary_key)
556                .with_unique(unique);
557
558            if let Some(check_expr) = fallback_constraints.check_expr.clone() {
559                spec = spec.with_check(Some(check_expr));
560            }
561
562            specs.push(spec);
563        }
564
565        Ok(specs)
566    }
567
568    /// Return the foreign key metadata for the specified table.
569    pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
570        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
571            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
572        })?;
573
574        self.metadata.foreign_key_views(&self.catalog, table_id)
575    }
576
577    /// Return constraint-related catalog metadata for the specified table.
578    pub fn table_constraint_summary(
579        &self,
580        canonical_name: &str,
581    ) -> LlkvResult<TableConstraintSummaryView> {
582        let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
583            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
584        })?;
585
586        let (_, field_ids) = self.sorted_user_fields(table_id);
587        let table_meta = self.metadata.table_meta(table_id)?;
588        let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
589        let constraint_records = self.metadata.constraint_records(table_id)?;
590        let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
591
592        Ok(TableConstraintSummaryView {
593            table_meta,
594            column_metas,
595            constraint_records,
596            multi_column_uniques,
597        })
598    }
599
600    fn sorted_user_fields(
601        &self,
602        table_id: TableId,
603    ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
604        let mut logical_fields = self.store.user_field_ids_for_table(table_id);
605        logical_fields.sort_by_key(|lfid| lfid.field_id());
606        let field_ids = logical_fields
607            .iter()
608            .map(|lfid| lfid.field_id())
609            .collect::<Vec<_>>();
610
611        (logical_fields, field_ids)
612    }
613
614    fn table_view_with_field_ids(
615        &self,
616        table_id: TableId,
617        field_ids: &[FieldId],
618    ) -> LlkvResult<TableView> {
619        self.metadata.table_view(&self.catalog, table_id, field_ids)
620    }
621
622    // -------------------------------------------------------------------------
623    // Catalog read-only views
624    // -------------------------------------------------------------------------
625
626    /// Returns all table names in the catalog.
627    pub fn table_names(&self) -> Vec<String> {
628        self.catalog.table_names()
629    }
630
631    /// Returns the TableId for a canonical table name.
632    pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
633        self.catalog.table_id(canonical_name)
634    }
635
636    /// Returns a field resolver for the given table.
637    pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
638        self.catalog.field_resolver(table_id)
639    }
640
641    /// Returns a snapshot of the catalog for read-only access.
642    pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
643        self.catalog.snapshot()
644    }
645
646    /// Returns a reference to the internal catalog for services that need it.
647    /// Note: This is primarily for internal use by services like ConstraintService.
648    pub fn catalog(&self) -> &Arc<TableCatalog> {
649        &self.catalog
650    }
651}
652
653fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
654    FieldId::try_from(idx + 1).map_err(|_| {
655        Error::Internal(format!(
656            "column index {} exceeded supported field id range",
657            idx + 1
658        ))
659    })
660}
661
662#[allow(clippy::unnecessary_wraps)]
663fn current_time_micros() -> u64 {
664    SystemTime::now()
665        .duration_since(UNIX_EPOCH)
666        .map(|duration| duration.as_micros() as u64)
667        .unwrap_or(0)
668}