llkv_runtime/runtime_context/
table_creation.rs

1//! Table creation operations for RuntimeContext.
2//!
3//! This module contains logic for creating new tables:
4//! - Public API methods (create_table, create_table_if_not_exists, create_table_builder)
5//! - CREATE TABLE from column specifications
6//! - CREATE TABLE AS SELECT (from batches)
7
8use crate::{
9    RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10    canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{
15    ExecutorColumn, ExecutorSchema, ExecutorTable, TableStorageAdapter, current_time_micros,
16};
17use llkv_plan::{
18    CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
19};
20use llkv_result::{Error, Result};
21use llkv_storage::pager::Pager;
22use llkv_table::{
23    CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
24};
25use llkv_transaction::TransactionMvccBuilder;
26use rustc_hash::FxHashMap;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::{Arc, RwLock};
30
31use super::RuntimeContext;
32
33impl<P> RuntimeContext<P>
34where
35    P: Pager<Blob = EntryHandle> + Send + Sync,
36{
37    /// Create a table from column specifications.
38    pub(super) fn create_table_from_columns(
39        &self,
40        display_name: String,
41        canonical_name: String,
42        columns: Vec<PlanColumnSpec>,
43        foreign_keys: Vec<ForeignKeySpec>,
44        multi_column_uniques: Vec<MultiColumnUniqueSpec>,
45        if_not_exists: bool,
46    ) -> Result<RuntimeStatementResult<P>> {
47        tracing::trace!(
48            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
49            display_name,
50            columns.len()
51        );
52        for (idx, col) in columns.iter().enumerate() {
53            tracing::trace!(
54                "  input column[{}]: name='{}' primary_key={}",
55                idx,
56                col.name,
57                col.primary_key
58            );
59        }
60        if columns.is_empty() {
61            return Err(Error::InvalidArgumentError(
62                "CREATE TABLE requires at least one column".into(),
63            ));
64        }
65
66        // Avoid repeating catalog work if the table already exists.
67        {
68            let tables = self.tables.read().unwrap();
69            if tables.contains_key(&canonical_name) {
70                if if_not_exists {
71                    return Ok(RuntimeStatementResult::CreateTable {
72                        table_name: display_name,
73                    });
74                }
75
76                return Err(Error::CatalogError(format!(
77                    "Catalog Error: Table '{}' already exists",
78                    display_name
79                )));
80            }
81        }
82
83        let CreateTableResult {
84            table_id,
85            table,
86            table_columns,
87            column_lookup,
88        } = Table::create_from_columns(
89            &display_name,
90            &canonical_name,
91            &columns,
92            self.metadata.clone(),
93            self.catalog.clone(),
94            self.store.clone(),
95        )?;
96
97        tracing::trace!(
98            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
99            display_name,
100            table_id,
101            &*self.pager
102        );
103
104        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
105        for (idx, column) in table_columns.iter().enumerate() {
106            tracing::trace!(
107                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
108                idx,
109                column.name,
110                column.data_type,
111                column.nullable,
112                column.primary_key,
113                column.unique
114            );
115            column_defs.push(ExecutorColumn {
116                name: column.name.clone(),
117                data_type: column.data_type.clone(),
118                nullable: column.nullable,
119                primary_key: column.primary_key,
120                unique: column.unique,
121                field_id: column.field_id,
122                check_expr: column.check_expr.clone(),
123            });
124        }
125
126        let schema = Arc::new(ExecutorSchema {
127            columns: column_defs.clone(),
128            lookup: column_lookup,
129        });
130        let table_entry = Arc::new(ExecutorTable {
131            storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
132            table: Arc::clone(&table),
133            schema,
134            next_row_id: AtomicU64::new(0),
135            total_rows: AtomicU64::new(0),
136            multi_column_uniques: RwLock::new(Vec::new()),
137        });
138
139        let mut tables = self.tables.write().unwrap();
140        if tables.contains_key(&canonical_name) {
141            drop(tables);
142            let field_ids: Vec<FieldId> =
143                table_columns.iter().map(|column| column.field_id).collect();
144            let _ = self
145                .catalog_service
146                .drop_table(&canonical_name, table_id, &field_ids);
147            if if_not_exists {
148                return Ok(RuntimeStatementResult::CreateTable {
149                    table_name: display_name,
150                });
151            }
152            return Err(Error::CatalogError(format!(
153                "Catalog Error: Table '{}' already exists",
154                display_name
155            )));
156        }
157        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
158        drop(tables);
159
160        if !foreign_keys.is_empty() {
161            let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
162                table_id,
163                &display_name,
164                &canonical_name,
165                &table_columns,
166                &foreign_keys,
167                |table_name| {
168                    let (display, canonical) = canonical_table_name(table_name)?;
169                    let referenced_table = self.lookup_table(&canonical).map_err(|_| {
170                        Error::CatalogError(format!(
171                            "Catalog Error: referenced table '{}' does not exist",
172                            table_name
173                        ))
174                    })?;
175
176                    let columns = referenced_table
177                        .schema
178                        .columns
179                        .iter()
180                        .map(|column| ForeignKeyColumn {
181                            name: column.name.clone(),
182                            data_type: column.data_type.clone(),
183                            nullable: column.nullable,
184                            primary_key: column.primary_key,
185                            unique: column.unique,
186                            field_id: column.field_id,
187                        })
188                        .collect();
189
190                    // Get multi-column unique constraints for this table
191                    let multi_column_uniques = self
192                        .catalog_service
193                        .table_view(&canonical)?
194                        .multi_column_uniques;
195
196                    Ok(ForeignKeyTableInfo {
197                        display_name: display,
198                        canonical_name: canonical,
199                        table_id: referenced_table.table_id(),
200                        columns,
201                        multi_column_uniques,
202                    })
203                },
204                current_time_micros(),
205            );
206
207            if let Err(err) = fk_result {
208                let field_ids: Vec<FieldId> =
209                    table_columns.iter().map(|column| column.field_id).collect();
210                let _ = self
211                    .catalog_service
212                    .drop_table(&canonical_name, table_id, &field_ids);
213                self.remove_table_entry(&canonical_name);
214                return Err(err);
215            }
216        }
217
218        // Register multi-column unique constraints
219        if !multi_column_uniques.is_empty() {
220            // Build column name to field_id lookup
221            let column_lookup: FxHashMap<String, FieldId> = table_columns
222                .iter()
223                .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
224                .collect();
225
226            for unique_spec in &multi_column_uniques {
227                // Map column names to field IDs
228                let field_ids: Result<Vec<FieldId>> = unique_spec
229                    .columns
230                    .iter()
231                    .map(|col_name| {
232                        let normalized = col_name.to_ascii_lowercase();
233                        column_lookup.get(&normalized).copied().ok_or_else(|| {
234                            Error::InvalidArgumentError(format!(
235                                "unknown column '{}' in UNIQUE constraint",
236                                col_name
237                            ))
238                        })
239                    })
240                    .collect();
241
242                let field_ids = field_ids?;
243
244                let registration_result = self.catalog_service.register_multi_column_unique_index(
245                    table_id,
246                    &field_ids,
247                    unique_spec.name.clone(),
248                );
249
250                if let Err(err) = registration_result {
251                    let field_ids: Vec<FieldId> =
252                        table_columns.iter().map(|column| column.field_id).collect();
253                    let _ = self
254                        .catalog_service
255                        .drop_table(&canonical_name, table_id, &field_ids);
256                    self.remove_table_entry(&canonical_name);
257                    return Err(err);
258                }
259            }
260        }
261
262        // Flush table metadata to SysCatalog immediately so fallback contexts (e.g., temporary
263        // contexts needing to access persistent tables) can see the table. This is critical for
264        // temporary views that reference persistent tables.
265        self.metadata.flush_table(table_id)?;
266
267        Ok(RuntimeStatementResult::CreateTable {
268            table_name: display_name,
269        })
270    }
271
272    /// Create a table from batches (CREATE TABLE AS SELECT).
273    pub(super) fn create_table_from_batches(
274        &self,
275        display_name: String,
276        canonical_name: String,
277        schema: Arc<Schema>,
278        batches: Vec<RecordBatch>,
279        if_not_exists: bool,
280    ) -> Result<RuntimeStatementResult<P>> {
281        {
282            let tables = self.tables.read().unwrap();
283            if tables.contains_key(&canonical_name) {
284                if if_not_exists {
285                    return Ok(RuntimeStatementResult::CreateTable {
286                        table_name: display_name,
287                    });
288                }
289                return Err(Error::CatalogError(format!(
290                    "Catalog Error: Table '{}' already exists",
291                    display_name
292                )));
293            }
294        }
295
296        let CreateTableResult {
297            table_id,
298            table,
299            table_columns,
300            column_lookup,
301        } = self.catalog_service.create_table_from_schema(
302            &display_name,
303            &canonical_name,
304            &schema,
305        )?;
306
307        tracing::trace!(
308            "=== CTAS table '{}' created with table_id={} pager={:p} ===",
309            display_name,
310            table_id,
311            &*self.pager
312        );
313
314        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
315        for column in &table_columns {
316            column_defs.push(ExecutorColumn {
317                name: column.name.clone(),
318                data_type: column.data_type.clone(),
319                nullable: column.nullable,
320                primary_key: column.primary_key,
321                unique: column.unique,
322                field_id: column.field_id,
323                check_expr: column.check_expr.clone(),
324            });
325        }
326
327        let schema_arc = Arc::new(ExecutorSchema {
328            columns: column_defs.clone(),
329            lookup: column_lookup,
330        });
331        let table_entry = Arc::new(ExecutorTable {
332            storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
333            table: Arc::clone(&table),
334            schema: schema_arc,
335            next_row_id: AtomicU64::new(0),
336            total_rows: AtomicU64::new(0),
337            multi_column_uniques: RwLock::new(Vec::new()),
338        });
339
340        let creator_snapshot = self.txn_manager.begin_transaction();
341        let creator_txn_id = creator_snapshot.txn_id;
342        let mvcc_builder = TransactionMvccBuilder;
343        let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
344            table.as_ref(),
345            &table_columns,
346            &batches,
347            creator_txn_id,
348            TXN_ID_NONE,
349            0,
350            &mvcc_builder,
351        ) {
352            Ok(result) => {
353                self.txn_manager.mark_committed(creator_txn_id);
354                result
355            }
356            Err(err) => {
357                self.txn_manager.mark_aborted(creator_txn_id);
358                let field_ids: Vec<FieldId> =
359                    table_columns.iter().map(|column| column.field_id).collect();
360                let _ = self
361                    .catalog_service
362                    .drop_table(&canonical_name, table_id, &field_ids);
363                return Err(err);
364            }
365        };
366
367        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
368        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
369
370        let mut tables = self.tables.write().unwrap();
371        if tables.contains_key(&canonical_name) {
372            if if_not_exists {
373                return Ok(RuntimeStatementResult::CreateTable {
374                    table_name: display_name,
375                });
376            }
377            return Err(Error::CatalogError(format!(
378                "Catalog Error: Table '{}' already exists",
379                display_name
380            )));
381        }
382        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
383        drop(tables); // Release write lock before catalog operations
384
385        // Flush table metadata to SysCatalog immediately so fallback contexts (e.g., temporary
386        // contexts needing to access persistent tables) can see the table. This is critical for
387        // temporary views that reference persistent tables.
388        self.metadata.flush_table(table_id)?;
389
390        Ok(RuntimeStatementResult::CreateTable {
391            table_name: display_name,
392        })
393    }
394
395    /// Create a new table with the given columns.
396    ///
397    /// Public API method for table creation.
398    pub fn create_table<C, I>(
399        self: &Arc<Self>,
400        name: &str,
401        columns: I,
402    ) -> Result<RuntimeTableHandle<P>>
403    where
404        C: IntoPlanColumnSpec,
405        I: IntoIterator<Item = C>,
406    {
407        self.create_table_with_options(name, columns, false)
408    }
409
410    /// Create a new table if it doesn't already exist.
411    ///
412    /// Public API method for conditional table creation.
413    pub fn create_table_if_not_exists<C, I>(
414        self: &Arc<Self>,
415        name: &str,
416        columns: I,
417    ) -> Result<RuntimeTableHandle<P>>
418    where
419        C: IntoPlanColumnSpec,
420        I: IntoIterator<Item = C>,
421    {
422        self.create_table_with_options(name, columns, true)
423    }
424
425    /// Creates a fluent builder for defining and creating a new table with columns and constraints.
426    ///
427    /// Public API method for advanced table creation with constraints.
428    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
429        RuntimeCreateTableBuilder::new(self, name)
430    }
431
432    /// Internal helper for create_table and create_table_if_not_exists.
433    fn create_table_with_options<C, I>(
434        self: &Arc<Self>,
435        name: &str,
436        columns: I,
437        if_not_exists: bool,
438    ) -> Result<RuntimeTableHandle<P>>
439    where
440        C: IntoPlanColumnSpec,
441        I: IntoIterator<Item = C>,
442    {
443        let mut plan = CreateTablePlan::new(name);
444        plan.if_not_exists = if_not_exists;
445        plan.columns = columns
446            .into_iter()
447            .map(|column| column.into_plan_column_spec())
448            .collect();
449        let result = CatalogDdl::create_table(self.as_ref(), plan)?;
450        match result {
451            RuntimeStatementResult::CreateTable { .. } => {
452                RuntimeTableHandle::new(Arc::clone(self), name)
453            }
454            other => Err(Error::InvalidArgumentError(format!(
455                "unexpected statement result {other:?} when creating table"
456            ))),
457        }
458    }
459
460    /// Execute a CREATE TABLE plan - internal storage API.
461    ///
462    /// Use RuntimeSession::execute_statement() instead for external use.
463    pub(crate) fn execute_create_table(
464        &self,
465        plan: CreateTablePlan,
466    ) -> Result<RuntimeStatementResult<P>> {
467        CatalogDdl::create_table(self, plan)
468    }
469}