1use crate::{
9 RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10 canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{
15 ExecutorColumn, ExecutorSchema, ExecutorTable, TableStorageAdapter, current_time_micros,
16};
17use llkv_plan::{
18 CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
19};
20use llkv_result::{Error, Result};
21use llkv_storage::pager::Pager;
22use llkv_table::{
23 CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
24};
25use llkv_transaction::TransactionMvccBuilder;
26use rustc_hash::FxHashMap;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::{Arc, RwLock};
30
31use super::RuntimeContext;
32
33impl<P> RuntimeContext<P>
34where
35 P: Pager<Blob = EntryHandle> + Send + Sync,
36{
37 pub(super) fn create_table_from_columns(
39 &self,
40 display_name: String,
41 canonical_name: String,
42 columns: Vec<PlanColumnSpec>,
43 foreign_keys: Vec<ForeignKeySpec>,
44 multi_column_uniques: Vec<MultiColumnUniqueSpec>,
45 if_not_exists: bool,
46 ) -> Result<RuntimeStatementResult<P>> {
47 tracing::trace!(
48 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
49 display_name,
50 columns.len()
51 );
52 for (idx, col) in columns.iter().enumerate() {
53 tracing::trace!(
54 " input column[{}]: name='{}' primary_key={}",
55 idx,
56 col.name,
57 col.primary_key
58 );
59 }
60 if columns.is_empty() {
61 return Err(Error::InvalidArgumentError(
62 "CREATE TABLE requires at least one column".into(),
63 ));
64 }
65
66 {
68 let tables = self.tables.read().unwrap();
69 if tables.contains_key(&canonical_name) {
70 if if_not_exists {
71 return Ok(RuntimeStatementResult::CreateTable {
72 table_name: display_name,
73 });
74 }
75
76 return Err(Error::CatalogError(format!(
77 "Catalog Error: Table '{}' already exists",
78 display_name
79 )));
80 }
81 }
82
83 let CreateTableResult {
84 table_id,
85 table,
86 table_columns,
87 column_lookup,
88 } = Table::create_from_columns(
89 &display_name,
90 &canonical_name,
91 &columns,
92 self.metadata.clone(),
93 self.catalog.clone(),
94 self.store.clone(),
95 )?;
96
97 tracing::trace!(
98 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
99 display_name,
100 table_id,
101 &*self.pager
102 );
103
104 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
105 for (idx, column) in table_columns.iter().enumerate() {
106 tracing::trace!(
107 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
108 idx,
109 column.name,
110 column.data_type,
111 column.nullable,
112 column.primary_key,
113 column.unique
114 );
115 column_defs.push(ExecutorColumn {
116 name: column.name.clone(),
117 data_type: column.data_type.clone(),
118 nullable: column.nullable,
119 primary_key: column.primary_key,
120 unique: column.unique,
121 field_id: column.field_id,
122 check_expr: column.check_expr.clone(),
123 });
124 }
125
126 let schema = Arc::new(ExecutorSchema {
127 columns: column_defs.clone(),
128 lookup: column_lookup,
129 });
130 let table_entry = Arc::new(ExecutorTable {
131 storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
132 table: Arc::clone(&table),
133 schema,
134 next_row_id: AtomicU64::new(0),
135 total_rows: AtomicU64::new(0),
136 multi_column_uniques: RwLock::new(Vec::new()),
137 });
138
139 let mut tables = self.tables.write().unwrap();
140 if tables.contains_key(&canonical_name) {
141 drop(tables);
142 let field_ids: Vec<FieldId> =
143 table_columns.iter().map(|column| column.field_id).collect();
144 let _ = self
145 .catalog_service
146 .drop_table(&canonical_name, table_id, &field_ids);
147 if if_not_exists {
148 return Ok(RuntimeStatementResult::CreateTable {
149 table_name: display_name,
150 });
151 }
152 return Err(Error::CatalogError(format!(
153 "Catalog Error: Table '{}' already exists",
154 display_name
155 )));
156 }
157 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
158 drop(tables);
159
160 if !foreign_keys.is_empty() {
161 let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
162 table_id,
163 &display_name,
164 &canonical_name,
165 &table_columns,
166 &foreign_keys,
167 |table_name| {
168 let (display, canonical) = canonical_table_name(table_name)?;
169 let referenced_table = self.lookup_table(&canonical).map_err(|_| {
170 Error::CatalogError(format!(
171 "Catalog Error: referenced table '{}' does not exist",
172 table_name
173 ))
174 })?;
175
176 let columns = referenced_table
177 .schema
178 .columns
179 .iter()
180 .map(|column| ForeignKeyColumn {
181 name: column.name.clone(),
182 data_type: column.data_type.clone(),
183 nullable: column.nullable,
184 primary_key: column.primary_key,
185 unique: column.unique,
186 field_id: column.field_id,
187 })
188 .collect();
189
190 let multi_column_uniques = self
192 .catalog_service
193 .table_view(&canonical)?
194 .multi_column_uniques;
195
196 Ok(ForeignKeyTableInfo {
197 display_name: display,
198 canonical_name: canonical,
199 table_id: referenced_table.table_id(),
200 columns,
201 multi_column_uniques,
202 })
203 },
204 current_time_micros(),
205 );
206
207 if let Err(err) = fk_result {
208 let field_ids: Vec<FieldId> =
209 table_columns.iter().map(|column| column.field_id).collect();
210 let _ = self
211 .catalog_service
212 .drop_table(&canonical_name, table_id, &field_ids);
213 self.remove_table_entry(&canonical_name);
214 return Err(err);
215 }
216 }
217
218 if !multi_column_uniques.is_empty() {
220 let column_lookup: FxHashMap<String, FieldId> = table_columns
222 .iter()
223 .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
224 .collect();
225
226 for unique_spec in &multi_column_uniques {
227 let field_ids: Result<Vec<FieldId>> = unique_spec
229 .columns
230 .iter()
231 .map(|col_name| {
232 let normalized = col_name.to_ascii_lowercase();
233 column_lookup.get(&normalized).copied().ok_or_else(|| {
234 Error::InvalidArgumentError(format!(
235 "unknown column '{}' in UNIQUE constraint",
236 col_name
237 ))
238 })
239 })
240 .collect();
241
242 let field_ids = field_ids?;
243
244 let registration_result = self.catalog_service.register_multi_column_unique_index(
245 table_id,
246 &field_ids,
247 unique_spec.name.clone(),
248 );
249
250 if let Err(err) = registration_result {
251 let field_ids: Vec<FieldId> =
252 table_columns.iter().map(|column| column.field_id).collect();
253 let _ = self
254 .catalog_service
255 .drop_table(&canonical_name, table_id, &field_ids);
256 self.remove_table_entry(&canonical_name);
257 return Err(err);
258 }
259 }
260 }
261
262 self.metadata.flush_table(table_id)?;
266
267 Ok(RuntimeStatementResult::CreateTable {
268 table_name: display_name,
269 })
270 }
271
272 pub(super) fn create_table_from_batches(
274 &self,
275 display_name: String,
276 canonical_name: String,
277 schema: Arc<Schema>,
278 batches: Vec<RecordBatch>,
279 if_not_exists: bool,
280 ) -> Result<RuntimeStatementResult<P>> {
281 {
282 let tables = self.tables.read().unwrap();
283 if tables.contains_key(&canonical_name) {
284 if if_not_exists {
285 return Ok(RuntimeStatementResult::CreateTable {
286 table_name: display_name,
287 });
288 }
289 return Err(Error::CatalogError(format!(
290 "Catalog Error: Table '{}' already exists",
291 display_name
292 )));
293 }
294 }
295
296 let CreateTableResult {
297 table_id,
298 table,
299 table_columns,
300 column_lookup,
301 } = self.catalog_service.create_table_from_schema(
302 &display_name,
303 &canonical_name,
304 &schema,
305 )?;
306
307 tracing::trace!(
308 "=== CTAS table '{}' created with table_id={} pager={:p} ===",
309 display_name,
310 table_id,
311 &*self.pager
312 );
313
314 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
315 for column in &table_columns {
316 column_defs.push(ExecutorColumn {
317 name: column.name.clone(),
318 data_type: column.data_type.clone(),
319 nullable: column.nullable,
320 primary_key: column.primary_key,
321 unique: column.unique,
322 field_id: column.field_id,
323 check_expr: column.check_expr.clone(),
324 });
325 }
326
327 let schema_arc = Arc::new(ExecutorSchema {
328 columns: column_defs.clone(),
329 lookup: column_lookup,
330 });
331 let table_entry = Arc::new(ExecutorTable {
332 storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
333 table: Arc::clone(&table),
334 schema: schema_arc,
335 next_row_id: AtomicU64::new(0),
336 total_rows: AtomicU64::new(0),
337 multi_column_uniques: RwLock::new(Vec::new()),
338 });
339
340 let creator_snapshot = self.txn_manager.begin_transaction();
341 let creator_txn_id = creator_snapshot.txn_id;
342 let mvcc_builder = TransactionMvccBuilder;
343 let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
344 table.as_ref(),
345 &table_columns,
346 &batches,
347 creator_txn_id,
348 TXN_ID_NONE,
349 0,
350 &mvcc_builder,
351 ) {
352 Ok(result) => {
353 self.txn_manager.mark_committed(creator_txn_id);
354 result
355 }
356 Err(err) => {
357 self.txn_manager.mark_aborted(creator_txn_id);
358 let field_ids: Vec<FieldId> =
359 table_columns.iter().map(|column| column.field_id).collect();
360 let _ = self
361 .catalog_service
362 .drop_table(&canonical_name, table_id, &field_ids);
363 return Err(err);
364 }
365 };
366
367 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
368 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
369
370 let mut tables = self.tables.write().unwrap();
371 if tables.contains_key(&canonical_name) {
372 if if_not_exists {
373 return Ok(RuntimeStatementResult::CreateTable {
374 table_name: display_name,
375 });
376 }
377 return Err(Error::CatalogError(format!(
378 "Catalog Error: Table '{}' already exists",
379 display_name
380 )));
381 }
382 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
383 drop(tables); self.metadata.flush_table(table_id)?;
389
390 Ok(RuntimeStatementResult::CreateTable {
391 table_name: display_name,
392 })
393 }
394
395 pub fn create_table<C, I>(
399 self: &Arc<Self>,
400 name: &str,
401 columns: I,
402 ) -> Result<RuntimeTableHandle<P>>
403 where
404 C: IntoPlanColumnSpec,
405 I: IntoIterator<Item = C>,
406 {
407 self.create_table_with_options(name, columns, false)
408 }
409
410 pub fn create_table_if_not_exists<C, I>(
414 self: &Arc<Self>,
415 name: &str,
416 columns: I,
417 ) -> Result<RuntimeTableHandle<P>>
418 where
419 C: IntoPlanColumnSpec,
420 I: IntoIterator<Item = C>,
421 {
422 self.create_table_with_options(name, columns, true)
423 }
424
425 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
429 RuntimeCreateTableBuilder::new(self, name)
430 }
431
432 fn create_table_with_options<C, I>(
434 self: &Arc<Self>,
435 name: &str,
436 columns: I,
437 if_not_exists: bool,
438 ) -> Result<RuntimeTableHandle<P>>
439 where
440 C: IntoPlanColumnSpec,
441 I: IntoIterator<Item = C>,
442 {
443 let mut plan = CreateTablePlan::new(name);
444 plan.if_not_exists = if_not_exists;
445 plan.columns = columns
446 .into_iter()
447 .map(|column| column.into_plan_column_spec())
448 .collect();
449 let result = CatalogDdl::create_table(self.as_ref(), plan)?;
450 match result {
451 RuntimeStatementResult::CreateTable { .. } => {
452 RuntimeTableHandle::new(Arc::clone(self), name)
453 }
454 other => Err(Error::InvalidArgumentError(format!(
455 "unexpected statement result {other:?} when creating table"
456 ))),
457 }
458 }
459
460 pub(crate) fn execute_create_table(
464 &self,
465 plan: CreateTablePlan,
466 ) -> Result<RuntimeStatementResult<P>> {
467 CatalogDdl::create_table(self, plan)
468 }
469}