1#![forbid(unsafe_code)]
7
8use std::convert::TryFrom;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use arrow::array::ArrayRef;
13use arrow::datatypes::{DataType, Schema};
14use arrow::record_batch::RecordBatch;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::IndexKind;
17use llkv_plan::{ColumnSpec, ForeignKeySpec};
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22
23use crate::mvcc;
24
25use super::table_catalog::{FieldDefinition, TableCatalog};
26use crate::constraints::ConstraintKind;
27use crate::metadata::{MetadataManager, MultiColumnUniqueRegistration};
28use crate::table::Table;
29use crate::types::{FieldId, RowId, TableColumn, TableId};
30use crate::{
31 ForeignKeyColumn, ForeignKeyTableInfo, ForeignKeyView, TableConstraintSummaryView, TableView,
32 ValidatedForeignKey,
33};
34
35pub struct CreateTableResult<P>
38where
39 P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41 pub table_id: TableId,
42 pub table: Arc<Table<P>>,
43 pub table_columns: Vec<TableColumn>,
44 pub column_lookup: FxHashMap<String, usize>,
45}
46
47#[derive(Clone)]
52pub struct CatalogManager<P>
53where
54 P: Pager<Blob = EntryHandle> + Send + Sync,
55{
56 metadata: Arc<MetadataManager<P>>,
57 catalog: Arc<TableCatalog>,
58 store: Arc<ColumnStore<P>>,
59}
60
61impl<P> CatalogManager<P>
62where
63 P: Pager<Blob = EntryHandle> + Send + Sync,
64{
65 pub fn new(
67 metadata: Arc<MetadataManager<P>>,
68 catalog: Arc<TableCatalog>,
69 store: Arc<ColumnStore<P>>,
70 ) -> Self {
71 Self {
72 metadata,
73 catalog,
74 store,
75 }
76 }
77
78 pub(crate) fn create_table_from_columns(
83 &self,
84 display_name: &str,
85 canonical_name: &str,
86 columns: &[ColumnSpec],
87 ) -> LlkvResult<CreateTableResult<P>> {
88 if columns.is_empty() {
89 return Err(Error::InvalidArgumentError(
90 "CREATE TABLE requires at least one column".into(),
91 ));
92 }
93
94 let mut lookup: FxHashMap<String, usize> =
95 FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
96 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(columns.len());
97
98 for (idx, column) in columns.iter().enumerate() {
99 let normalized = column.name.to_ascii_lowercase();
100 if lookup.insert(normalized.clone(), idx).is_some() {
101 return Err(Error::InvalidArgumentError(format!(
102 "duplicate column name '{}' in table '{}'",
103 column.name, display_name
104 )));
105 }
106
107 table_columns.push(TableColumn {
108 field_id: field_id_for_index(idx)?,
109 name: column.name.clone(),
110 data_type: column.data_type.clone(),
111 nullable: column.nullable,
112 primary_key: column.primary_key,
113 unique: column.unique,
114 check_expr: column.check_expr.clone(),
115 });
116 }
117
118 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
119 }
120
121 pub fn create_table_from_schema(
123 &self,
124 display_name: &str,
125 canonical_name: &str,
126 schema: &Schema,
127 ) -> LlkvResult<CreateTableResult<P>> {
128 if schema.fields().is_empty() {
129 return Err(Error::InvalidArgumentError(
130 "CREATE TABLE AS SELECT requires at least one column".into(),
131 ));
132 }
133
134 let mut lookup: FxHashMap<String, usize> =
135 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
136 let mut table_columns: Vec<TableColumn> = Vec::with_capacity(schema.fields().len());
137
138 for (idx, field) in schema.fields().iter().enumerate() {
139 let data_type = match field.data_type() {
140 DataType::Int64
141 | DataType::Float64
142 | DataType::Utf8
143 | DataType::Date32
144 | DataType::Struct(_) => field.data_type().clone(),
145 other => {
146 return Err(Error::InvalidArgumentError(format!(
147 "unsupported column type in CTAS result: {other:?}"
148 )));
149 }
150 };
151
152 let normalized = field.name().to_ascii_lowercase();
153 if lookup.insert(normalized.clone(), idx).is_some() {
154 return Err(Error::InvalidArgumentError(format!(
155 "duplicate column name '{}' in CTAS result",
156 field.name()
157 )));
158 }
159
160 table_columns.push(TableColumn {
161 field_id: field_id_for_index(idx)?,
162 name: field.name().to_string(),
163 data_type,
164 nullable: field.is_nullable(),
165 primary_key: false,
166 unique: false,
167 check_expr: None,
168 });
169 }
170
171 self.create_table_inner(display_name, canonical_name, table_columns, lookup)
172 }
173
174 fn create_table_inner(
175 &self,
176 display_name: &str,
177 _canonical_name: &str,
178 table_columns: Vec<TableColumn>,
179 column_lookup: FxHashMap<String, usize>,
180 ) -> LlkvResult<CreateTableResult<P>> {
181 let table_id = self.metadata.reserve_table_id()?;
182 let timestamp = current_time_micros();
183 let table_meta = crate::sys_catalog::TableMeta {
184 table_id,
185 name: Some(display_name.to_string()),
186 created_at_micros: timestamp,
187 flags: 0,
188 epoch: 0,
189 };
190
191 self.metadata.set_table_meta(table_id, table_meta)?;
192 self.metadata
193 .apply_column_definitions(table_id, &table_columns, timestamp)?;
194 self.metadata.flush_table(table_id)?;
195
196 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
197
198 if let Err(err) = self.catalog.register_table(display_name, table_id) {
200 self.metadata.remove_table_state(table_id);
201 return Err(err);
202 }
203
204 if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
205 for column in &table_columns {
206 let definition = FieldDefinition::new(&column.name)
207 .with_primary_key(column.primary_key)
208 .with_unique(column.unique)
209 .with_check_expr(column.check_expr.clone());
210 if let Err(err) = field_resolver.register_field(definition) {
211 self.catalog.unregister_table(table_id);
212 self.metadata.remove_table_state(table_id);
213 return Err(err);
214 }
215 }
216 }
217
218 Ok(CreateTableResult {
219 table_id,
220 table: Arc::new(table),
221 table_columns,
222 column_lookup,
223 })
224 }
225
226 pub fn drop_table(
228 &self,
229 canonical_name: &str,
230 table_id: TableId,
231 column_field_ids: &[FieldId],
232 ) -> LlkvResult<()> {
233 self.metadata
234 .prepare_table_drop(table_id, column_field_ids)?;
235 self.metadata.flush_table(table_id)?;
236 self.metadata.remove_table_state(table_id);
237 if let Some(table_id_from_catalog) = self.catalog.table_id(canonical_name) {
238 let _ = self.catalog.unregister_table(table_id_from_catalog);
239 } else {
240 let _ = self.catalog.unregister_table(table_id);
241 }
242 Ok(())
243 }
244
245 #[allow(clippy::too_many_arguments)]
248 pub fn register_single_column_index(
249 &self,
250 display_name: &str,
251 canonical_name: &str,
252 table: &Table<P>,
253 field_id: FieldId,
254 column_name: &str,
255 mark_unique: bool,
256 if_not_exists: bool,
257 ) -> LlkvResult<bool> {
258 let existing_indexes = table.list_registered_indexes(field_id)?;
259 if existing_indexes.contains(&IndexKind::Sort) {
260 if if_not_exists {
261 return Ok(false);
262 }
263 return Err(Error::CatalogError(format!(
264 "Index already exists on column '{}' in table '{}'",
265 column_name, display_name
266 )));
267 }
268
269 let table_id = table.table_id();
270 self.metadata.register_sort_index(table_id, field_id)?;
271
272 if mark_unique {
273 let catalog_table_id = self.catalog.table_id(canonical_name).unwrap_or(table_id);
274 if let Some(resolver) = self.catalog.field_resolver(catalog_table_id) {
275 resolver.set_field_unique(column_name, true)?;
276 }
277 }
278
279 self.metadata.flush_table(table_id)?;
280 Ok(true)
281 }
282
283 pub fn register_multi_column_unique_index(
285 &self,
286 table_id: TableId,
287 field_ids: &[FieldId],
288 index_name: Option<String>,
289 ) -> LlkvResult<MultiColumnUniqueRegistration> {
290 let registration = self
291 .metadata
292 .register_multi_column_unique(table_id, field_ids, index_name)?;
293
294 if matches!(registration, MultiColumnUniqueRegistration::Created) {
295 self.metadata.flush_table(table_id)?;
296 }
297
298 Ok(registration)
299 }
300
301 pub fn append_batches_with_mvcc(
303 &self,
304 table: &Table<P>,
305 table_columns: &[TableColumn],
306 batches: &[RecordBatch],
307 creator_txn_id: u64,
308 deleted_marker: u64,
309 starting_row_id: RowId,
310 ) -> LlkvResult<(RowId, u64)> {
311 let mut next_row_id = starting_row_id;
312 let mut total_rows: u64 = 0;
313
314 for batch in batches {
315 if batch.num_rows() == 0 {
316 continue;
317 }
318
319 if batch.num_columns() != table_columns.len() {
320 return Err(Error::InvalidArgumentError(format!(
321 "CTAS query returned unexpected column count (expected {}, found {})",
322 table_columns.len(),
323 batch.num_columns()
324 )));
325 }
326
327 let row_count = batch.num_rows();
328
329 let (row_id_array, created_by_array, deleted_by_array) =
330 mvcc::build_insert_mvcc_columns(
331 row_count,
332 next_row_id,
333 creator_txn_id,
334 deleted_marker,
335 );
336
337 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table_columns.len() + 3);
338 arrays.push(row_id_array);
339 arrays.push(created_by_array);
340 arrays.push(deleted_by_array);
341
342 let mut fields = mvcc::build_mvcc_fields();
343
344 for (idx, column) in table_columns.iter().enumerate() {
345 let array = batch.column(idx).clone();
346 let field = mvcc::build_field_with_metadata(
347 &column.name,
348 column.data_type.clone(),
349 column.nullable,
350 column.field_id,
351 );
352 arrays.push(array);
353 fields.push(field);
354 }
355
356 let append_schema = Arc::new(Schema::new(fields));
357 let append_batch = RecordBatch::try_new(append_schema, arrays).map_err(Error::Arrow)?;
358 table.append(&append_batch)?;
359
360 next_row_id = next_row_id.saturating_add(row_count as u64);
361 total_rows = total_rows.saturating_add(row_count as u64);
362 }
363
364 Ok((next_row_id, total_rows))
365 }
366
367 #[allow(clippy::too_many_arguments)]
369 pub fn register_foreign_keys_for_new_table<F>(
370 &self,
371 table_id: TableId,
372 display_name: &str,
373 canonical_name: &str,
374 table_columns: &[TableColumn],
375 specs: &[ForeignKeySpec],
376 lookup_table: F,
377 timestamp_micros: u64,
378 ) -> LlkvResult<Vec<ValidatedForeignKey>>
379 where
380 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
381 {
382 if specs.is_empty() {
383 return Ok(Vec::new());
384 }
385
386 let referencing_columns: Vec<ForeignKeyColumn> = table_columns
387 .iter()
388 .map(|column| ForeignKeyColumn {
389 name: column.name.clone(),
390 data_type: column.data_type.clone(),
391 nullable: column.nullable,
392 primary_key: column.primary_key,
393 unique: column.unique,
394 field_id: column.field_id,
395 })
396 .collect();
397
398 let referencing_table = ForeignKeyTableInfo {
399 display_name: display_name.to_string(),
400 canonical_name: canonical_name.to_string(),
401 table_id,
402 columns: referencing_columns,
403 };
404
405 self.metadata.validate_and_register_foreign_keys(
406 &referencing_table,
407 specs,
408 lookup_table,
409 timestamp_micros,
410 )
411 }
412
413 pub fn referenced_table_info(
415 &self,
416 views: &[ForeignKeyView],
417 ) -> LlkvResult<Vec<ForeignKeyTableInfo>> {
418 let mut results = Vec::with_capacity(views.len());
419 for view in views {
420 let Some(table_id) = self.catalog.table_id(&view.referenced_table_canonical) else {
421 return Err(Error::InvalidArgumentError(format!(
422 "referenced table '{}' does not exist",
423 view.referenced_table_display
424 )));
425 };
426
427 let Some(resolver) = self.catalog.field_resolver(table_id) else {
428 return Err(Error::Internal(format!(
429 "catalog resolver missing for table '{}'",
430 view.referenced_table_display
431 )));
432 };
433
434 let mut columns = Vec::with_capacity(view.referenced_field_ids.len());
435 for field_id in &view.referenced_field_ids {
436 let info = resolver.field_info(*field_id).ok_or_else(|| {
437 Error::Internal(format!(
438 "field metadata missing for id {} in table '{}'",
439 field_id, view.referenced_table_display
440 ))
441 })?;
442
443 let data_type = self.metadata.column_data_type(table_id, *field_id)?;
444
445 columns.push(ForeignKeyColumn {
446 name: info.display_name.to_string(),
447 data_type,
448 nullable: !info.constraints.primary_key,
449 primary_key: info.constraints.primary_key,
450 unique: info.constraints.unique,
451 field_id: *field_id,
452 });
453 }
454
455 results.push(ForeignKeyTableInfo {
456 display_name: view.referenced_table_display.clone(),
457 canonical_name: view.referenced_table_canonical.clone(),
458 table_id,
459 columns,
460 });
461 }
462
463 Ok(results)
464 }
465
466 pub fn table_view(&self, canonical_name: &str) -> LlkvResult<TableView> {
468 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
469 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
470 })?;
471
472 let (_, field_ids) = self.sorted_user_fields(table_id);
473 self.table_view_with_field_ids(table_id, &field_ids)
474 }
475
476 pub fn table_column_specs(&self, canonical_name: &str) -> LlkvResult<Vec<ColumnSpec>> {
478 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
479 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
480 })?;
481
482 let resolver = self
483 .catalog
484 .field_resolver(table_id)
485 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
486
487 let (logical_fields, field_ids) = self.sorted_user_fields(table_id);
488
489 let table_view = self.table_view_with_field_ids(table_id, &field_ids)?;
490 let column_metas = table_view.column_metas;
491 let constraint_records = table_view.constraint_records;
492
493 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
494 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
495 let mut has_primary_key_records = false;
496 let mut has_single_unique_records = false;
497
498 for record in constraint_records
499 .iter()
500 .filter(|record| record.is_active())
501 {
502 match &record.kind {
503 ConstraintKind::PrimaryKey(pk) => {
504 has_primary_key_records = true;
505 for field_id in &pk.field_ids {
506 metadata_primary_keys.insert(*field_id);
507 metadata_unique_fields.insert(*field_id);
508 }
509 }
510 ConstraintKind::Unique(unique) => {
511 if unique.field_ids.len() == 1 {
512 has_single_unique_records = true;
513 if let Some(field_id) = unique.field_ids.first() {
514 metadata_unique_fields.insert(*field_id);
515 }
516 }
517 }
518 _ => {}
519 }
520 }
521
522 let mut specs = Vec::with_capacity(field_ids.len());
523
524 for (idx, lfid) in logical_fields.iter().enumerate() {
525 let field_id = lfid.field_id();
526
527 let column_name = column_metas
528 .get(idx)
529 .and_then(|meta| meta.as_ref())
530 .and_then(|meta| meta.name.clone())
531 .unwrap_or_else(|| format!("col_{}", field_id));
532
533 let fallback_constraints = resolver
534 .field_constraints_by_name(&column_name)
535 .unwrap_or_default();
536
537 let metadata_primary = metadata_primary_keys.contains(&field_id);
538 let primary_key = if has_primary_key_records {
539 metadata_primary
540 } else {
541 fallback_constraints.primary_key
542 };
543
544 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
545 let unique = if has_primary_key_records || has_single_unique_records {
546 metadata_unique
547 } else {
548 fallback_constraints.primary_key || fallback_constraints.unique
549 };
550
551 let data_type = self.store.data_type(*lfid)?;
552 let nullable = !primary_key;
553
554 let mut spec = ColumnSpec::new(column_name.clone(), data_type, nullable)
555 .with_primary_key(primary_key)
556 .with_unique(unique);
557
558 if let Some(check_expr) = fallback_constraints.check_expr.clone() {
559 spec = spec.with_check(Some(check_expr));
560 }
561
562 specs.push(spec);
563 }
564
565 Ok(specs)
566 }
567
568 pub fn foreign_key_views(&self, canonical_name: &str) -> LlkvResult<Vec<ForeignKeyView>> {
570 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
571 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
572 })?;
573
574 self.metadata.foreign_key_views(&self.catalog, table_id)
575 }
576
577 pub fn table_constraint_summary(
579 &self,
580 canonical_name: &str,
581 ) -> LlkvResult<TableConstraintSummaryView> {
582 let table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
583 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
584 })?;
585
586 let (_, field_ids) = self.sorted_user_fields(table_id);
587 let table_meta = self.metadata.table_meta(table_id)?;
588 let column_metas = self.metadata.column_metas(table_id, &field_ids)?;
589 let constraint_records = self.metadata.constraint_records(table_id)?;
590 let multi_column_uniques = self.metadata.multi_column_uniques(table_id)?;
591
592 Ok(TableConstraintSummaryView {
593 table_meta,
594 column_metas,
595 constraint_records,
596 multi_column_uniques,
597 })
598 }
599
600 fn sorted_user_fields(
601 &self,
602 table_id: TableId,
603 ) -> (Vec<llkv_column_map::types::LogicalFieldId>, Vec<FieldId>) {
604 let mut logical_fields = self.store.user_field_ids_for_table(table_id);
605 logical_fields.sort_by_key(|lfid| lfid.field_id());
606 let field_ids = logical_fields
607 .iter()
608 .map(|lfid| lfid.field_id())
609 .collect::<Vec<_>>();
610
611 (logical_fields, field_ids)
612 }
613
614 fn table_view_with_field_ids(
615 &self,
616 table_id: TableId,
617 field_ids: &[FieldId],
618 ) -> LlkvResult<TableView> {
619 self.metadata.table_view(&self.catalog, table_id, field_ids)
620 }
621
622 pub fn table_names(&self) -> Vec<String> {
628 self.catalog.table_names()
629 }
630
631 pub fn table_id(&self, canonical_name: &str) -> Option<TableId> {
633 self.catalog.table_id(canonical_name)
634 }
635
636 pub fn field_resolver(&self, table_id: TableId) -> Option<crate::catalog::FieldResolver> {
638 self.catalog.field_resolver(table_id)
639 }
640
641 pub fn catalog_snapshot(&self) -> crate::catalog::TableCatalogSnapshot {
643 self.catalog.snapshot()
644 }
645
646 pub fn catalog(&self) -> &Arc<TableCatalog> {
649 &self.catalog
650 }
651}
652
653fn field_id_for_index(idx: usize) -> LlkvResult<FieldId> {
654 FieldId::try_from(idx + 1).map_err(|_| {
655 Error::Internal(format!(
656 "column index {} exceeded supported field id range",
657 idx + 1
658 ))
659 })
660}
661
662#[allow(clippy::unnecessary_wraps)]
663fn current_time_micros() -> u64 {
664 SystemTime::now()
665 .duration_since(UNIX_EPOCH)
666 .map(|duration| duration.as_micros() as u64)
667 .unwrap_or(0)
668}