1use crate::{
9 RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10 canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable, current_time_micros};
15use llkv_plan::{
16 CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::Pager;
20use llkv_table::{
21 CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
22};
23use llkv_transaction::TransactionMvccBuilder;
24use rustc_hash::FxHashMap;
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, RwLock};
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35 pub(super) fn create_table_from_columns(
37 &self,
38 display_name: String,
39 canonical_name: String,
40 columns: Vec<PlanColumnSpec>,
41 foreign_keys: Vec<ForeignKeySpec>,
42 multi_column_uniques: Vec<MultiColumnUniqueSpec>,
43 if_not_exists: bool,
44 ) -> Result<RuntimeStatementResult<P>> {
45 tracing::trace!(
46 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
47 display_name,
48 columns.len()
49 );
50 for (idx, col) in columns.iter().enumerate() {
51 tracing::trace!(
52 " input column[{}]: name='{}' primary_key={}",
53 idx,
54 col.name,
55 col.primary_key
56 );
57 }
58 if columns.is_empty() {
59 return Err(Error::InvalidArgumentError(
60 "CREATE TABLE requires at least one column".into(),
61 ));
62 }
63
64 {
66 let tables = self.tables.read().unwrap();
67 if tables.contains_key(&canonical_name) {
68 if if_not_exists {
69 return Ok(RuntimeStatementResult::CreateTable {
70 table_name: display_name,
71 });
72 }
73
74 return Err(Error::CatalogError(format!(
75 "Catalog Error: Table '{}' already exists",
76 display_name
77 )));
78 }
79 }
80
81 let CreateTableResult {
82 table_id,
83 table,
84 table_columns,
85 column_lookup,
86 } = Table::create_from_columns(
87 &display_name,
88 &canonical_name,
89 &columns,
90 self.metadata.clone(),
91 self.catalog.clone(),
92 self.store.clone(),
93 )?;
94
95 tracing::trace!(
96 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
97 display_name,
98 table_id,
99 &*self.pager
100 );
101
102 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
103 for (idx, column) in table_columns.iter().enumerate() {
104 tracing::trace!(
105 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
106 idx,
107 column.name,
108 column.data_type,
109 column.nullable,
110 column.primary_key,
111 column.unique
112 );
113 column_defs.push(ExecutorColumn {
114 name: column.name.clone(),
115 data_type: column.data_type.clone(),
116 nullable: column.nullable,
117 primary_key: column.primary_key,
118 unique: column.unique,
119 field_id: column.field_id,
120 check_expr: column.check_expr.clone(),
121 });
122 }
123
124 let schema = Arc::new(ExecutorSchema {
125 columns: column_defs.clone(),
126 lookup: column_lookup,
127 });
128 let table_entry = Arc::new(ExecutorTable {
129 table: Arc::clone(&table),
130 schema,
131 next_row_id: AtomicU64::new(0),
132 total_rows: AtomicU64::new(0),
133 multi_column_uniques: RwLock::new(Vec::new()),
134 });
135
136 let mut tables = self.tables.write().unwrap();
137 if tables.contains_key(&canonical_name) {
138 drop(tables);
139 let field_ids: Vec<FieldId> =
140 table_columns.iter().map(|column| column.field_id).collect();
141 let _ = self
142 .catalog_service
143 .drop_table(&canonical_name, table_id, &field_ids);
144 if if_not_exists {
145 return Ok(RuntimeStatementResult::CreateTable {
146 table_name: display_name,
147 });
148 }
149 return Err(Error::CatalogError(format!(
150 "Catalog Error: Table '{}' already exists",
151 display_name
152 )));
153 }
154 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
155 drop(tables);
156
157 if !foreign_keys.is_empty() {
158 let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
159 table_id,
160 &display_name,
161 &canonical_name,
162 &table_columns,
163 &foreign_keys,
164 |table_name| {
165 let (display, canonical) = canonical_table_name(table_name)?;
166 let referenced_table = self.lookup_table(&canonical).map_err(|_| {
167 Error::CatalogError(format!(
168 "Catalog Error: referenced table '{}' does not exist",
169 table_name
170 ))
171 })?;
172
173 let columns = referenced_table
174 .schema
175 .columns
176 .iter()
177 .map(|column| ForeignKeyColumn {
178 name: column.name.clone(),
179 data_type: column.data_type.clone(),
180 nullable: column.nullable,
181 primary_key: column.primary_key,
182 unique: column.unique,
183 field_id: column.field_id,
184 })
185 .collect();
186
187 let multi_column_uniques = self
189 .catalog_service
190 .table_view(&canonical)?
191 .multi_column_uniques;
192
193 Ok(ForeignKeyTableInfo {
194 display_name: display,
195 canonical_name: canonical,
196 table_id: referenced_table.table.table_id(),
197 columns,
198 multi_column_uniques,
199 })
200 },
201 current_time_micros(),
202 );
203
204 if let Err(err) = fk_result {
205 let field_ids: Vec<FieldId> =
206 table_columns.iter().map(|column| column.field_id).collect();
207 let _ = self
208 .catalog_service
209 .drop_table(&canonical_name, table_id, &field_ids);
210 self.remove_table_entry(&canonical_name);
211 return Err(err);
212 }
213 }
214
215 if !multi_column_uniques.is_empty() {
217 let column_lookup: FxHashMap<String, FieldId> = table_columns
219 .iter()
220 .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
221 .collect();
222
223 for unique_spec in &multi_column_uniques {
224 let field_ids: Result<Vec<FieldId>> = unique_spec
226 .columns
227 .iter()
228 .map(|col_name| {
229 let normalized = col_name.to_ascii_lowercase();
230 column_lookup.get(&normalized).copied().ok_or_else(|| {
231 Error::InvalidArgumentError(format!(
232 "unknown column '{}' in UNIQUE constraint",
233 col_name
234 ))
235 })
236 })
237 .collect();
238
239 let field_ids = field_ids?;
240
241 let registration_result = self.catalog_service.register_multi_column_unique_index(
242 table_id,
243 &field_ids,
244 unique_spec.name.clone(),
245 );
246
247 if let Err(err) = registration_result {
248 let field_ids: Vec<FieldId> =
249 table_columns.iter().map(|column| column.field_id).collect();
250 let _ = self
251 .catalog_service
252 .drop_table(&canonical_name, table_id, &field_ids);
253 self.remove_table_entry(&canonical_name);
254 return Err(err);
255 }
256 }
257 }
258
259 self.metadata.flush_table(table_id)?;
263
264 Ok(RuntimeStatementResult::CreateTable {
265 table_name: display_name,
266 })
267 }
268
269 pub(super) fn create_table_from_batches(
271 &self,
272 display_name: String,
273 canonical_name: String,
274 schema: Arc<Schema>,
275 batches: Vec<RecordBatch>,
276 if_not_exists: bool,
277 ) -> Result<RuntimeStatementResult<P>> {
278 {
279 let tables = self.tables.read().unwrap();
280 if tables.contains_key(&canonical_name) {
281 if if_not_exists {
282 return Ok(RuntimeStatementResult::CreateTable {
283 table_name: display_name,
284 });
285 }
286 return Err(Error::CatalogError(format!(
287 "Catalog Error: Table '{}' already exists",
288 display_name
289 )));
290 }
291 }
292
293 let CreateTableResult {
294 table_id,
295 table,
296 table_columns,
297 column_lookup,
298 } = self.catalog_service.create_table_from_schema(
299 &display_name,
300 &canonical_name,
301 &schema,
302 )?;
303
304 tracing::trace!(
305 "=== CTAS table '{}' created with table_id={} pager={:p} ===",
306 display_name,
307 table_id,
308 &*self.pager
309 );
310
311 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
312 for column in &table_columns {
313 column_defs.push(ExecutorColumn {
314 name: column.name.clone(),
315 data_type: column.data_type.clone(),
316 nullable: column.nullable,
317 primary_key: column.primary_key,
318 unique: column.unique,
319 field_id: column.field_id,
320 check_expr: column.check_expr.clone(),
321 });
322 }
323
324 let schema_arc = Arc::new(ExecutorSchema {
325 columns: column_defs.clone(),
326 lookup: column_lookup,
327 });
328 let table_entry = Arc::new(ExecutorTable {
329 table: Arc::clone(&table),
330 schema: schema_arc,
331 next_row_id: AtomicU64::new(0),
332 total_rows: AtomicU64::new(0),
333 multi_column_uniques: RwLock::new(Vec::new()),
334 });
335
336 let creator_snapshot = self.txn_manager.begin_transaction();
337 let creator_txn_id = creator_snapshot.txn_id;
338 let mvcc_builder = TransactionMvccBuilder;
339 let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
340 table.as_ref(),
341 &table_columns,
342 &batches,
343 creator_txn_id,
344 TXN_ID_NONE,
345 0,
346 &mvcc_builder,
347 ) {
348 Ok(result) => {
349 self.txn_manager.mark_committed(creator_txn_id);
350 result
351 }
352 Err(err) => {
353 self.txn_manager.mark_aborted(creator_txn_id);
354 let field_ids: Vec<FieldId> =
355 table_columns.iter().map(|column| column.field_id).collect();
356 let _ = self
357 .catalog_service
358 .drop_table(&canonical_name, table_id, &field_ids);
359 return Err(err);
360 }
361 };
362
363 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
364 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
365
366 let mut tables = self.tables.write().unwrap();
367 if tables.contains_key(&canonical_name) {
368 if if_not_exists {
369 return Ok(RuntimeStatementResult::CreateTable {
370 table_name: display_name,
371 });
372 }
373 return Err(Error::CatalogError(format!(
374 "Catalog Error: Table '{}' already exists",
375 display_name
376 )));
377 }
378 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
379 drop(tables); self.metadata.flush_table(table_id)?;
385
386 Ok(RuntimeStatementResult::CreateTable {
387 table_name: display_name,
388 })
389 }
390
391 pub fn create_table<C, I>(
395 self: &Arc<Self>,
396 name: &str,
397 columns: I,
398 ) -> Result<RuntimeTableHandle<P>>
399 where
400 C: IntoPlanColumnSpec,
401 I: IntoIterator<Item = C>,
402 {
403 self.create_table_with_options(name, columns, false)
404 }
405
406 pub fn create_table_if_not_exists<C, I>(
410 self: &Arc<Self>,
411 name: &str,
412 columns: I,
413 ) -> Result<RuntimeTableHandle<P>>
414 where
415 C: IntoPlanColumnSpec,
416 I: IntoIterator<Item = C>,
417 {
418 self.create_table_with_options(name, columns, true)
419 }
420
421 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
425 RuntimeCreateTableBuilder::new(self, name)
426 }
427
428 fn create_table_with_options<C, I>(
430 self: &Arc<Self>,
431 name: &str,
432 columns: I,
433 if_not_exists: bool,
434 ) -> Result<RuntimeTableHandle<P>>
435 where
436 C: IntoPlanColumnSpec,
437 I: IntoIterator<Item = C>,
438 {
439 let mut plan = CreateTablePlan::new(name);
440 plan.if_not_exists = if_not_exists;
441 plan.columns = columns
442 .into_iter()
443 .map(|column| column.into_plan_column_spec())
444 .collect();
445 let result = CatalogDdl::create_table(self.as_ref(), plan)?;
446 match result {
447 RuntimeStatementResult::CreateTable { .. } => {
448 RuntimeTableHandle::new(Arc::clone(self), name)
449 }
450 other => Err(Error::InvalidArgumentError(format!(
451 "unexpected statement result {other:?} when creating table"
452 ))),
453 }
454 }
455
456 pub(crate) fn execute_create_table(
460 &self,
461 plan: CreateTablePlan,
462 ) -> Result<RuntimeStatementResult<P>> {
463 CatalogDdl::create_table(self, plan)
464 }
465}