llkv_runtime/runtime_context/
table_creation.rs

1//! Table creation operations for RuntimeContext.
2//!
3//! This module contains logic for creating new tables:
4//! - Public API methods (create_table, create_table_if_not_exists, create_table_builder)
5//! - CREATE TABLE from column specifications
6//! - CREATE TABLE AS SELECT (from batches)
7
8use crate::{
9    RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10    canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable, current_time_micros};
15use llkv_plan::{
16    CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::Pager;
20use llkv_table::{
21    CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
22};
23use llkv_transaction::TransactionMvccBuilder;
24use rustc_hash::FxHashMap;
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, RwLock};
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35    /// Create a table from column specifications.
36    pub(super) fn create_table_from_columns(
37        &self,
38        display_name: String,
39        canonical_name: String,
40        columns: Vec<PlanColumnSpec>,
41        foreign_keys: Vec<ForeignKeySpec>,
42        multi_column_uniques: Vec<MultiColumnUniqueSpec>,
43        if_not_exists: bool,
44    ) -> Result<RuntimeStatementResult<P>> {
45        tracing::trace!(
46            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
47            display_name,
48            columns.len()
49        );
50        for (idx, col) in columns.iter().enumerate() {
51            tracing::trace!(
52                "  input column[{}]: name='{}' primary_key={}",
53                idx,
54                col.name,
55                col.primary_key
56            );
57        }
58        if columns.is_empty() {
59            return Err(Error::InvalidArgumentError(
60                "CREATE TABLE requires at least one column".into(),
61            ));
62        }
63
64        // Avoid repeating catalog work if the table already exists.
65        {
66            let tables = self.tables.read().unwrap();
67            if tables.contains_key(&canonical_name) {
68                if if_not_exists {
69                    return Ok(RuntimeStatementResult::CreateTable {
70                        table_name: display_name,
71                    });
72                }
73
74                return Err(Error::CatalogError(format!(
75                    "Catalog Error: Table '{}' already exists",
76                    display_name
77                )));
78            }
79        }
80
81        let CreateTableResult {
82            table_id,
83            table,
84            table_columns,
85            column_lookup,
86        } = Table::create_from_columns(
87            &display_name,
88            &canonical_name,
89            &columns,
90            self.metadata.clone(),
91            self.catalog.clone(),
92            self.store.clone(),
93        )?;
94
95        tracing::trace!(
96            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
97            display_name,
98            table_id,
99            &*self.pager
100        );
101
102        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
103        for (idx, column) in table_columns.iter().enumerate() {
104            tracing::trace!(
105                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
106                idx,
107                column.name,
108                column.data_type,
109                column.nullable,
110                column.primary_key,
111                column.unique
112            );
113            column_defs.push(ExecutorColumn {
114                name: column.name.clone(),
115                data_type: column.data_type.clone(),
116                nullable: column.nullable,
117                primary_key: column.primary_key,
118                unique: column.unique,
119                field_id: column.field_id,
120                check_expr: column.check_expr.clone(),
121            });
122        }
123
124        let schema = Arc::new(ExecutorSchema {
125            columns: column_defs.clone(),
126            lookup: column_lookup,
127        });
128        let table_entry = Arc::new(ExecutorTable {
129            table: Arc::clone(&table),
130            schema,
131            next_row_id: AtomicU64::new(0),
132            total_rows: AtomicU64::new(0),
133            multi_column_uniques: RwLock::new(Vec::new()),
134        });
135
136        let mut tables = self.tables.write().unwrap();
137        if tables.contains_key(&canonical_name) {
138            drop(tables);
139            let field_ids: Vec<FieldId> =
140                table_columns.iter().map(|column| column.field_id).collect();
141            let _ = self
142                .catalog_service
143                .drop_table(&canonical_name, table_id, &field_ids);
144            if if_not_exists {
145                return Ok(RuntimeStatementResult::CreateTable {
146                    table_name: display_name,
147                });
148            }
149            return Err(Error::CatalogError(format!(
150                "Catalog Error: Table '{}' already exists",
151                display_name
152            )));
153        }
154        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
155        drop(tables);
156
157        if !foreign_keys.is_empty() {
158            let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
159                table_id,
160                &display_name,
161                &canonical_name,
162                &table_columns,
163                &foreign_keys,
164                |table_name| {
165                    let (display, canonical) = canonical_table_name(table_name)?;
166                    let referenced_table = self.lookup_table(&canonical).map_err(|_| {
167                        Error::CatalogError(format!(
168                            "Catalog Error: referenced table '{}' does not exist",
169                            table_name
170                        ))
171                    })?;
172
173                    let columns = referenced_table
174                        .schema
175                        .columns
176                        .iter()
177                        .map(|column| ForeignKeyColumn {
178                            name: column.name.clone(),
179                            data_type: column.data_type.clone(),
180                            nullable: column.nullable,
181                            primary_key: column.primary_key,
182                            unique: column.unique,
183                            field_id: column.field_id,
184                        })
185                        .collect();
186
187                    // Get multi-column unique constraints for this table
188                    let multi_column_uniques = self
189                        .catalog_service
190                        .table_view(&canonical)?
191                        .multi_column_uniques;
192
193                    Ok(ForeignKeyTableInfo {
194                        display_name: display,
195                        canonical_name: canonical,
196                        table_id: referenced_table.table.table_id(),
197                        columns,
198                        multi_column_uniques,
199                    })
200                },
201                current_time_micros(),
202            );
203
204            if let Err(err) = fk_result {
205                let field_ids: Vec<FieldId> =
206                    table_columns.iter().map(|column| column.field_id).collect();
207                let _ = self
208                    .catalog_service
209                    .drop_table(&canonical_name, table_id, &field_ids);
210                self.remove_table_entry(&canonical_name);
211                return Err(err);
212            }
213        }
214
215        // Register multi-column unique constraints
216        if !multi_column_uniques.is_empty() {
217            // Build column name to field_id lookup
218            let column_lookup: FxHashMap<String, FieldId> = table_columns
219                .iter()
220                .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
221                .collect();
222
223            for unique_spec in &multi_column_uniques {
224                // Map column names to field IDs
225                let field_ids: Result<Vec<FieldId>> = unique_spec
226                    .columns
227                    .iter()
228                    .map(|col_name| {
229                        let normalized = col_name.to_ascii_lowercase();
230                        column_lookup.get(&normalized).copied().ok_or_else(|| {
231                            Error::InvalidArgumentError(format!(
232                                "unknown column '{}' in UNIQUE constraint",
233                                col_name
234                            ))
235                        })
236                    })
237                    .collect();
238
239                let field_ids = field_ids?;
240
241                let registration_result = self.catalog_service.register_multi_column_unique_index(
242                    table_id,
243                    &field_ids,
244                    unique_spec.name.clone(),
245                );
246
247                if let Err(err) = registration_result {
248                    let field_ids: Vec<FieldId> =
249                        table_columns.iter().map(|column| column.field_id).collect();
250                    let _ = self
251                        .catalog_service
252                        .drop_table(&canonical_name, table_id, &field_ids);
253                    self.remove_table_entry(&canonical_name);
254                    return Err(err);
255                }
256            }
257        }
258
259        // Flush table metadata to SysCatalog immediately so fallback contexts (e.g., temporary
260        // contexts needing to access persistent tables) can see the table. This is critical for
261        // temporary views that reference persistent tables.
262        self.metadata.flush_table(table_id)?;
263
264        Ok(RuntimeStatementResult::CreateTable {
265            table_name: display_name,
266        })
267    }
268
269    /// Create a table from batches (CREATE TABLE AS SELECT).
270    pub(super) fn create_table_from_batches(
271        &self,
272        display_name: String,
273        canonical_name: String,
274        schema: Arc<Schema>,
275        batches: Vec<RecordBatch>,
276        if_not_exists: bool,
277    ) -> Result<RuntimeStatementResult<P>> {
278        {
279            let tables = self.tables.read().unwrap();
280            if tables.contains_key(&canonical_name) {
281                if if_not_exists {
282                    return Ok(RuntimeStatementResult::CreateTable {
283                        table_name: display_name,
284                    });
285                }
286                return Err(Error::CatalogError(format!(
287                    "Catalog Error: Table '{}' already exists",
288                    display_name
289                )));
290            }
291        }
292
293        let CreateTableResult {
294            table_id,
295            table,
296            table_columns,
297            column_lookup,
298        } = self.catalog_service.create_table_from_schema(
299            &display_name,
300            &canonical_name,
301            &schema,
302        )?;
303
304        tracing::trace!(
305            "=== CTAS table '{}' created with table_id={} pager={:p} ===",
306            display_name,
307            table_id,
308            &*self.pager
309        );
310
311        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
312        for column in &table_columns {
313            column_defs.push(ExecutorColumn {
314                name: column.name.clone(),
315                data_type: column.data_type.clone(),
316                nullable: column.nullable,
317                primary_key: column.primary_key,
318                unique: column.unique,
319                field_id: column.field_id,
320                check_expr: column.check_expr.clone(),
321            });
322        }
323
324        let schema_arc = Arc::new(ExecutorSchema {
325            columns: column_defs.clone(),
326            lookup: column_lookup,
327        });
328        let table_entry = Arc::new(ExecutorTable {
329            table: Arc::clone(&table),
330            schema: schema_arc,
331            next_row_id: AtomicU64::new(0),
332            total_rows: AtomicU64::new(0),
333            multi_column_uniques: RwLock::new(Vec::new()),
334        });
335
336        let creator_snapshot = self.txn_manager.begin_transaction();
337        let creator_txn_id = creator_snapshot.txn_id;
338        let mvcc_builder = TransactionMvccBuilder;
339        let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
340            table.as_ref(),
341            &table_columns,
342            &batches,
343            creator_txn_id,
344            TXN_ID_NONE,
345            0,
346            &mvcc_builder,
347        ) {
348            Ok(result) => {
349                self.txn_manager.mark_committed(creator_txn_id);
350                result
351            }
352            Err(err) => {
353                self.txn_manager.mark_aborted(creator_txn_id);
354                let field_ids: Vec<FieldId> =
355                    table_columns.iter().map(|column| column.field_id).collect();
356                let _ = self
357                    .catalog_service
358                    .drop_table(&canonical_name, table_id, &field_ids);
359                return Err(err);
360            }
361        };
362
363        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
364        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
365
366        let mut tables = self.tables.write().unwrap();
367        if tables.contains_key(&canonical_name) {
368            if if_not_exists {
369                return Ok(RuntimeStatementResult::CreateTable {
370                    table_name: display_name,
371                });
372            }
373            return Err(Error::CatalogError(format!(
374                "Catalog Error: Table '{}' already exists",
375                display_name
376            )));
377        }
378        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
379        drop(tables); // Release write lock before catalog operations
380
381        // Flush table metadata to SysCatalog immediately so fallback contexts (e.g., temporary
382        // contexts needing to access persistent tables) can see the table. This is critical for
383        // temporary views that reference persistent tables.
384        self.metadata.flush_table(table_id)?;
385
386        Ok(RuntimeStatementResult::CreateTable {
387            table_name: display_name,
388        })
389    }
390
391    /// Create a new table with the given columns.
392    ///
393    /// Public API method for table creation.
394    pub fn create_table<C, I>(
395        self: &Arc<Self>,
396        name: &str,
397        columns: I,
398    ) -> Result<RuntimeTableHandle<P>>
399    where
400        C: IntoPlanColumnSpec,
401        I: IntoIterator<Item = C>,
402    {
403        self.create_table_with_options(name, columns, false)
404    }
405
406    /// Create a new table if it doesn't already exist.
407    ///
408    /// Public API method for conditional table creation.
409    pub fn create_table_if_not_exists<C, I>(
410        self: &Arc<Self>,
411        name: &str,
412        columns: I,
413    ) -> Result<RuntimeTableHandle<P>>
414    where
415        C: IntoPlanColumnSpec,
416        I: IntoIterator<Item = C>,
417    {
418        self.create_table_with_options(name, columns, true)
419    }
420
421    /// Creates a fluent builder for defining and creating a new table with columns and constraints.
422    ///
423    /// Public API method for advanced table creation with constraints.
424    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
425        RuntimeCreateTableBuilder::new(self, name)
426    }
427
428    /// Internal helper for create_table and create_table_if_not_exists.
429    fn create_table_with_options<C, I>(
430        self: &Arc<Self>,
431        name: &str,
432        columns: I,
433        if_not_exists: bool,
434    ) -> Result<RuntimeTableHandle<P>>
435    where
436        C: IntoPlanColumnSpec,
437        I: IntoIterator<Item = C>,
438    {
439        let mut plan = CreateTablePlan::new(name);
440        plan.if_not_exists = if_not_exists;
441        plan.columns = columns
442            .into_iter()
443            .map(|column| column.into_plan_column_spec())
444            .collect();
445        let result = CatalogDdl::create_table(self.as_ref(), plan)?;
446        match result {
447            RuntimeStatementResult::CreateTable { .. } => {
448                RuntimeTableHandle::new(Arc::clone(self), name)
449            }
450            other => Err(Error::InvalidArgumentError(format!(
451                "unexpected statement result {other:?} when creating table"
452            ))),
453        }
454    }
455
456    /// Execute a CREATE TABLE plan - internal storage API.
457    ///
458    /// Use RuntimeSession::execute_statement() instead for external use.
459    pub(crate) fn execute_create_table(
460        &self,
461        plan: CreateTablePlan,
462    ) -> Result<RuntimeStatementResult<P>> {
463        CatalogDdl::create_table(self, plan)
464    }
465}