llkv_runtime/runtime_context/
table_creation.rs

1//! Table creation operations for RuntimeContext.
2//!
3//! This module contains logic for creating new tables:
4//! - Public API methods (create_table, create_table_if_not_exists, create_table_builder)
5//! - CREATE TABLE from column specifications
6//! - CREATE TABLE AS SELECT (from batches)
7
8use crate::{
9    RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10    canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable, current_time_micros};
15use llkv_plan::{
16    CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::Pager;
20use llkv_table::{
21    CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
22};
23use llkv_transaction::TransactionMvccBuilder;
24use rustc_hash::FxHashMap;
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, RwLock};
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35    /// Create a table from column specifications.
36    pub(super) fn create_table_from_columns(
37        &self,
38        display_name: String,
39        canonical_name: String,
40        columns: Vec<PlanColumnSpec>,
41        foreign_keys: Vec<ForeignKeySpec>,
42        multi_column_uniques: Vec<MultiColumnUniqueSpec>,
43        if_not_exists: bool,
44    ) -> Result<RuntimeStatementResult<P>> {
45        tracing::trace!(
46            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
47            display_name,
48            columns.len()
49        );
50        for (idx, col) in columns.iter().enumerate() {
51            tracing::trace!(
52                "  input column[{}]: name='{}' primary_key={}",
53                idx,
54                col.name,
55                col.primary_key
56            );
57        }
58        if columns.is_empty() {
59            return Err(Error::InvalidArgumentError(
60                "CREATE TABLE requires at least one column".into(),
61            ));
62        }
63
64        // Avoid repeating catalog work if the table already exists.
65        {
66            let tables = self.tables.read().unwrap();
67            if tables.contains_key(&canonical_name) {
68                if if_not_exists {
69                    return Ok(RuntimeStatementResult::CreateTable {
70                        table_name: display_name,
71                    });
72                }
73
74                return Err(Error::CatalogError(format!(
75                    "Catalog Error: Table '{}' already exists",
76                    display_name
77                )));
78            }
79        }
80
81        let CreateTableResult {
82            table_id,
83            table,
84            table_columns,
85            column_lookup,
86        } = Table::create_from_columns(
87            &display_name,
88            &canonical_name,
89            &columns,
90            self.metadata.clone(),
91            self.catalog.clone(),
92            self.store.clone(),
93        )?;
94
95        tracing::trace!(
96            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
97            display_name,
98            table_id,
99            &*self.pager
100        );
101
102        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
103        for (idx, column) in table_columns.iter().enumerate() {
104            tracing::trace!(
105                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
106                idx,
107                column.name,
108                column.data_type,
109                column.nullable,
110                column.primary_key,
111                column.unique
112            );
113            column_defs.push(ExecutorColumn {
114                name: column.name.clone(),
115                data_type: column.data_type.clone(),
116                nullable: column.nullable,
117                primary_key: column.primary_key,
118                unique: column.unique,
119                field_id: column.field_id,
120                check_expr: column.check_expr.clone(),
121            });
122        }
123
124        let schema = Arc::new(ExecutorSchema {
125            columns: column_defs.clone(),
126            lookup: column_lookup,
127        });
128        let table_entry = Arc::new(ExecutorTable {
129            table: Arc::clone(&table),
130            schema,
131            next_row_id: AtomicU64::new(0),
132            total_rows: AtomicU64::new(0),
133            multi_column_uniques: RwLock::new(Vec::new()),
134        });
135
136        let mut tables = self.tables.write().unwrap();
137        if tables.contains_key(&canonical_name) {
138            drop(tables);
139            let field_ids: Vec<FieldId> =
140                table_columns.iter().map(|column| column.field_id).collect();
141            let _ = self
142                .catalog_service
143                .drop_table(&canonical_name, table_id, &field_ids);
144            if if_not_exists {
145                return Ok(RuntimeStatementResult::CreateTable {
146                    table_name: display_name,
147                });
148            }
149            return Err(Error::CatalogError(format!(
150                "Catalog Error: Table '{}' already exists",
151                display_name
152            )));
153        }
154        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
155        drop(tables);
156
157        if !foreign_keys.is_empty() {
158            let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
159                table_id,
160                &display_name,
161                &canonical_name,
162                &table_columns,
163                &foreign_keys,
164                |table_name| {
165                    let (display, canonical) = canonical_table_name(table_name)?;
166                    let referenced_table = self.lookup_table(&canonical).map_err(|_| {
167                        Error::CatalogError(format!(
168                            "Catalog Error: referenced table '{}' does not exist",
169                            table_name
170                        ))
171                    })?;
172
173                    let columns = referenced_table
174                        .schema
175                        .columns
176                        .iter()
177                        .map(|column| ForeignKeyColumn {
178                            name: column.name.clone(),
179                            data_type: column.data_type.clone(),
180                            nullable: column.nullable,
181                            primary_key: column.primary_key,
182                            unique: column.unique,
183                            field_id: column.field_id,
184                        })
185                        .collect();
186
187                    // Get multi-column unique constraints for this table
188                    let multi_column_uniques = self
189                        .catalog_service
190                        .table_view(&canonical)?
191                        .multi_column_uniques;
192
193                    Ok(ForeignKeyTableInfo {
194                        display_name: display,
195                        canonical_name: canonical,
196                        table_id: referenced_table.table.table_id(),
197                        columns,
198                        multi_column_uniques,
199                    })
200                },
201                current_time_micros(),
202            );
203
204            if let Err(err) = fk_result {
205                let field_ids: Vec<FieldId> =
206                    table_columns.iter().map(|column| column.field_id).collect();
207                let _ = self
208                    .catalog_service
209                    .drop_table(&canonical_name, table_id, &field_ids);
210                self.remove_table_entry(&canonical_name);
211                return Err(err);
212            }
213        }
214
215        // Register multi-column unique constraints
216        if !multi_column_uniques.is_empty() {
217            // Build column name to field_id lookup
218            let column_lookup: FxHashMap<String, FieldId> = table_columns
219                .iter()
220                .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
221                .collect();
222
223            for unique_spec in &multi_column_uniques {
224                // Map column names to field IDs
225                let field_ids: Result<Vec<FieldId>> = unique_spec
226                    .columns
227                    .iter()
228                    .map(|col_name| {
229                        let normalized = col_name.to_ascii_lowercase();
230                        column_lookup.get(&normalized).copied().ok_or_else(|| {
231                            Error::InvalidArgumentError(format!(
232                                "unknown column '{}' in UNIQUE constraint",
233                                col_name
234                            ))
235                        })
236                    })
237                    .collect();
238
239                let field_ids = field_ids?;
240
241                let registration_result = self.catalog_service.register_multi_column_unique_index(
242                    table_id,
243                    &field_ids,
244                    unique_spec.name.clone(),
245                );
246
247                if let Err(err) = registration_result {
248                    let field_ids: Vec<FieldId> =
249                        table_columns.iter().map(|column| column.field_id).collect();
250                    let _ = self
251                        .catalog_service
252                        .drop_table(&canonical_name, table_id, &field_ids);
253                    self.remove_table_entry(&canonical_name);
254                    return Err(err);
255                }
256            }
257        }
258
259        Ok(RuntimeStatementResult::CreateTable {
260            table_name: display_name,
261        })
262    }
263
264    /// Create a table from batches (CREATE TABLE AS SELECT).
265    pub(super) fn create_table_from_batches(
266        &self,
267        display_name: String,
268        canonical_name: String,
269        schema: Arc<Schema>,
270        batches: Vec<RecordBatch>,
271        if_not_exists: bool,
272    ) -> Result<RuntimeStatementResult<P>> {
273        {
274            let tables = self.tables.read().unwrap();
275            if tables.contains_key(&canonical_name) {
276                if if_not_exists {
277                    return Ok(RuntimeStatementResult::CreateTable {
278                        table_name: display_name,
279                    });
280                }
281                return Err(Error::CatalogError(format!(
282                    "Catalog Error: Table '{}' already exists",
283                    display_name
284                )));
285            }
286        }
287
288        let CreateTableResult {
289            table_id,
290            table,
291            table_columns,
292            column_lookup,
293        } = self.catalog_service.create_table_from_schema(
294            &display_name,
295            &canonical_name,
296            &schema,
297        )?;
298
299        tracing::trace!(
300            "=== CTAS table '{}' created with table_id={} pager={:p} ===",
301            display_name,
302            table_id,
303            &*self.pager
304        );
305
306        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
307        for column in &table_columns {
308            column_defs.push(ExecutorColumn {
309                name: column.name.clone(),
310                data_type: column.data_type.clone(),
311                nullable: column.nullable,
312                primary_key: column.primary_key,
313                unique: column.unique,
314                field_id: column.field_id,
315                check_expr: column.check_expr.clone(),
316            });
317        }
318
319        let schema_arc = Arc::new(ExecutorSchema {
320            columns: column_defs.clone(),
321            lookup: column_lookup,
322        });
323        let table_entry = Arc::new(ExecutorTable {
324            table: Arc::clone(&table),
325            schema: schema_arc,
326            next_row_id: AtomicU64::new(0),
327            total_rows: AtomicU64::new(0),
328            multi_column_uniques: RwLock::new(Vec::new()),
329        });
330
331        let creator_snapshot = self.txn_manager.begin_transaction();
332        let creator_txn_id = creator_snapshot.txn_id;
333        let mvcc_builder = TransactionMvccBuilder;
334        let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
335            table.as_ref(),
336            &table_columns,
337            &batches,
338            creator_txn_id,
339            TXN_ID_NONE,
340            0,
341            &mvcc_builder,
342        ) {
343            Ok(result) => {
344                self.txn_manager.mark_committed(creator_txn_id);
345                result
346            }
347            Err(err) => {
348                self.txn_manager.mark_aborted(creator_txn_id);
349                let field_ids: Vec<FieldId> =
350                    table_columns.iter().map(|column| column.field_id).collect();
351                let _ = self
352                    .catalog_service
353                    .drop_table(&canonical_name, table_id, &field_ids);
354                return Err(err);
355            }
356        };
357
358        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
359        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
360
361        let mut tables = self.tables.write().unwrap();
362        if tables.contains_key(&canonical_name) {
363            if if_not_exists {
364                return Ok(RuntimeStatementResult::CreateTable {
365                    table_name: display_name,
366                });
367            }
368            return Err(Error::CatalogError(format!(
369                "Catalog Error: Table '{}' already exists",
370                display_name
371            )));
372        }
373        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
374        drop(tables); // Release write lock before catalog operations
375
376        Ok(RuntimeStatementResult::CreateTable {
377            table_name: display_name,
378        })
379    }
380
381    /// Create a new table with the given columns.
382    ///
383    /// Public API method for table creation.
384    pub fn create_table<C, I>(
385        self: &Arc<Self>,
386        name: &str,
387        columns: I,
388    ) -> Result<RuntimeTableHandle<P>>
389    where
390        C: IntoPlanColumnSpec,
391        I: IntoIterator<Item = C>,
392    {
393        self.create_table_with_options(name, columns, false)
394    }
395
396    /// Create a new table if it doesn't already exist.
397    ///
398    /// Public API method for conditional table creation.
399    pub fn create_table_if_not_exists<C, I>(
400        self: &Arc<Self>,
401        name: &str,
402        columns: I,
403    ) -> Result<RuntimeTableHandle<P>>
404    where
405        C: IntoPlanColumnSpec,
406        I: IntoIterator<Item = C>,
407    {
408        self.create_table_with_options(name, columns, true)
409    }
410
411    /// Creates a fluent builder for defining and creating a new table with columns and constraints.
412    ///
413    /// Public API method for advanced table creation with constraints.
414    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
415        RuntimeCreateTableBuilder::new(self, name)
416    }
417
418    /// Internal helper for create_table and create_table_if_not_exists.
419    fn create_table_with_options<C, I>(
420        self: &Arc<Self>,
421        name: &str,
422        columns: I,
423        if_not_exists: bool,
424    ) -> Result<RuntimeTableHandle<P>>
425    where
426        C: IntoPlanColumnSpec,
427        I: IntoIterator<Item = C>,
428    {
429        let mut plan = CreateTablePlan::new(name);
430        plan.if_not_exists = if_not_exists;
431        plan.columns = columns
432            .into_iter()
433            .map(|column| column.into_plan_column_spec())
434            .collect();
435        let result = CatalogDdl::create_table(self.as_ref(), plan)?;
436        match result {
437            RuntimeStatementResult::CreateTable { .. } => {
438                RuntimeTableHandle::new(Arc::clone(self), name)
439            }
440            other => Err(Error::InvalidArgumentError(format!(
441                "unexpected statement result {other:?} when creating table"
442            ))),
443        }
444    }
445
446    /// Execute a CREATE TABLE plan - internal storage API.
447    ///
448    /// Use RuntimeSession::execute_statement() instead for external use.
449    pub(crate) fn execute_create_table(
450        &self,
451        plan: CreateTablePlan,
452    ) -> Result<RuntimeStatementResult<P>> {
453        CatalogDdl::create_table(self, plan)
454    }
455}