1use crate::{
9 RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
10 TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
11};
12use llkv_column_map::store::ColumnStore;
13use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
14use llkv_plan::{
15 AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, CreateViewPlan,
16 DropIndexPlan, DropTablePlan, DropViewPlan, PlanColumnSpec, RenameTablePlan, SelectPlan,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::{BoxedPager, MemPager, Pager};
20use llkv_table::catalog::TableCatalog;
21use llkv_table::{
22 CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
23 SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
24 TriggerEventMeta, TriggerTimingMeta, ensure_multi_column_unique, ensure_single_column_unique,
25 validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod truncate;
41mod types;
42mod update;
43mod utils;
44
45pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
46
47pub struct RuntimeContext<P>
61where
62 P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64 pub(crate) pager: Arc<P>,
65 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
66 pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
67 metadata: Arc<MetadataManager<P>>,
68 constraint_service: ConstraintService<P>,
69 pub(crate) catalog_service: CatalogManager<P>,
70 pub(crate) catalog: Arc<TableCatalog>,
72 store: Arc<ColumnStore<P>>,
75 transaction_manager:
77 TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
78 txn_manager: Arc<TxnIdManager>,
79 txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
80 fallback_lookup: Option<Arc<RuntimeContext<P>>>,
84}
85
86impl<P> RuntimeContext<P>
87where
88 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
89{
90 pub fn new(pager: Arc<P>) -> Self {
91 Self::new_with_catalog_inner(pager, None)
92 }
93
94 pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
95 Self::new_with_catalog_inner(pager, Some(catalog))
96 }
97
98 fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
99 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
100
101 let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
102 let catalog = SysCatalog::new(&store);
103
104 let next_txn_id = match catalog.get_next_txn_id() {
105 Ok(Some(id)) => {
106 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
107 id
108 }
109 Ok(None) => {
110 tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
111 TXN_ID_AUTO_COMMIT + 1
112 }
113 Err(e) => {
114 tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
115 TXN_ID_AUTO_COMMIT + 1
116 }
117 };
118
119 let last_committed = match catalog.get_last_committed_txn_id() {
120 Ok(Some(id)) => {
121 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
122 id
123 }
124 Ok(None) => {
125 tracing::debug!(
126 "[CONTEXT] No persisted last_committed found, starting from default"
127 );
128 TXN_ID_AUTO_COMMIT
129 }
130 Err(e) => {
131 tracing::warn!(
132 "[CONTEXT] Failed to load last_committed: {}, using default",
133 e
134 );
135 TXN_ID_AUTO_COMMIT
136 }
137 };
138
139 let store_arc = Arc::new(store);
140 let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
141
142 let loaded_tables = match metadata.all_table_metas() {
143 Ok(metas) => {
144 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
145 metas
146 }
147 Err(e) => {
148 tracing::warn!(
149 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
150 e
151 );
152 Vec::new()
153 }
154 };
155
156 let transaction_manager =
157 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
158 let txn_manager = transaction_manager.txn_manager();
159
160 tracing::debug!(
182 "[CONTEXT] Initialized with lazy loading for {} table(s)",
183 loaded_tables.len()
184 );
185
186 let (catalog, is_shared_catalog) = match shared_catalog {
188 Some(existing) => (existing, true),
189 None => (Arc::new(TableCatalog::new()), false),
190 };
191 for (table_id, table_meta) in &loaded_tables {
192 if let Some(ref table_name) = table_meta.name
193 && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
194 {
195 match e {
196 Error::CatalogError(ref msg)
197 if is_shared_catalog && msg.contains("already exists") =>
198 {
199 tracing::debug!(
200 "[CONTEXT] Shared catalog already contains table '{}' with id={}",
201 table_name,
202 table_id
203 );
204 }
205 other => {
206 tracing::warn!(
207 "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
208 table_name,
209 table_id,
210 other
211 );
212 }
213 }
214 }
215 }
216 tracing::debug!(
217 "[CONTEXT] Catalog initialized with {} table(s)",
218 catalog.table_count()
219 );
220
221 let constraint_service =
222 ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
223 let catalog_service = CatalogManager::new(
224 Arc::clone(&metadata),
225 Arc::clone(&catalog),
226 Arc::clone(&store_arc),
227 );
228
229 if let Err(e) = catalog_service.load_types_from_catalog() {
231 tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
232 }
233
234 Self {
235 pager,
236 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
238 metadata,
239 constraint_service,
240 catalog_service,
241 catalog,
242 store: store_arc,
243 transaction_manager,
244 txn_manager,
245 txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
246 fallback_lookup: None,
247 }
248 }
249
250 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
252 Arc::clone(&self.txn_manager)
253 }
254
255 pub fn store(&self) -> &Arc<ColumnStore<P>> {
257 &self.store
258 }
259
260 pub fn with_fallback_lookup(mut self, fallback: Arc<RuntimeContext<P>>) -> Self {
264 self.fallback_lookup = Some(fallback);
265 self
266 }
267
268 pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
270 self.catalog_service.register_type(name, data_type);
271 }
272
273 pub fn drop_type(&self, name: &str) -> Result<()> {
275 self.catalog_service.drop_type(name)?;
276 Ok(())
277 }
278
279 pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> Result<()> {
281 self.metadata.ensure_next_table_id_at_least(minimum)?;
282 Ok(())
283 }
284
285 fn create_view_internal(
287 self: &Arc<Self>,
288 display_name: &str,
289 view_definition: String,
290 select_plan: SelectPlan,
291 if_not_exists: bool,
292 snapshot: TransactionSnapshot,
293 ) -> Result<()> {
294 let (normalized_display, canonical_name) = canonical_table_name(display_name)?;
295
296 if let Some(existing_id) = self.catalog.table_id(&canonical_name) {
297 let is_view = self.catalog_service.is_view(existing_id)?;
298 if is_view && if_not_exists {
299 return Ok(());
300 }
301
302 let entity = if is_view { "View" } else { "Table" };
303 return Err(Error::CatalogError(format!(
304 "{} '{}' already exists",
305 entity, normalized_display
306 )));
307 }
308
309 let execution = self.execute_select(select_plan, snapshot)?;
310 let column_specs = {
311 let schema = execution.schema();
312 if schema.fields().is_empty() {
313 return Err(Error::InvalidArgumentError(
314 "CREATE VIEW requires SELECT to project at least one column".into(),
315 ));
316 }
317
318 schema
319 .fields()
320 .iter()
321 .map(|field| {
322 PlanColumnSpec::new(
323 field.name(),
324 field.data_type().clone(),
325 field.is_nullable(),
326 )
327 })
328 .collect::<Vec<_>>()
329 };
330 drop(execution);
331
332 self.catalog_service
333 .create_view(&normalized_display, view_definition, column_specs)?;
334
335 self.dropped_tables.write().unwrap().remove(&canonical_name);
336
337 Ok(())
338 }
339
340 pub fn create_view(
345 self: &Arc<Self>,
346 display_name: &str,
347 view_definition: String,
348 select_plan: SelectPlan,
349 if_not_exists: bool,
350 ) -> Result<()> {
351 let snapshot = self.default_snapshot();
352 self.create_view_internal(
353 display_name,
354 view_definition,
355 select_plan,
356 if_not_exists,
357 snapshot,
358 )
359 }
360
361 #[allow(clippy::too_many_arguments)]
362 pub fn create_trigger(
363 self: &Arc<Self>,
364 trigger_display_name: &str,
365 canonical_trigger_name: &str,
366 table_display_name: &str,
367 canonical_table_name: &str,
368 timing: TriggerTimingMeta,
369 event: TriggerEventMeta,
370 for_each_row: bool,
371 condition: Option<String>,
372 body_sql: String,
373 if_not_exists: bool,
374 ) -> Result<bool> {
375 self.catalog_service.create_trigger(
376 trigger_display_name,
377 canonical_trigger_name,
378 table_display_name,
379 canonical_table_name,
380 timing,
381 event,
382 for_each_row,
383 condition,
384 body_sql,
385 if_not_exists,
386 )
387 }
388
389 pub fn drop_trigger(
390 self: &Arc<Self>,
391 trigger_display_name: &str,
392 canonical_trigger_name: &str,
393 table_hint_display: Option<&str>,
394 table_hint_canonical: Option<&str>,
395 if_exists: bool,
396 ) -> Result<bool> {
397 self.catalog_service.drop_trigger(
398 trigger_display_name,
399 canonical_trigger_name,
400 table_hint_display,
401 table_hint_canonical,
402 if_exists,
403 )
404 }
405
406 pub fn view_definition(&self, canonical_name: &str) -> Result<Option<String>> {
408 let Some(table_id) = self.catalog.table_id(canonical_name) else {
409 return Ok(None);
410 };
411
412 match self.metadata.table_meta(table_id)? {
413 Some(meta) => Ok(meta.view_definition),
414 None => Ok(None),
415 }
416 }
417
418 pub fn is_view(&self, table_id: TableId) -> Result<bool> {
421 self.catalog_service.is_view(table_id)
422 }
423
424 pub fn drop_view(&self, name: &str, if_exists: bool) -> Result<()> {
426 let (display_name, canonical_name) = canonical_table_name(name)?;
427
428 let table_id = match self.catalog.table_id(&canonical_name) {
429 Some(id) => id,
430 None => {
431 if if_exists {
432 return Ok(());
433 }
434 return Err(Error::CatalogError(format!(
435 "View '{}' does not exist",
436 display_name
437 )));
438 }
439 };
440
441 if !self.catalog_service.is_view(table_id)? {
442 return Err(Error::CatalogError(format!(
443 "use DROP TABLE to delete table '{}'",
444 display_name
445 )));
446 }
447
448 self.catalog_service.drop_view(&canonical_name, table_id)?;
449
450 {
451 let mut tables = self.tables.write().unwrap();
452 tables.remove(&canonical_name);
453 }
454
455 self.dropped_tables.write().unwrap().insert(canonical_name);
456
457 Ok(())
458 }
459
460 pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
462 self.catalog_service.resolve_type(data_type)
463 }
464
465 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
467 let catalog = SysCatalog::new(&self.store);
468 catalog.put_next_txn_id(next_txn_id)?;
469 let last_committed = self.txn_manager.last_committed();
470 catalog.put_last_committed_txn_id(last_committed)?;
471 tracing::debug!(
472 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
473 next_txn_id,
474 last_committed
475 );
476 Ok(())
477 }
478
479 pub fn default_snapshot(&self) -> TransactionSnapshot {
481 TransactionSnapshot {
482 txn_id: TXN_ID_AUTO_COMMIT,
483 snapshot_id: self.txn_manager.last_committed(),
484 }
485 }
486
487 pub fn table_catalog(&self) -> Arc<TableCatalog> {
489 Arc::clone(&self.catalog)
490 }
491
492 pub fn catalog(&self) -> &CatalogManager<P> {
494 &self.catalog_service
495 }
496
497 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
499 RuntimeTableHandle::new(Arc::clone(self), name)
500 }
501
502 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
543 self.catalog.table_names()
545 }
546}
547
548impl RuntimeContext<BoxedPager> {
549 pub fn create_session(self: &Arc<Self>) -> RuntimeSession {
552 tracing::debug!("[SESSION] RuntimeContext::create_session called");
553 let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
554 self,
555 )));
556 let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
557 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
558 tracing::debug!(
559 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
560 );
561 RuntimeSession::from_parts(inner, namespaces)
562 }
563}
564
565impl<P> CatalogDdl for RuntimeContext<P>
566where
567 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
568{
569 type CreateTableOutput = RuntimeStatementResult<P>;
570 type DropTableOutput = ();
571 type RenameTableOutput = ();
572 type AlterTableOutput = RuntimeStatementResult<P>;
573 type CreateIndexOutput = RuntimeStatementResult<P>;
574 type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
575
576 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
577 if plan.columns.is_empty() && plan.source.is_none() {
578 return Err(Error::InvalidArgumentError(
579 "CREATE TABLE requires explicit columns or a source".into(),
580 ));
581 }
582
583 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
584 let CreateTablePlan {
585 name: _,
586 if_not_exists,
587 or_replace,
588 columns,
589 source,
590 namespace: _,
591 foreign_keys,
592 multi_column_uniques,
593 } = plan;
594
595 tracing::trace!(
596 "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
597 display_name,
598 if_not_exists,
599 columns.len()
600 );
601 for (idx, col) in columns.iter().enumerate() {
602 tracing::trace!(
603 " plan column[{}]: name='{}' primary_key={}",
604 idx,
605 col.name,
606 col.primary_key
607 );
608 }
609 let (exists, is_dropped) = {
610 let tables = self.tables.read().unwrap();
611 let in_cache = tables.contains_key(&canonical_name);
612 let is_dropped = self
613 .dropped_tables
614 .read()
615 .unwrap()
616 .contains(&canonical_name);
617 (in_cache && !is_dropped, is_dropped)
619 };
620 tracing::trace!(
621 "DEBUG create_table (plan): exists={}, is_dropped={}",
622 exists,
623 is_dropped
624 );
625
626 if is_dropped {
628 self.remove_table_entry(&canonical_name);
629 self.dropped_tables.write().unwrap().remove(&canonical_name);
630 }
631
632 if exists {
633 if or_replace {
634 tracing::trace!(
635 "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
636 display_name
637 );
638 self.remove_table_entry(&canonical_name);
639 } else if if_not_exists {
640 tracing::trace!(
641 "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
642 display_name
643 );
644 return Ok(RuntimeStatementResult::CreateTable {
645 table_name: display_name,
646 });
647 } else {
648 return Err(Error::CatalogError(format!(
649 "Catalog Error: Table '{}' already exists",
650 display_name
651 )));
652 }
653 }
654
655 match source {
656 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
657 display_name,
658 canonical_name,
659 schema,
660 batches,
661 if_not_exists,
662 ),
663 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
664 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
665 .into(),
666 )),
667 None => self.create_table_from_columns(
668 display_name,
669 canonical_name,
670 columns,
671 foreign_keys,
672 multi_column_uniques,
673 if_not_exists,
674 ),
675 }
676 }
677
678 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
679 let DropTablePlan { name, if_exists } = plan;
680 let (display_name, canonical_name) = canonical_table_name(&name)?;
681
682 tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
683
684 if self.is_table_marked_dropped(&canonical_name) {
685 tracing::debug!(
686 "drop_table: table '{}' already marked dropped; if_exists={}",
687 canonical_name,
688 if_exists
689 );
690 return if if_exists {
691 Ok(())
692 } else {
693 Err(Error::CatalogError(format!(
694 "Catalog Error: Table '{}' does not exist",
695 display_name
696 )))
697 };
698 }
699
700 let cached_entry = {
701 let tables = self.tables.read().unwrap();
702 tracing::debug!("drop_table: cache contains {} tables", tables.len());
703 tables.get(&canonical_name).cloned()
704 };
705
706 let table_entry = match cached_entry {
707 Some(entry) => entry,
708 None => {
709 tracing::debug!(
710 "drop_table: table '{}' not cached; attempting reload",
711 canonical_name
712 );
713
714 if self.catalog.table_id(&canonical_name).is_none() {
715 tracing::debug!(
716 "drop_table: no catalog entry for '{}'; if_exists={}",
717 canonical_name,
718 if_exists
719 );
720 if if_exists {
721 return Ok(());
722 }
723 return Err(Error::CatalogError(format!(
724 "Catalog Error: Table '{}' does not exist",
725 display_name
726 )));
727 }
728
729 match self.lookup_table(&canonical_name) {
730 Ok(entry) => entry,
731 Err(err) => {
732 tracing::warn!(
733 "drop_table: failed to reload table '{}': {:?}",
734 canonical_name,
735 err
736 );
737 if if_exists {
738 return Ok(());
739 }
740 return Err(err);
741 }
742 }
743 }
744 };
745
746 let column_field_ids = table_entry
747 .schema
748 .columns
749 .iter()
750 .map(|col| col.field_id)
751 .collect::<Vec<_>>();
752 let table_id = table_entry.table.table_id();
753
754 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
755
756 for detail in referencing {
757 if detail.referencing_table_canonical == canonical_name {
758 continue;
759 }
760
761 if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
762 continue;
763 }
764
765 return Err(Error::CatalogError(format!(
766 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
767 detail.referencing_table_display
768 )));
769 }
770
771 self.catalog_service
772 .drop_table(&canonical_name, table_id, &column_field_ids)?;
773 tracing::debug!(
774 "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
775 canonical_name,
776 table_id
777 );
778
779 self.remove_table_entry(&canonical_name);
780 self.dropped_tables
781 .write()
782 .unwrap()
783 .insert(canonical_name.clone());
784 Ok(())
785 }
786
787 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
788 let RenameTablePlan {
789 current_name,
790 new_name,
791 if_exists,
792 } = plan;
793
794 let (current_display, current_canonical) = canonical_table_name(¤t_name)?;
795 let (new_display, new_canonical) = canonical_table_name(&new_name)?;
796
797 if current_canonical == new_canonical && current_display == new_display {
798 return Ok(());
799 }
800
801 if self.is_table_marked_dropped(¤t_canonical) {
802 if if_exists {
803 return Ok(());
804 }
805 return Err(Error::CatalogError(format!(
806 "Catalog Error: Table '{}' does not exist",
807 current_display
808 )));
809 }
810
811 let table_id = match self
812 .catalog
813 .table_id(¤t_canonical)
814 .or_else(|| self.catalog.table_id(¤t_display))
815 {
816 Some(id) => id,
817 None => {
818 if if_exists {
819 return Ok(());
820 }
821 return Err(Error::CatalogError(format!(
822 "Catalog Error: Table '{}' does not exist",
823 current_display
824 )));
825 }
826 };
827
828 if !current_display.eq_ignore_ascii_case(&new_display)
829 && (self.catalog.table_id(&new_canonical).is_some()
830 || self.catalog.table_id(&new_display).is_some())
831 {
832 return Err(Error::CatalogError(format!(
833 "Catalog Error: Table '{}' already exists",
834 new_display
835 )));
836 }
837
838 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
839 if !referencing.is_empty() {
840 return Err(Error::CatalogError(format!(
841 "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
842 current_display
843 )));
844 }
845
846 self.catalog_service
847 .rename_table(table_id, ¤t_display, &new_display)?;
848
849 let mut tables = self.tables.write().unwrap();
850 if let Some(table) = tables.remove(¤t_canonical) {
851 tables.insert(new_canonical.clone(), table);
852 }
853
854 let mut dropped = self.dropped_tables.write().unwrap();
855 dropped.remove(¤t_canonical);
856 dropped.remove(&new_canonical);
857
858 Ok(())
859 }
860
861 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
862 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
863
864 let view = match self.catalog_service.table_view(&canonical_table) {
865 Ok(view) => view,
866 Err(err) if plan.if_exists && is_table_missing_error(&err) => {
867 return Ok(RuntimeStatementResult::NoOp);
868 }
869 Err(err) => return Err(err),
870 };
871
872 let table_meta = match view.table_meta.as_ref() {
873 Some(meta) => meta,
874 None => {
875 if plan.if_exists {
876 return Ok(RuntimeStatementResult::NoOp);
877 }
878 return Err(Error::Internal("table metadata missing".into()));
879 }
880 };
881
882 let table_id = table_meta.table_id;
883
884 validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
885
886 match &plan.operation {
887 llkv_plan::AlterTableOperation::RenameColumn {
888 old_column_name,
889 new_column_name,
890 } => {
891 self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
892 }
893 llkv_plan::AlterTableOperation::SetColumnDataType {
894 column_name,
895 new_data_type,
896 } => {
897 self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
898 }
899 llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
900 self.drop_column(&plan.table_name, column_name)?;
901 }
902 }
903
904 Ok(RuntimeStatementResult::NoOp)
905 }
906
907 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
908 if plan.columns.is_empty() {
909 return Err(Error::InvalidArgumentError(
910 "CREATE INDEX requires at least one column".into(),
911 ));
912 }
913
914 let mut index_name = plan.name.clone();
915 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
916 let table = self.lookup_table(&canonical_name)?;
917
918 let mut column_indices = Vec::with_capacity(plan.columns.len());
919 let mut field_ids = Vec::with_capacity(plan.columns.len());
920 let mut column_names = Vec::with_capacity(plan.columns.len());
921 let mut seen_column_indices = FxHashSet::default();
922
923 for column_plan in &plan.columns {
924 let normalized = column_plan.name.to_ascii_lowercase();
925 let col_idx = table
926 .schema
927 .lookup
928 .get(&normalized)
929 .copied()
930 .ok_or_else(|| {
931 Error::InvalidArgumentError(format!(
932 "column '{}' does not exist in table '{}'",
933 column_plan.name, display_name
934 ))
935 })?;
936 if !seen_column_indices.insert(col_idx) {
937 return Err(Error::InvalidArgumentError(format!(
938 "duplicate column '{}' in CREATE INDEX",
939 column_plan.name
940 )));
941 }
942
943 let column = &table.schema.columns[col_idx];
944 column_indices.push(col_idx);
945 field_ids.push(column.field_id);
946 column_names.push(column.name.clone());
947 }
948
949 if plan.columns.len() == 1 {
950 let field_id = field_ids[0];
951 let column_name = column_names[0].clone();
952 let column_plan = &plan.columns[0];
953
954 if plan.unique {
955 let snapshot = self.default_snapshot();
956 let existing_values =
957 self.scan_column_values(table.as_ref(), field_id, snapshot)?;
958 ensure_single_column_unique(&existing_values, &[], &column_name)?;
959 }
960
961 let registration = self.catalog_service.register_single_column_index(
962 &display_name,
963 &canonical_name,
964 &table.table,
965 field_id,
966 &column_name,
967 plan.name.clone(),
968 plan.unique,
969 column_plan.ascending,
970 column_plan.nulls_first,
971 plan.if_not_exists,
972 )?;
973
974 let created_name = match registration {
975 SingleColumnIndexRegistration::Created { index_name } => index_name,
976 SingleColumnIndexRegistration::AlreadyExists { index_name } => {
977 drop(table);
978 return Ok(RuntimeStatementResult::CreateIndex {
979 table_name: display_name,
980 index_name: Some(index_name),
981 });
982 }
983 };
984
985 index_name = Some(created_name.clone());
986
987 if plan.unique {
988 if let Some(updated_table) =
989 Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
990 {
991 self.tables
992 .write()
993 .unwrap()
994 .insert(canonical_name.clone(), Arc::clone(&updated_table));
995 } else {
996 self.remove_table_entry(&canonical_name);
997 }
998 }
999
1000 drop(table);
1001
1002 return Ok(RuntimeStatementResult::CreateIndex {
1003 table_name: display_name,
1004 index_name,
1005 });
1006 }
1007
1008 let table_id = table.table.table_id();
1009
1010 if plan.unique {
1011 let snapshot = self.default_snapshot();
1013 let existing_rows =
1014 self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1015 ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1016
1017 let executor_entry = ExecutorMultiColumnUnique {
1018 index_name: index_name.clone(),
1019 column_indices: column_indices.clone(),
1020 };
1021
1022 let registration = self.catalog_service.register_multi_column_unique_index(
1023 table_id,
1024 &field_ids,
1025 index_name.clone(),
1026 )?;
1027
1028 match registration {
1029 MultiColumnUniqueRegistration::Created => {
1030 table.add_multi_column_unique(executor_entry);
1031 }
1032 MultiColumnUniqueRegistration::AlreadyExists {
1033 index_name: existing,
1034 } => {
1035 if plan.if_not_exists {
1036 drop(table);
1037 return Ok(RuntimeStatementResult::CreateIndex {
1038 table_name: display_name,
1039 index_name: existing,
1040 });
1041 }
1042 return Err(Error::CatalogError(format!(
1043 "Index already exists on columns '{}'",
1044 column_names.join(", ")
1045 )));
1046 }
1047 }
1048 } else {
1049 let name = index_name.clone().ok_or_else(|| {
1051 Error::InvalidArgumentError(
1052 "Multi-column CREATE INDEX requires an explicit index name".into(),
1053 )
1054 })?;
1055 let created = self.catalog_service.register_multi_column_index(
1056 table_id, &field_ids, name, false, )?;
1058
1059 if !created && !plan.if_not_exists {
1060 return Err(Error::CatalogError(format!(
1061 "Index already exists on columns '{}'",
1062 column_names.join(", ")
1063 )));
1064 }
1065 }
1066
1067 Ok(RuntimeStatementResult::CreateIndex {
1068 table_name: display_name,
1069 index_name,
1070 })
1071 }
1072
1073 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1074 let descriptor = self.catalog_service.drop_single_column_index(plan)?;
1075
1076 if let Some(descriptor) = &descriptor {
1077 self.remove_table_entry(&descriptor.canonical_table_name);
1078 }
1079
1080 Ok(descriptor)
1081 }
1082
1083 fn create_view(&self, _plan: CreateViewPlan) -> Result<()> {
1084 Err(Error::Internal(
1087 "create_view on RuntimeContext should be called through RuntimeSession".into(),
1088 ))
1089 }
1090
1091 fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1092 RuntimeContext::drop_view(self, &plan.name, plan.if_exists)
1093 }
1094}
1095
1096impl<P> RuntimeContext<P>
1097where
1098 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1099{
1100 pub(crate) fn reindex_index(
1102 &self,
1103 plan: llkv_plan::ReindexPlan,
1104 ) -> Result<RuntimeStatementResult<P>> {
1105 let canonical_index = plan.canonical_name.to_ascii_lowercase();
1106 let snapshot = self.catalog.snapshot();
1107
1108 for canonical_table_name in snapshot.table_names() {
1110 let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
1111 continue;
1112 };
1113
1114 if let Some(entry) = self
1115 .metadata
1116 .single_column_index(table_id, &canonical_index)?
1117 {
1118 let table = self.lookup_table(&canonical_table_name)?;
1120
1121 table.table.unregister_sort_index(entry.column_id)?;
1123
1124 table.table.register_sort_index(entry.column_id)?;
1126
1127 drop(table);
1128
1129 return Ok(RuntimeStatementResult::NoOp);
1130 }
1131 }
1132
1133 Err(Error::CatalogError(format!(
1135 "Index '{}' does not exist",
1136 plan.name
1137 )))
1138 }
1139}