1use crate::{
9 RuntimeCreateTableBuilder, RuntimeStatementResult, RuntimeTableHandle, TXN_ID_NONE,
10 canonical_table_name,
11};
12use arrow::datatypes::Schema;
13use arrow::record_batch::RecordBatch;
14use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable, current_time_micros};
15use llkv_plan::{
16 CreateTablePlan, ForeignKeySpec, IntoPlanColumnSpec, MultiColumnUniqueSpec, PlanColumnSpec,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::Pager;
20use llkv_table::{
21 CatalogDdl, CreateTableResult, FieldId, ForeignKeyColumn, ForeignKeyTableInfo, Table,
22};
23use llkv_transaction::TransactionMvccBuilder;
24use rustc_hash::FxHashMap;
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, RwLock};
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35 pub(super) fn create_table_from_columns(
37 &self,
38 display_name: String,
39 canonical_name: String,
40 columns: Vec<PlanColumnSpec>,
41 foreign_keys: Vec<ForeignKeySpec>,
42 multi_column_uniques: Vec<MultiColumnUniqueSpec>,
43 if_not_exists: bool,
44 ) -> Result<RuntimeStatementResult<P>> {
45 tracing::trace!(
46 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
47 display_name,
48 columns.len()
49 );
50 for (idx, col) in columns.iter().enumerate() {
51 tracing::trace!(
52 " input column[{}]: name='{}' primary_key={}",
53 idx,
54 col.name,
55 col.primary_key
56 );
57 }
58 if columns.is_empty() {
59 return Err(Error::InvalidArgumentError(
60 "CREATE TABLE requires at least one column".into(),
61 ));
62 }
63
64 {
66 let tables = self.tables.read().unwrap();
67 if tables.contains_key(&canonical_name) {
68 if if_not_exists {
69 return Ok(RuntimeStatementResult::CreateTable {
70 table_name: display_name,
71 });
72 }
73
74 return Err(Error::CatalogError(format!(
75 "Catalog Error: Table '{}' already exists",
76 display_name
77 )));
78 }
79 }
80
81 let CreateTableResult {
82 table_id,
83 table,
84 table_columns,
85 column_lookup,
86 } = Table::create_from_columns(
87 &display_name,
88 &canonical_name,
89 &columns,
90 self.metadata.clone(),
91 self.catalog.clone(),
92 self.store.clone(),
93 )?;
94
95 tracing::trace!(
96 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
97 display_name,
98 table_id,
99 &*self.pager
100 );
101
102 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
103 for (idx, column) in table_columns.iter().enumerate() {
104 tracing::trace!(
105 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
106 idx,
107 column.name,
108 column.data_type,
109 column.nullable,
110 column.primary_key,
111 column.unique
112 );
113 column_defs.push(ExecutorColumn {
114 name: column.name.clone(),
115 data_type: column.data_type.clone(),
116 nullable: column.nullable,
117 primary_key: column.primary_key,
118 unique: column.unique,
119 field_id: column.field_id,
120 check_expr: column.check_expr.clone(),
121 });
122 }
123
124 let schema = Arc::new(ExecutorSchema {
125 columns: column_defs.clone(),
126 lookup: column_lookup,
127 });
128 let table_entry = Arc::new(ExecutorTable {
129 table: Arc::clone(&table),
130 schema,
131 next_row_id: AtomicU64::new(0),
132 total_rows: AtomicU64::new(0),
133 multi_column_uniques: RwLock::new(Vec::new()),
134 });
135
136 let mut tables = self.tables.write().unwrap();
137 if tables.contains_key(&canonical_name) {
138 drop(tables);
139 let field_ids: Vec<FieldId> =
140 table_columns.iter().map(|column| column.field_id).collect();
141 let _ = self
142 .catalog_service
143 .drop_table(&canonical_name, table_id, &field_ids);
144 if if_not_exists {
145 return Ok(RuntimeStatementResult::CreateTable {
146 table_name: display_name,
147 });
148 }
149 return Err(Error::CatalogError(format!(
150 "Catalog Error: Table '{}' already exists",
151 display_name
152 )));
153 }
154 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
155 drop(tables);
156
157 if !foreign_keys.is_empty() {
158 let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
159 table_id,
160 &display_name,
161 &canonical_name,
162 &table_columns,
163 &foreign_keys,
164 |table_name| {
165 let (display, canonical) = canonical_table_name(table_name)?;
166 let referenced_table = self.lookup_table(&canonical).map_err(|_| {
167 Error::CatalogError(format!(
168 "Catalog Error: referenced table '{}' does not exist",
169 table_name
170 ))
171 })?;
172
173 let columns = referenced_table
174 .schema
175 .columns
176 .iter()
177 .map(|column| ForeignKeyColumn {
178 name: column.name.clone(),
179 data_type: column.data_type.clone(),
180 nullable: column.nullable,
181 primary_key: column.primary_key,
182 unique: column.unique,
183 field_id: column.field_id,
184 })
185 .collect();
186
187 let multi_column_uniques = self
189 .catalog_service
190 .table_view(&canonical)?
191 .multi_column_uniques;
192
193 Ok(ForeignKeyTableInfo {
194 display_name: display,
195 canonical_name: canonical,
196 table_id: referenced_table.table.table_id(),
197 columns,
198 multi_column_uniques,
199 })
200 },
201 current_time_micros(),
202 );
203
204 if let Err(err) = fk_result {
205 let field_ids: Vec<FieldId> =
206 table_columns.iter().map(|column| column.field_id).collect();
207 let _ = self
208 .catalog_service
209 .drop_table(&canonical_name, table_id, &field_ids);
210 self.remove_table_entry(&canonical_name);
211 return Err(err);
212 }
213 }
214
215 if !multi_column_uniques.is_empty() {
217 let column_lookup: FxHashMap<String, FieldId> = table_columns
219 .iter()
220 .map(|col| (col.name.to_ascii_lowercase(), col.field_id))
221 .collect();
222
223 for unique_spec in &multi_column_uniques {
224 let field_ids: Result<Vec<FieldId>> = unique_spec
226 .columns
227 .iter()
228 .map(|col_name| {
229 let normalized = col_name.to_ascii_lowercase();
230 column_lookup.get(&normalized).copied().ok_or_else(|| {
231 Error::InvalidArgumentError(format!(
232 "unknown column '{}' in UNIQUE constraint",
233 col_name
234 ))
235 })
236 })
237 .collect();
238
239 let field_ids = field_ids?;
240
241 let registration_result = self.catalog_service.register_multi_column_unique_index(
242 table_id,
243 &field_ids,
244 unique_spec.name.clone(),
245 );
246
247 if let Err(err) = registration_result {
248 let field_ids: Vec<FieldId> =
249 table_columns.iter().map(|column| column.field_id).collect();
250 let _ = self
251 .catalog_service
252 .drop_table(&canonical_name, table_id, &field_ids);
253 self.remove_table_entry(&canonical_name);
254 return Err(err);
255 }
256 }
257 }
258
259 Ok(RuntimeStatementResult::CreateTable {
260 table_name: display_name,
261 })
262 }
263
264 pub(super) fn create_table_from_batches(
266 &self,
267 display_name: String,
268 canonical_name: String,
269 schema: Arc<Schema>,
270 batches: Vec<RecordBatch>,
271 if_not_exists: bool,
272 ) -> Result<RuntimeStatementResult<P>> {
273 {
274 let tables = self.tables.read().unwrap();
275 if tables.contains_key(&canonical_name) {
276 if if_not_exists {
277 return Ok(RuntimeStatementResult::CreateTable {
278 table_name: display_name,
279 });
280 }
281 return Err(Error::CatalogError(format!(
282 "Catalog Error: Table '{}' already exists",
283 display_name
284 )));
285 }
286 }
287
288 let CreateTableResult {
289 table_id,
290 table,
291 table_columns,
292 column_lookup,
293 } = self.catalog_service.create_table_from_schema(
294 &display_name,
295 &canonical_name,
296 &schema,
297 )?;
298
299 tracing::trace!(
300 "=== CTAS table '{}' created with table_id={} pager={:p} ===",
301 display_name,
302 table_id,
303 &*self.pager
304 );
305
306 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
307 for column in &table_columns {
308 column_defs.push(ExecutorColumn {
309 name: column.name.clone(),
310 data_type: column.data_type.clone(),
311 nullable: column.nullable,
312 primary_key: column.primary_key,
313 unique: column.unique,
314 field_id: column.field_id,
315 check_expr: column.check_expr.clone(),
316 });
317 }
318
319 let schema_arc = Arc::new(ExecutorSchema {
320 columns: column_defs.clone(),
321 lookup: column_lookup,
322 });
323 let table_entry = Arc::new(ExecutorTable {
324 table: Arc::clone(&table),
325 schema: schema_arc,
326 next_row_id: AtomicU64::new(0),
327 total_rows: AtomicU64::new(0),
328 multi_column_uniques: RwLock::new(Vec::new()),
329 });
330
331 let creator_snapshot = self.txn_manager.begin_transaction();
332 let creator_txn_id = creator_snapshot.txn_id;
333 let mvcc_builder = TransactionMvccBuilder;
334 let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
335 table.as_ref(),
336 &table_columns,
337 &batches,
338 creator_txn_id,
339 TXN_ID_NONE,
340 0,
341 &mvcc_builder,
342 ) {
343 Ok(result) => {
344 self.txn_manager.mark_committed(creator_txn_id);
345 result
346 }
347 Err(err) => {
348 self.txn_manager.mark_aborted(creator_txn_id);
349 let field_ids: Vec<FieldId> =
350 table_columns.iter().map(|column| column.field_id).collect();
351 let _ = self
352 .catalog_service
353 .drop_table(&canonical_name, table_id, &field_ids);
354 return Err(err);
355 }
356 };
357
358 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
359 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
360
361 let mut tables = self.tables.write().unwrap();
362 if tables.contains_key(&canonical_name) {
363 if if_not_exists {
364 return Ok(RuntimeStatementResult::CreateTable {
365 table_name: display_name,
366 });
367 }
368 return Err(Error::CatalogError(format!(
369 "Catalog Error: Table '{}' already exists",
370 display_name
371 )));
372 }
373 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
374 drop(tables); Ok(RuntimeStatementResult::CreateTable {
377 table_name: display_name,
378 })
379 }
380
381 pub fn create_table<C, I>(
385 self: &Arc<Self>,
386 name: &str,
387 columns: I,
388 ) -> Result<RuntimeTableHandle<P>>
389 where
390 C: IntoPlanColumnSpec,
391 I: IntoIterator<Item = C>,
392 {
393 self.create_table_with_options(name, columns, false)
394 }
395
396 pub fn create_table_if_not_exists<C, I>(
400 self: &Arc<Self>,
401 name: &str,
402 columns: I,
403 ) -> Result<RuntimeTableHandle<P>>
404 where
405 C: IntoPlanColumnSpec,
406 I: IntoIterator<Item = C>,
407 {
408 self.create_table_with_options(name, columns, true)
409 }
410
411 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
415 RuntimeCreateTableBuilder::new(self, name)
416 }
417
418 fn create_table_with_options<C, I>(
420 self: &Arc<Self>,
421 name: &str,
422 columns: I,
423 if_not_exists: bool,
424 ) -> Result<RuntimeTableHandle<P>>
425 where
426 C: IntoPlanColumnSpec,
427 I: IntoIterator<Item = C>,
428 {
429 let mut plan = CreateTablePlan::new(name);
430 plan.if_not_exists = if_not_exists;
431 plan.columns = columns
432 .into_iter()
433 .map(|column| column.into_plan_column_spec())
434 .collect();
435 let result = CatalogDdl::create_table(self.as_ref(), plan)?;
436 match result {
437 RuntimeStatementResult::CreateTable { .. } => {
438 RuntimeTableHandle::new(Arc::clone(self), name)
439 }
440 other => Err(Error::InvalidArgumentError(format!(
441 "unexpected statement result {other:?} when creating table"
442 ))),
443 }
444 }
445
446 pub(crate) fn execute_create_table(
450 &self,
451 plan: CreateTablePlan,
452 ) -> Result<RuntimeStatementResult<P>> {
453 CatalogDdl::create_table(self, plan)
454 }
455}