llkv_runtime/runtime_context/
mod.rs

1//! Runtime context submodules
2//!
3//! This module contains the RuntimeContext implementation split into logical submodules:
4//! - `query_translation`: String-based expression to field-ID-based expression translation
5//! - `types`: Helper types (PreparedAssignmentValue, TableConstraintContext)
6//! - `provider`: ContextProvider for TableProvider trait
7
8use crate::{
9    RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
10    TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
11};
12use llkv_column_map::store::{ColumnStore, ColumnStoreWriteHints};
13use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
14use llkv_plan::{
15    AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, CreateViewPlan,
16    DropIndexPlan, DropTablePlan, DropViewPlan, PlanColumnSpec, RenameTablePlan, SelectPlan,
17};
18use llkv_result::{Error, Result};
19use llkv_storage::pager::{BoxedPager, MemPager, Pager};
20use llkv_table::catalog::TableCatalog;
21use llkv_table::{
22    CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
23    SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
24    TriggerEventMeta, TriggerTimingMeta, UniqueKey, build_composite_unique_key,
25    ensure_multi_column_unique, ensure_single_column_unique, validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod truncate;
41mod types;
42mod update;
43mod utils;
44
45pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
46
47/// In-memory execution context shared by plan-based queries.
48///
49/// Important: "lazy loading" here refers to *table metadata only* (schema,
50/// executor-side column descriptors, and a small next-row-id counter). We do
51/// NOT eagerly load or materialize the table's row data into memory. All
52/// row/column data remains on the ColumnStore and is streamed in chunks during
53/// query execution. This keeps the memory footprint low even for very large
54/// tables.
55///
56/// Typical resource usage:
57/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
58/// - ExecutorTable struct: small (handles + counters)
59/// - Actual table rows: streamed from disk in chunks (never fully resident)
60pub struct RuntimeContext<P>
61where
62    P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64    pub(crate) pager: Arc<P>,
65    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
66    pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
67    metadata: Arc<MetadataManager<P>>,
68    constraint_service: ConstraintService<P>,
69    pub(crate) catalog_service: CatalogManager<P>,
70    // Centralized catalog for table/field name resolution
71    pub(crate) catalog: Arc<TableCatalog>,
72    // Shared column store for all tables in this context
73    // This ensures catalog state is synchronized across all tables
74    store: Arc<ColumnStore<P>>,
75    // Transaction manager for session-based transactions
76    transaction_manager:
77        TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
78    txn_manager: Arc<TxnIdManager>,
79    txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
80    // Optional fallback context for cross-namespace table lookups. Temporary namespaces use this
81    // to access persistent tables while maintaining separate storage. The fallback shares the
82    // same pager type as the primary context so executor tables can be reused without conversion.
83    // Uses RwLock for interior mutability to allow setting fallback after Arc wrapping.
84    fallback_lookup: RwLock<Option<Arc<RuntimeContext<P>>>>,
85}
86
87impl<P> RuntimeContext<P>
88where
89    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
90{
91    pub fn new(pager: Arc<P>) -> Self {
92        Self::new_with_catalog_inner(pager, None)
93    }
94
95    pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
96        Self::new_with_catalog_inner(pager, Some(catalog))
97    }
98
99    fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
100        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
101
102        let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
103        let catalog = SysCatalog::new(&store);
104
105        let next_txn_id = match catalog.get_next_txn_id() {
106            Ok(Some(id)) => {
107                tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
108                id
109            }
110            Ok(None) => {
111                tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
112                TXN_ID_AUTO_COMMIT + 1
113            }
114            Err(e) => {
115                tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
116                TXN_ID_AUTO_COMMIT + 1
117            }
118        };
119
120        let last_committed = match catalog.get_last_committed_txn_id() {
121            Ok(Some(id)) => {
122                tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
123                id
124            }
125            Ok(None) => {
126                tracing::debug!(
127                    "[CONTEXT] No persisted last_committed found, starting from default"
128                );
129                TXN_ID_AUTO_COMMIT
130            }
131            Err(e) => {
132                tracing::warn!(
133                    "[CONTEXT] Failed to load last_committed: {}, using default",
134                    e
135                );
136                TXN_ID_AUTO_COMMIT
137            }
138        };
139
140        let store_arc = Arc::new(store);
141        let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
142
143        let loaded_tables = match metadata.all_table_metas() {
144            Ok(metas) => {
145                tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
146                metas
147            }
148            Err(e) => {
149                tracing::warn!(
150                    "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
151                    e
152                );
153                Vec::new()
154            }
155        };
156
157        let transaction_manager =
158            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
159        let txn_manager = transaction_manager.txn_manager();
160
161        // LAZY LOADING: Only load table metadata at first access. We intentionally
162        // avoid loading any row/column data into memory here. The executor
163        // performs streaming reads from the ColumnStore when a query runs, so
164        // large tables are never fully materialized.
165        //
166        // Benefits of this approach:
167        // - Instant database open (no upfront I/O for table data)
168        // - Lower memory footprint (only metadata cached)
169        // - Natural parallelism: if multiple threads request different tables
170        //   concurrently, those tables will be loaded concurrently by the
171        //   caller threads (no global preload required).
172        //
173        // Future Optimizations (if profiling shows need):
174        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
175        // 2. Background preload of catalog entries after startup
176        // 3. LRU-based eviction for extremely large deployments
177        // 4. Cache compact representations of schemas to reduce per-table RAM
178        //
179        // Note: `loaded_tables` holds catalog metadata that helped us discover
180        // which tables exist; we discard it here because metadata will be
181        // fetched on-demand during lazy loads.
182        tracing::debug!(
183            "[CONTEXT] Initialized with lazy loading for {} table(s)",
184            loaded_tables.len()
185        );
186
187        // Initialize catalog and populate with existing tables
188        let (catalog, is_shared_catalog) = match shared_catalog {
189            Some(existing) => (existing, true),
190            None => (Arc::new(TableCatalog::new()), false),
191        };
192        for (table_id, table_meta) in &loaded_tables {
193            if let Some(ref table_name) = table_meta.name
194                && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
195            {
196                match e {
197                    Error::CatalogError(ref msg)
198                        if is_shared_catalog && msg.contains("already exists") =>
199                    {
200                        tracing::debug!(
201                            "[CONTEXT] Shared catalog already contains table '{}' with id={}",
202                            table_name,
203                            table_id
204                        );
205                    }
206                    other => {
207                        tracing::warn!(
208                            "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
209                            table_name,
210                            table_id,
211                            other
212                        );
213                    }
214                }
215            }
216        }
217        tracing::debug!(
218            "[CONTEXT] Catalog initialized with {} table(s)",
219            catalog.table_count()
220        );
221
222        let constraint_service =
223            ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
224        let catalog_service = CatalogManager::new(
225            Arc::clone(&metadata),
226            Arc::clone(&catalog),
227            Arc::clone(&store_arc),
228        );
229
230        // Load custom types from SysCatalog into catalog_service
231        if let Err(e) = catalog_service.load_types_from_catalog() {
232            tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
233        }
234
235        Self {
236            pager,
237            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
238            dropped_tables: RwLock::new(FxHashSet::default()),
239            metadata,
240            constraint_service,
241            catalog_service,
242            catalog,
243            store: store_arc,
244            transaction_manager,
245            txn_manager,
246            txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
247            fallback_lookup: RwLock::new(None),
248        }
249    }
250
251    /// Return the transaction ID manager shared with sessions.
252    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
253        Arc::clone(&self.txn_manager)
254    }
255
256    /// Return the column store for catalog operations.
257    pub fn store(&self) -> &Arc<ColumnStore<P>> {
258        &self.store
259    }
260
261    /// Expose storage-level write sizing hints for bulk ingest callers.
262    pub fn column_store_write_hints(&self) -> ColumnStoreWriteHints {
263        self.store.write_hints()
264    }
265
266    /// Set a fallback context for cross-pager table lookups. The fallback uses BoxedPager
267    /// to enable access across different underlying pager types (e.g., temporary MemPager
268    /// can fall back to persistent disk pager).
269    pub fn with_fallback_lookup(self, fallback: Arc<RuntimeContext<P>>) -> Self {
270        *self.fallback_lookup.write().unwrap() = Some(fallback);
271        self
272    }
273
274    /// Set a fallback context after construction for contexts already wrapped in Arc.
275    pub fn set_fallback_lookup(&self, fallback: Arc<RuntimeContext<P>>) {
276        *self.fallback_lookup.write().unwrap() = Some(fallback);
277    }
278
279    /// Register a custom type alias (CREATE TYPE/DOMAIN).
280    pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
281        self.catalog_service.register_type(name, data_type);
282    }
283
284    /// Drop a custom type alias (DROP TYPE/DOMAIN).
285    pub fn drop_type(&self, name: &str) -> Result<()> {
286        self.catalog_service.drop_type(name)?;
287        Ok(())
288    }
289
290    /// Ensure the catalog's next_table_id counter is at least `minimum`.
291    pub fn ensure_next_table_id_at_least(&self, minimum: TableId) -> Result<()> {
292        self.metadata.ensure_next_table_id_at_least(minimum)?;
293        Ok(())
294    }
295
296    /// Internal helper for creating a view that can be called from CatalogDdl trait implementation.
297    fn create_view_internal(
298        self: &Arc<Self>,
299        display_name: &str,
300        view_definition: String,
301        select_plan: SelectPlan,
302        if_not_exists: bool,
303        snapshot: TransactionSnapshot,
304    ) -> Result<()> {
305        let (normalized_display, canonical_name) = canonical_table_name(display_name)?;
306
307        if let Some(existing_id) = self.catalog.table_id(&canonical_name) {
308            let is_view = self.catalog_service.is_view(existing_id)?;
309            if is_view && if_not_exists {
310                return Ok(());
311            }
312
313            let entity = if is_view { "View" } else { "Table" };
314            return Err(Error::CatalogError(format!(
315                "{} '{}' already exists",
316                entity, normalized_display
317            )));
318        }
319
320        let execution = self.execute_select(select_plan, snapshot)?;
321        let column_specs = {
322            let schema = execution.schema();
323            if schema.fields().is_empty() {
324                return Err(Error::InvalidArgumentError(
325                    "CREATE VIEW requires SELECT to project at least one column".into(),
326                ));
327            }
328
329            schema
330                .fields()
331                .iter()
332                .map(|field| {
333                    PlanColumnSpec::new(
334                        field.name(),
335                        field.data_type().clone(),
336                        field.is_nullable(),
337                    )
338                })
339                .collect::<Vec<_>>()
340        };
341        drop(execution);
342
343        self.catalog_service
344            .create_view(&normalized_display, view_definition, column_specs)?;
345
346        self.dropped_tables.write().unwrap().remove(&canonical_name);
347
348        Ok(())
349    }
350
351    /// Create a view by executing its SELECT definition to derive projected columns
352    /// and persisting the metadata into the catalog. The view is registered as a
353    /// catalog entry with column names so subsequent binding can succeed without
354    /// reparsing the stored SQL in higher layers.
355    pub fn create_view(
356        self: &Arc<Self>,
357        display_name: &str,
358        view_definition: String,
359        select_plan: SelectPlan,
360        if_not_exists: bool,
361    ) -> Result<()> {
362        let snapshot = self.default_snapshot();
363        self.create_view_internal(
364            display_name,
365            view_definition,
366            select_plan,
367            if_not_exists,
368            snapshot,
369        )
370    }
371
372    #[allow(clippy::too_many_arguments)]
373    pub fn create_trigger(
374        self: &Arc<Self>,
375        trigger_display_name: &str,
376        canonical_trigger_name: &str,
377        table_display_name: &str,
378        canonical_table_name: &str,
379        timing: TriggerTimingMeta,
380        event: TriggerEventMeta,
381        for_each_row: bool,
382        condition: Option<String>,
383        body_sql: String,
384        if_not_exists: bool,
385    ) -> Result<bool> {
386        self.catalog_service.create_trigger(
387            trigger_display_name,
388            canonical_trigger_name,
389            table_display_name,
390            canonical_table_name,
391            timing,
392            event,
393            for_each_row,
394            condition,
395            body_sql,
396            if_not_exists,
397        )
398    }
399
400    pub fn drop_trigger(
401        self: &Arc<Self>,
402        trigger_display_name: &str,
403        canonical_trigger_name: &str,
404        table_hint_display: Option<&str>,
405        table_hint_canonical: Option<&str>,
406        if_exists: bool,
407    ) -> Result<bool> {
408        self.catalog_service.drop_trigger(
409            trigger_display_name,
410            canonical_trigger_name,
411            table_hint_display,
412            table_hint_canonical,
413            if_exists,
414        )
415    }
416
417    /// Return the stored SQL definition for a view, if it exists.
418    pub fn view_definition(&self, canonical_name: &str) -> Result<Option<String>> {
419        let Some(table_id) = self.catalog.table_id(canonical_name) else {
420            return Ok(None);
421        };
422
423        match self.metadata.table_meta(table_id)? {
424            Some(meta) => Ok(meta.view_definition),
425            None => Ok(None),
426        }
427    }
428
429    /// Check if a table is actually a view by looking at its metadata.
430    /// Returns true if the table exists and has a view_definition.
431    pub fn is_view(&self, table_id: TableId) -> Result<bool> {
432        self.catalog_service.is_view(table_id)
433    }
434
435    /// Drop a view, ignoring missing views when `if_exists` is true.
436    pub fn drop_view(&self, name: &str, if_exists: bool) -> Result<()> {
437        let (display_name, canonical_name) = canonical_table_name(name)?;
438
439        let table_id = match self.catalog.table_id(&canonical_name) {
440            Some(id) => id,
441            None => {
442                if if_exists {
443                    return Ok(());
444                }
445                return Err(Error::CatalogError(format!(
446                    "View '{}' does not exist",
447                    display_name
448                )));
449            }
450        };
451
452        if !self.catalog_service.is_view(table_id)? {
453            return Err(Error::CatalogError(format!(
454                "use DROP TABLE to delete table '{}'",
455                display_name
456            )));
457        }
458
459        self.catalog_service.drop_view(&canonical_name, table_id)?;
460
461        {
462            let mut tables = self.tables.write().unwrap();
463            tables.remove(&canonical_name);
464        }
465
466        self.dropped_tables.write().unwrap().insert(canonical_name);
467
468        Ok(())
469    }
470
471    /// Resolve a type name to its base DataType, recursively following aliases.
472    pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
473        self.catalog_service.resolve_type(data_type)
474    }
475
476    /// Persist the next_txn_id to the catalog.
477    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
478        let catalog = SysCatalog::new(&self.store);
479        catalog.put_next_txn_id(next_txn_id)?;
480        let last_committed = self.txn_manager.last_committed();
481        catalog.put_last_committed_txn_id(last_committed)?;
482        tracing::debug!(
483            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
484            next_txn_id,
485            last_committed
486        );
487        Ok(())
488    }
489
490    /// Construct the default snapshot for auto-commit operations.
491    pub fn default_snapshot(&self) -> TransactionSnapshot {
492        TransactionSnapshot {
493            txn_id: TXN_ID_AUTO_COMMIT,
494            snapshot_id: self.txn_manager.last_committed(),
495        }
496    }
497
498    /// Get the table catalog for schema and table name management.
499    pub fn table_catalog(&self) -> Arc<TableCatalog> {
500        Arc::clone(&self.catalog)
501    }
502
503    /// Enable caching of parent key sets for the specified referencing table.
504    pub fn enable_foreign_key_cache(&self, table_id: TableId) {
505        self.constraint_service.enable_foreign_key_cache(table_id);
506    }
507
508    /// Clear cached foreign key parent sets for the specified referencing table.
509    pub fn clear_foreign_key_cache(&self, table_id: TableId) {
510        self.constraint_service.clear_foreign_key_cache(table_id);
511    }
512
513    /// Access the catalog manager for type registry, view management, and metadata operations.
514    pub fn catalog(&self) -> &CatalogManager<P> {
515        &self.catalog_service
516    }
517
518    /// Get a handle to an existing table by name.
519    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
520        RuntimeTableHandle::new(Arc::clone(self), name)
521    }
522
523    /// Create a table with explicit column definitions - Programmatic API.
524    ///
525    /// This is a **convenience method** for programmatically creating tables with explicit
526    /// column definitions. Use this when you're writing Rust code that needs to create tables
527    /// directly, rather than executing SQL.
528    ///
529    /// # When to use `create_table` vs [`CatalogDdl::create_table`]
530    ///
531    /// **Use `create_table`:**
532    /// - You're writing Rust code (not parsing SQL)
533    /// - You have explicit column definitions
534    /// - You want a simple, ergonomic API: `ctx.create_table("users", vec![...])`
535    /// - You want a `RuntimeTableHandle` to work with immediately
536    /// - **Does NOT support**: CREATE TABLE AS SELECT, foreign keys, namespaces
537    ///
538    /// **Use [`CatalogDdl::create_table`]:**
539    /// - You're implementing SQL execution (already have a parsed `CreateTablePlan`)
540    /// - You need CREATE TABLE AS SELECT support
541    /// - You need foreign key constraints
542    /// - You need namespace support (temporary tables)
543    /// - You need IF NOT EXISTS / OR REPLACE semantics
544    /// - You're working within the transaction system
545    ///
546    /// # Usage Comparison
547    ///
548    /// **Programmatic API** (this method):
549    /// - `ctx.create_table("users", vec![("id", DataType::Int64, false), ...])?`
550    /// - Returns `RuntimeTableHandle` for immediate use
551    /// - Simple, ergonomic, no plan construction needed
552    ///
553    /// **SQL execution API** ([`CatalogDdl::create_table`]):
554    /// - Construct a `CreateTablePlan` with all SQL features
555    /// - Delegates to the [`CatalogDdl`] trait for catalog-aware creation
556    /// - Support for CTAS, foreign keys, namespaces, transactions
557    /// - Returns `RuntimeStatementResult` for consistency with other SQL operations
558    ///
559    /// # Returns
560    /// Returns a [`RuntimeTableHandle`] that provides immediate access to the table.
561    /// Use this for further programmatic operations on the table.
562    /// Returns all table names currently registered in the catalog.
563    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
564        // Use catalog for table names (single source of truth)
565        self.catalog.table_names()
566    }
567}
568
569impl RuntimeContext<BoxedPager> {
570    /// Create a new session for transaction management.
571    /// Each session can have its own independent transaction.
572    pub fn create_session(self: &Arc<Self>) -> RuntimeSession {
573        tracing::debug!("[SESSION] RuntimeContext::create_session called");
574        let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
575            self,
576        )));
577        let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
578        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
579        tracing::debug!(
580            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
581        );
582        RuntimeSession::from_parts(inner, namespaces)
583    }
584}
585
586impl<P> CatalogDdl for RuntimeContext<P>
587where
588    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
589{
590    type CreateTableOutput = RuntimeStatementResult<P>;
591    type DropTableOutput = ();
592    type RenameTableOutput = ();
593    type AlterTableOutput = RuntimeStatementResult<P>;
594    type CreateIndexOutput = RuntimeStatementResult<P>;
595    type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
596
597    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
598        if plan.columns.is_empty() && plan.source.is_none() {
599            return Err(Error::InvalidArgumentError(
600                "CREATE TABLE requires explicit columns or a source".into(),
601            ));
602        }
603
604        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
605        let CreateTablePlan {
606            name: _,
607            if_not_exists,
608            or_replace,
609            columns,
610            source,
611            namespace: _,
612            foreign_keys,
613            multi_column_uniques,
614        } = plan;
615
616        tracing::trace!(
617            "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
618            display_name,
619            if_not_exists,
620            columns.len()
621        );
622        for (idx, col) in columns.iter().enumerate() {
623            tracing::trace!(
624                "  plan column[{}]: name='{}' primary_key={}",
625                idx,
626                col.name,
627                col.primary_key
628            );
629        }
630        let (exists, is_dropped) = {
631            let tables = self.tables.read().unwrap();
632            let in_cache = tables.contains_key(&canonical_name);
633            let is_dropped = self
634                .dropped_tables
635                .read()
636                .unwrap()
637                .contains(&canonical_name);
638            // Table exists if it's in cache and NOT marked as dropped
639            (in_cache && !is_dropped, is_dropped)
640        };
641        tracing::trace!(
642            "DEBUG create_table (plan): exists={}, is_dropped={}",
643            exists,
644            is_dropped
645        );
646
647        // If table was dropped, remove it from cache before creating new one
648        if is_dropped {
649            self.remove_table_entry(&canonical_name);
650            self.dropped_tables.write().unwrap().remove(&canonical_name);
651        }
652
653        if exists {
654            if or_replace {
655                tracing::trace!(
656                    "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
657                    display_name
658                );
659                self.remove_table_entry(&canonical_name);
660            } else if if_not_exists {
661                tracing::trace!(
662                    "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
663                    display_name
664                );
665                return Ok(RuntimeStatementResult::CreateTable {
666                    table_name: display_name,
667                });
668            } else {
669                return Err(Error::CatalogError(format!(
670                    "Catalog Error: Table '{}' already exists",
671                    display_name
672                )));
673            }
674        }
675
676        match source {
677            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
678                display_name,
679                canonical_name,
680                schema,
681                batches,
682                if_not_exists,
683            ),
684            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
685                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
686                    .into(),
687            )),
688            None => self.create_table_from_columns(
689                display_name,
690                canonical_name,
691                columns,
692                foreign_keys,
693                multi_column_uniques,
694                if_not_exists,
695            ),
696        }
697    }
698
699    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
700        let DropTablePlan { name, if_exists } = plan;
701        let (display_name, canonical_name) = canonical_table_name(&name)?;
702
703        tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
704
705        if self.is_table_marked_dropped(&canonical_name) {
706            tracing::debug!(
707                "drop_table: table '{}' already marked dropped; if_exists={}",
708                canonical_name,
709                if_exists
710            );
711            return if if_exists {
712                Ok(())
713            } else {
714                Err(Error::CatalogError(format!(
715                    "Catalog Error: Table '{}' does not exist",
716                    display_name
717                )))
718            };
719        }
720
721        let cached_entry = {
722            let tables = self.tables.read().unwrap();
723            tracing::debug!("drop_table: cache contains {} tables", tables.len());
724            tables.get(&canonical_name).cloned()
725        };
726
727        let table_entry = match cached_entry {
728            Some(entry) => entry,
729            None => {
730                tracing::debug!(
731                    "drop_table: table '{}' not cached; attempting reload",
732                    canonical_name
733                );
734
735                if self.catalog.table_id(&canonical_name).is_none() {
736                    tracing::debug!(
737                        "drop_table: no catalog entry for '{}'; if_exists={}",
738                        canonical_name,
739                        if_exists
740                    );
741                    if if_exists {
742                        return Ok(());
743                    }
744                    return Err(Error::CatalogError(format!(
745                        "Catalog Error: Table '{}' does not exist",
746                        display_name
747                    )));
748                }
749
750                match self.lookup_table(&canonical_name) {
751                    Ok(entry) => entry,
752                    Err(err) => {
753                        tracing::warn!(
754                            "drop_table: failed to reload table '{}': {:?}",
755                            canonical_name,
756                            err
757                        );
758                        if if_exists {
759                            return Ok(());
760                        }
761                        return Err(err);
762                    }
763                }
764            }
765        };
766
767        let column_field_ids = table_entry
768            .schema
769            .columns
770            .iter()
771            .map(|col| col.field_id)
772            .collect::<Vec<_>>();
773        let table_id = table_entry.table.table_id();
774
775        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
776
777        for detail in referencing {
778            if detail.referencing_table_canonical == canonical_name {
779                continue;
780            }
781
782            if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
783                continue;
784            }
785
786            return Err(Error::CatalogError(format!(
787                "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
788                detail.referencing_table_display
789            )));
790        }
791
792        self.catalog_service
793            .drop_table(&canonical_name, table_id, &column_field_ids)?;
794        tracing::debug!(
795            "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
796            canonical_name,
797            table_id
798        );
799
800        self.remove_table_entry(&canonical_name);
801        self.dropped_tables
802            .write()
803            .unwrap()
804            .insert(canonical_name.clone());
805        Ok(())
806    }
807
808    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
809        let RenameTablePlan {
810            current_name,
811            new_name,
812            if_exists,
813        } = plan;
814
815        let (current_display, current_canonical) = canonical_table_name(&current_name)?;
816        let (new_display, new_canonical) = canonical_table_name(&new_name)?;
817
818        if current_canonical == new_canonical && current_display == new_display {
819            return Ok(());
820        }
821
822        if self.is_table_marked_dropped(&current_canonical) {
823            if if_exists {
824                return Ok(());
825            }
826            return Err(Error::CatalogError(format!(
827                "Catalog Error: Table '{}' does not exist",
828                current_display
829            )));
830        }
831
832        let table_id = match self
833            .catalog
834            .table_id(&current_canonical)
835            .or_else(|| self.catalog.table_id(&current_display))
836        {
837            Some(id) => id,
838            None => {
839                if if_exists {
840                    return Ok(());
841                }
842                return Err(Error::CatalogError(format!(
843                    "Catalog Error: Table '{}' does not exist",
844                    current_display
845                )));
846            }
847        };
848
849        if !current_display.eq_ignore_ascii_case(&new_display)
850            && (self.catalog.table_id(&new_canonical).is_some()
851                || self.catalog.table_id(&new_display).is_some())
852        {
853            return Err(Error::CatalogError(format!(
854                "Catalog Error: Table '{}' already exists",
855                new_display
856            )));
857        }
858
859        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
860        if !referencing.is_empty() {
861            return Err(Error::CatalogError(format!(
862                "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
863                current_display
864            )));
865        }
866
867        self.catalog_service
868            .rename_table(table_id, &current_display, &new_display)?;
869
870        let mut tables = self.tables.write().unwrap();
871        if let Some(table) = tables.remove(&current_canonical) {
872            tables.insert(new_canonical.clone(), table);
873        }
874
875        let mut dropped = self.dropped_tables.write().unwrap();
876        dropped.remove(&current_canonical);
877        dropped.remove(&new_canonical);
878
879        Ok(())
880    }
881
882    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
883        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
884
885        let view = match self.catalog_service.table_view(&canonical_table) {
886            Ok(view) => view,
887            Err(err) if plan.if_exists && is_table_missing_error(&err) => {
888                return Ok(RuntimeStatementResult::NoOp);
889            }
890            Err(err) => return Err(err),
891        };
892
893        let table_meta = match view.table_meta.as_ref() {
894            Some(meta) => meta,
895            None => {
896                if plan.if_exists {
897                    return Ok(RuntimeStatementResult::NoOp);
898                }
899                return Err(Error::Internal("table metadata missing".into()));
900            }
901        };
902
903        let table_id = table_meta.table_id;
904
905        validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
906
907        match &plan.operation {
908            llkv_plan::AlterTableOperation::RenameColumn {
909                old_column_name,
910                new_column_name,
911            } => {
912                self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
913            }
914            llkv_plan::AlterTableOperation::SetColumnDataType {
915                column_name,
916                new_data_type,
917            } => {
918                self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
919            }
920            llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
921                self.drop_column(&plan.table_name, column_name)?;
922            }
923        }
924
925        Ok(RuntimeStatementResult::NoOp)
926    }
927
928    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
929        if plan.columns.is_empty() {
930            return Err(Error::InvalidArgumentError(
931                "CREATE INDEX requires at least one column".into(),
932            ));
933        }
934
935        let mut index_name = plan.name.clone();
936        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
937        let table = self.lookup_table(&canonical_name)?;
938
939        let mut column_indices = Vec::with_capacity(plan.columns.len());
940        let mut field_ids = Vec::with_capacity(plan.columns.len());
941        let mut column_names = Vec::with_capacity(plan.columns.len());
942        let mut seen_column_indices = FxHashSet::default();
943
944        for column_plan in &plan.columns {
945            let normalized = column_plan.name.to_ascii_lowercase();
946            let col_idx = table
947                .schema
948                .lookup
949                .get(&normalized)
950                .copied()
951                .ok_or_else(|| {
952                    Error::InvalidArgumentError(format!(
953                        "column '{}' does not exist in table '{}'",
954                        column_plan.name, display_name
955                    ))
956                })?;
957            if !seen_column_indices.insert(col_idx) {
958                return Err(Error::InvalidArgumentError(format!(
959                    "duplicate column '{}' in CREATE INDEX",
960                    column_plan.name
961                )));
962            }
963
964            let column = &table.schema.columns[col_idx];
965            column_indices.push(col_idx);
966            field_ids.push(column.field_id);
967            column_names.push(column.name.clone());
968        }
969
970        if plan.columns.len() == 1 {
971            let field_id = field_ids[0];
972            let column_name = column_names[0].clone();
973            let column_plan = &plan.columns[0];
974
975            if plan.unique {
976                let snapshot = self.default_snapshot();
977                let existing_values =
978                    self.scan_column_values(table.as_ref(), field_id, snapshot)?;
979                ensure_single_column_unique(&existing_values, &[], &column_name)?;
980            }
981
982            let registration = self.catalog_service.register_single_column_index(
983                &display_name,
984                &canonical_name,
985                &table.table,
986                field_id,
987                &column_name,
988                plan.name.clone(),
989                plan.unique,
990                column_plan.ascending,
991                column_plan.nulls_first,
992                plan.if_not_exists,
993            )?;
994
995            let created_name = match registration {
996                SingleColumnIndexRegistration::Created { index_name } => index_name,
997                SingleColumnIndexRegistration::AlreadyExists { index_name } => {
998                    drop(table);
999                    return Ok(RuntimeStatementResult::CreateIndex {
1000                        table_name: display_name,
1001                        index_name: Some(index_name),
1002                    });
1003                }
1004            };
1005
1006            index_name = Some(created_name.clone());
1007
1008            if plan.unique {
1009                if let Some(updated_table) =
1010                    Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1011                {
1012                    self.tables
1013                        .write()
1014                        .unwrap()
1015                        .insert(canonical_name.clone(), Arc::clone(&updated_table));
1016                } else {
1017                    self.remove_table_entry(&canonical_name);
1018                }
1019            }
1020
1021            drop(table);
1022
1023            return Ok(RuntimeStatementResult::CreateIndex {
1024                table_name: display_name,
1025                index_name,
1026            });
1027        }
1028
1029        let table_id = table.table_id();
1030
1031        if plan.unique {
1032            // For unique multi-column indexes, validate uniqueness and register
1033            let snapshot = self.default_snapshot();
1034            let existing_rows =
1035                self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1036            let mut existing_keys: Vec<UniqueKey> = Vec::with_capacity(existing_rows.len());
1037            for values in existing_rows {
1038                if let Some(key) = build_composite_unique_key(&values, &column_names)? {
1039                    existing_keys.push(key);
1040                }
1041            }
1042            ensure_multi_column_unique(&existing_keys, &[] as &[UniqueKey], &column_names)?;
1043
1044            let executor_entry = ExecutorMultiColumnUnique {
1045                index_name: index_name.clone(),
1046                column_indices: column_indices.clone(),
1047            };
1048
1049            let registration = self.catalog_service.register_multi_column_unique_index(
1050                table_id,
1051                &field_ids,
1052                index_name.clone(),
1053            )?;
1054
1055            match registration {
1056                MultiColumnUniqueRegistration::Created => {
1057                    table.add_multi_column_unique(executor_entry);
1058                }
1059                MultiColumnUniqueRegistration::AlreadyExists {
1060                    index_name: existing,
1061                } => {
1062                    if plan.if_not_exists {
1063                        drop(table);
1064                        return Ok(RuntimeStatementResult::CreateIndex {
1065                            table_name: display_name,
1066                            index_name: existing,
1067                        });
1068                    }
1069                    return Err(Error::CatalogError(format!(
1070                        "Index already exists on columns '{}'",
1071                        column_names.join(", ")
1072                    )));
1073                }
1074            }
1075        } else {
1076            // For non-unique multi-column indexes, register in catalog but no runtime enforcement yet
1077            let name = index_name.clone().ok_or_else(|| {
1078                Error::InvalidArgumentError(
1079                    "Multi-column CREATE INDEX requires an explicit index name".into(),
1080                )
1081            })?;
1082            let created = self.catalog_service.register_multi_column_index(
1083                table_id, &field_ids, name, false, // unique = false
1084            )?;
1085
1086            if !created && !plan.if_not_exists {
1087                return Err(Error::CatalogError(format!(
1088                    "Index already exists on columns '{}'",
1089                    column_names.join(", ")
1090                )));
1091            }
1092        }
1093
1094        Ok(RuntimeStatementResult::CreateIndex {
1095            table_name: display_name,
1096            index_name,
1097        })
1098    }
1099
1100    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1101        let descriptor = self.catalog_service.drop_single_column_index(plan)?;
1102
1103        if let Some(descriptor) = &descriptor {
1104            self.remove_table_entry(&descriptor.canonical_table_name);
1105        }
1106
1107        Ok(descriptor)
1108    }
1109
1110    fn create_view(&self, _plan: CreateViewPlan) -> Result<()> {
1111        // This trait method should not be called directly on RuntimeContext.
1112        // Views should be created through RuntimeSession which has the Arc<RuntimeContext>.
1113        Err(Error::Internal(
1114            "create_view on RuntimeContext should be called through RuntimeSession".into(),
1115        ))
1116    }
1117
1118    fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1119        RuntimeContext::drop_view(self, &plan.name, plan.if_exists)
1120    }
1121}
1122
1123impl<P> RuntimeContext<P>
1124where
1125    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1126{
1127    /// Rebuild an index by dropping and recreating it.
1128    pub(crate) fn reindex_index(
1129        &self,
1130        plan: llkv_plan::ReindexPlan,
1131    ) -> Result<RuntimeStatementResult<P>> {
1132        let canonical_index = plan.canonical_name.to_ascii_lowercase();
1133        let snapshot = self.catalog.snapshot();
1134
1135        // Search for the index across all tables
1136        for canonical_table_name in snapshot.table_names() {
1137            let Some(table_id) = snapshot.table_id(&canonical_table_name) else {
1138                continue;
1139            };
1140
1141            if let Some(entry) = self
1142                .metadata
1143                .single_column_index(table_id, &canonical_index)?
1144            {
1145                // Found the index - rebuild it by unregistering and re-registering
1146                let table = self.lookup_table(&canonical_table_name)?;
1147
1148                // Unregister the physical index
1149                table.table.unregister_sort_index(entry.column_id)?;
1150
1151                // Re-register the physical index (this rebuilds it)
1152                table.table.register_sort_index(entry.column_id)?;
1153
1154                drop(table);
1155
1156                return Ok(RuntimeStatementResult::NoOp);
1157            }
1158        }
1159
1160        // Index not found
1161        Err(Error::CatalogError(format!(
1162            "Index '{}' does not exist",
1163            plan.name
1164        )))
1165    }
1166}