1use crate::{
9 RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
10 TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
11};
12use llkv_column_map::store::{ColumnStore, ColumnStoreWriteHints};
13use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
14use llkv_plan::{
15 AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, CreateViewPlan,
16 DropIndexPlan, DropTablePlan, DropViewPlan, PlanColumnSpec, RenameTablePlan, SelectPlan,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::{BoxedPager, MemPager, Pager};
20use llkv_table::catalog::TableCatalog;
21use llkv_table::{
22 CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
23 SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
24 TriggerEventMeta, TriggerTimingMeta, UniqueKey, build_composite_unique_key,
25 ensure_multi_column_unique, ensure_single_column_unique, validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod truncate;
41mod types;
42mod update;
43mod utils;
44
45pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
46
47pub struct RuntimeContext<P>
61where
62 P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64 pub(crate) pager: Arc<P>,
65 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
66 pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
67 metadata: Arc<MetadataManager<P>>,
68 constraint_service: ConstraintService<P>,
69 pub(crate) catalog_service: CatalogManager<P>,
70 pub(crate) catalog: Arc<TableCatalog>,
72 store: Arc<ColumnStore<P>>,
75 transaction_manager:
77 TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
78 txn_manager: Arc<TxnIdManager>,
79 txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
80 fallback_lookup: RwLock<Option<Arc<RuntimeContext<P>>>>,
85}
86
87impl<P> RuntimeContext<P>
88where
89 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
90{
91 pub fn new(pager: Arc<P>) -> Self {
92 Self::new_with_catalog_inner(pager, None)
93 }
94
95 pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
96 Self::new_with_catalog_inner(pager, Some(catalog))
97 }
98
99 fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
100 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
101
102 let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
103 let catalog = SysCatalog::new(&store);
104
105 let next_txn_id = match catalog.get_next_txn_id() {
106 Ok(Some(id)) => {
107 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
108 id
109 }
110 Ok(None) => {
111 tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
112 TXN_ID_AUTO_COMMIT + 1
113 }
114 Err(e) => {
115 tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
116 TXN_ID_AUTO_COMMIT + 1
117 }
118 };
119
120 let last_committed = match catalog.get_last_committed_txn_id() {
121 Ok(Some(id)) => {
122 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
123 id
124 }
125 Ok(None) => {
126 tracing::debug!(
127 "[CONTEXT] No persisted last_committed found, starting from default"
128 );
129 TXN_ID_AUTO_COMMIT
130 }
131 Err(e) => {
132 tracing::warn!(
133 "[CONTEXT] Failed to load last_committed: {}, using default",
134 e
135 );
136 TXN_ID_AUTO_COMMIT
137 }
138 };
139
140 let store_arc = Arc::new(store);
141 let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
142
143 let loaded_tables = match metadata.all_table_metas() {
144 Ok(metas) => {
145 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
146 metas
147 }
148 Err(e) => {
149 tracing::warn!(
150 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
151 e
152 );
153 Vec::new()
154 }
155 };
156
157 let transaction_manager =
158 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
159 let txn_manager = transaction_manager.txn_manager();
160
161 tracing::debug!(
183 "[CONTEXT] Initialized with lazy loading for {} table(s)",
184 loaded_tables.len()
185 );
186
187 let (catalog, is_shared_catalog) = match shared_catalog {
189 Some(existing) => (existing, true),
190 None => (Arc::new(TableCatalog::new()), false),
191 };
192 for (table_id, table_meta) in &loaded_tables {
193 if let Some(ref table_name) = table_meta.name
194 && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
195 {
196 match e {
197 Error::CatalogError(ref msg)
198 if is_shared_catalog && msg.contains("already exists") =>
199 {
200 tracing::debug!(
201 "[CONTEXT] Shared catalog already contains table '{}' with id={}",
202 table_name,
203 table_id
204 );
205 }
206 other => {
207 tracing::warn!(
208 "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
209 table_name,
210 table_id,
211 other
212 );
213 }
214 }
215 }
216 }
217 tracing::debug!(
218 "[CONTEXT] Catalog initialized with {} table(s)",
219 catalog.table_count()
220 );
221
222 let constraint_service =
223 ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
224 let catalog_service = CatalogManager::new(
225 Arc::clone(&metadata),
226 Arc::clone(&catalog),
227 Arc::clone(&store_arc),
228 );
229
230 if let Err(e) = catalog_service.load_types_from_catalog() {
232 tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
233 }
234
235 Self {
236 pager,
237 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
239 metadata,
240 constraint_service,
241 catalog_service,
242 catalog,
243 store: store_arc,
244 transaction_manager,
245 txn_manager,
246 txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
247 fallback_lookup: RwLock::new(None),
248 }
249 }
250
251 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
253 Arc::clone(&self.txn_manager)
254 }
255
256 pub fn store(&self) -> &Arc<ColumnStore<P>> {
258 &self.store
259 }
260
261 pub fn column_store_write_hints(&self) -> ColumnStoreWriteHints {
263 self.store.write_hints()
264 }
265
266 pub fn with_fallback_lookup(self, fallback: Arc<RuntimeContext<P>>) -> Self {
270 *self.fallback_lookup.write().unwrap() = Some(fallback);
271 self
272 }
273
274 pub fn set_fallback_lookup(&self, fallback: Arc<RuntimeContext<P>>) {
276 *self.fallback_lookup.write().unwrap() = Some(fallback);
277 }
278
279 pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
281 self.catalog_service.register_type(name, data_type);
282 }
283
284 pub fn drop_type(&self, name: &str) -> Result<()> {
286 self.catalog_service.drop_type(name)?;
287 Ok(())
288 }
289
290 pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> Result<()> {
292 self.metadata.ensure_next_table_id_at_least(minimum)?;
293 Ok(())
294 }
295
296 fn create_view_internal(
298 self: &Arc<Self>,
299 display_name: &str,
300 view_definition: String,
301 select_plan: SelectPlan,
302 if_not_exists: bool,
303 snapshot: TransactionSnapshot,
304 ) -> Result<()> {
305 let (normalized_display, canonical_name) = canonical_table_name(display_name)?;
306
307 if let Some(existing_id) = self.catalog.table_id(&canonical_name) {
308 let is_view = self.catalog_service.is_view(existing_id)?;
309 if is_view && if_not_exists {
310 return Ok(());
311 }
312
313 let entity = if is_view { "View" } else { "Table" };
314 return Err(Error::CatalogError(format!(
315 "{} '{}' already exists",
316 entity, normalized_display
317 )));
318 }
319
320 let execution = self.execute_select(select_plan, snapshot)?;
321 let column_specs = {
322 let schema = execution.schema();
323 if schema.fields().is_empty() {
324 return Err(Error::InvalidArgumentError(
325 "CREATE VIEW requires SELECT to project at least one column".into(),
326 ));
327 }
328
329 schema
330 .fields()
331 .iter()
332 .map(|field| {
333 PlanColumnSpec::new(
334 field.name(),
335 field.data_type().clone(),
336 field.is_nullable(),
337 )
338 })
339 .collect::<Vec<_>>()
340 };
341 drop(execution);
342
343 self.catalog_service
344 .create_view(&normalized_display, view_definition, column_specs)?;
345
346 self.dropped_tables.write().unwrap().remove(&canonical_name);
347
348 Ok(())
349 }
350
351 pub fn create_view(
356 self: &Arc<Self>,
357 display_name: &str,
358 view_definition: String,
359 select_plan: SelectPlan,
360 if_not_exists: bool,
361 ) -> Result<()> {
362 let snapshot = self.default_snapshot();
363 self.create_view_internal(
364 display_name,
365 view_definition,
366 select_plan,
367 if_not_exists,
368 snapshot,
369 )
370 }
371
372 #[allow(clippy::too_many_arguments)]
373 pub fn create_trigger(
374 self: &Arc<Self>,
375 trigger_display_name: &str,
376 canonical_trigger_name: &str,
377 table_display_name: &str,
378 canonical_table_name: &str,
379 timing: TriggerTimingMeta,
380 event: TriggerEventMeta,
381 for_each_row: bool,
382 condition: Option<String>,
383 body_sql: String,
384 if_not_exists: bool,
385 ) -> Result<bool> {
386 self.catalog_service.create_trigger(
387 trigger_display_name,
388 canonical_trigger_name,
389 table_display_name,
390 canonical_table_name,
391 timing,
392 event,
393 for_each_row,
394 condition,
395 body_sql,
396 if_not_exists,
397 )
398 }
399
400 pub fn drop_trigger(
401 self: &Arc<Self>,
402 trigger_display_name: &str,
403 canonical_trigger_name: &str,
404 table_hint_display: Option<&str>,
405 table_hint_canonical: Option<&str>,
406 if_exists: bool,
407 ) -> Result<bool> {
408 self.catalog_service.drop_trigger(
409 trigger_display_name,
410 canonical_trigger_name,
411 table_hint_display,
412 table_hint_canonical,
413 if_exists,
414 )
415 }
416
417 pub fn view_definition(&self, canonical_name: &str) -> Result<Option<String>> {
419 let Some(table_id) = self.catalog.table_id(canonical_name) else {
420 return Ok(None);
421 };
422
423 match self.metadata.table_meta(table_id)? {
424 Some(meta) => Ok(meta.view_definition),
425 None => Ok(None),
426 }
427 }
428
429 pub fn is_view(&self, table_id: TableId) -> Result<bool> {
432 self.catalog_service.is_view(table_id)
433 }
434
435 pub fn drop_view(&self, name: &str, if_exists: bool) -> Result<()> {
437 let (display_name, canonical_name) = canonical_table_name(name)?;
438
439 let table_id = match self.catalog.table_id(&canonical_name) {
440 Some(id) => id,
441 None => {
442 if if_exists {
443 return Ok(());
444 }
445 return Err(Error::CatalogError(format!(
446 "View '{}' does not exist",
447 display_name
448 )));
449 }
450 };
451
452 if !self.catalog_service.is_view(table_id)? {
453 return Err(Error::CatalogError(format!(
454 "use DROP TABLE to delete table '{}'",
455 display_name
456 )));
457 }
458
459 self.catalog_service.drop_view(&canonical_name, table_id)?;
460
461 {
462 let mut tables = self.tables.write().unwrap();
463 tables.remove(&canonical_name);
464 }
465
466 self.dropped_tables.write().unwrap().insert(canonical_name);
467
468 Ok(())
469 }
470
471 pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
473 self.catalog_service.resolve_type(data_type)
474 }
475
476 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
478 let catalog = SysCatalog::new(&self.store);
479 catalog.put_next_txn_id(next_txn_id)?;
480 let last_committed = self.txn_manager.last_committed();
481 catalog.put_last_committed_txn_id(last_committed)?;
482 tracing::debug!(
483 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
484 next_txn_id,
485 last_committed
486 );
487 Ok(())
488 }
489
490 pub fn default_snapshot(&self) -> TransactionSnapshot {
492 TransactionSnapshot {
493 txn_id: TXN_ID_AUTO_COMMIT,
494 snapshot_id: self.txn_manager.last_committed(),
495 }
496 }
497
498 pub fn table_catalog(&self) -> Arc<TableCatalog> {
500 Arc::clone(&self.catalog)
501 }
502
503 pub fn enable_foreign_key_cache(&self, table_id: TableId) {
505 self.constraint_service.enable_foreign_key_cache(table_id);
506 }
507
508 pub fn clear_foreign_key_cache(&self, table_id: TableId) {
510 self.constraint_service.clear_foreign_key_cache(table_id);
511 }
512
513 pub fn catalog(&self) -> &CatalogManager<P> {
515 &self.catalog_service
516 }
517
518 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
520 RuntimeTableHandle::new(Arc::clone(self), name)
521 }
522
523 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
564 self.catalog.table_names()
566 }
567}
568
569impl RuntimeContext<BoxedPager> {
570 pub fn create_session(self: &Arc<Self>) -> RuntimeSession {
573 tracing::debug!("[SESSION] RuntimeContext::create_session called");
574 let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
575 self,
576 )));
577 let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
578 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
579 tracing::debug!(
580 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
581 );
582 RuntimeSession::from_parts(inner, namespaces)
583 }
584}
585
586impl<P> CatalogDdl for RuntimeContext<P>
587where
588 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
589{
590 type CreateTableOutput = RuntimeStatementResult<P>;
591 type DropTableOutput = ();
592 type RenameTableOutput = ();
593 type AlterTableOutput = RuntimeStatementResult<P>;
594 type CreateIndexOutput = RuntimeStatementResult<P>;
595 type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
596
597 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
598 if plan.columns.is_empty() && plan.source.is_none() {
599 return Err(Error::InvalidArgumentError(
600 "CREATE TABLE requires explicit columns or a source".into(),
601 ));
602 }
603
604 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
605 let CreateTablePlan {
606 name: _,
607 if_not_exists,
608 or_replace,
609 columns,
610 source,
611 namespace: _,
612 foreign_keys,
613 multi_column_uniques,
614 } = plan;
615
616 tracing::trace!(
617 "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
618 display_name,
619 if_not_exists,
620 columns.len()
621 );
622 for (idx, col) in columns.iter().enumerate() {
623 tracing::trace!(
624 " plan column[{}]: name='{}' primary_key={}",
625 idx,
626 col.name,
627 col.primary_key
628 );
629 }
630 let (exists, is_dropped) = {
631 let tables = self.tables.read().unwrap();
632 let in_cache = tables.contains_key(&canonical_name);
633 let is_dropped = self
634 .dropped_tables
635 .read()
636 .unwrap()
637 .contains(&canonical_name);
638 (in_cache && !is_dropped, is_dropped)
640 };
641 tracing::trace!(
642 "DEBUG create_table (plan): exists={}, is_dropped={}",
643 exists,
644 is_dropped
645 );
646
647 if is_dropped {
649 self.remove_table_entry(&canonical_name);
650 self.dropped_tables.write().unwrap().remove(&canonical_name);
651 }
652
653 if exists {
654 if or_replace {
655 tracing::trace!(
656 "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
657 display_name
658 );
659 self.remove_table_entry(&canonical_name);
660 } else if if_not_exists {
661 tracing::trace!(
662 "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
663 display_name
664 );
665 return Ok(RuntimeStatementResult::CreateTable {
666 table_name: display_name,
667 });
668 } else {
669 return Err(Error::CatalogError(format!(
670 "Catalog Error: Table '{}' already exists",
671 display_name
672 )));
673 }
674 }
675
676 match source {
677 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
678 display_name,
679 canonical_name,
680 schema,
681 batches,
682 if_not_exists,
683 ),
684 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
685 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
686 .into(),
687 )),
688 None => self.create_table_from_columns(
689 display_name,
690 canonical_name,
691 columns,
692 foreign_keys,
693 multi_column_uniques,
694 if_not_exists,
695 ),
696 }
697 }
698
699 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
700 let DropTablePlan { name, if_exists } = plan;
701 let (display_name, canonical_name) = canonical_table_name(&name)?;
702
703 tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
704
705 if self.is_table_marked_dropped(&canonical_name) {
706 tracing::debug!(
707 "drop_table: table '{}' already marked dropped; if_exists={}",
708 canonical_name,
709 if_exists
710 );
711 return if if_exists {
712 Ok(())
713 } else {
714 Err(Error::CatalogError(format!(
715 "Catalog Error: Table '{}' does not exist",
716 display_name
717 )))
718 };
719 }
720
721 let cached_entry = {
722 let tables = self.tables.read().unwrap();
723 tracing::debug!("drop_table: cache contains {} tables", tables.len());
724 tables.get(&canonical_name).cloned()
725 };
726
727 let table_entry = match cached_entry {
728 Some(entry) => entry,
729 None => {
730 tracing::debug!(
731 "drop_table: table '{}' not cached; attempting reload",
732 canonical_name
733 );
734
735 if self.catalog.table_id(&canonical_name).is_none() {
736 tracing::debug!(
737 "drop_table: no catalog entry for '{}'; if_exists={}",
738 canonical_name,
739 if_exists
740 );
741 if if_exists {
742 return Ok(());
743 }
744 return Err(Error::CatalogError(format!(
745 "Catalog Error: Table '{}' does not exist",
746 display_name
747 )));
748 }
749
750 match self.lookup_table(&canonical_name) {
751 Ok(entry) => entry,
752 Err(err) => {
753 tracing::warn!(
754 "drop_table: failed to reload table '{}': {:?}",
755 canonical_name,
756 err
757 );
758 if if_exists {
759 return Ok(());
760 }
761 return Err(err);
762 }
763 }
764 }
765 };
766
767 let column_field_ids = table_entry
768 .schema
769 .columns
770 .iter()
771 .map(|col| col.field_id)
772 .collect::<Vec<_>>();
773 let table_id = table_entry.table.table_id();
774
775 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
776
777 for detail in referencing {
778 if detail.referencing_table_canonical == canonical_name {
779 continue;
780 }
781
782 if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
783 continue;
784 }
785
786 return Err(Error::CatalogError(format!(
787 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
788 detail.referencing_table_display
789 )));
790 }
791
792 self.catalog_service
793 .drop_table(&canonical_name, table_id, &column_field_ids)?;
794 tracing::debug!(
795 "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
796 canonical_name,
797 table_id
798 );
799
800 self.remove_table_entry(&canonical_name);
801 self.dropped_tables
802 .write()
803 .unwrap()
804 .insert(canonical_name.clone());
805 Ok(())
806 }
807
808 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
809 let RenameTablePlan {
810 current_name,
811 new_name,
812 if_exists,
813 } = plan;
814
815 let (current_display, current_canonical) = canonical_table_name(¤t_name)?;
816 let (new_display, new_canonical) = canonical_table_name(&new_name)?;
817
818 if current_canonical == new_canonical && current_display == new_display {
819 return Ok(());
820 }
821
822 if self.is_table_marked_dropped(¤t_canonical) {
823 if if_exists {
824 return Ok(());
825 }
826 return Err(Error::CatalogError(format!(
827 "Catalog Error: Table '{}' does not exist",
828 current_display
829 )));
830 }
831
832 let table_id = match self
833 .catalog
834 .table_id(¤t_canonical)
835 .or_else(|| self.catalog.table_id(¤t_display))
836 {
837 Some(id) => id,
838 None => {
839 if if_exists {
840 return Ok(());
841 }
842 return Err(Error::CatalogError(format!(
843 "Catalog Error: Table '{}' does not exist",
844 current_display
845 )));
846 }
847 };
848
849 if !current_display.eq_ignore_ascii_case(&new_display)
850 && (self.catalog.table_id(&new_canonical).is_some()
851 || self.catalog.table_id(&new_display).is_some())
852 {
853 return Err(Error::CatalogError(format!(
854 "Catalog Error: Table '{}' already exists",
855 new_display
856 )));
857 }
858
859 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
860 if !referencing.is_empty() {
861 return Err(Error::CatalogError(format!(
862 "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
863 current_display
864 )));
865 }
866
867 self.catalog_service
868 .rename_table(table_id, ¤t_display, &new_display)?;
869
870 let mut tables = self.tables.write().unwrap();
871 if let Some(table) = tables.remove(¤t_canonical) {
872 tables.insert(new_canonical.clone(), table);
873 }
874
875 let mut dropped = self.dropped_tables.write().unwrap();
876 dropped.remove(¤t_canonical);
877 dropped.remove(&new_canonical);
878
879 Ok(())
880 }
881
882 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
883 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
884
885 let view = match self.catalog_service.table_view(&canonical_table) {
886 Ok(view) => view,
887 Err(err) if plan.if_exists && is_table_missing_error(&err) => {
888 return Ok(RuntimeStatementResult::NoOp);
889 }
890 Err(err) => return Err(err),
891 };
892
893 let table_meta = match view.table_meta.as_ref() {
894 Some(meta) => meta,
895 None => {
896 if plan.if_exists {
897 return Ok(RuntimeStatementResult::NoOp);
898 }
899 return Err(Error::Internal("table metadata missing".into()));
900 }
901 };
902
903 let table_id = table_meta.table_id;
904
905 validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
906
907 match &plan.operation {
908 llkv_plan::AlterTableOperation::RenameColumn {
909 old_column_name,
910 new_column_name,
911 } => {
912 self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
913 }
914 llkv_plan::AlterTableOperation::SetColumnDataType {
915 column_name,
916 new_data_type,
917 } => {
918 self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
919 }
920 llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
921 self.drop_column(&plan.table_name, column_name)?;
922 }
923 }
924
925 Ok(RuntimeStatementResult::NoOp)
926 }
927
928 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
929 if plan.columns.is_empty() {
930 return Err(Error::InvalidArgumentError(
931 "CREATE INDEX requires at least one column".into(),
932 ));
933 }
934
935 let mut index_name = plan.name.clone();
936 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
937 let table = self.lookup_table(&canonical_name)?;
938
939 let mut column_indices = Vec::with_capacity(plan.columns.len());
940 let mut field_ids = Vec::with_capacity(plan.columns.len());
941 let mut column_names = Vec::with_capacity(plan.columns.len());
942 let mut seen_column_indices = FxHashSet::default();
943
944 for column_plan in &plan.columns {
945 let normalized = column_plan.name.to_ascii_lowercase();
946 let col_idx = table
947 .schema
948 .lookup
949 .get(&normalized)
950 .copied()
951 .ok_or_else(|| {
952 Error::InvalidArgumentError(format!(
953 "column '{}' does not exist in table '{}'",
954 column_plan.name, display_name
955 ))
956 })?;
957 if !seen_column_indices.insert(col_idx) {
958 return Err(Error::InvalidArgumentError(format!(
959 "duplicate column '{}' in CREATE INDEX",
960 column_plan.name
961 )));
962 }
963
964 let column = &table.schema.columns[col_idx];
965 column_indices.push(col_idx);
966 field_ids.push(column.field_id);
967 column_names.push(column.name.clone());
968 }
969
970 if plan.columns.len() == 1 {
971 let field_id = field_ids[0];
972 let column_name = column_names[0].clone();
973 let column_plan = &plan.columns[0];
974
975 if plan.unique {
976 let snapshot = self.default_snapshot();
977 let existing_values =
978 self.scan_column_values(table.as_ref(), field_id, snapshot)?;
979 ensure_single_column_unique(&existing_values, &[], &column_name)?;
980 }
981
982 let registration = self.catalog_service.register_single_column_index(
983 &display_name,
984 &canonical_name,
985 &table.table,
986 field_id,
987 &column_name,
988 plan.name.clone(),
989 plan.unique,
990 column_plan.ascending,
991 column_plan.nulls_first,
992 plan.if_not_exists,
993 )?;
994
995 let created_name = match registration {
996 SingleColumnIndexRegistration::Created { index_name } => index_name,
997 SingleColumnIndexRegistration::AlreadyExists { index_name } => {
998 drop(table);
999 return Ok(RuntimeStatementResult::CreateIndex {
1000 table_name: display_name,
1001 index_name: Some(index_name),
1002 });
1003 }
1004 };
1005
1006 index_name = Some(created_name.clone());
1007
1008 if plan.unique {
1009 if let Some(updated_table) =
1010 Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1011 {
1012 self.tables
1013 .write()
1014 .unwrap()
1015 .insert(canonical_name.clone(), Arc::clone(&updated_table));
1016 } else {
1017 self.remove_table_entry(&canonical_name);
1018 }
1019 }
1020
1021 drop(table);
1022
1023 return Ok(RuntimeStatementResult::CreateIndex {
1024 table_name: display_name,
1025 index_name,
1026 });
1027 }
1028
1029 let table_id = table.table_id();
1030
1031 if plan.unique {
1032 let snapshot = self.default_snapshot();
1034 let existing_rows =
1035 self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1036 let mut existing_keys: Vec<UniqueKey> = Vec::with_capacity(existing_rows.len());
1037 for values in existing_rows {
1038 if let Some(key) = build_composite_unique_key(&values, &column_names)? {
1039 existing_keys.push(key);
1040 }
1041 }
1042 ensure_multi_column_unique(&existing_keys, &[] as &[UniqueKey], &column_names)?;
1043
1044 let executor_entry = ExecutorMultiColumnUnique {
1045 index_name: index_name.clone(),
1046 column_indices: column_indices.clone(),
1047 };
1048
1049 let registration = self.catalog_service.register_multi_column_unique_index(
1050 table_id,
1051 &field_ids,
1052 index_name.clone(),
1053 )?;
1054
1055 match registration {
1056 MultiColumnUniqueRegistration::Created => {
1057 table.add_multi_column_unique(executor_entry);
1058 }
1059 MultiColumnUniqueRegistration::AlreadyExists {
1060 index_name: existing,
1061 } => {
1062 if plan.if_not_exists {
1063 drop(table);
1064 return Ok(RuntimeStatementResult::CreateIndex {
1065 table_name: display_name,
1066 index_name: existing,
1067 });
1068 }
1069 return Err(Error::CatalogError(format!(
1070 "Index already exists on columns '{}'",
1071 column_names.join(", ")
1072 )));
1073 }
1074 }
1075 } else {
1076 let name = index_name.clone().ok_or_else(|| {
1078 Error::InvalidArgumentError(
1079 "Multi-column CREATE INDEX requires an explicit index name".into(),
1080 )
1081 })?;
1082 let created = self.catalog_service.register_multi_column_index(
1083 table_id, &field_ids, name, false, )?;
1085
1086 if !created && !plan.if_not_exists {
1087 return Err(Error::CatalogError(format!(
1088 "Index already exists on columns '{}'",
1089 column_names.join(", ")
1090 )));
1091 }
1092 }
1093
1094 Ok(RuntimeStatementResult::CreateIndex {
1095 table_name: display_name,
1096 index_name,
1097 })
1098 }
1099
1100 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1101 let descriptor = self.catalog_service.drop_single_column_index(plan)?;
1102
1103 if let Some(descriptor) = &descriptor {
1104 self.remove_table_entry(&descriptor.canonical_table_name);
1105 }
1106
1107 Ok(descriptor)
1108 }
1109
1110 fn create_view(&self, _plan: CreateViewPlan) -> Result<()> {
1111 Err(Error::Internal(
1114 "create_view on RuntimeContext should be called through RuntimeSession".into(),
1115 ))
1116 }
1117
1118 fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1119 RuntimeContext::drop_view(self, &plan.name, plan.if_exists)
1120 }
1121}
1122
1123impl<P> RuntimeContext<P>
1124where
1125 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1126{
1127 pub(crate) fn reindex_index(
1129 &self,
1130 plan: llkv_plan::ReindexPlan,
1131 ) -> Result<RuntimeStatementResult<P>> {
1132 let canonical_index = plan.canonical_name.to_ascii_lowercase();
1133 let snapshot = self.catalog.snapshot();
1134
1135 for canonical_table_name in snapshot.table_names() {
1137 let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
1138 continue;
1139 };
1140
1141 if let Some(entry) = self
1142 .metadata
1143 .single_column_index(table_id, &canonical_index)?
1144 {
1145 let table = self.lookup_table(&canonical_table_name)?;
1147
1148 table.table.unregister_sort_index(entry.column_id)?;
1150
1151 table.table.register_sort_index(entry.column_id)?;
1153
1154 drop(table);
1155
1156 return Ok(RuntimeStatementResult::NoOp);
1157 }
1158 }
1159
1160 Err(Error::CatalogError(format!(
1162 "Index '{}' does not exist",
1163 plan.name
1164 )))
1165 }
1166}