1use crate::{
10 RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
11 TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
12};
13use llkv_column_map::store::ColumnStore;
14use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
15use llkv_plan::{
16 AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, DropIndexPlan,
17 DropTablePlan, RenameTablePlan,
18};
19use llkv_result::{Error, Result};
20use llkv_storage::pager::{MemPager, Pager};
21use llkv_table::catalog::TableCatalog;
22use llkv_table::{
23 CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
24 SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
25 ensure_multi_column_unique, ensure_single_column_unique, validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod types;
41mod update;
42mod utils;
43
44pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
45
46pub struct RuntimeContext<P>
60where
61 P: Pager<Blob = EntryHandle> + Send + Sync,
62{
63 pub(crate) pager: Arc<P>,
64 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
65 pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
66 metadata: Arc<MetadataManager<P>>,
67 constraint_service: ConstraintService<P>,
68 pub(crate) catalog_service: CatalogManager<P>,
69 pub(crate) catalog: Arc<TableCatalog>,
71 store: Arc<ColumnStore<P>>,
74 transaction_manager:
76 TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
77 txn_manager: Arc<TxnIdManager>,
78 txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
79}
80
81impl<P> RuntimeContext<P>
82where
83 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
84{
85 pub fn new(pager: Arc<P>) -> Self {
86 Self::new_with_catalog_inner(pager, None)
87 }
88
89 pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
90 Self::new_with_catalog_inner(pager, Some(catalog))
91 }
92
93 fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
94 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
95
96 let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
97 let catalog = SysCatalog::new(&store);
98
99 let next_txn_id = match catalog.get_next_txn_id() {
100 Ok(Some(id)) => {
101 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
102 id
103 }
104 Ok(None) => {
105 tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
106 TXN_ID_AUTO_COMMIT + 1
107 }
108 Err(e) => {
109 tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
110 TXN_ID_AUTO_COMMIT + 1
111 }
112 };
113
114 let last_committed = match catalog.get_last_committed_txn_id() {
115 Ok(Some(id)) => {
116 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
117 id
118 }
119 Ok(None) => {
120 tracing::debug!(
121 "[CONTEXT] No persisted last_committed found, starting from default"
122 );
123 TXN_ID_AUTO_COMMIT
124 }
125 Err(e) => {
126 tracing::warn!(
127 "[CONTEXT] Failed to load last_committed: {}, using default",
128 e
129 );
130 TXN_ID_AUTO_COMMIT
131 }
132 };
133
134 let store_arc = Arc::new(store);
135 let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
136
137 let loaded_tables = match metadata.all_table_metas() {
138 Ok(metas) => {
139 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
140 metas
141 }
142 Err(e) => {
143 tracing::warn!(
144 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
145 e
146 );
147 Vec::new()
148 }
149 };
150
151 let transaction_manager =
152 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
153 let txn_manager = transaction_manager.txn_manager();
154
155 tracing::debug!(
177 "[CONTEXT] Initialized with lazy loading for {} table(s)",
178 loaded_tables.len()
179 );
180
181 let (catalog, is_shared_catalog) = match shared_catalog {
183 Some(existing) => (existing, true),
184 None => (Arc::new(TableCatalog::new()), false),
185 };
186 for (table_id, table_meta) in &loaded_tables {
187 if let Some(ref table_name) = table_meta.name
188 && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
189 {
190 match e {
191 Error::CatalogError(ref msg)
192 if is_shared_catalog && msg.contains("already exists") =>
193 {
194 tracing::debug!(
195 "[CONTEXT] Shared catalog already contains table '{}' with id={}",
196 table_name,
197 table_id
198 );
199 }
200 other => {
201 tracing::warn!(
202 "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
203 table_name,
204 table_id,
205 other
206 );
207 }
208 }
209 }
210 }
211 tracing::debug!(
212 "[CONTEXT] Catalog initialized with {} table(s)",
213 catalog.table_count()
214 );
215
216 let constraint_service =
217 ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
218 let catalog_service = CatalogManager::new(
219 Arc::clone(&metadata),
220 Arc::clone(&catalog),
221 Arc::clone(&store_arc),
222 );
223
224 if let Err(e) = catalog_service.load_types_from_catalog() {
226 tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
227 }
228
229 Self {
230 pager,
231 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
233 metadata,
234 constraint_service,
235 catalog_service,
236 catalog,
237 store: store_arc,
238 transaction_manager,
239 txn_manager,
240 txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
241 }
242 }
243
244 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
246 Arc::clone(&self.txn_manager)
247 }
248
249 pub fn store(&self) -> &Arc<ColumnStore<P>> {
251 &self.store
252 }
253
254 pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
256 self.catalog_service.register_type(name, data_type);
257 }
258
259 pub fn drop_type(&self, name: &str) -> Result<()> {
261 self.catalog_service.drop_type(name)?;
262 Ok(())
263 }
264
265 pub fn create_view(&self, display_name: &str, view_definition: String) -> Result<TableId> {
268 self.catalog_service
269 .create_view(display_name, view_definition)
270 }
271
272 pub fn is_view(&self, table_id: TableId) -> Result<bool> {
275 self.catalog_service.is_view(table_id)
276 }
277
278 pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
280 self.catalog_service.resolve_type(data_type)
281 }
282
283 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
285 let catalog = SysCatalog::new(&self.store);
286 catalog.put_next_txn_id(next_txn_id)?;
287 let last_committed = self.txn_manager.last_committed();
288 catalog.put_last_committed_txn_id(last_committed)?;
289 tracing::debug!(
290 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
291 next_txn_id,
292 last_committed
293 );
294 Ok(())
295 }
296
297 pub fn default_snapshot(&self) -> TransactionSnapshot {
299 TransactionSnapshot {
300 txn_id: TXN_ID_AUTO_COMMIT,
301 snapshot_id: self.txn_manager.last_committed(),
302 }
303 }
304
305 pub fn table_catalog(&self) -> Arc<TableCatalog> {
307 Arc::clone(&self.catalog)
308 }
309
310 pub fn catalog(&self) -> &CatalogManager<P> {
312 &self.catalog_service
313 }
314
315 pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
318 tracing::debug!("[SESSION] RuntimeContext::create_session called");
319 let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
320 self,
321 )));
322 let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
323 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
324 tracing::debug!(
325 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
326 );
327 RuntimeSession::from_parts(inner, namespaces)
328 }
329
330 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
332 RuntimeTableHandle::new(Arc::clone(self), name)
333 }
334
335 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
376 self.catalog.table_names()
378 }
379}
380
381impl<P> CatalogDdl for RuntimeContext<P>
382where
383 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
384{
385 type CreateTableOutput = RuntimeStatementResult<P>;
386 type DropTableOutput = ();
387 type RenameTableOutput = ();
388 type AlterTableOutput = RuntimeStatementResult<P>;
389 type CreateIndexOutput = RuntimeStatementResult<P>;
390 type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
391
392 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
393 if plan.columns.is_empty() && plan.source.is_none() {
394 return Err(Error::InvalidArgumentError(
395 "CREATE TABLE requires explicit columns or a source".into(),
396 ));
397 }
398
399 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
400 let CreateTablePlan {
401 name: _,
402 if_not_exists,
403 or_replace,
404 columns,
405 source,
406 namespace: _,
407 foreign_keys,
408 multi_column_uniques,
409 } = plan;
410
411 tracing::trace!(
412 "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
413 display_name,
414 if_not_exists,
415 columns.len()
416 );
417 for (idx, col) in columns.iter().enumerate() {
418 tracing::trace!(
419 " plan column[{}]: name='{}' primary_key={}",
420 idx,
421 col.name,
422 col.primary_key
423 );
424 }
425 let (exists, is_dropped) = {
426 let tables = self.tables.read().unwrap();
427 let in_cache = tables.contains_key(&canonical_name);
428 let is_dropped = self
429 .dropped_tables
430 .read()
431 .unwrap()
432 .contains(&canonical_name);
433 (in_cache && !is_dropped, is_dropped)
435 };
436 tracing::trace!(
437 "DEBUG create_table (plan): exists={}, is_dropped={}",
438 exists,
439 is_dropped
440 );
441
442 if is_dropped {
444 self.remove_table_entry(&canonical_name);
445 self.dropped_tables.write().unwrap().remove(&canonical_name);
446 }
447
448 if exists {
449 if or_replace {
450 tracing::trace!(
451 "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
452 display_name
453 );
454 self.remove_table_entry(&canonical_name);
455 } else if if_not_exists {
456 tracing::trace!(
457 "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
458 display_name
459 );
460 return Ok(RuntimeStatementResult::CreateTable {
461 table_name: display_name,
462 });
463 } else {
464 return Err(Error::CatalogError(format!(
465 "Catalog Error: Table '{}' already exists",
466 display_name
467 )));
468 }
469 }
470
471 match source {
472 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
473 display_name,
474 canonical_name,
475 schema,
476 batches,
477 if_not_exists,
478 ),
479 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
480 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
481 .into(),
482 )),
483 None => self.create_table_from_columns(
484 display_name,
485 canonical_name,
486 columns,
487 foreign_keys,
488 multi_column_uniques,
489 if_not_exists,
490 ),
491 }
492 }
493
494 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
495 let DropTablePlan { name, if_exists } = plan;
496 let (display_name, canonical_name) = canonical_table_name(&name)?;
497
498 tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
499
500 let cached_entry = {
501 let tables = self.tables.read().unwrap();
502 tracing::debug!("drop_table: cache contains {} tables", tables.len());
503 tables.get(&canonical_name).cloned()
504 };
505
506 let table_entry = match cached_entry {
507 Some(entry) => entry,
508 None => {
509 tracing::debug!(
510 "drop_table: table '{}' not cached; attempting reload",
511 canonical_name
512 );
513
514 if self.catalog.table_id(&canonical_name).is_none() {
515 tracing::debug!(
516 "drop_table: no catalog entry for '{}'; if_exists={}",
517 canonical_name,
518 if_exists
519 );
520 if if_exists {
521 return Ok(());
522 }
523 return Err(Error::CatalogError(format!(
524 "Catalog Error: Table '{}' does not exist",
525 display_name
526 )));
527 }
528
529 match self.lookup_table(&canonical_name) {
530 Ok(entry) => entry,
531 Err(err) => {
532 tracing::warn!(
533 "drop_table: failed to reload table '{}': {:?}",
534 canonical_name,
535 err
536 );
537 if if_exists {
538 return Ok(());
539 }
540 return Err(err);
541 }
542 }
543 }
544 };
545
546 let column_field_ids = table_entry
547 .schema
548 .columns
549 .iter()
550 .map(|col| col.field_id)
551 .collect::<Vec<_>>();
552 let table_id = table_entry.table.table_id();
553
554 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
555
556 for detail in referencing {
557 if detail.referencing_table_canonical == canonical_name {
558 continue;
559 }
560
561 if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
562 continue;
563 }
564
565 return Err(Error::CatalogError(format!(
566 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
567 detail.referencing_table_display
568 )));
569 }
570
571 self.catalog_service
572 .drop_table(&canonical_name, table_id, &column_field_ids)?;
573 tracing::debug!(
574 "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
575 canonical_name,
576 table_id
577 );
578
579 self.dropped_tables
580 .write()
581 .unwrap()
582 .insert(canonical_name.clone());
583 Ok(())
584 }
585
586 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
587 let RenameTablePlan {
588 current_name,
589 new_name,
590 if_exists,
591 } = plan;
592
593 let (current_display, current_canonical) = canonical_table_name(¤t_name)?;
594 let (new_display, new_canonical) = canonical_table_name(&new_name)?;
595
596 if current_canonical == new_canonical && current_display == new_display {
597 return Ok(());
598 }
599
600 if self.is_table_marked_dropped(¤t_canonical) {
601 if if_exists {
602 return Ok(());
603 }
604 return Err(Error::CatalogError(format!(
605 "Catalog Error: Table '{}' does not exist",
606 current_display
607 )));
608 }
609
610 let table_id = match self
611 .catalog
612 .table_id(¤t_canonical)
613 .or_else(|| self.catalog.table_id(¤t_display))
614 {
615 Some(id) => id,
616 None => {
617 if if_exists {
618 return Ok(());
619 }
620 return Err(Error::CatalogError(format!(
621 "Catalog Error: Table '{}' does not exist",
622 current_display
623 )));
624 }
625 };
626
627 if !current_display.eq_ignore_ascii_case(&new_display)
628 && (self.catalog.table_id(&new_canonical).is_some()
629 || self.catalog.table_id(&new_display).is_some())
630 {
631 return Err(Error::CatalogError(format!(
632 "Catalog Error: Table '{}' already exists",
633 new_display
634 )));
635 }
636
637 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
638 if !referencing.is_empty() {
639 return Err(Error::CatalogError(format!(
640 "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
641 current_display
642 )));
643 }
644
645 self.catalog_service
646 .rename_table(table_id, ¤t_display, &new_display)?;
647
648 let mut tables = self.tables.write().unwrap();
649 if let Some(table) = tables.remove(¤t_canonical) {
650 tables.insert(new_canonical.clone(), table);
651 }
652
653 let mut dropped = self.dropped_tables.write().unwrap();
654 dropped.remove(¤t_canonical);
655 dropped.remove(&new_canonical);
656
657 Ok(())
658 }
659
660 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
661 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
662
663 let view = match self.catalog_service.table_view(&canonical_table) {
664 Ok(view) => view,
665 Err(err) if plan.if_exists && is_table_missing_error(&err) => {
666 return Ok(RuntimeStatementResult::NoOp);
667 }
668 Err(err) => return Err(err),
669 };
670
671 let table_meta = match view.table_meta.as_ref() {
672 Some(meta) => meta,
673 None => {
674 if plan.if_exists {
675 return Ok(RuntimeStatementResult::NoOp);
676 }
677 return Err(Error::Internal("table metadata missing".into()));
678 }
679 };
680
681 let table_id = table_meta.table_id;
682
683 validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
684
685 match &plan.operation {
686 llkv_plan::AlterTableOperation::RenameColumn {
687 old_column_name,
688 new_column_name,
689 } => {
690 self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
691 }
692 llkv_plan::AlterTableOperation::SetColumnDataType {
693 column_name,
694 new_data_type,
695 } => {
696 self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
697 }
698 llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
699 self.drop_column(&plan.table_name, column_name)?;
700 }
701 }
702
703 Ok(RuntimeStatementResult::NoOp)
704 }
705
706 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
707 if plan.columns.is_empty() {
708 return Err(Error::InvalidArgumentError(
709 "CREATE INDEX requires at least one column".into(),
710 ));
711 }
712
713 for column_plan in &plan.columns {
714 if !column_plan.ascending || column_plan.nulls_first {
715 return Err(Error::InvalidArgumentError(
716 "only ASC indexes with NULLS LAST are supported".into(),
717 ));
718 }
719 }
720
721 let mut index_name = plan.name.clone();
722 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
723 let table = self.lookup_table(&canonical_name)?;
724
725 let mut column_indices = Vec::with_capacity(plan.columns.len());
726 let mut field_ids = Vec::with_capacity(plan.columns.len());
727 let mut column_names = Vec::with_capacity(plan.columns.len());
728 let mut seen_column_indices = FxHashSet::default();
729
730 for column_plan in &plan.columns {
731 let normalized = column_plan.name.to_ascii_lowercase();
732 let col_idx = table
733 .schema
734 .lookup
735 .get(&normalized)
736 .copied()
737 .ok_or_else(|| {
738 Error::InvalidArgumentError(format!(
739 "column '{}' does not exist in table '{}'",
740 column_plan.name, display_name
741 ))
742 })?;
743 if !seen_column_indices.insert(col_idx) {
744 return Err(Error::InvalidArgumentError(format!(
745 "duplicate column '{}' in CREATE INDEX",
746 column_plan.name
747 )));
748 }
749
750 let column = &table.schema.columns[col_idx];
751 column_indices.push(col_idx);
752 field_ids.push(column.field_id);
753 column_names.push(column.name.clone());
754 }
755
756 if plan.columns.len() == 1 {
757 let field_id = field_ids[0];
758 let column_name = column_names[0].clone();
759
760 if plan.unique {
761 let snapshot = self.default_snapshot();
762 let existing_values =
763 self.scan_column_values(table.as_ref(), field_id, snapshot)?;
764 ensure_single_column_unique(&existing_values, &[], &column_name)?;
765 }
766
767 let registration = self.catalog_service.register_single_column_index(
768 &display_name,
769 &canonical_name,
770 &table.table,
771 field_id,
772 &column_name,
773 plan.name.clone(),
774 plan.unique,
775 plan.if_not_exists,
776 )?;
777
778 let created_name = match registration {
779 SingleColumnIndexRegistration::Created { index_name } => index_name,
780 SingleColumnIndexRegistration::AlreadyExists { index_name } => {
781 drop(table);
782 return Ok(RuntimeStatementResult::CreateIndex {
783 table_name: display_name,
784 index_name: Some(index_name),
785 });
786 }
787 };
788
789 index_name = Some(created_name.clone());
790
791 if let Some(updated_table) =
792 Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
793 {
794 self.tables
795 .write()
796 .unwrap()
797 .insert(canonical_name.clone(), Arc::clone(&updated_table));
798 } else {
799 self.remove_table_entry(&canonical_name);
800 }
801
802 drop(table);
803
804 return Ok(RuntimeStatementResult::CreateIndex {
805 table_name: display_name,
806 index_name,
807 });
808 }
809
810 if !plan.unique {
811 return Err(Error::InvalidArgumentError(
812 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
813 ));
814 }
815
816 let table_id = table.table.table_id();
817
818 let snapshot = self.default_snapshot();
819 let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
820 ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
821
822 let executor_entry = ExecutorMultiColumnUnique {
823 index_name: index_name.clone(),
824 column_indices: column_indices.clone(),
825 };
826
827 let registration = self.catalog_service.register_multi_column_unique_index(
828 table_id,
829 &field_ids,
830 index_name.clone(),
831 )?;
832
833 match registration {
834 MultiColumnUniqueRegistration::Created => {
835 table.add_multi_column_unique(executor_entry);
836 }
837 MultiColumnUniqueRegistration::AlreadyExists {
838 index_name: existing,
839 } => {
840 if plan.if_not_exists {
841 drop(table);
842 return Ok(RuntimeStatementResult::CreateIndex {
843 table_name: display_name,
844 index_name: existing,
845 });
846 }
847 return Err(Error::CatalogError(format!(
848 "Index already exists on columns '{}'",
849 column_names.join(", ")
850 )));
851 }
852 }
853
854 Ok(RuntimeStatementResult::CreateIndex {
855 table_name: display_name,
856 index_name,
857 })
858 }
859
860 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
861 let descriptor = self.catalog_service.drop_single_column_index(plan)?;
862
863 if let Some(descriptor) = &descriptor {
864 self.remove_table_entry(&descriptor.canonical_table_name);
865 }
866
867 Ok(descriptor)
868 }
869}