1#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{ColumnSpec, ForeignKeySpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use crate::mvcc;
24
25use super::table_catalog::{FieldDefinition, TableCatalog};
26use crate::constraints::ConstraintKind;
27use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration};
28use crate::table::Table;
29use crate::types::{FieldId, RowId, TableColumn, TableId};
30use crate::{
31 ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
32 ValidatedForeignKey,
33};
34
35pub struct CreateTableResult<P>
38where
39 P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41 pub table_id: TableId,
42 pub table: Arc<Table<P>>,
43 pub table_columns: Vec<TableColumn>,
44 pub column_lookup: FxHashMap<String, usize>,
45}
46
47#[derive(Clone)]
52pub struct CatalogManager<P>
53where
54 P: Pager<Blob = EntryHandle> + Send + Sync,
55{
56 metadata: Arc<MetadataManager<P>>,
57 catalog: Arc<TableCatalog>,
58 store: Arc<ColumnStore<P>>,
59}
60
61impl<P> CatalogManager<P>
62where
63 P: Pager<Blob = EntryHandle> + Send + Sync,
64{
65 pub fn new(
66 metadata: Arc<MetadataManager<P>>,
67 catalog: Arc<TableCatalog>,
68 store: Arc<ColumnStore<P>>,
69 ) -> Self {
70 Self {
71 metadata,
72 catalog,
73 store,
74 }
75 }
76
77 pub(crate) fn create_table_from_columns(
82 &self,
83 display_name: &str,
84 canonical_name: &str,
85 columns: &[ColumnSpec],
86 ) -> LlkvResult<CreateTableResult<P>> {
87 if columns.is_empty() {
88 return Err(Error::InvalidArgumentError(
89 "CREATE TABLE requires at least one column".into(),
90 ));
91 }
92
93 let mut lookup: FxHashMap<String, usize> =
94 FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
95 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
96
97 for (idx, column) in columns.iter().enumerate() {
98 let normalized = column.name.to_ascii_lowercase();
99 if lookup.insert(normalized.clone(), idx).is_some() {
100 return Err(Error::InvalidArgumentError(format!(
101 "duplicate column name '{}' in table '{}'",
102 column.name, display_name
103 )));
104 }
105
106 table_columns.push(TableColumn {
107 field_id: field_id_for_index(idx)?,
108 name: column.name.clone(),
109 data_type: column.data_type.clone(),
110 nullable: column.nullable,
111 primary_key: column.primary_key,
112 unique: column.unique,
113 check_expr: column.check_expr.clone(),
114 });
115 }
116
117 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
118 }
119
120 pub fn create_table_from_schema(
122 &self,
123 display_name: &str,
124 canonical_name: &str,
125 schema: &Schema,
126 ) -> LlkvResult<CreateTableResult<P>> {
127 if schema.fields().is_empty() {
128 return Err(Error::InvalidArgumentError(
129 "CREATE TABLE AS SELECT requires at least one column".into(),
130 ));
131 }
132
133 let mut lookup: FxHashMap<String, usize> =
134 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
135 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
136
137 for (idx, field) in schema.fields().iter().enumerate() {
138 let data_type = match field.data_type() {
139 DataType::Int64
140 | DataType::Float64
141 | DataType::Utf8
142 | DataType::Date32
143 | DataType::Struct(_) => field.data_type().clone(),
144 other => {
145 return Err(Error::InvalidArgumentError(format!(
146 "unsupported column type in CTAS result: {other:?}"
147 )));
148 }
149 };
150
151 let normalized = field.name().to_ascii_lowercase();
152 if lookup.insert(normalized.clone(), idx).is_some() {
153 return Err(Error::InvalidArgumentError(format!(
154 "duplicate column name '{}' in CTAS result",
155 field.name()
156 )));
157 }
158
159 table_columns.push(TableColumn {
160 field_id: field_id_for_index(idx)?,
161 name: field.name().to_string(),
162 data_type,
163 nullable: field.is_nullable(),
164 primary_key: false,
165 unique: false,
166 check_expr: None,
167 });
168 }
169
170 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
171 }
172
173 fn create_table_inner(
174 &self,
175 display_name: &str,
176 _canonical_name: &str,
177 table_columns: Vec<TableColumn>,
178 column_lookup: FxHashMap<String, usize>,
179 ) -> LlkvResult<CreateTableResult<P>> {
180 let table_id = self.metadata.reserve_table_id()?;
181 let timestamp = current_time_micros();
182 let table_meta = crate::sys_catalog::TableMeta {
183 table_id,
184 name: Some(display_name.to_string()),
185 created_at_micros: timestamp,
186 flags: 0,
187 epoch: 0,
188 };
189
190 self.metadata.set_table_meta(table_id, table_meta)?;
191 self.metadata
192 .apply_column_definitions(table_id, &table_columns, timestamp)?;
193 self.metadata.flush_table(table_id)?;
194
195 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
196
197 if let Err(err) = self.catalog.register_table(display_name, table_id) {
199 self.metadata.remove_table_state(table_id);
200 return Err(err);
201 }
202
203 if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
204 for column in &table_columns {
205 let definition = FieldDefinition::new(&column.name)
206 .with_primary_key(column.primary_key)
207 .with_unique(column.unique)
208 .with_check_expr(column.check_expr.clone());
209 if let Err(err) = field_resolver.register_field(definition) {
210 self.catalog.unregister_table(table_id);
211 self.metadata.remove_table_state(table_id);
212 return Err(err);
213 }
214 }
215 }
216
217 Ok(CreateTableResult {
218 table_id,
219 table: Arc::new(table),
220 table_columns,
221 column_lookup,
222 })
223 }
224
225 pub fn drop_table(
227 &self,
228 canonical_name: &str,
229 table_id: TableId,
230 column_field_ids: &[FieldId],
231 ) -> LlkvResult<()> {
232 self.metadata
233 .prepare_table_drop(table_id, column_field_ids)?;
234 self.metadata.flush_table(table_id)?;
235 self.metadata.remove_table_state(table_id);
236 if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
237 let _ = self.catalog.unregister_table(table_id_from_catalog);
238 } else {
239 let _ = self.catalog.unregister_table(table_id);
240 }
241 Ok(())
242 }
243
244 #[allow(clippy::too_many_arguments)]
247 pub fn register_single_column_index(
248 &self,
249 display_name: &str,
250 canonical_name: &str,
251 table: &Table<P>,
252 field_id: FieldId,
253 column_name: &str,
254 mark_unique: bool,
255 if_not_exists: bool,
256 ) -> LlkvResult<bool> {
257 let existing_indexes = table.list_registered_indexes(field_id)?;
258 if existing_indexes.contains(&IndexKind::Sort) {
259 if if_not_exists {
260 return Ok(false);
261 }
262 return Err(Error::CatalogError(format!(
263 "Index already exists on column '{}' in table '{}'",
264 column_name, display_name
265 )));
266 }
267
268 let table_id = table.table_id();
269 self.metadata.register_sort_index(table_id, field_id)?;
270
271 if mark_unique {
272 let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
273 if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
274 resolver.set_field_unique(column_name, true)?;
275 }
276 }
277
278 self.metadata.flush_table(table_id)?;
279 Ok(true)
280 }
281
282 pub fn register_multi_column_unique_index(
284 &self,
285 table_id: TableId,
286 field_ids: &[FieldId],
287 index_name: Option<String>,
288 ) -> LlkvResult<MultiColumnUniqueRegistration> {
289 let registration = self
290 .metadata
291 .register_multi_column_unique(table_id, field_ids, index_name)?;
292
293 if matches!(registration, MultiColumnUniqueRegistration::Created) {
294 self.metadata.flush_table(table_id)?;
295 }
296
297 Ok(registration)
298 }
299
300 pub fn append_batches_with_mvcc(
302 &self,
303 table: &Table<P>,
304 table_columns: &[TableColumn],
305 batches: &[RecordBatch],
306 creator_txn_id: u64,
307 deleted_marker: u64,
308 starting_row_id: RowId,
309 ) -> LlkvResult<(RowId, u64)> {
310 let mut next_row_id = starting_row_id;
311 let mut total_rows: u64 = 0;
312
313 for batch in batches {
314 if batch.num_rows() == 0 {
315 continue;
316 }
317
318 if batch.num_columns() != table_columns.len() {
319 return Err(Error::InvalidArgumentError(format!(
320 "CTAS query returned unexpected column count (expected {}, found {})",
321 table_columns.len(),
322 batch.num_columns()
323 )));
324 }
325
326 let row_count = batch.num_rows();
327
328 let (row_id_array, created_by_array, deleted_by_array) =
329 mvcc::build_insert_mvcc_columns(
330 row_count,
331 next_row_id,
332 creator_txn_id,
333 deleted_marker,
334 );
335
336 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
337 arrays.push(row_id_array);
338 arrays.push(created_by_array);
339 arrays.push(deleted_by_array);
340
341 let mut fields = mvcc::build_mvcc_fields();
342
343 for (idx, column) in table_columns.iter().enumerate() {
344 let array = batch.column(idx).clone();
345 let field = mvcc::build_field_with_metadata(
346 &column.name,
347 column.data_type.clone(),
348 column.nullable,
349 column.field_id,
350 );
351 arrays.push(array);
352 fields.push(field);
353 }
354
355 let append_schema = Arc::new(Schema::new(fields));
356 let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
357 table.append(&append_batch)?;
358
359 next_row_id = next_row_id.saturating_add(row_count as u64);
360 total_rows = total_rows.saturating_add(row_count as u64);
361 }
362
363 Ok((next_row_id, total_rows))
364 }
365
366 #[allow(clippy::too_many_arguments)]
368 pub fn register_foreign_keys_for_new_table<F>(
369 &self,
370 table_id: TableId,
371 display_name: &str,
372 canonical_name: &str,
373 table_columns: &[TableColumn],
374 specs: &[ForeignKeySpec],
375 lookup_table: F,
376 timestamp_micros: u64,
377 ) -> LlkvResult<Vec<ValidatedForeignKey>>
378 where
379 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
380 {
381 if specs.is_empty() {
382 return Ok(Vec::new());
383 }
384
385 let referencing_columns: Vec<ForeignKeyColumn> = table_columns
386 .iter()
387 .map(|column| ForeignKeyColumn {
388 name: column.name.clone(),
389 data_type: column.data_type.clone(),
390 nullable: column.nullable,
391 primary_key: column.primary_key,
392 unique: column.unique,
393 field_id: column.field_id,
394 })
395 .collect();
396
397 let referencing_table = ForeignKeyTableInfo {
398 display_name: display_name.to_string(),
399 canonical_name: canonical_name.to_string(),
400 table_id,
401 columns: referencing_columns,
402 };
403
404 self.metadata.validate_and_register_foreign_keys(
405 &referencing_table,
406 specs,
407 lookup_table,
408 timestamp_micros,
409 )
410 }
411
412 pub fn referenced_table_info(
414 &self,
415 views: &[ForeignKeyView],
416 ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
417 let mut results = Vec::with_capacity(views.len());
418 for view in views {
419 let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
420 return Err(Error::InvalidArgumentError(format!(
421 "referenced table '{}' does not exist",
422 view.referenced_table_display
423 )));
424 };
425
426 let Some(resolver) = self.catalog.field_resolver(table_id) else {
427 return Err(Error::Internal(format!(
428 "catalog resolver missing for table '{}'",
429 view.referenced_table_display
430 )));
431 };
432
433 let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
434 for field_id in &view.referenced_field_ids {
435 let info = resolver.field_info(*field_id).ok_or_else(|| {
436 Error::Internal(format!(
437 "field metadata missing for id {} in table '{}'",
438 field_id, view.referenced_table_display
439 ))
440 })?;
441
442 let data_type = self.metadata.column_data_type(table_id, *field_id)?;
443
444 columns.push(ForeignKeyColumn {
445 name: info.display_name.to_string(),
446 data_type,
447 nullable: !info.constraints.primary_key,
448 primary_key: info.constraints.primary_key,
449 unique: info.constraints.unique,
450 field_id: *field_id,
451 });
452 }
453
454 results.push(ForeignKeyTableInfo {
455 display_name: view.referenced_table_display.clone(),
456 canonical_name: view.referenced_table_canonical.clone(),
457 table_id,
458 columns,
459 });
460 }
461
462 Ok(results)
463 }
464
465 pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
467 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
468 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
469 })?;
470
471 let (_, field_ids) = self.sorted_user_fields(table_id);
472 self.table_view_with_field_ids(table_id, &field_ids)
473 }
474
475 pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<ColumnSpec>> {
477 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
478 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
479 })?;
480
481 let resolver = self
482 .catalog
483 .field_resolver(table_id)
484 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
485
486 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
487
488 let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
489 let column_metas = table_view.column_metas;
490 let constraint_records = table_view.constraint_records;
491
492 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
493 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
494 let mut has_primary_key_records = false;
495 let mut has_single_unique_records = false;
496
497 for record in constraint_records
498 .iter()
499 .filter(|record| record.is_active())
500 {
501 match &record.kind {
502 ConstraintKind::PrimaryKey(pk) => {
503 has_primary_key_records = true;
504 for field_id in &pk.field_ids {
505 metadata_primary_keys.insert(*field_id);
506 metadata_unique_fields.insert(*field_id);
507 }
508 }
509 ConstraintKind::Unique(unique) => {
510 if unique.field_ids.len() == 1 {
511 has_single_unique_records = true;
512 if let Some(field_id) = unique.field_ids.first() {
513 metadata_unique_fields.insert(*field_id);
514 }
515 }
516 }
517 _ => {}
518 }
519 }
520
521 let mut specs = Vec::with_capacity(field_ids.len());
522
523 for (idx, lfid) in logical_fields.iter().enumerate() {
524 let field_id = lfid.field_id();
525
526 let column_name = column_metas
527 .get(idx)
528 .and_then(|meta| meta.as_ref())
529 .and_then(|meta| meta.name.clone())
530 .unwrap_or_else(|| format!("col_{}", field_id));
531
532 let fallback_constraints = resolver
533 .field_constraints_by_name(&column_name)
534 .unwrap_or_default();
535
536 let metadata_primary = metadata_primary_keys.contains(&field_id);
537 let primary_key = if has_primary_key_records {
538 metadata_primary
539 } else {
540 fallback_constraints.primary_key
541 };
542
543 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
544 let unique = if has_primary_key_records || has_single_unique_records {
545 metadata_unique
546 } else {
547 fallback_constraints.primary_key || fallback_constraints.unique
548 };
549
550 let data_type = self.store.data_type(*lfid)?;
551 let nullable = !primary_key;
552
553 let mut spec = ColumnSpec::new(column_name.clone(), data_type, nullable)
554 .with_primary_key(primary_key)
555 .with_unique(unique);
556
557 if let Some(check_expr) = fallback_constraints.check_expr.clone() {
558 spec = spec.with_check(Some(check_expr));
559 }
560
561 specs.push(spec);
562 }
563
564 Ok(specs)
565 }
566
567 pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
569 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
570 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
571 })?;
572
573 self.metadata.foreign_key_views(&self.catalog, table_id)
574 }
575
576 pub fn table_constraint_summary(
578 &self,
579 canonical_name: &str,
580 ) -> LlkvResult<TableConstraintSummaryView> {
581 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
582 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
583 })?;
584
585 let (_, field_ids) = self.sorted_user_fields(table_id);
586 let table_meta = self.metadata.table_meta(table_id)?;
587 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
588 let constraint_records = self.metadata.constraint_records(table_id)?;
589 let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
590
591 Ok(TableConstraintSummaryView {
592 table_meta,
593 column_metas,
594 constraint_records,
595 multi_column_uniques,
596 })
597 }
598
599 fn sorted_user_fields(
600 &self,
601 table_id: TableId,
602 ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
603 let mut logical_fields = self.store.user_field_ids_for_table(table_id);
604 logical_fields.sort_by_key(|lfid| lfid.field_id());
605 let field_ids = logical_fields
606 .iter()
607 .map(|lfid| lfid.field_id())
608 .collect::<Vec<_>>();
609
610 (logical_fields, field_ids)
611 }
612
613 fn table_view_with_field_ids(
614 &self,
615 table_id: TableId,
616 field_ids: &[FieldId],
617 ) -> LlkvResult<TableView> {
618 self.metadata.table_view(&self.catalog, table_id, field_ids)
619 }
620
621 pub fn table_names(&self) -> Vec<String> {
627 self.catalog.table_names()
628 }
629
630 pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
632 self.catalog.table_id(canonical_name)
633 }
634
635 pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
637 self.catalog.field_resolver(table_id)
638 }
639
640 pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
642 self.catalog.snapshot()
643 }
644
645 pub fn catalog(&self) -> &Arc<TableCatalog> {
648 &self.catalog
649 }
650}
651
652fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
653 FieldId::try_from(idx + 1).map_err(|_| {
654 Error::Internal(format!(
655 "column index {} exceeded supported field id range",
656 idx + 1
657 ))
658 })
659}
660
661#[allow(clippy::unnecessary_wraps)]
662fn current_time_micros() -> u64 {
663 SystemTime::now()
664 .duration_since(UNIX_EPOCH)
665 .map(|duration| duration.as_micros() as u64)
666 .unwrap_or(0)
667}