llkv_runtime/runtime_context/
mod.rs

1//! Runtime context submodules
2//!
3//! This module contains the RuntimeContext implementation split into logical submodules:
4//! - `query_translation`: String-based expression to field-ID-based expression translation
5//! - `types`: Helper types (PreparedAssignmentValue, TableConstraintContext)
6//! - `provider`: ContextProvider for TableProvider trait
7
8use crate::{
9    RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
10    TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
11};
12use llkv_column_map::store::ColumnStore;
13use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
14use llkv_plan::{
15    AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, CreateViewPlan,
16    DropIndexPlan, DropTablePlan, DropViewPlan, PlanColumnSpec, RenameTablePlan, SelectPlan,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::{BoxedPager, MemPager, Pager};
20use llkv_table::catalog::TableCatalog;
21use llkv_table::{
22    CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
23    SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
24    TriggerEventMeta, TriggerTimingMeta, ensure_multi_column_unique, ensure_single_column_unique,
25    validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod truncate;
41mod types;
42mod update;
43mod utils;
44
45pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
46
47/// In-memory execution context shared by plan-based queries.
48///
49/// Important: "lazy loading" here refers to *table metadata only* (schema,
50/// executor-side column descriptors, and a small next-row-id counter). We do
51/// NOT eagerly load or materialize the table's row data into memory. All
52/// row/column data remains on the ColumnStore and is streamed in chunks during
53/// query execution. This keeps the memory footprint low even for very large
54/// tables.
55///
56/// Typical resource usage:
57/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
58/// - ExecutorTable struct: small (handles + counters)
59/// - Actual table rows: streamed from disk in chunks (never fully resident)
60pub struct RuntimeContext<P>
61where
62    P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64    pub(crate) pager: Arc<P>,
65    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
66    pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
67    metadata: Arc<MetadataManager<P>>,
68    constraint_service: ConstraintService<P>,
69    pub(crate) catalog_service: CatalogManager<P>,
70    // Centralized catalog for table/field name resolution
71    pub(crate) catalog: Arc<TableCatalog>,
72    // Shared column store for all tables in this context
73    // This ensures catalog state is synchronized across all tables
74    store: Arc<ColumnStore<P>>,
75    // Transaction manager for session-based transactions
76    transaction_manager:
77        TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
78    txn_manager: Arc<TxnIdManager>,
79    txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
80    // Optional fallback context for cross-namespace table lookups. Temporary namespaces use this
81    // to access persistent tables while maintaining separate storage. The fallback shares the
82    // same pager type as the primary context so executor tables can be reused without conversion.
83    fallback_lookup: Option<Arc<RuntimeContext<P>>>,
84}
85
86impl<P> RuntimeContext<P>
87where
88    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
89{
90    pub fn new(pager: Arc<P>) -> Self {
91        Self::new_with_catalog_inner(pager, None)
92    }
93
94    pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
95        Self::new_with_catalog_inner(pager, Some(catalog))
96    }
97
98    fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
99        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
100
101        let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
102        let catalog = SysCatalog::new(&store);
103
104        let next_txn_id = match catalog.get_next_txn_id() {
105            Ok(Some(id)) => {
106                tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
107                id
108            }
109            Ok(None) => {
110                tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
111                TXN_ID_AUTO_COMMIT + 1
112            }
113            Err(e) => {
114                tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
115                TXN_ID_AUTO_COMMIT + 1
116            }
117        };
118
119        let last_committed = match catalog.get_last_committed_txn_id() {
120            Ok(Some(id)) => {
121                tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
122                id
123            }
124            Ok(None) => {
125                tracing::debug!(
126                    "[CONTEXT] No persisted last_committed found, starting from default"
127                );
128                TXN_ID_AUTO_COMMIT
129            }
130            Err(e) => {
131                tracing::warn!(
132                    "[CONTEXT] Failed to load last_committed: {}, using default",
133                    e
134                );
135                TXN_ID_AUTO_COMMIT
136            }
137        };
138
139        let store_arc = Arc::new(store);
140        let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
141
142        let loaded_tables = match metadata.all_table_metas() {
143            Ok(metas) => {
144                tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
145                metas
146            }
147            Err(e) => {
148                tracing::warn!(
149                    "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
150                    e
151                );
152                Vec::new()
153            }
154        };
155
156        let transaction_manager =
157            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
158        let txn_manager = transaction_manager.txn_manager();
159
160        // LAZY LOADING: Only load table metadata at first access. We intentionally
161        // avoid loading any row/column data into memory here. The executor
162        // performs streaming reads from the ColumnStore when a query runs, so
163        // large tables are never fully materialized.
164        //
165        // Benefits of this approach:
166        // - Instant database open (no upfront I/O for table data)
167        // - Lower memory footprint (only metadata cached)
168        // - Natural parallelism: if multiple threads request different tables
169        //   concurrently, those tables will be loaded concurrently by the
170        //   caller threads (no global preload required).
171        //
172        // Future Optimizations (if profiling shows need):
173        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
174        // 2. Background preload of catalog entries after startup
175        // 3. LRU-based eviction for extremely large deployments
176        // 4. Cache compact representations of schemas to reduce per-table RAM
177        //
178        // Note: `loaded_tables` holds catalog metadata that helped us discover
179        // which tables exist; we discard it here because metadata will be
180        // fetched on-demand during lazy loads.
181        tracing::debug!(
182            "[CONTEXT] Initialized with lazy loading for {} table(s)",
183            loaded_tables.len()
184        );
185
186        // Initialize catalog and populate with existing tables
187        let (catalog, is_shared_catalog) = match shared_catalog {
188            Some(existing) => (existing, true),
189            None => (Arc::new(TableCatalog::new()), false),
190        };
191        for (table_id, table_meta) in &loaded_tables {
192            if let Some(ref table_name) = table_meta.name
193                && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
194            {
195                match e {
196                    Error::CatalogError(ref msg)
197                        if is_shared_catalog && msg.contains("already exists") =>
198                    {
199                        tracing::debug!(
200                            "[CONTEXT] Shared catalog already contains table '{}' with id={}",
201                            table_name,
202                            table_id
203                        );
204                    }
205                    other => {
206                        tracing::warn!(
207                            "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
208                            table_name,
209                            table_id,
210                            other
211                        );
212                    }
213                }
214            }
215        }
216        tracing::debug!(
217            "[CONTEXT] Catalog initialized with {} table(s)",
218            catalog.table_count()
219        );
220
221        let constraint_service =
222            ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
223        let catalog_service = CatalogManager::new(
224            Arc::clone(&metadata),
225            Arc::clone(&catalog),
226            Arc::clone(&store_arc),
227        );
228
229        // Load custom types from SysCatalog into catalog_service
230        if let Err(e) = catalog_service.load_types_from_catalog() {
231            tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
232        }
233
234        Self {
235            pager,
236            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
237            dropped_tables: RwLock::new(FxHashSet::default()),
238            metadata,
239            constraint_service,
240            catalog_service,
241            catalog,
242            store: store_arc,
243            transaction_manager,
244            txn_manager,
245            txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
246            fallback_lookup: None,
247        }
248    }
249
250    /// Return the transaction ID manager shared with sessions.
251    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
252        Arc::clone(&self.txn_manager)
253    }
254
255    /// Return the column store for catalog operations.
256    pub fn store(&self) -> &Arc<ColumnStore<P>> {
257        &self.store
258    }
259
260    /// Set a fallback context for cross-pager table lookups. The fallback uses BoxedPager
261    /// to enable access across different underlying pager types (e.g., temporary MemPager
262    /// can fall back to persistent disk pager).
263    pub fn with_fallback_lookup(mut self, fallback: Arc<RuntimeContext<P>>) -> Self {
264        self.fallback_lookup = Some(fallback);
265        self
266    }
267
268    /// Register a custom type alias (CREATE TYPE/DOMAIN).
269    pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
270        self.catalog_service.register_type(name, data_type);
271    }
272
273    /// Drop a custom type alias (DROP TYPE/DOMAIN).
274    pub fn drop_type(&self, name: &str) -> Result<()> {
275        self.catalog_service.drop_type(name)?;
276        Ok(())
277    }
278
279    /// Ensure the catalog's next_table_id counter is at least `minimum`.
280    pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> Result<()> {
281        self.metadata.ensure_next_table_id_at_least(minimum)?;
282        Ok(())
283    }
284
285    /// Internal helper for creating a view that can be called from CatalogDdl trait implementation.
286    fn create_view_internal(
287        self: &Arc<Self>,
288        display_name: &str,
289        view_definition: String,
290        select_plan: SelectPlan,
291        if_not_exists: bool,
292        snapshot: TransactionSnapshot,
293    ) -> Result<()> {
294        let (normalized_display, canonical_name) = canonical_table_name(display_name)?;
295
296        if let Some(existing_id) = self.catalog.table_id(&canonical_name) {
297            let is_view = self.catalog_service.is_view(existing_id)?;
298            if is_view && if_not_exists {
299                return Ok(());
300            }
301
302            let entity = if is_view { "View" } else { "Table" };
303            return Err(Error::CatalogError(format!(
304                "{} '{}' already exists",
305                entity, normalized_display
306            )));
307        }
308
309        let execution = self.execute_select(select_plan, snapshot)?;
310        let column_specs = {
311            let schema = execution.schema();
312            if schema.fields().is_empty() {
313                return Err(Error::InvalidArgumentError(
314                    "CREATE VIEW requires SELECT to project at least one column".into(),
315                ));
316            }
317
318            schema
319                .fields()
320                .iter()
321                .map(|field| {
322                    PlanColumnSpec::new(
323                        field.name(),
324                        field.data_type().clone(),
325                        field.is_nullable(),
326                    )
327                })
328                .collect::<Vec<_>>()
329        };
330        drop(execution);
331
332        self.catalog_service
333            .create_view(&normalized_display, view_definition, column_specs)?;
334
335        self.dropped_tables.write().unwrap().remove(&canonical_name);
336
337        Ok(())
338    }
339
340    /// Create a view by executing its SELECT definition to derive projected columns
341    /// and persisting the metadata into the catalog. The view is registered as a
342    /// catalog entry with column names so subsequent binding can succeed without
343    /// reparsing the stored SQL in higher layers.
344    pub fn create_view(
345        self: &Arc<Self>,
346        display_name: &str,
347        view_definition: String,
348        select_plan: SelectPlan,
349        if_not_exists: bool,
350    ) -> Result<()> {
351        let snapshot = self.default_snapshot();
352        self.create_view_internal(
353            display_name,
354            view_definition,
355            select_plan,
356            if_not_exists,
357            snapshot,
358        )
359    }
360
361    #[allow(clippy::too_many_arguments)]
362    pub fn create_trigger(
363        self: &Arc<Self>,
364        trigger_display_name: &str,
365        canonical_trigger_name: &str,
366        table_display_name: &str,
367        canonical_table_name: &str,
368        timing: TriggerTimingMeta,
369        event: TriggerEventMeta,
370        for_each_row: bool,
371        condition: Option<String>,
372        body_sql: String,
373        if_not_exists: bool,
374    ) -> Result<bool> {
375        self.catalog_service.create_trigger(
376            trigger_display_name,
377            canonical_trigger_name,
378            table_display_name,
379            canonical_table_name,
380            timing,
381            event,
382            for_each_row,
383            condition,
384            body_sql,
385            if_not_exists,
386        )
387    }
388
389    pub fn drop_trigger(
390        self: &Arc<Self>,
391        trigger_display_name: &str,
392        canonical_trigger_name: &str,
393        table_hint_display: Option<&str>,
394        table_hint_canonical: Option<&str>,
395        if_exists: bool,
396    ) -> Result<bool> {
397        self.catalog_service.drop_trigger(
398            trigger_display_name,
399            canonical_trigger_name,
400            table_hint_display,
401            table_hint_canonical,
402            if_exists,
403        )
404    }
405
406    /// Return the stored SQL definition for a view, if it exists.
407    pub fn view_definition(&self, canonical_name: &str) -> Result<Option<String>> {
408        let Some(table_id) = self.catalog.table_id(canonical_name) else {
409            return Ok(None);
410        };
411
412        match self.metadata.table_meta(table_id)? {
413            Some(meta) => Ok(meta.view_definition),
414            None => Ok(None),
415        }
416    }
417
418    /// Check if a table is actually a view by looking at its metadata.
419    /// Returns true if the table exists and has a view_definition.
420    pub fn is_view(&self, table_id: TableId) -> Result<bool> {
421        self.catalog_service.is_view(table_id)
422    }
423
424    /// Drop a view, ignoring missing views when `if_exists` is true.
425    pub fn drop_view(&self, name: &str, if_exists: bool) -> Result<()> {
426        let (display_name, canonical_name) = canonical_table_name(name)?;
427
428        let table_id = match self.catalog.table_id(&canonical_name) {
429            Some(id) => id,
430            None => {
431                if if_exists {
432                    return Ok(());
433                }
434                return Err(Error::CatalogError(format!(
435                    "View '{}' does not exist",
436                    display_name
437                )));
438            }
439        };
440
441        if !self.catalog_service.is_view(table_id)? {
442            return Err(Error::CatalogError(format!(
443                "use DROP TABLE to delete table '{}'",
444                display_name
445            )));
446        }
447
448        self.catalog_service.drop_view(&canonical_name, table_id)?;
449
450        {
451            let mut tables = self.tables.write().unwrap();
452            tables.remove(&canonical_name);
453        }
454
455        self.dropped_tables.write().unwrap().insert(canonical_name);
456
457        Ok(())
458    }
459
460    /// Resolve a type name to its base DataType, recursively following aliases.
461    pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
462        self.catalog_service.resolve_type(data_type)
463    }
464
465    /// Persist the next_txn_id to the catalog.
466    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
467        let catalog = SysCatalog::new(&self.store);
468        catalog.put_next_txn_id(next_txn_id)?;
469        let last_committed = self.txn_manager.last_committed();
470        catalog.put_last_committed_txn_id(last_committed)?;
471        tracing::debug!(
472            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
473            next_txn_id,
474            last_committed
475        );
476        Ok(())
477    }
478
479    /// Construct the default snapshot for auto-commit operations.
480    pub fn default_snapshot(&self) -> TransactionSnapshot {
481        TransactionSnapshot {
482            txn_id: TXN_ID_AUTO_COMMIT,
483            snapshot_id: self.txn_manager.last_committed(),
484        }
485    }
486
487    /// Get the table catalog for schema and table name management.
488    pub fn table_catalog(&self) -> Arc<TableCatalog> {
489        Arc::clone(&self.catalog)
490    }
491
492    /// Access the catalog manager for type registry, view management, and metadata operations.
493    pub fn catalog(&self) -> &CatalogManager<P> {
494        &self.catalog_service
495    }
496
497    /// Get a handle to an existing table by name.
498    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
499        RuntimeTableHandle::new(Arc::clone(self), name)
500    }
501
502    /// Create a table with explicit column definitions - Programmatic API.
503    ///
504    /// This is a **convenience method** for programmatically creating tables with explicit
505    /// column definitions. Use this when you're writing Rust code that needs to create tables
506    /// directly, rather than executing SQL.
507    ///
508    /// # When to use `create_table` vs [`CatalogDdl::create_table`]
509    ///
510    /// **Use `create_table`:**
511    /// - You're writing Rust code (not parsing SQL)
512    /// - You have explicit column definitions
513    /// - You want a simple, ergonomic API: `ctx.create_table("users", vec![...])`
514    /// - You want a `RuntimeTableHandle` to work with immediately
515    /// - **Does NOT support**: CREATE TABLE AS SELECT, foreign keys, namespaces
516    ///
517    /// **Use [`CatalogDdl::create_table`]:**
518    /// - You're implementing SQL execution (already have a parsed `CreateTablePlan`)
519    /// - You need CREATE TABLE AS SELECT support
520    /// - You need foreign key constraints
521    /// - You need namespace support (temporary tables)
522    /// - You need IF NOT EXISTS / OR REPLACE semantics
523    /// - You're working within the transaction system
524    ///
525    /// # Usage Comparison
526    ///
527    /// **Programmatic API** (this method):
528    /// - `ctx.create_table("users", vec![("id", DataType::Int64, false), ...])?`
529    /// - Returns `RuntimeTableHandle` for immediate use
530    /// - Simple, ergonomic, no plan construction needed
531    ///
532    /// **SQL execution API** ([`CatalogDdl::create_table`]):
533    /// - Construct a `CreateTablePlan` with all SQL features
534    /// - Delegates to the [`CatalogDdl`] trait for catalog-aware creation
535    /// - Support for CTAS, foreign keys, namespaces, transactions
536    /// - Returns `RuntimeStatementResult` for consistency with other SQL operations
537    ///
538    /// # Returns
539    /// Returns a [`RuntimeTableHandle`] that provides immediate access to the table.
540    /// Use this for further programmatic operations on the table.
541    /// Returns all table names currently registered in the catalog.
542    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
543        // Use catalog for table names (single source of truth)
544        self.catalog.table_names()
545    }
546}
547
548impl RuntimeContext<BoxedPager> {
549    /// Create a new session for transaction management.
550    /// Each session can have its own independent transaction.
551    pub fn create_session(self: &Arc<Self>) -> RuntimeSession {
552        tracing::debug!("[SESSION] RuntimeContext::create_session called");
553        let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
554            self,
555        )));
556        let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
557        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
558        tracing::debug!(
559            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
560        );
561        RuntimeSession::from_parts(inner, namespaces)
562    }
563}
564
565impl<P> CatalogDdl for RuntimeContext<P>
566where
567    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
568{
569    type CreateTableOutput = RuntimeStatementResult<P>;
570    type DropTableOutput = ();
571    type RenameTableOutput = ();
572    type AlterTableOutput = RuntimeStatementResult<P>;
573    type CreateIndexOutput = RuntimeStatementResult<P>;
574    type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
575
576    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
577        if plan.columns.is_empty() && plan.source.is_none() {
578            return Err(Error::InvalidArgumentError(
579                "CREATE TABLE requires explicit columns or a source".into(),
580            ));
581        }
582
583        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
584        let CreateTablePlan {
585            name: _,
586            if_not_exists,
587            or_replace,
588            columns,
589            source,
590            namespace: _,
591            foreign_keys,
592            multi_column_uniques,
593        } = plan;
594
595        tracing::trace!(
596            "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
597            display_name,
598            if_not_exists,
599            columns.len()
600        );
601        for (idx, col) in columns.iter().enumerate() {
602            tracing::trace!(
603                "  plan column[{}]: name='{}' primary_key={}",
604                idx,
605                col.name,
606                col.primary_key
607            );
608        }
609        let (exists, is_dropped) = {
610            let tables = self.tables.read().unwrap();
611            let in_cache = tables.contains_key(&canonical_name);
612            let is_dropped = self
613                .dropped_tables
614                .read()
615                .unwrap()
616                .contains(&canonical_name);
617            // Table exists if it's in cache and NOT marked as dropped
618            (in_cache && !is_dropped, is_dropped)
619        };
620        tracing::trace!(
621            "DEBUG create_table (plan): exists={}, is_dropped={}",
622            exists,
623            is_dropped
624        );
625
626        // If table was dropped, remove it from cache before creating new one
627        if is_dropped {
628            self.remove_table_entry(&canonical_name);
629            self.dropped_tables.write().unwrap().remove(&canonical_name);
630        }
631
632        if exists {
633            if or_replace {
634                tracing::trace!(
635                    "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
636                    display_name
637                );
638                self.remove_table_entry(&canonical_name);
639            } else if if_not_exists {
640                tracing::trace!(
641                    "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
642                    display_name
643                );
644                return Ok(RuntimeStatementResult::CreateTable {
645                    table_name: display_name,
646                });
647            } else {
648                return Err(Error::CatalogError(format!(
649                    "Catalog Error: Table '{}' already exists",
650                    display_name
651                )));
652            }
653        }
654
655        match source {
656            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
657                display_name,
658                canonical_name,
659                schema,
660                batches,
661                if_not_exists,
662            ),
663            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
664                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
665                    .into(),
666            )),
667            None => self.create_table_from_columns(
668                display_name,
669                canonical_name,
670                columns,
671                foreign_keys,
672                multi_column_uniques,
673                if_not_exists,
674            ),
675        }
676    }
677
678    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
679        let DropTablePlan { name, if_exists } = plan;
680        let (display_name, canonical_name) = canonical_table_name(&name)?;
681
682        tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
683
684        if self.is_table_marked_dropped(&canonical_name) {
685            tracing::debug!(
686                "drop_table: table '{}' already marked dropped; if_exists={}",
687                canonical_name,
688                if_exists
689            );
690            return if if_exists {
691                Ok(())
692            } else {
693                Err(Error::CatalogError(format!(
694                    "Catalog Error: Table '{}' does not exist",
695                    display_name
696                )))
697            };
698        }
699
700        let cached_entry = {
701            let tables = self.tables.read().unwrap();
702            tracing::debug!("drop_table: cache contains {} tables", tables.len());
703            tables.get(&canonical_name).cloned()
704        };
705
706        let table_entry = match cached_entry {
707            Some(entry) => entry,
708            None => {
709                tracing::debug!(
710                    "drop_table: table '{}' not cached; attempting reload",
711                    canonical_name
712                );
713
714                if self.catalog.table_id(&canonical_name).is_none() {
715                    tracing::debug!(
716                        "drop_table: no catalog entry for '{}'; if_exists={}",
717                        canonical_name,
718                        if_exists
719                    );
720                    if if_exists {
721                        return Ok(());
722                    }
723                    return Err(Error::CatalogError(format!(
724                        "Catalog Error: Table '{}' does not exist",
725                        display_name
726                    )));
727                }
728
729                match self.lookup_table(&canonical_name) {
730                    Ok(entry) => entry,
731                    Err(err) => {
732                        tracing::warn!(
733                            "drop_table: failed to reload table '{}': {:?}",
734                            canonical_name,
735                            err
736                        );
737                        if if_exists {
738                            return Ok(());
739                        }
740                        return Err(err);
741                    }
742                }
743            }
744        };
745
746        let column_field_ids = table_entry
747            .schema
748            .columns
749            .iter()
750            .map(|col| col.field_id)
751            .collect::<Vec<_>>();
752        let table_id = table_entry.table.table_id();
753
754        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
755
756        for detail in referencing {
757            if detail.referencing_table_canonical == canonical_name {
758                continue;
759            }
760
761            if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
762                continue;
763            }
764
765            return Err(Error::CatalogError(format!(
766                "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
767                detail.referencing_table_display
768            )));
769        }
770
771        self.catalog_service
772            .drop_table(&canonical_name, table_id, &column_field_ids)?;
773        tracing::debug!(
774            "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
775            canonical_name,
776            table_id
777        );
778
779        self.remove_table_entry(&canonical_name);
780        self.dropped_tables
781            .write()
782            .unwrap()
783            .insert(canonical_name.clone());
784        Ok(())
785    }
786
787    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
788        let RenameTablePlan {
789            current_name,
790            new_name,
791            if_exists,
792        } = plan;
793
794        let (current_display, current_canonical) = canonical_table_name(&current_name)?;
795        let (new_display, new_canonical) = canonical_table_name(&new_name)?;
796
797        if current_canonical == new_canonical && current_display == new_display {
798            return Ok(());
799        }
800
801        if self.is_table_marked_dropped(&current_canonical) {
802            if if_exists {
803                return Ok(());
804            }
805            return Err(Error::CatalogError(format!(
806                "Catalog Error: Table '{}' does not exist",
807                current_display
808            )));
809        }
810
811        let table_id = match self
812            .catalog
813            .table_id(&current_canonical)
814            .or_else(|| self.catalog.table_id(&current_display))
815        {
816            Some(id) => id,
817            None => {
818                if if_exists {
819                    return Ok(());
820                }
821                return Err(Error::CatalogError(format!(
822                    "Catalog Error: Table '{}' does not exist",
823                    current_display
824                )));
825            }
826        };
827
828        if !current_display.eq_ignore_ascii_case(&new_display)
829            && (self.catalog.table_id(&new_canonical).is_some()
830                || self.catalog.table_id(&new_display).is_some())
831        {
832            return Err(Error::CatalogError(format!(
833                "Catalog Error: Table '{}' already exists",
834                new_display
835            )));
836        }
837
838        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
839        if !referencing.is_empty() {
840            return Err(Error::CatalogError(format!(
841                "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
842                current_display
843            )));
844        }
845
846        self.catalog_service
847            .rename_table(table_id, &current_display, &new_display)?;
848
849        let mut tables = self.tables.write().unwrap();
850        if let Some(table) = tables.remove(&current_canonical) {
851            tables.insert(new_canonical.clone(), table);
852        }
853
854        let mut dropped = self.dropped_tables.write().unwrap();
855        dropped.remove(&current_canonical);
856        dropped.remove(&new_canonical);
857
858        Ok(())
859    }
860
861    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
862        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
863
864        let view = match self.catalog_service.table_view(&canonical_table) {
865            Ok(view) => view,
866            Err(err) if plan.if_exists && is_table_missing_error(&err) => {
867                return Ok(RuntimeStatementResult::NoOp);
868            }
869            Err(err) => return Err(err),
870        };
871
872        let table_meta = match view.table_meta.as_ref() {
873            Some(meta) => meta,
874            None => {
875                if plan.if_exists {
876                    return Ok(RuntimeStatementResult::NoOp);
877                }
878                return Err(Error::Internal("table metadata missing".into()));
879            }
880        };
881
882        let table_id = table_meta.table_id;
883
884        validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
885
886        match &plan.operation {
887            llkv_plan::AlterTableOperation::RenameColumn {
888                old_column_name,
889                new_column_name,
890            } => {
891                self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
892            }
893            llkv_plan::AlterTableOperation::SetColumnDataType {
894                column_name,
895                new_data_type,
896            } => {
897                self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
898            }
899            llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
900                self.drop_column(&plan.table_name, column_name)?;
901            }
902        }
903
904        Ok(RuntimeStatementResult::NoOp)
905    }
906
907    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
908        if plan.columns.is_empty() {
909            return Err(Error::InvalidArgumentError(
910                "CREATE INDEX requires at least one column".into(),
911            ));
912        }
913
914        let mut index_name = plan.name.clone();
915        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
916        let table = self.lookup_table(&canonical_name)?;
917
918        let mut column_indices = Vec::with_capacity(plan.columns.len());
919        let mut field_ids = Vec::with_capacity(plan.columns.len());
920        let mut column_names = Vec::with_capacity(plan.columns.len());
921        let mut seen_column_indices = FxHashSet::default();
922
923        for column_plan in &plan.columns {
924            let normalized = column_plan.name.to_ascii_lowercase();
925            let col_idx = table
926                .schema
927                .lookup
928                .get(&normalized)
929                .copied()
930                .ok_or_else(|| {
931                    Error::InvalidArgumentError(format!(
932                        "column '{}' does not exist in table '{}'",
933                        column_plan.name, display_name
934                    ))
935                })?;
936            if !seen_column_indices.insert(col_idx) {
937                return Err(Error::InvalidArgumentError(format!(
938                    "duplicate column '{}' in CREATE INDEX",
939                    column_plan.name
940                )));
941            }
942
943            let column = &table.schema.columns[col_idx];
944            column_indices.push(col_idx);
945            field_ids.push(column.field_id);
946            column_names.push(column.name.clone());
947        }
948
949        if plan.columns.len() == 1 {
950            let field_id = field_ids[0];
951            let column_name = column_names[0].clone();
952            let column_plan = &plan.columns[0];
953
954            if plan.unique {
955                let snapshot = self.default_snapshot();
956                let existing_values =
957                    self.scan_column_values(table.as_ref(), field_id, snapshot)?;
958                ensure_single_column_unique(&existing_values, &[], &column_name)?;
959            }
960
961            let registration = self.catalog_service.register_single_column_index(
962                &display_name,
963                &canonical_name,
964                &table.table,
965                field_id,
966                &column_name,
967                plan.name.clone(),
968                plan.unique,
969                column_plan.ascending,
970                column_plan.nulls_first,
971                plan.if_not_exists,
972            )?;
973
974            let created_name = match registration {
975                SingleColumnIndexRegistration::Created { index_name } => index_name,
976                SingleColumnIndexRegistration::AlreadyExists { index_name } => {
977                    drop(table);
978                    return Ok(RuntimeStatementResult::CreateIndex {
979                        table_name: display_name,
980                        index_name: Some(index_name),
981                    });
982                }
983            };
984
985            index_name = Some(created_name.clone());
986
987            if plan.unique {
988                if let Some(updated_table) =
989                    Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
990                {
991                    self.tables
992                        .write()
993                        .unwrap()
994                        .insert(canonical_name.clone(), Arc::clone(&updated_table));
995                } else {
996                    self.remove_table_entry(&canonical_name);
997                }
998            }
999
1000            drop(table);
1001
1002            return Ok(RuntimeStatementResult::CreateIndex {
1003                table_name: display_name,
1004                index_name,
1005            });
1006        }
1007
1008        let table_id = table.table.table_id();
1009
1010        if plan.unique {
1011            // For unique multi-column indexes, validate uniqueness and register
1012            let snapshot = self.default_snapshot();
1013            let existing_rows =
1014                self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1015            ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1016
1017            let executor_entry = ExecutorMultiColumnUnique {
1018                index_name: index_name.clone(),
1019                column_indices: column_indices.clone(),
1020            };
1021
1022            let registration = self.catalog_service.register_multi_column_unique_index(
1023                table_id,
1024                &field_ids,
1025                index_name.clone(),
1026            )?;
1027
1028            match registration {
1029                MultiColumnUniqueRegistration::Created => {
1030                    table.add_multi_column_unique(executor_entry);
1031                }
1032                MultiColumnUniqueRegistration::AlreadyExists {
1033                    index_name: existing,
1034                } => {
1035                    if plan.if_not_exists {
1036                        drop(table);
1037                        return Ok(RuntimeStatementResult::CreateIndex {
1038                            table_name: display_name,
1039                            index_name: existing,
1040                        });
1041                    }
1042                    return Err(Error::CatalogError(format!(
1043                        "Index already exists on columns '{}'",
1044                        column_names.join(", ")
1045                    )));
1046                }
1047            }
1048        } else {
1049            // For non-unique multi-column indexes, register in catalog but no runtime enforcement yet
1050            let name = index_name.clone().ok_or_else(|| {
1051                Error::InvalidArgumentError(
1052                    "Multi-column CREATE INDEX requires an explicit index name".into(),
1053                )
1054            })?;
1055            let created = self.catalog_service.register_multi_column_index(
1056                table_id, &field_ids, name, false, // unique = false
1057            )?;
1058
1059            if !created && !plan.if_not_exists {
1060                return Err(Error::CatalogError(format!(
1061                    "Index already exists on columns '{}'",
1062                    column_names.join(", ")
1063                )));
1064            }
1065        }
1066
1067        Ok(RuntimeStatementResult::CreateIndex {
1068            table_name: display_name,
1069            index_name,
1070        })
1071    }
1072
1073    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1074        let descriptor = self.catalog_service.drop_single_column_index(plan)?;
1075
1076        if let Some(descriptor) = &descriptor {
1077            self.remove_table_entry(&descriptor.canonical_table_name);
1078        }
1079
1080        Ok(descriptor)
1081    }
1082
1083    fn create_view(&self, _plan: CreateViewPlan) -> Result<()> {
1084        // This trait method should not be called directly on RuntimeContext.
1085        // Views should be created through RuntimeSession which has the Arc<RuntimeContext>.
1086        Err(Error::Internal(
1087            "create_view on RuntimeContext should be called through RuntimeSession".into(),
1088        ))
1089    }
1090
1091    fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1092        RuntimeContext::drop_view(self, &plan.name, plan.if_exists)
1093    }
1094}
1095
1096impl<P> RuntimeContext<P>
1097where
1098    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1099{
1100    /// Rebuild an index by dropping and recreating it.
1101    pub(crate) fn reindex_index(
1102        &self,
1103        plan: llkv_plan::ReindexPlan,
1104    ) -> Result<RuntimeStatementResult<P>> {
1105        let canonical_index = plan.canonical_name.to_ascii_lowercase();
1106        let snapshot = self.catalog.snapshot();
1107
1108        // Search for the index across all tables
1109        for canonical_table_name in snapshot.table_names() {
1110            let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
1111                continue;
1112            };
1113
1114            if let Some(entry) = self
1115                .metadata
1116                .single_column_index(table_id, &canonical_index)?
1117            {
1118                // Found the index - rebuild it by unregistering and re-registering
1119                let table = self.lookup_table(&canonical_table_name)?;
1120
1121                // Unregister the physical index
1122                table.table.unregister_sort_index(entry.column_id)?;
1123
1124                // Re-register the physical index (this rebuilds it)
1125                table.table.register_sort_index(entry.column_id)?;
1126
1127                drop(table);
1128
1129                return Ok(RuntimeStatementResult::NoOp);
1130            }
1131        }
1132
1133        // Index not found
1134        Err(Error::CatalogError(format!(
1135            "Index '{}' does not exist",
1136            plan.name
1137        )))
1138    }
1139}