1#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_compute::time::current_time_micros;
18use llkv_plan::{DropIndexPlan, ForeignKeySpec, PlanColumnSpec};
19use llkv_result::{Error, Result as LlkvResult};
20use llkv_storage::pager::Pager;
21use rustc_hash::{FxHashMap, FxHashSet};
22use simd_r_drive_entry_handle::EntryHandle;
23
24use super::table_catalog::{FieldDefinition, TableCatalog};
25use crate::constraints::{ConstraintId, ConstraintKind};
26use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration, SingleColumnIndexEntry};
27use crate::sys_catalog::{
28 ColMeta, MultiColumnIndexEntryMeta, SysCatalog, TriggerEntryMeta, TriggerEventMeta,
29 TriggerTimingMeta,
30};
31use crate::table::Table;
32use crate::types::{FieldId, RowId, TableColumn, TableId};
33use crate::{
34 ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
35 ValidatedForeignKey,
36};
37
38pub struct CreateTableResult<P>
41where
42 P: Pager<Blob = EntryHandle> + Send + Sync,
43{
44 pub table_id: TableId,
45 pub table: Arc<Table<P>>,
46 pub table_columns: Vec<TableColumn>,
47 pub column_lookup: FxHashMap<String, usize>,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum SingleColumnIndexRegistration {
53 Created { index_name: String },
54 AlreadyExists { index_name: String },
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct SingleColumnIndexDescriptor {
60 pub index_name: String,
61 pub table_id: TableId,
62 pub canonical_table_name: String,
63 pub display_table_name: String,
64 pub field_id: FieldId,
65 pub column_name: String,
66 pub was_unique: bool,
67}
68
69pub trait MvccColumnBuilder: Send + Sync {
75 fn build_insert_columns(
77 &self,
78 row_count: usize,
79 start_row_id: RowId,
80 creator_txn_id: u64,
81 deleted_marker: u64,
82 ) -> (ArrayRef, ArrayRef, ArrayRef);
83
84 fn mvcc_fields(&self) -> Vec<Field>;
86
87 fn field_with_metadata(
89 &self,
90 name: &str,
91 data_type: DataType,
92 nullable: bool,
93 field_id: FieldId,
94 ) -> Field;
95}
96
97#[derive(Clone)]
102pub struct CatalogManager<P>
103where
104 P: Pager<Blob = EntryHandle> + Send + Sync,
105{
106 metadata: Arc<MetadataManager<P>>,
107 catalog: Arc<TableCatalog>,
108 store: Arc<ColumnStore<P>>,
109 type_registry: Arc<std::sync::RwLock<FxHashMap<String, sqlparser::ast::DataType>>>,
110}
111
112impl<P> CatalogManager<P>
113where
114 P: Pager<Blob = EntryHandle> + Send + Sync,
115{
116 pub fn new(
118 metadata: Arc<MetadataManager<P>>,
119 catalog: Arc<TableCatalog>,
120 store: Arc<ColumnStore<P>>,
121 ) -> Self {
122 Self {
123 metadata,
124 catalog,
125 store,
126 type_registry: Arc::new(std::sync::RwLock::new(FxHashMap::default())),
127 }
128 }
129
130 pub fn load_types_from_catalog(&self) -> LlkvResult<()> {
137 use crate::sys_catalog::SysCatalog;
138
139 let sys_catalog = SysCatalog::new(&self.store);
140 match sys_catalog.all_custom_type_metas() {
141 Ok(type_metas) => {
142 tracing::debug!(
143 "[CATALOG] Loaded {} custom type(s) from catalog",
144 type_metas.len()
145 );
146
147 let mut registry = self.type_registry.write().unwrap();
148 for type_meta in type_metas {
149 if let Ok(parsed_type) = parse_data_type_from_sql(&type_meta.base_type_sql) {
151 registry.insert(type_meta.name.to_lowercase(), parsed_type);
152 } else {
153 tracing::warn!(
154 "[CATALOG] Failed to parse base type SQL for type '{}': {}",
155 type_meta.name,
156 type_meta.base_type_sql
157 );
158 }
159 }
160
161 tracing::debug!(
162 "[CATALOG] Type registry initialized with {} type(s)",
163 registry.len()
164 );
165 Ok(())
166 }
167 Err(e) => {
168 tracing::warn!(
169 "[CATALOG] Failed to load custom types: {}, starting with empty type registry",
170 e
171 );
172 Ok(()) }
174 }
175 }
176
177 pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
179 let mut registry = self.type_registry.write().unwrap();
180 registry.insert(name.to_lowercase(), data_type);
181 }
182
183 pub fn drop_type(&self, name: &str) -> LlkvResult<()> {
185 let mut registry = self.type_registry.write().unwrap();
186 if registry.remove(&name.to_lowercase()).is_none() {
187 return Err(Error::InvalidArgumentError(format!(
188 "Type '{}' does not exist",
189 name
190 )));
191 }
192 Ok(())
193 }
194
195 pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
197 use sqlparser::ast::DataType;
198
199 match data_type {
200 DataType::Custom(obj_name, _) => {
201 let name = obj_name.to_string().to_lowercase();
202 let registry = self.type_registry.read().unwrap();
203 if let Some(base_type) = registry.get(&name) {
204 self.resolve_type(base_type)
206 } else {
207 data_type.clone()
209 }
210 }
211 _ => data_type.clone(),
213 }
214 }
215
216 pub fn create_view(
223 &self,
224 display_name: &str,
225 view_definition: String,
226 column_specs: Vec<PlanColumnSpec>,
227 ) -> LlkvResult<crate::types::TableId> {
228 if column_specs.is_empty() {
229 return Err(Error::InvalidArgumentError(
230 "CREATE VIEW requires at least one column".into(),
231 ));
232 }
233
234 use crate::sys_catalog::TableMeta;
235
236 let table_id = self.metadata.reserve_table_id()?;
238
239 let created_at_micros = SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .unwrap_or_default()
242 .as_micros() as u64;
243
244 let table_meta = TableMeta {
246 table_id,
247 name: Some(display_name.to_string()),
248 created_at_micros,
249 flags: 0,
250 epoch: 0,
251 view_definition: Some(view_definition),
252 };
253
254 let mut table_columns = Vec::with_capacity(column_specs.len());
255 for (idx, spec) in column_specs.iter().enumerate() {
256 let field_id = field_id_for_index(idx)?;
257 table_columns.push(TableColumn {
258 field_id,
259 name: spec.name.clone(),
260 data_type: spec.data_type.clone(),
261 nullable: spec.nullable,
262 primary_key: spec.primary_key,
263 unique: spec.unique,
264 check_expr: spec.check_expr.clone(),
265 });
266 }
267
268 self.metadata.set_table_meta(table_id, table_meta)?;
270 self.metadata
271 .apply_column_definitions(table_id, &table_columns, created_at_micros)?;
272 self.metadata.flush_table(table_id)?;
273
274 self.catalog.register_table(display_name, table_id)?;
276
277 if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
278 for column in &table_columns {
279 let definition = FieldDefinition::new(&column.name)
280 .with_primary_key(column.primary_key)
281 .with_unique(column.unique)
282 .with_check_expr(column.check_expr.clone());
283 if let Err(err) = field_resolver.register_field(definition) {
284 self.catalog.unregister_table(table_id);
285 self.metadata.remove_table_state(table_id);
286 return Err(err);
287 }
288 }
289 }
290
291 tracing::debug!("Created view '{}' with table_id={}", display_name, table_id);
292 Ok(table_id)
293 }
294
295 pub fn is_view(&self, table_id: crate::types::TableId) -> LlkvResult<bool> {
298 match self.metadata.table_meta(table_id)? {
299 Some(meta) => Ok(meta.view_definition.is_some()),
300 None => Ok(false),
301 }
302 }
303
304 pub fn drop_view(&self, canonical_name: &str, table_id: TableId) -> LlkvResult<()> {
306 let (_, field_ids) = self.sorted_user_fields(table_id);
307 self.metadata.prepare_table_drop(table_id, &field_ids)?;
308 self.metadata.flush_table(table_id)?;
309 self.metadata.remove_table_state(table_id);
310
311 if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
312 let _ = self.catalog.unregister_table(table_id_from_catalog);
313 } else {
314 let _ = self.catalog.unregister_table(table_id);
315 }
316
317 Ok(())
318 }
319
320 pub(crate) fn create_table_from_columns(
329 &self,
330 display_name: &str,
331 canonical_name: &str,
332 columns: &[PlanColumnSpec],
333 ) -> LlkvResult<CreateTableResult<P>> {
334 if columns.is_empty() {
335 return Err(Error::InvalidArgumentError(
336 "CREATE TABLE requires at least one column".into(),
337 ));
338 }
339
340 let mut lookup: FxHashMap<String, usize> =
341 FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
342 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
343
344 for (idx, column) in columns.iter().enumerate() {
345 let normalized = column.name.to_ascii_lowercase();
346 if lookup.insert(normalized.clone(), idx).is_some() {
347 return Err(Error::InvalidArgumentError(format!(
348 "duplicate column name '{}' in table '{}'",
349 column.name, display_name
350 )));
351 }
352
353 table_columns.push(TableColumn {
354 field_id: field_id_for_index(idx)?,
355 name: column.name.clone(),
356 data_type: column.data_type.clone(),
357 nullable: column.nullable,
358 primary_key: column.primary_key,
359 unique: column.unique,
360 check_expr: column.check_expr.clone(),
361 });
362 }
363
364 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
365 }
366
367 pub fn create_table_from_schema(
369 &self,
370 display_name: &str,
371 canonical_name: &str,
372 schema: &Schema,
373 ) -> LlkvResult<CreateTableResult<P>> {
374 if schema.fields().is_empty() {
375 return Err(Error::InvalidArgumentError(
376 "CREATE TABLE AS SELECT requires at least one column".into(),
377 ));
378 }
379
380 let mut lookup: FxHashMap<String, usize> =
381 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
382 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
383
384 for (idx, field) in schema.fields().iter().enumerate() {
385 let data_type = match field.data_type() {
387 DataType::Int64
388 | DataType::Int32
389 | DataType::Float64
390 | DataType::Utf8
391 | DataType::Date32
392 | DataType::Boolean
393 | DataType::Struct(_)
394 | DataType::Decimal128(_, _) => field.data_type().clone(),
395 other => {
396 return Err(Error::InvalidArgumentError(format!(
397 "unsupported column type in CTAS result: {other:?}"
398 )));
399 }
400 };
401
402 let normalized = field.name().to_ascii_lowercase();
403 if lookup.insert(normalized.clone(), idx).is_some() {
404 return Err(Error::InvalidArgumentError(format!(
405 "duplicate column name '{}' in CTAS result",
406 field.name()
407 )));
408 }
409
410 table_columns.push(TableColumn {
411 field_id: field_id_for_index(idx)?,
412 name: field.name().to_string(),
413 data_type,
414 nullable: field.is_nullable(),
415 primary_key: false,
416 unique: false,
417 check_expr: None,
418 });
419 }
420
421 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
422 }
423
424 fn create_table_inner(
425 &self,
426 display_name: &str,
427 _canonical_name: &str,
428 table_columns: Vec<TableColumn>,
429 column_lookup: FxHashMap<String, usize>,
430 ) -> LlkvResult<CreateTableResult<P>> {
431 let table_id = self.metadata.reserve_table_id()?;
432 let timestamp = current_time_micros();
433 let table_meta = crate::sys_catalog::TableMeta {
434 table_id,
435 name: Some(display_name.to_string()),
436 created_at_micros: timestamp,
437 flags: 0,
438 epoch: 0,
439 view_definition: None, };
441
442 self.metadata.set_table_meta(table_id, table_meta)?;
443 self.metadata
444 .apply_column_definitions(table_id, &table_columns, timestamp)?;
445 self.metadata.flush_table(table_id)?;
446
447 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
448
449 tracing::debug!(
451 "[CATALOG_REGISTER] Registering table '{}' (id={}) in catalog @ {:p}",
452 display_name,
453 table_id,
454 &*self.catalog
455 );
456 if let Err(err) = self.catalog.register_table(display_name, table_id) {
457 self.metadata.remove_table_state(table_id);
458 return Err(err);
459 }
460
461 if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
462 for column in &table_columns {
463 let definition = FieldDefinition::new(&column.name)
464 .with_primary_key(column.primary_key)
465 .with_unique(column.unique)
466 .with_check_expr(column.check_expr.clone());
467 if let Err(err) = field_resolver.register_field(definition) {
468 self.catalog.unregister_table(table_id);
469 self.metadata.remove_table_state(table_id);
470 return Err(err);
471 }
472 }
473 }
474
475 Ok(CreateTableResult {
476 table_id,
477 table: Arc::new(table),
478 table_columns,
479 column_lookup,
480 })
481 }
482
483 pub fn drop_table(
485 &self,
486 canonical_name: &str,
487 table_id: TableId,
488 column_field_ids: &[FieldId],
489 ) -> LlkvResult<()> {
490 use llkv_types::LogicalFieldId;
491
492 self.metadata
493 .prepare_table_drop(table_id, column_field_ids)?;
494 self.metadata.flush_table(table_id)?;
495 self.metadata.remove_table_state(table_id);
496
497 for field_id in column_field_ids {
498 let logical_field_id = LogicalFieldId::for_user(table_id, *field_id);
499 self.store.remove_column(logical_field_id)?;
500 }
501
502 self.store
503 .remove_column(LogicalFieldId::for_mvcc_created_by(table_id))?;
504 self.store
505 .remove_column(LogicalFieldId::for_mvcc_deleted_by(table_id))?;
506
507 if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
508 let _ = self.catalog.unregister_table(table_id_from_catalog);
509 } else {
510 let _ = self.catalog.unregister_table(table_id);
511 }
512 Ok(())
513 }
514
515 pub fn rename_table(
517 &self,
518 table_id: TableId,
519 current_name: &str,
520 new_name: &str,
521 ) -> LlkvResult<()> {
522 if !current_name.eq_ignore_ascii_case(new_name) && self.catalog.table_id(new_name).is_some()
523 {
524 return Err(Error::CatalogError(format!(
525 "Table '{}' already exists",
526 new_name
527 )));
528 }
529
530 let previous_meta = self.metadata.table_meta(table_id)?;
531 let mut prior_snapshot = None;
532 if let Some(mut meta) = previous_meta.clone() {
533 prior_snapshot = Some(meta.clone());
534 meta.name = Some(new_name.to_string());
535 self.metadata.set_table_meta(table_id, meta)?;
536 }
537
538 if let Err(err) = self.catalog.rename_registered_table(current_name, new_name) {
539 if let Some(prior) = prior_snapshot {
540 let _ = self.metadata.set_table_meta(table_id, prior);
541 }
542 return Err(err);
543 }
544
545 if let Some(prior) = prior_snapshot.clone()
546 && let Err(err) = self.metadata.flush_table(table_id)
547 {
548 let _ = self.metadata.set_table_meta(table_id, prior);
549 let _ = self.catalog.rename_registered_table(new_name, current_name);
550 let _ = self.metadata.flush_table(table_id);
551 return Err(err);
552 }
553
554 Ok(())
555 }
556
557 pub fn rename_column(
559 &self,
560 table_id: TableId,
561 old_column_name: &str,
562 new_column_name: &str,
563 ) -> LlkvResult<()> {
564 let (_, field_ids) = self.sorted_user_fields(table_id);
566 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
567
568 let mut found_col: Option<(u32, ColMeta)> = None;
570 for (idx, meta_opt) in column_metas.iter().enumerate() {
571 if let Some(meta) = meta_opt
572 && let Some(name) = &meta.name
573 && name.eq_ignore_ascii_case(old_column_name)
574 {
575 found_col = Some((field_ids[idx], meta.clone()));
576 break;
577 }
578 }
579
580 let (_field_id, mut col_meta) = found_col.ok_or_else(|| {
581 Error::InvalidArgumentError(format!("column '{}' not found in table", old_column_name))
582 })?;
583
584 col_meta.name = Some(new_column_name.to_string());
586
587 let catalog = SysCatalog::new(&self.store);
589 catalog.put_col_meta(table_id, &col_meta);
590
591 self.metadata.set_column_meta(table_id, col_meta)?;
593
594 if let Some(resolver) = self.catalog.field_resolver(table_id) {
596 resolver.rename_field(old_column_name, new_column_name)?;
597 }
598
599 self.metadata.flush_table(table_id)?;
600
601 Ok(())
602 }
603
604 pub fn alter_column_type(
615 &self,
616 table_id: TableId,
617 column_name: &str,
618 new_data_type: &DataType,
619 ) -> LlkvResult<()> {
620 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
622 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
623
624 let mut found_col: Option<(usize, u32, ColMeta)> = None;
626 for (idx, meta_opt) in column_metas.iter().enumerate() {
627 if let Some(meta) = meta_opt
628 && let Some(name) = &meta.name
629 && name.eq_ignore_ascii_case(column_name)
630 {
631 found_col = Some((idx, field_ids[idx], meta.clone()));
632 break;
633 }
634 }
635
636 let (col_idx, _field_id, col_meta) = found_col.ok_or_else(|| {
637 Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
638 })?;
639
640 let lfid = logical_fields[col_idx];
642 self.store.update_data_type(lfid, new_data_type)?;
643
644 let catalog = SysCatalog::new(&self.store);
646 catalog.put_col_meta(table_id, &col_meta);
647
648 self.metadata.set_column_meta(table_id, col_meta)?;
650
651 Ok(())
652 }
653
654 pub fn drop_column(&self, table_id: TableId, column_name: &str) -> LlkvResult<()> {
656 let (_, field_ids) = self.sorted_user_fields(table_id);
658 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
659
660 let mut found_col_id: Option<u32> = None;
662 for (idx, meta_opt) in column_metas.iter().enumerate() {
663 if let Some(meta) = meta_opt
664 && let Some(name) = &meta.name
665 && name.eq_ignore_ascii_case(column_name)
666 {
667 found_col_id = Some(field_ids[idx]);
668 break;
669 }
670 }
671
672 let col_id = found_col_id.ok_or_else(|| {
673 Error::InvalidArgumentError(format!("column '{}' not found in table", column_name))
674 })?;
675
676 use llkv_types::{LogicalFieldId, LogicalStorageNamespace};
678 let logical_field_id =
679 LogicalFieldId::from_parts(LogicalStorageNamespace::UserData, table_id, col_id);
680 self.store.remove_column(logical_field_id)?;
681
682 let catalog = SysCatalog::new(&self.store);
684 catalog.delete_col_meta(table_id, &[col_id])?;
685
686 Ok(())
687 }
688
689 #[allow(clippy::too_many_arguments)]
692 pub fn register_single_column_index(
693 &self,
694 display_name: &str,
695 canonical_name: &str,
696 table: &Table<P>,
697 field_id: FieldId,
698 column_name: &str,
699 index_name: Option<String>,
700 mark_unique: bool,
701 ascending: bool,
702 nulls_first: bool,
703 if_not_exists: bool,
704 ) -> LlkvResult<SingleColumnIndexRegistration> {
705 let table_id = table.table_id();
706 let existing_indexes = table.list_registered_indexes(field_id)?;
707 if existing_indexes.contains(&IndexKind::Sort) {
708 let existing_name = self
709 .metadata
710 .single_column_indexes(table_id)?
711 .into_iter()
712 .find(|entry| entry.column_id == field_id)
713 .map(|entry| entry.index_name)
714 .unwrap_or_else(|| column_name.to_string());
715
716 if if_not_exists {
717 return Ok(SingleColumnIndexRegistration::AlreadyExists {
718 index_name: existing_name,
719 });
720 }
721
722 return Err(Error::CatalogError(format!(
723 "Index already exists on column '{}' in table '{}'",
724 column_name, display_name
725 )));
726 }
727
728 let index_display_name = match index_name {
729 Some(name) => name,
730 None => {
731 self.generate_single_column_index_name(table_id, canonical_name, column_name)?
732 }
733 };
734 if index_display_name.is_empty() {
735 return Err(Error::InvalidArgumentError(
736 "Index name must not be empty".into(),
737 ));
738 }
739 let canonical_index_name = index_display_name.to_ascii_lowercase();
740
741 if let Some(existing) = self
742 .metadata
743 .single_column_index(table_id, &canonical_index_name)?
744 {
745 if if_not_exists {
746 return Ok(SingleColumnIndexRegistration::AlreadyExists {
747 index_name: existing.index_name,
748 });
749 }
750
751 return Err(Error::CatalogError(format!(
752 "Index '{}' already exists on table '{}'",
753 existing.index_name, display_name
754 )));
755 }
756
757 let entry = SingleColumnIndexEntry {
758 index_name: index_display_name.clone(),
759 canonical_name: canonical_index_name,
760 column_id: field_id,
761 column_name: column_name.to_string(),
762 unique: mark_unique,
763 ascending,
764 nulls_first,
765 };
766
767 self.metadata.put_single_column_index(table_id, entry)?;
768 self.metadata.register_sort_index(table_id, field_id)?;
769
770 if mark_unique {
771 let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
772 if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
773 resolver.set_field_unique(column_name, true)?;
774 }
775 }
776
777 self.metadata.flush_table(table_id)?;
778
779 Ok(SingleColumnIndexRegistration::Created {
780 index_name: index_display_name,
781 })
782 }
783
784 pub fn drop_single_column_index(
785 &self,
786 plan: DropIndexPlan,
787 ) -> LlkvResult<Option<SingleColumnIndexDescriptor>> {
788 let canonical_index = plan.canonical_name.to_ascii_lowercase();
789 let snapshot = self.catalog.snapshot();
790
791 for canonical_table_name in snapshot.table_names() {
792 let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
793 continue;
794 };
795
796 if let Some(entry) = self
797 .metadata
798 .single_column_index(table_id, &canonical_index)?
799 {
800 self.metadata
801 .remove_single_column_index(table_id, &canonical_index)?;
802
803 if entry.unique
804 && let Some(resolver) = self.catalog.field_resolver(table_id)
805 {
806 resolver.set_field_unique(&entry.column_name, false)?;
807 }
808
809 self.metadata.flush_table(table_id)?;
810
811 let display_table_name = self
812 .metadata
813 .table_meta(table_id)?
814 .and_then(|meta| meta.name)
815 .unwrap_or_else(|| canonical_table_name.clone());
816
817 return Ok(Some(SingleColumnIndexDescriptor {
818 index_name: entry.index_name,
819 table_id,
820 canonical_table_name,
821 display_table_name,
822 field_id: entry.column_id,
823 column_name: entry.column_name,
824 was_unique: entry.unique,
825 }));
826 }
827 }
828
829 if plan.if_exists {
830 Ok(None)
831 } else {
832 Err(Error::CatalogError(format!(
833 "Index '{}' does not exist",
834 plan.name
835 )))
836 }
837 }
838
839 pub fn register_multi_column_unique_index(
841 &self,
842 table_id: TableId,
843 field_ids: &[FieldId],
844 index_name: Option<String>,
845 ) -> LlkvResult<MultiColumnUniqueRegistration> {
846 let registration = self
847 .metadata
848 .register_multi_column_unique(table_id, field_ids, index_name)?;
849
850 if matches!(registration, MultiColumnUniqueRegistration::Created) {
851 self.metadata.flush_table(table_id)?;
852 }
853
854 Ok(registration)
855 }
856
857 #[allow(clippy::too_many_arguments)]
858 pub fn create_trigger(
859 &self,
860 trigger_display_name: &str,
861 canonical_trigger_name: &str,
862 table_display_name: &str,
863 canonical_table_name: &str,
864 timing: TriggerTimingMeta,
865 event: TriggerEventMeta,
866 for_each_row: bool,
867 condition: Option<String>,
868 body_sql: String,
869 if_not_exists: bool,
870 ) -> LlkvResult<bool> {
871 let Some(table_id) = self.catalog.table_id(canonical_table_name) else {
872 return Err(Error::CatalogError(format!(
873 "Table '{}' does not exist",
874 table_display_name
875 )));
876 };
877
878 let table_meta = self.metadata.table_meta(table_id)?;
879 let is_view = table_meta
880 .as_ref()
881 .and_then(|meta| meta.view_definition.as_ref())
882 .is_some();
883
884 match timing {
885 TriggerTimingMeta::InsteadOf => {
886 if !is_view {
887 return Err(Error::InvalidArgumentError(format!(
888 "INSTEAD OF trigger '{}' requires a view target",
889 trigger_display_name
890 )));
891 }
892 }
893 _ => {
894 if is_view {
895 return Err(Error::InvalidArgumentError(format!(
896 "Trigger '{}' must use INSTEAD OF when targeting a view",
897 trigger_display_name
898 )));
899 }
900 }
901 }
902
903 if self
904 .metadata
905 .trigger(table_id, canonical_trigger_name)?
906 .is_some()
907 {
908 if if_not_exists {
909 return Ok(false);
910 }
911 return Err(Error::CatalogError(format!(
912 "Trigger '{}' already exists",
913 trigger_display_name
914 )));
915 }
916
917 let entry = TriggerEntryMeta {
918 name: trigger_display_name.to_string(),
919 canonical_name: canonical_trigger_name.to_string(),
920 timing,
921 event,
922 for_each_row,
923 condition,
924 body_sql,
925 };
926
927 self.metadata.insert_trigger(table_id, entry)?;
928 self.metadata.flush_table(table_id)?;
929 Ok(true)
930 }
931
932 pub fn drop_trigger(
933 &self,
934 trigger_display_name: &str,
935 canonical_trigger_name: &str,
936 table_hint_display: Option<&str>,
937 table_hint_canonical: Option<&str>,
938 if_exists: bool,
939 ) -> LlkvResult<bool> {
940 let mut candidate_tables: Vec<(TableId, String)> = Vec::new();
941
942 if let Some(canonical_table) = table_hint_canonical {
943 match self.catalog.table_id(canonical_table) {
944 Some(table_id) => candidate_tables.push((table_id, canonical_table.to_string())),
945 None => {
946 if if_exists {
947 return Ok(false);
948 }
949 let display = table_hint_display.unwrap_or(canonical_table);
950 return Err(Error::CatalogError(format!(
951 "Table '{}' does not exist",
952 display
953 )));
954 }
955 }
956 } else {
957 let snapshot = self.catalog.snapshot();
958 for canonical_table in snapshot.table_names() {
959 if let Some(table_id) = snapshot.table_id(&canonical_table) {
960 candidate_tables.push((table_id, canonical_table));
961 }
962 }
963 }
964
965 for (table_id, canonical_table) in candidate_tables {
966 if self
967 .metadata
968 .remove_trigger(table_id, canonical_trigger_name)?
969 {
970 self.metadata.flush_table(table_id)?;
971 return Ok(true);
972 } else if table_hint_canonical.is_some()
973 && table_hint_canonical
974 .unwrap()
975 .eq_ignore_ascii_case(&canonical_table)
976 {
977 break;
978 }
979 }
980
981 if if_exists {
982 Ok(false)
983 } else {
984 Err(Error::CatalogError(format!(
985 "Trigger '{}' does not exist",
986 trigger_display_name
987 )))
988 }
989 }
990
991 pub fn register_multi_column_index(
995 &self,
996 table_id: TableId,
997 field_ids: &[FieldId],
998 index_name: String,
999 unique: bool,
1000 ) -> LlkvResult<bool> {
1001 let canonical_name = index_name.to_lowercase();
1002
1003 if let Some(_existing) = self
1005 .metadata
1006 .get_multi_column_index(table_id, &canonical_name)?
1007 {
1008 return Ok(false);
1009 }
1010
1011 let entry = MultiColumnIndexEntryMeta {
1013 index_name: Some(index_name),
1014 canonical_name,
1015 column_ids: field_ids.to_vec(),
1016 unique,
1017 };
1018
1019 self.metadata.put_multi_column_index(table_id, entry)?;
1020 self.metadata.flush_table(table_id)?;
1021
1022 Ok(true)
1023 }
1024
1025 fn generate_single_column_index_name(
1026 &self,
1027 table_id: TableId,
1028 canonical_table_name: &str,
1029 column_name: &str,
1030 ) -> LlkvResult<String> {
1031 let table_token = if canonical_table_name.is_empty() {
1032 "table".to_string()
1033 } else {
1034 canonical_table_name.replace('.', "_")
1035 };
1036 let column_token = column_name.to_ascii_lowercase();
1037
1038 let mut candidate = format!("{}_{}_idx", table_token, column_token);
1039 let mut suffix: u32 = 1;
1040 loop {
1041 let canonical = candidate.to_ascii_lowercase();
1042 if self
1043 .metadata
1044 .single_column_index(table_id, &canonical)?
1045 .is_none()
1046 {
1047 return Ok(candidate);
1048 }
1049
1050 candidate = format!("{}_{}_idx{}", table_token, column_token, suffix);
1051 suffix = suffix.checked_add(1).ok_or_else(|| {
1052 Error::InvalidArgumentError("exhausted unique index name generation space".into())
1053 })?;
1054 }
1055 }
1056
1057 #[allow(clippy::too_many_arguments)]
1059 pub fn append_batches_with_mvcc(
1060 &self,
1061 table: &Table<P>,
1062 table_columns: &[TableColumn],
1063 batches: &[RecordBatch],
1064 creator_txn_id: u64,
1065 deleted_marker: u64,
1066 starting_row_id: RowId,
1067 mvcc_builder: &dyn MvccColumnBuilder,
1068 ) -> LlkvResult<(RowId, u64)> {
1069 let mut next_row_id = starting_row_id;
1070 let mut total_rows: u64 = 0;
1071
1072 for batch in batches {
1073 if batch.num_rows() == 0 {
1074 continue;
1075 }
1076
1077 if batch.num_columns() != table_columns.len() {
1078 return Err(Error::InvalidArgumentError(format!(
1079 "CTAS query returned unexpected column count (expected {}, found {})",
1080 table_columns.len(),
1081 batch.num_columns()
1082 )));
1083 }
1084
1085 let row_count = batch.num_rows();
1086
1087 let (row_id_array, created_by_array, deleted_by_array) = mvcc_builder
1088 .build_insert_columns(row_count, next_row_id, creator_txn_id, deleted_marker);
1089
1090 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
1091 arrays.push(row_id_array);
1092 arrays.push(created_by_array);
1093 arrays.push(deleted_by_array);
1094
1095 let mut fields = mvcc_builder.mvcc_fields();
1096
1097 for (idx, column) in table_columns.iter().enumerate() {
1098 let array = batch.column(idx).clone();
1099 let field = mvcc_builder.field_with_metadata(
1100 &column.name,
1101 column.data_type.clone(),
1102 column.nullable,
1103 column.field_id,
1104 );
1105 arrays.push(array);
1106 fields.push(field);
1107 }
1108
1109 let append_schema = Arc::new(Schema::new(fields));
1110 let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
1111 table.append(&append_batch)?;
1112
1113 next_row_id = next_row_id.saturating_add(row_count as u64);
1114 total_rows = total_rows.saturating_add(row_count as u64);
1115 }
1116
1117 Ok((next_row_id, total_rows))
1118 }
1119
1120 #[allow(clippy::too_many_arguments)]
1122 pub fn register_foreign_keys_for_new_table<F>(
1123 &self,
1124 table_id: TableId,
1125 display_name: &str,
1126 canonical_name: &str,
1127 table_columns: &[TableColumn],
1128 specs: &[ForeignKeySpec],
1129 lookup_table: F,
1130 timestamp_micros: u64,
1131 ) -> LlkvResult<Vec<ValidatedForeignKey>>
1132 where
1133 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
1134 {
1135 if specs.is_empty() {
1136 return Ok(Vec::new());
1137 }
1138
1139 let referencing_columns: Vec<ForeignKeyColumn> = table_columns
1140 .iter()
1141 .map(|column| ForeignKeyColumn {
1142 name: column.name.clone(),
1143 data_type: column.data_type.clone(),
1144 nullable: column.nullable,
1145 primary_key: column.primary_key,
1146 unique: column.unique,
1147 field_id: column.field_id,
1148 })
1149 .collect();
1150
1151 let multi_column_uniques = {
1152 let catalog = SysCatalog::new(&self.store);
1153 let all_indexes = catalog.get_multi_column_indexes(table_id)?;
1154 all_indexes.into_iter().filter(|idx| idx.unique).collect()
1155 };
1156
1157 let referencing_table = ForeignKeyTableInfo {
1158 display_name: display_name.to_string(),
1159 canonical_name: canonical_name.to_string(),
1160 table_id,
1161 columns: referencing_columns,
1162 multi_column_uniques,
1163 };
1164
1165 self.metadata.validate_and_register_foreign_keys(
1166 &referencing_table,
1167 specs,
1168 lookup_table,
1169 timestamp_micros,
1170 )
1171 }
1172
1173 pub fn referenced_table_info(
1175 &self,
1176 views: &[ForeignKeyView],
1177 ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
1178 let mut results = Vec::with_capacity(views.len());
1179 for view in views {
1180 let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
1181 return Err(Error::CatalogError(format!(
1182 "Catalog Error: referenced table '{}' does not exist",
1183 view.referenced_table_display
1184 )));
1185 };
1186
1187 let Some(resolver) = self.catalog.field_resolver(table_id) else {
1188 return Err(Error::Internal(format!(
1189 "catalog resolver missing for table '{}'",
1190 view.referenced_table_display
1191 )));
1192 };
1193
1194 let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
1195 for field_id in &view.referenced_field_ids {
1196 let info = resolver.field_info(*field_id).ok_or_else(|| {
1197 Error::Internal(format!(
1198 "field metadata missing for id {} in table '{}'",
1199 field_id, view.referenced_table_display
1200 ))
1201 })?;
1202
1203 let data_type = self.metadata.column_data_type(table_id, *field_id)?;
1204
1205 columns.push(ForeignKeyColumn {
1206 name: info.display_name.to_string(),
1207 data_type,
1208 nullable: !info.constraints.primary_key,
1209 primary_key: info.constraints.primary_key,
1210 unique: info.constraints.unique,
1211 field_id: *field_id,
1212 });
1213 }
1214
1215 let multi_column_uniques = {
1216 let catalog = SysCatalog::new(&self.store);
1217 let all_indexes = catalog.get_multi_column_indexes(table_id)?;
1218 all_indexes.into_iter().filter(|idx| idx.unique).collect()
1219 };
1220
1221 results.push(ForeignKeyTableInfo {
1222 display_name: view.referenced_table_display.clone(),
1223 canonical_name: view.referenced_table_canonical.clone(),
1224 table_id,
1225 columns,
1226 multi_column_uniques,
1227 });
1228 }
1229
1230 Ok(results)
1231 }
1232
1233 pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
1235 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1236 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1237 })?;
1238
1239 let (_, field_ids) = self.sorted_user_fields(table_id);
1240 self.table_view_with_field_ids(table_id, &field_ids)
1241 }
1242
1243 pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<PlanColumnSpec>> {
1245 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1246 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1247 })?;
1248
1249 let resolver = self
1250 .catalog
1251 .field_resolver(table_id)
1252 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1253
1254 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
1255
1256 let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
1257 let column_metas = table_view.column_metas;
1258 let constraint_records = table_view.constraint_records;
1259
1260 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
1261 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
1262 let mut has_primary_key_records = false;
1263 let mut has_single_unique_records = false;
1264
1265 for record in constraint_records
1266 .iter()
1267 .filter(|record| record.is_active())
1268 {
1269 match &record.kind {
1270 ConstraintKind::PrimaryKey(pk) => {
1271 has_primary_key_records = true;
1272 for field_id in &pk.field_ids {
1273 metadata_primary_keys.insert(*field_id);
1274 metadata_unique_fields.insert(*field_id);
1275 }
1276 }
1277 ConstraintKind::Unique(unique) => {
1278 if unique.field_ids.len() == 1 {
1279 has_single_unique_records = true;
1280 if let Some(field_id) = unique.field_ids.first() {
1281 metadata_unique_fields.insert(*field_id);
1282 }
1283 }
1284 }
1285 _ => {}
1286 }
1287 }
1288
1289 let mut specs = Vec::with_capacity(field_ids.len());
1290
1291 for (idx, lfid) in logical_fields.iter().enumerate() {
1292 let field_id = lfid.field_id();
1293
1294 let column_name = column_metas
1295 .get(idx)
1296 .and_then(|meta| meta.as_ref())
1297 .and_then(|meta| meta.name.clone())
1298 .unwrap_or_else(|| format!("col_{}", field_id));
1299
1300 let fallback_constraints = resolver
1301 .field_constraints_by_name(&column_name)
1302 .unwrap_or_default();
1303
1304 let metadata_primary = metadata_primary_keys.contains(&field_id);
1305 let primary_key = if has_primary_key_records {
1306 metadata_primary
1307 } else {
1308 fallback_constraints.primary_key
1309 };
1310
1311 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
1312 let unique = if has_primary_key_records || has_single_unique_records {
1313 metadata_unique
1314 } else {
1315 fallback_constraints.primary_key || fallback_constraints.unique
1316 };
1317
1318 let data_type = self.store.data_type(*lfid)?;
1319 let nullable = !primary_key;
1320
1321 let mut spec = PlanColumnSpec::new(column_name.clone(), data_type, nullable)
1322 .with_primary_key(primary_key)
1323 .with_unique(unique);
1324
1325 if let Some(check_expr) = fallback_constraints.check_expr.clone() {
1326 spec = spec.with_check(Some(check_expr));
1327 }
1328
1329 specs.push(spec);
1330 }
1331
1332 Ok(specs)
1333 }
1334
1335 pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
1337 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1338 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1339 })?;
1340
1341 self.metadata.foreign_key_views(&self.catalog, table_id)
1342 }
1343
1344 pub fn table_constraint_summary(
1346 &self,
1347 canonical_name: &str,
1348 ) -> LlkvResult<TableConstraintSummaryView> {
1349 tracing::trace!(
1350 "[TABLE_CONSTRAINT_SUMMARY] Looking up table '{}' in catalog @ {:p}",
1351 canonical_name,
1352 &*self.catalog
1353 );
1354 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
1355 tracing::error!(
1356 "[TABLE_CONSTRAINT_SUMMARY] Table '{}' NOT FOUND in catalog @ {:p}",
1357 canonical_name,
1358 &*self.catalog
1359 );
1360 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
1361 })?;
1362 tracing::trace!(
1363 "[TABLE_CONSTRAINT_SUMMARY] Found table '{}' with id={} in catalog",
1364 canonical_name,
1365 table_id
1366 );
1367
1368 let (_, field_ids) = self.sorted_user_fields(table_id);
1369 let table_meta = self.metadata.table_meta(table_id)?;
1370 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
1371 let constraint_records = self.metadata.constraint_records(table_id)?;
1372 let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
1373 let constraint_names = self.metadata.constraint_names(table_id)?;
1374
1375 Ok(TableConstraintSummaryView {
1376 table_meta,
1377 column_metas,
1378 constraint_records,
1379 multi_column_uniques,
1380 constraint_names,
1381 })
1382 }
1383
1384 fn sorted_user_fields(
1385 &self,
1386 table_id: TableId,
1387 ) -> (Vec<llkv_types::LogicalFieldId>, Vec<FieldId>) {
1388 let mut logical_fields = self.store.user_field_ids_for_table(table_id);
1389 logical_fields.sort_by_key(|lfid| lfid.field_id());
1390 let field_ids = logical_fields
1391 .iter()
1392 .map(|lfid| lfid.field_id())
1393 .collect::<Vec<_>>();
1394
1395 (logical_fields, field_ids)
1396 }
1397
1398 fn table_view_with_field_ids(
1399 &self,
1400 table_id: TableId,
1401 field_ids: &[FieldId],
1402 ) -> LlkvResult<TableView> {
1403 self.metadata.table_view(&self.catalog, table_id, field_ids)
1404 }
1405
1406 pub fn table_names(&self) -> Vec<String> {
1412 self.catalog.table_names()
1413 }
1414
1415 pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
1417 self.catalog.table_id(canonical_name)
1418 }
1419
1420 pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
1422 self.catalog.field_resolver(table_id)
1423 }
1424
1425 pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
1427 self.catalog.snapshot()
1428 }
1429
1430 pub fn catalog(&self) -> &Arc<TableCatalog> {
1433 &self.catalog
1434 }
1435
1436 pub fn foreign_keys_referencing(
1439 &self,
1440 referenced_table_id: TableId,
1441 ) -> LlkvResult<Vec<(TableId, ConstraintId)>> {
1442 self.metadata.foreign_keys_referencing(referenced_table_id)
1443 }
1444
1445 pub fn foreign_key_views_for_table(
1448 &self,
1449 table_id: TableId,
1450 ) -> LlkvResult<Vec<ForeignKeyView>> {
1451 self.metadata.foreign_key_views(&self.catalog, table_id)
1452 }
1453}
1454
1455fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
1456 FieldId::try_from(idx + 1).map_err(|_| {
1457 Error::Internal(format!(
1458 "column index {} exceeded supported field id range",
1459 idx + 1
1460 ))
1461 })
1462}
1463
1464fn parse_data_type_from_sql(sql: &str) -> LlkvResult<sqlparser::ast::DataType> {
1466 use sqlparser::dialect::GenericDialect;
1467 use sqlparser::parser::Parser;
1468
1469 let create_sql = format!("CREATE DOMAIN dummy AS {}", sql);
1471 let dialect = GenericDialect {};
1472
1473 match Parser::parse_sql(&dialect, &create_sql) {
1474 Ok(stmts) if !stmts.is_empty() => {
1475 if let sqlparser::ast::Statement::CreateDomain(create_domain) = &stmts[0] {
1476 Ok(create_domain.data_type.clone())
1477 } else {
1478 Err(Error::InvalidArgumentError(format!(
1479 "Failed to parse type from SQL: {}",
1480 sql
1481 )))
1482 }
1483 }
1484 _ => Err(Error::InvalidArgumentError(format!(
1485 "Failed to parse type from SQL: {}",
1486 sql
1487 ))),
1488 }
1489}