1#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{DropIndexPlan, ForeignKeySpec, PlanColumnSpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use super::table_catalog::{FieldDefinition, TableCatalog};
24use crate::constraints::{ConstraintId, ConstraintKind};
25use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration, SingleColumnIndexEntry};
26use crate::sys_catalog::{ColMeta, MultiColumnIndexEntryMeta, SysCatalog};
27use crate::table::Table;
28use crate::types::{FieldId, RowId, TableColumn, TableId};
29use crate::{
30 ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
31 ValidatedForeignKey,
32};
33
34pub struct CreateTableResult<P>
37where
38 P: Pager<Blob = EntryHandle> + Send + Sync,
39{
40 pub table_id: TableId,
41 pub table: Arc<Table<P>>,
42 pub table_columns: Vec<TableColumn>,
43 pub column_lookup: FxHashMap<String, usize>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum SingleColumnIndexRegistration {
49 Created { index_name: String },
50 AlreadyExists { index_name: String },
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct SingleColumnIndexDescriptor {
56 pub index_name: String,
57 pub table_id: TableId,
58 pub canonical_table_name: String,
59 pub display_table_name: String,
60 pub field_id: FieldId,
61 pub column_name: String,
62 pub was_unique: bool,
63}
64
65pub trait MvccColumnBuilder: Send + Sync {
71 fn build_insert_columns(
73 &self,
74 row_count: usize,
75 start_row_id: RowId,
76 creator_txn_id: u64,
77 deleted_marker: u64,
78 ) -> (ArrayRef, ArrayRef, ArrayRef);
79
80 fn mvcc_fields(&self) -> Vec<Field>;
82
83 fn field_with_metadata(
85 &self,
86 name: &str,
87 data_type: DataType,
88 nullable: bool,
89 field_id: FieldId,
90 ) -> Field;
91}
92
93#[derive(Clone)]
98pub struct CatalogManager<P>
99where
100 P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102 metadata: Arc<MetadataManager<P>>,
103 catalog: Arc<TableCatalog>,
104 store: Arc<ColumnStore<P>>,
105 type_registry: Arc<std::sync::RwLock<FxHashMap<String, sqlparser::ast::DataType>>>,
106}
107
108impl<P> CatalogManager<P>
109where
110 P: Pager<Blob = EntryHandle> + Send + Sync,
111{
112 pub fn new(
114 metadata: Arc<MetadataManager<P>>,
115 catalog: Arc<TableCatalog>,
116 store: Arc<ColumnStore<P>>,
117 ) -> Self {
118 Self {
119 metadata,
120 catalog,
121 store,
122 type_registry: Arc::new(std::sync::RwLock::new(FxHashMap::default())),
123 }
124 }
125
126 pub fn load_types_from_catalog(&self) -> LlkvResult<()> {
133 use crate::sys_catalog::SysCatalog;
134
135 let sys_catalog = SysCatalog::new(&self.store);
136 match sys_catalog.all_custom_type_metas() {
137 Ok(type_metas) => {
138 tracing::debug!(
139 "[CATALOG] Loaded {} custom type(s) from catalog",
140 type_metas.len()
141 );
142
143 let mut registry = self.type_registry.write().unwrap();
144 for type_meta in type_metas {
145 if let Ok(parsed_type) = parse_data_type_from_sql(&type_meta.base_type_sql) {
147 registry.insert(type_meta.name.to_lowercase(), parsed_type);
148 } else {
149 tracing::warn!(
150 "[CATALOG] Failed to parse base type SQL for type '{}': {}",
151 type_meta.name,
152 type_meta.base_type_sql
153 );
154 }
155 }
156
157 tracing::debug!(
158 "[CATALOG] Type registry initialized with {} type(s)",
159 registry.len()
160 );
161 Ok(())
162 }
163 Err(e) => {
164 tracing::warn!(
165 "[CATALOG] Failed to load custom types: {}, starting with empty type registry",
166 e
167 );
168 Ok(()) }
170 }
171 }
172
173 pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
175 let mut registry = self.type_registry.write().unwrap();
176 registry.insert(name.to_lowercase(), data_type);
177 }
178
179 pub fn drop_type(&self, name: &str) -> LlkvResult<()> {
181 let mut registry = self.type_registry.write().unwrap();
182 if registry.remove(&name.to_lowercase()).is_none() {
183 return Err(Error::InvalidArgumentError(format!(
184 "Type '{}' does not exist",
185 name
186 )));
187 }
188 Ok(())
189 }
190
191 pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
193 use sqlparser::ast::DataType;
194
195 match data_type {
196 DataType::Custom(obj_name, _) => {
197 let name = obj_name.to_string().to_lowercase();
198 let registry = self.type_registry.read().unwrap();
199 if let Some(base_type) = registry.get(&name) {
200 self.resolve_type(base_type)
202 } else {
203 data_type.clone()
205 }
206 }
207 _ => data_type.clone(),
209 }
210 }
211
212 pub fn create_view(
219 &self,
220 display_name: &str,
221 view_definition: String,
222 ) -> LlkvResult<crate::types::TableId> {
223 use crate::sys_catalog::TableMeta;
224
225 let table_id = self.metadata.reserve_table_id()?;
227
228 let created_at_micros = SystemTime::now()
229 .duration_since(UNIX_EPOCH)
230 .unwrap_or_default()
231 .as_micros() as u64;
232
233 let table_meta = TableMeta {
235 table_id,
236 name: Some(display_name.to_string()),
237 created_at_micros,
238 flags: 0,
239 epoch: 0,
240 view_definition: Some(view_definition),
241 };
242
243 self.metadata.set_table_meta(table_id, table_meta)?;
245 self.metadata.flush_table(table_id)?;
246
247 self.catalog.register_table(display_name, table_id)?;
249
250 tracing::debug!("Created view '{}' with table_id={}", display_name, table_id);
251 Ok(table_id)
252 }
253
254 pub fn is_view(&self, table_id: crate::types::TableId) -> LlkvResult<bool> {
257 match self.metadata.table_meta(table_id)? {
258 Some(meta) => Ok(meta.view_definition.is_some()),
259 None => Ok(false),
260 }
261 }
262
263 pub(crate) fn create_table_from_columns(
272 &self,
273 display_name: &str,
274 canonical_name: &str,
275 columns: &[PlanColumnSpec],
276 ) -> LlkvResult<CreateTableResult<P>> {
277 if columns.is_empty() {
278 return Err(Error::InvalidArgumentError(
279 "CREATE TABLE requires at least one column".into(),
280 ));
281 }
282
283 let mut lookup: FxHashMap<String, usize> =
284 FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
285 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
286
287 for (idx, column) in columns.iter().enumerate() {
288 let normalized = column.name.to_ascii_lowercase();
289 if lookup.insert(normalized.clone(), idx).is_some() {
290 return Err(Error::InvalidArgumentError(format!(
291 "duplicate column name '{}' in table '{}'",
292 column.name, display_name
293 )));
294 }
295
296 table_columns.push(TableColumn {
297 field_id: field_id_for_index(idx)?,
298 name: column.name.clone(),
299 data_type: column.data_type.clone(),
300 nullable: column.nullable,
301 primary_key: column.primary_key,
302 unique: column.unique,
303 check_expr: column.check_expr.clone(),
304 });
305 }
306
307 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
308 }
309
310 pub fn create_table_from_schema(
312 &self,
313 display_name: &str,
314 canonical_name: &str,
315 schema: &Schema,
316 ) -> LlkvResult<CreateTableResult<P>> {
317 if schema.fields().is_empty() {
318 return Err(Error::InvalidArgumentError(
319 "CREATE TABLE AS SELECT requires at least one column".into(),
320 ));
321 }
322
323 let mut lookup: FxHashMap<String, usize> =
324 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
325 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
326
327 for (idx, field) in schema.fields().iter().enumerate() {
328 let data_type = match field.data_type() {
329 DataType::Int64
330 | DataType::Float64
331 | DataType::Utf8
332 | DataType::Date32
333 | DataType::Struct(_) => field.data_type().clone(),
334 other => {
335 return Err(Error::InvalidArgumentError(format!(
336 "unsupported column type in CTAS result: {other:?}"
337 )));
338 }
339 };
340
341 let normalized = field.name().to_ascii_lowercase();
342 if lookup.insert(normalized.clone(), idx).is_some() {
343 return Err(Error::InvalidArgumentError(format!(
344 "duplicate column name '{}' in CTAS result",
345 field.name()
346 )));
347 }
348
349 table_columns.push(TableColumn {
350 field_id: field_id_for_index(idx)?,
351 name: field.name().to_string(),
352 data_type,
353 nullable: field.is_nullable(),
354 primary_key: false,
355 unique: false,
356 check_expr: None,
357 });
358 }
359
360 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
361 }
362
363 fn create_table_inner(
364 &self,
365 display_name: &str,
366 _canonical_name: &str,
367 table_columns: Vec<TableColumn>,
368 column_lookup: FxHashMap<String, usize>,
369 ) -> LlkvResult<CreateTableResult<P>> {
370 let table_id = self.metadata.reserve_table_id()?;
371 let timestamp = current_time_micros();
372 let table_meta = crate::sys_catalog::TableMeta {
373 table_id,
374 name: Some(display_name.to_string()),
375 created_at_micros: timestamp,
376 flags: 0,
377 epoch: 0,
378 view_definition: None, };
380
381 self.metadata.set_table_meta(table_id, table_meta)?;
382 self.metadata
383 .apply_column_definitions(table_id, &table_columns, timestamp)?;
384 self.metadata.flush_table(table_id)?;
385
386 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
387
388 if let Err(err) = self.catalog.register_table(display_name, table_id) {
390 self.metadata.remove_table_state(table_id);
391 return Err(err);
392 }
393
394 if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
395 for column in &table_columns {
396 let definition = FieldDefinition::new(&column.name)
397 .with_primary_key(column.primary_key)
398 .with_unique(column.unique)
399 .with_check_expr(column.check_expr.clone());
400 if let Err(err) = field_resolver.register_field(definition) {
401 self.catalog.unregister_table(table_id);
402 self.metadata.remove_table_state(table_id);
403 return Err(err);
404 }
405 }
406 }
407
408 Ok(CreateTableResult {
409 table_id,
410 table: Arc::new(table),
411 table_columns,
412 column_lookup,
413 })
414 }
415
416 pub fn drop_table(
418 &self,
419 canonical_name: &str,
420 table_id: TableId,
421 column_field_ids: &[FieldId],
422 ) -> LlkvResult<()> {
423 self.metadata
424 .prepare_table_drop(table_id, column_field_ids)?;
425 self.metadata.flush_table(table_id)?;
426 self.metadata.remove_table_state(table_id);
427 if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
428 let _ = self.catalog.unregister_table(table_id_from_catalog);
429 } else {
430 let _ = self.catalog.unregister_table(table_id);
431 }
432 Ok(())
433 }
434
435 pub fn rename_table(
437 &self,
438 table_id: TableId,
439 current_name: &str,
440 new_name: &str,
441 ) -> LlkvResult<()> {
442 if !current_name.eq_ignore_ascii_case(new_name) && self.catalog.table_id(new_name).is_some()
443 {
444 return Err(Error::CatalogError(format!(
445 "Table '{}' already exists",
446 new_name
447 )));
448 }
449
450 let previous_meta = self.metadata.table_meta(table_id)?;
451 let mut prior_snapshot = None;
452 if let Some(mut meta) = previous_meta.clone() {
453 prior_snapshot = Some(meta.clone());
454 meta.name = Some(new_name.to_string());
455 self.metadata.set_table_meta(table_id, meta)?;
456 }
457
458 if let Err(err) = self.catalog.rename_registered_table(current_name, new_name) {
459 if let Some(prior) = prior_snapshot {
460 let _ = self.metadata.set_table_meta(table_id, prior);
461 }
462 return Err(err);
463 }
464
465 if let Some(prior) = prior_snapshot.clone()
466 && let Err(err) = self.metadata.flush_table(table_id)
467 {
468 let _ = self.metadata.set_table_meta(table_id, prior);
469 let _ = self.catalog.rename_registered_table(new_name, current_name);
470 let _ = self.metadata.flush_table(table_id);
471 return Err(err);
472 }
473
474 Ok(())
475 }
476
477 pub fn rename_column(
479 &self,
480 table_id: TableId,
481 old_column_name: &str,
482 new_column_name: &str,
483 ) -> LlkvResult<()> {
484 let (_, field_ids) = self.sorted_user_fields(table_id);
486 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
487
488 let mut found_col: Option<(u32, ColMeta)> = None;
490 for (idx, meta_opt) in column_metas.iter().enumerate() {
491 if let Some(meta) = meta_opt
492 && let Some(name) = &meta.name
493 && name.eq_ignore_ascii_case(old_column_name)
494 {
495 found_col = Some((field_ids[idx], meta.clone()));
496 break;
497 }
498 }
499
500 let (_field_id, mut col_meta) = found_col.ok_or_else(|| {
501 Error::InvalidArgumentError(format!("column '{}' not found in table", old_column_name))
502 })?;
503
504 col_meta.name = Some(new_column_name.to_string());
506
507 let catalog = SysCatalog::new(&self.store);
509 catalog.put_col_meta(table_id, &col_meta);
510
511 self.metadata.set_column_meta(table_id, col_meta)?;
513
514 if let Some(resolver) = self.catalog.field_resolver(table_id) {
516 resolver.rename_field(old_column_name, new_column_name)?;
517 }
518
519 self.metadata.flush_table(table_id)?;
520
521 Ok(())
522 }
523
524 pub fn alter_column_type(
535 &self,
536 table_id: TableId,
537 column_name: &str,
538 new_data_type: &DataType,
539 ) -> LlkvResult<()> {
540 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
542 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
543
544 let mut found_col: Option<(usize, u32, ColMeta)> = None;
546 for (idx, meta_opt) in column_metas.iter().enumerate() {
547 if let Some(meta) = meta_opt
548 && let Some(name) = &meta.name
549 && name.eq_ignore_ascii_case(column_name)
550 {
551 found_col = Some((idx, field_ids[idx], meta.clone()));
552 break;
553 }
554 }
555
556 let (col_idx, _field_id, col_meta) = found_col.ok_or_else(|| {
557 Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
558 })?;
559
560 let lfid = logical_fields[col_idx];
562 self.store.update_data_type(lfid, new_data_type)?;
563
564 let catalog = SysCatalog::new(&self.store);
566 catalog.put_col_meta(table_id, &col_meta);
567
568 self.metadata.set_column_meta(table_id, col_meta)?;
570
571 Ok(())
572 }
573
574 pub fn drop_column(&self, table_id: TableId, column_name: &str) -> LlkvResult<()> {
576 let (_, field_ids) = self.sorted_user_fields(table_id);
578 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
579
580 let mut found_col_id: Option<u32> = None;
582 for (idx, meta_opt) in column_metas.iter().enumerate() {
583 if let Some(meta) = meta_opt
584 && let Some(name) = &meta.name
585 && name.eq_ignore_ascii_case(column_name)
586 {
587 found_col_id = Some(field_ids[idx]);
588 break;
589 }
590 }
591
592 let col_id = found_col_id.ok_or_else(|| {
593 Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
594 })?;
595
596 let catalog = SysCatalog::new(&self.store);
598 catalog.delete_col_meta(table_id, &[col_id])?;
599
600 Ok(())
601 }
602
603 #[allow(clippy::too_many_arguments)]
606 pub fn register_single_column_index(
607 &self,
608 display_name: &str,
609 canonical_name: &str,
610 table: &Table<P>,
611 field_id: FieldId,
612 column_name: &str,
613 index_name: Option<String>,
614 mark_unique: bool,
615 ascending: bool,
616 nulls_first: bool,
617 if_not_exists: bool,
618 ) -> LlkvResult<SingleColumnIndexRegistration> {
619 let table_id = table.table_id();
620 let existing_indexes = table.list_registered_indexes(field_id)?;
621 if existing_indexes.contains(&IndexKind::Sort) {
622 let existing_name = self
623 .metadata
624 .single_column_indexes(table_id)?
625 .into_iter()
626 .find(|entry| entry.column_id == field_id)
627 .map(|entry| entry.index_name)
628 .unwrap_or_else(|| column_name.to_string());
629
630 if if_not_exists {
631 return Ok(SingleColumnIndexRegistration::AlreadyExists {
632 index_name: existing_name,
633 });
634 }
635
636 return Err(Error::CatalogError(format!(
637 "Index already exists on column '{}' in table '{}'",
638 column_name, display_name
639 )));
640 }
641
642 let index_display_name = match index_name {
643 Some(name) => name,
644 None => {
645 self.generate_single_column_index_name(table_id, canonical_name, column_name)?
646 }
647 };
648 if index_display_name.is_empty() {
649 return Err(Error::InvalidArgumentError(
650 "Index name must not be empty".into(),
651 ));
652 }
653 let canonical_index_name = index_display_name.to_ascii_lowercase();
654
655 if let Some(existing) = self
656 .metadata
657 .single_column_index(table_id, &canonical_index_name)?
658 {
659 if if_not_exists {
660 return Ok(SingleColumnIndexRegistration::AlreadyExists {
661 index_name: existing.index_name,
662 });
663 }
664
665 return Err(Error::CatalogError(format!(
666 "Index '{}' already exists on table '{}'",
667 existing.index_name, display_name
668 )));
669 }
670
671 let entry = SingleColumnIndexEntry {
672 index_name: index_display_name.clone(),
673 canonical_name: canonical_index_name,
674 column_id: field_id,
675 column_name: column_name.to_string(),
676 unique: mark_unique,
677 ascending,
678 nulls_first,
679 };
680
681 self.metadata.put_single_column_index(table_id, entry)?;
682 self.metadata.register_sort_index(table_id, field_id)?;
683
684 if mark_unique {
685 let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
686 if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
687 resolver.set_field_unique(column_name, true)?;
688 }
689 }
690
691 self.metadata.flush_table(table_id)?;
692
693 Ok(SingleColumnIndexRegistration::Created {
694 index_name: index_display_name,
695 })
696 }
697
698 pub fn drop_single_column_index(
699 &self,
700 plan: DropIndexPlan,
701 ) -> LlkvResult<Option<SingleColumnIndexDescriptor>> {
702 let canonical_index = plan.canonical_name.to_ascii_lowercase();
703 let snapshot = self.catalog.snapshot();
704
705 for canonical_table_name in snapshot.table_names() {
706 let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
707 continue;
708 };
709
710 if let Some(entry) = self
711 .metadata
712 .single_column_index(table_id, &canonical_index)?
713 {
714 self.metadata
715 .remove_single_column_index(table_id, &canonical_index)?;
716
717 if entry.unique
718 && let Some(resolver) = self.catalog.field_resolver(table_id)
719 {
720 resolver.set_field_unique(&entry.column_name, false)?;
721 }
722
723 self.metadata.flush_table(table_id)?;
724
725 let display_table_name = self
726 .metadata
727 .table_meta(table_id)?
728 .and_then(|meta| meta.name)
729 .unwrap_or_else(|| canonical_table_name.clone());
730
731 return Ok(Some(SingleColumnIndexDescriptor {
732 index_name: entry.index_name,
733 table_id,
734 canonical_table_name,
735 display_table_name,
736 field_id: entry.column_id,
737 column_name: entry.column_name,
738 was_unique: entry.unique,
739 }));
740 }
741 }
742
743 if plan.if_exists {
744 Ok(None)
745 } else {
746 Err(Error::CatalogError(format!(
747 "Index '{}' does not exist",
748 plan.name
749 )))
750 }
751 }
752
753 pub fn register_multi_column_unique_index(
755 &self,
756 table_id: TableId,
757 field_ids: &[FieldId],
758 index_name: Option<String>,
759 ) -> LlkvResult<MultiColumnUniqueRegistration> {
760 let registration = self
761 .metadata
762 .register_multi_column_unique(table_id, field_ids, index_name)?;
763
764 if matches!(registration, MultiColumnUniqueRegistration::Created) {
765 self.metadata.flush_table(table_id)?;
766 }
767
768 Ok(registration)
769 }
770
771 pub fn register_multi_column_index(
775 &self,
776 table_id: TableId,
777 field_ids: &[FieldId],
778 index_name: String,
779 unique: bool,
780 ) -> LlkvResult<bool> {
781 let canonical_name = index_name.to_lowercase();
782
783 if let Some(_existing) = self
785 .metadata
786 .get_multi_column_index(table_id, &canonical_name)?
787 {
788 return Ok(false);
789 }
790
791 let entry = MultiColumnIndexEntryMeta {
793 index_name: Some(index_name),
794 canonical_name,
795 column_ids: field_ids.to_vec(),
796 unique,
797 };
798
799 self.metadata.put_multi_column_index(table_id, entry)?;
800 self.metadata.flush_table(table_id)?;
801
802 Ok(true)
803 }
804
805 fn generate_single_column_index_name(
806 &self,
807 table_id: TableId,
808 canonical_table_name: &str,
809 column_name: &str,
810 ) -> LlkvResult<String> {
811 let table_token = if canonical_table_name.is_empty() {
812 "table".to_string()
813 } else {
814 canonical_table_name.replace('.', "_")
815 };
816 let column_token = column_name.to_ascii_lowercase();
817
818 let mut candidate = format!("{}_{}_idx", table_token, column_token);
819 let mut suffix: u32 = 1;
820 loop {
821 let canonical = candidate.to_ascii_lowercase();
822 if self
823 .metadata
824 .single_column_index(table_id, &canonical)?
825 .is_none()
826 {
827 return Ok(candidate);
828 }
829
830 candidate = format!("{}_{}_idx{}", table_token, column_token, suffix);
831 suffix = suffix.checked_add(1).ok_or_else(|| {
832 Error::InvalidArgumentError("exhausted unique index name generation space".into())
833 })?;
834 }
835 }
836
837 #[allow(clippy::too_many_arguments)]
839 pub fn append_batches_with_mvcc(
840 &self,
841 table: &Table<P>,
842 table_columns: &[TableColumn],
843 batches: &[RecordBatch],
844 creator_txn_id: u64,
845 deleted_marker: u64,
846 starting_row_id: RowId,
847 mvcc_builder: &dyn MvccColumnBuilder,
848 ) -> LlkvResult<(RowId, u64)> {
849 let mut next_row_id = starting_row_id;
850 let mut total_rows: u64 = 0;
851
852 for batch in batches {
853 if batch.num_rows() == 0 {
854 continue;
855 }
856
857 if batch.num_columns() != table_columns.len() {
858 return Err(Error::InvalidArgumentError(format!(
859 "CTAS query returned unexpected column count (expected {}, found {})",
860 table_columns.len(),
861 batch.num_columns()
862 )));
863 }
864
865 let row_count = batch.num_rows();
866
867 let (row_id_array, created_by_array, deleted_by_array) = mvcc_builder
868 .build_insert_columns(row_count, next_row_id, creator_txn_id, deleted_marker);
869
870 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
871 arrays.push(row_id_array);
872 arrays.push(created_by_array);
873 arrays.push(deleted_by_array);
874
875 let mut fields = mvcc_builder.mvcc_fields();
876
877 for (idx, column) in table_columns.iter().enumerate() {
878 let array = batch.column(idx).clone();
879 let field = mvcc_builder.field_with_metadata(
880 &column.name,
881 column.data_type.clone(),
882 column.nullable,
883 column.field_id,
884 );
885 arrays.push(array);
886 fields.push(field);
887 }
888
889 let append_schema = Arc::new(Schema::new(fields));
890 let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
891 table.append(&append_batch)?;
892
893 next_row_id = next_row_id.saturating_add(row_count as u64);
894 total_rows = total_rows.saturating_add(row_count as u64);
895 }
896
897 Ok((next_row_id, total_rows))
898 }
899
900 #[allow(clippy::too_many_arguments)]
902 pub fn register_foreign_keys_for_new_table<F>(
903 &self,
904 table_id: TableId,
905 display_name: &str,
906 canonical_name: &str,
907 table_columns: &[TableColumn],
908 specs: &[ForeignKeySpec],
909 lookup_table: F,
910 timestamp_micros: u64,
911 ) -> LlkvResult<Vec<ValidatedForeignKey>>
912 where
913 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
914 {
915 if specs.is_empty() {
916 return Ok(Vec::new());
917 }
918
919 let referencing_columns: Vec<ForeignKeyColumn> = table_columns
920 .iter()
921 .map(|column| ForeignKeyColumn {
922 name: column.name.clone(),
923 data_type: column.data_type.clone(),
924 nullable: column.nullable,
925 primary_key: column.primary_key,
926 unique: column.unique,
927 field_id: column.field_id,
928 })
929 .collect();
930
931 let multi_column_uniques = {
932 let catalog = SysCatalog::new(&self.store);
933 let all_indexes = catalog.get_multi_column_indexes(table_id)?;
934 all_indexes.into_iter().filter(|idx| idx.unique).collect()
935 };
936
937 let referencing_table = ForeignKeyTableInfo {
938 display_name: display_name.to_string(),
939 canonical_name: canonical_name.to_string(),
940 table_id,
941 columns: referencing_columns,
942 multi_column_uniques,
943 };
944
945 self.metadata.validate_and_register_foreign_keys(
946 &referencing_table,
947 specs,
948 lookup_table,
949 timestamp_micros,
950 )
951 }
952
953 pub fn referenced_table_info(
955 &self,
956 views: &[ForeignKeyView],
957 ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
958 let mut results = Vec::with_capacity(views.len());
959 for view in views {
960 let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
961 return Err(Error::CatalogError(format!(
962 "Catalog Error: referenced table '{}' does not exist",
963 view.referenced_table_display
964 )));
965 };
966
967 let Some(resolver) = self.catalog.field_resolver(table_id) else {
968 return Err(Error::Internal(format!(
969 "catalog resolver missing for table '{}'",
970 view.referenced_table_display
971 )));
972 };
973
974 let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
975 for field_id in &view.referenced_field_ids {
976 let info = resolver.field_info(*field_id).ok_or_else(|| {
977 Error::Internal(format!(
978 "field metadata missing for id {} in table '{}'",
979 field_id, view.referenced_table_display
980 ))
981 })?;
982
983 let data_type = self.metadata.column_data_type(table_id, *field_id)?;
984
985 columns.push(ForeignKeyColumn {
986 name: info.display_name.to_string(),
987 data_type,
988 nullable: !info.constraints.primary_key,
989 primary_key: info.constraints.primary_key,
990 unique: info.constraints.unique,
991 field_id: *field_id,
992 });
993 }
994
995 let multi_column_uniques = {
996 let catalog = SysCatalog::new(&self.store);
997 let all_indexes = catalog.get_multi_column_indexes(table_id)?;
998 all_indexes.into_iter().filter(|idx| idx.unique).collect()
999 };
1000
1001 results.push(ForeignKeyTableInfo {
1002 display_name: view.referenced_table_display.clone(),
1003 canonical_name: view.referenced_table_canonical.clone(),
1004 table_id,
1005 columns,
1006 multi_column_uniques,
1007 });
1008 }
1009
1010 Ok(results)
1011 }
1012
1013 pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
1015 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1016 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1017 })?;
1018
1019 let (_, field_ids) = self.sorted_user_fields(table_id);
1020 self.table_view_with_field_ids(table_id, &field_ids)
1021 }
1022
1023 pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<PlanColumnSpec>> {
1025 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1026 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1027 })?;
1028
1029 let resolver = self
1030 .catalog
1031 .field_resolver(table_id)
1032 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1033
1034 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
1035
1036 let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
1037 let column_metas = table_view.column_metas;
1038 let constraint_records = table_view.constraint_records;
1039
1040 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
1041 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
1042 let mut has_primary_key_records = false;
1043 let mut has_single_unique_records = false;
1044
1045 for record in constraint_records
1046 .iter()
1047 .filter(|record| record.is_active())
1048 {
1049 match &record.kind {
1050 ConstraintKind::PrimaryKey(pk) => {
1051 has_primary_key_records = true;
1052 for field_id in &pk.field_ids {
1053 metadata_primary_keys.insert(*field_id);
1054 metadata_unique_fields.insert(*field_id);
1055 }
1056 }
1057 ConstraintKind::Unique(unique) => {
1058 if unique.field_ids.len() == 1 {
1059 has_single_unique_records = true;
1060 if let Some(field_id) = unique.field_ids.first() {
1061 metadata_unique_fields.insert(*field_id);
1062 }
1063 }
1064 }
1065 _ => {}
1066 }
1067 }
1068
1069 let mut specs = Vec::with_capacity(field_ids.len());
1070
1071 for (idx, lfid) in logical_fields.iter().enumerate() {
1072 let field_id = lfid.field_id();
1073
1074 let column_name = column_metas
1075 .get(idx)
1076 .and_then(|meta| meta.as_ref())
1077 .and_then(|meta| meta.name.clone())
1078 .unwrap_or_else(|| format!("col_{}", field_id));
1079
1080 let fallback_constraints = resolver
1081 .field_constraints_by_name(&column_name)
1082 .unwrap_or_default();
1083
1084 let metadata_primary = metadata_primary_keys.contains(&field_id);
1085 let primary_key = if has_primary_key_records {
1086 metadata_primary
1087 } else {
1088 fallback_constraints.primary_key
1089 };
1090
1091 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
1092 let unique = if has_primary_key_records || has_single_unique_records {
1093 metadata_unique
1094 } else {
1095 fallback_constraints.primary_key || fallback_constraints.unique
1096 };
1097
1098 let data_type = self.store.data_type(*lfid)?;
1099 let nullable = !primary_key;
1100
1101 let mut spec = PlanColumnSpec::new(column_name.clone(), data_type, nullable)
1102 .with_primary_key(primary_key)
1103 .with_unique(unique);
1104
1105 if let Some(check_expr) = fallback_constraints.check_expr.clone() {
1106 spec = spec.with_check(Some(check_expr));
1107 }
1108
1109 specs.push(spec);
1110 }
1111
1112 Ok(specs)
1113 }
1114
1115 pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
1117 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1118 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1119 })?;
1120
1121 self.metadata.foreign_key_views(&self.catalog, table_id)
1122 }
1123
1124 pub fn table_constraint_summary(
1126 &self,
1127 canonical_name: &str,
1128 ) -> LlkvResult<TableConstraintSummaryView> {
1129 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1130 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1131 })?;
1132
1133 let (_, field_ids) = self.sorted_user_fields(table_id);
1134 let table_meta = self.metadata.table_meta(table_id)?;
1135 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
1136 let constraint_records = self.metadata.constraint_records(table_id)?;
1137 let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
1138
1139 Ok(TableConstraintSummaryView {
1140 table_meta,
1141 column_metas,
1142 constraint_records,
1143 multi_column_uniques,
1144 })
1145 }
1146
1147 fn sorted_user_fields(
1148 &self,
1149 table_id: TableId,
1150 ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
1151 let mut logical_fields = self.store.user_field_ids_for_table(table_id);
1152 logical_fields.sort_by_key(|lfid| lfid.field_id());
1153 let field_ids = logical_fields
1154 .iter()
1155 .map(|lfid| lfid.field_id())
1156 .collect::<Vec<_>>();
1157
1158 (logical_fields, field_ids)
1159 }
1160
1161 fn table_view_with_field_ids(
1162 &self,
1163 table_id: TableId,
1164 field_ids: &[FieldId],
1165 ) -> LlkvResult<TableView> {
1166 self.metadata.table_view(&self.catalog, table_id, field_ids)
1167 }
1168
1169 pub fn table_names(&self) -> Vec<String> {
1175 self.catalog.table_names()
1176 }
1177
1178 pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
1180 self.catalog.table_id(canonical_name)
1181 }
1182
1183 pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
1185 self.catalog.field_resolver(table_id)
1186 }
1187
1188 pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
1190 self.catalog.snapshot()
1191 }
1192
1193 pub fn catalog(&self) -> &Arc<TableCatalog> {
1196 &self.catalog
1197 }
1198
1199 pub fn foreign_keys_referencing(
1202 &self,
1203 referenced_table_id: TableId,
1204 ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
1205 self.metadata.foreign_keys_referencing(referenced_table_id)
1206 }
1207
1208 pub fn foreign_key_views_for_table(
1211 &self,
1212 table_id: TableId,
1213 ) -> LlkvResult<Vec<ForeignKeyView>> {
1214 self.metadata.foreign_key_views(&self.catalog, table_id)
1215 }
1216}
1217
1218fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
1219 FieldId::try_from(idx + 1).map_err(|_| {
1220 Error::Internal(format!(
1221 "column index {} exceeded supported field id range",
1222 idx + 1
1223 ))
1224 })
1225}
1226
1227#[allow(clippy::unnecessary_wraps)]
1229fn current_time_micros() -> u64 {
1230 SystemTime::now()
1231 .duration_since(UNIX_EPOCH)
1232 .map(|duration| duration.as_micros() as u64)
1233 .unwrap_or(0)
1234}
1235
1236fn parse_data_type_from_sql(sql: &str) -> LlkvResult<sqlparser::ast::DataType> {
1238 use sqlparser::dialect::GenericDialect;
1239 use sqlparser::parser::Parser;
1240
1241 let create_sql = format!("CREATE DOMAIN dummy AS {}", sql);
1243 let dialect = GenericDialect {};
1244
1245 match Parser::parse_sql(&dialect, &create_sql) {
1246 Ok(stmts) if !stmts.is_empty() => {
1247 if let sqlparser::ast::Statement::CreateDomain(create_domain) = &stmts[0] {
1248 Ok(create_domain.data_type.clone())
1249 } else {
1250 Err(Error::InvalidArgumentError(format!(
1251 "Failed to parse type from SQL: {}",
1252 sql
1253 )))
1254 }
1255 }
1256 _ => Err(Error::InvalidArgumentError(format!(
1257 "Failed to parse type from SQL: {}",
1258 sql
1259 ))),
1260 }
1261}