llkv_runtime/runtime_context/
mod.rs

1//! Runtime context submodules
2//!
3//! This module contains the RuntimeContext implementation split into logical submodules:
4//! - `mvcc_helpers`: MVCC transaction visibility filtering
5//! - `query_translation`: String-based expression to field-ID-based expression translation
6//! - `types`: Helper types (PreparedAssignmentValue, TableConstraintContext)
7//! - `provider`: ContextProvider for TableProvider trait
8
9use crate::{
10    RuntimeSession, RuntimeStatementResult, RuntimeTableHandle, RuntimeTransactionContext,
11    TXN_ID_AUTO_COMMIT, canonical_table_name, is_table_missing_error,
12};
13use llkv_column_map::store::ColumnStore;
14use llkv_executor::{ExecutorMultiColumnUnique, ExecutorTable};
15use llkv_plan::{
16    AlterTablePlan, CreateIndexPlan, CreateTablePlan, CreateTableSource, DropIndexPlan,
17    DropTablePlan, RenameTablePlan,
18};
19use llkv_result::{Error, Result};
20use llkv_storage::pager::{MemPager, Pager};
21use llkv_table::catalog::TableCatalog;
22use llkv_table::{
23    CatalogDdl, CatalogManager, ConstraintService, MetadataManager, MultiColumnUniqueRegistration,
24    SingleColumnIndexDescriptor, SingleColumnIndexRegistration, SysCatalog, TableId,
25    ensure_multi_column_unique, ensure_single_column_unique, validate_alter_table_operation,
26};
27use llkv_transaction::{TransactionManager, TransactionSnapshot, TxnId, TxnIdManager};
28use rustc_hash::{FxHashMap, FxHashSet};
29use simd_r_drive_entry_handle::EntryHandle;
30use std::sync::{Arc, RwLock};
31
32mod alter;
33mod constraints;
34mod delete;
35mod insert;
36mod provider;
37mod query;
38mod table_access;
39mod table_creation;
40mod types;
41mod update;
42mod utils;
43
44pub(crate) use types::{PreparedAssignmentValue, TableConstraintContext};
45
46/// In-memory execution context shared by plan-based queries.
47///
48/// Important: "lazy loading" here refers to *table metadata only* (schema,
49/// executor-side column descriptors, and a small next-row-id counter). We do
50/// NOT eagerly load or materialize the table's row data into memory. All
51/// row/column data remains on the ColumnStore and is streamed in chunks during
52/// query execution. This keeps the memory footprint low even for very large
53/// tables.
54///
55/// Typical resource usage:
56/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
57/// - ExecutorTable struct: small (handles + counters)
58/// - Actual table rows: streamed from disk in chunks (never fully resident)
59pub struct RuntimeContext<P>
60where
61    P: Pager<Blob = EntryHandle> + Send + Sync,
62{
63    pub(crate) pager: Arc<P>,
64    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
65    pub(crate) dropped_tables: RwLock<FxHashSet<String>>,
66    metadata: Arc<MetadataManager<P>>,
67    constraint_service: ConstraintService<P>,
68    pub(crate) catalog_service: CatalogManager<P>,
69    // Centralized catalog for table/field name resolution
70    pub(crate) catalog: Arc<TableCatalog>,
71    // Shared column store for all tables in this context
72    // This ensures catalog state is synchronized across all tables
73    store: Arc<ColumnStore<P>>,
74    // Transaction manager for session-based transactions
75    transaction_manager:
76        TransactionManager<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
77    txn_manager: Arc<TxnIdManager>,
78    txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
79}
80
81impl<P> RuntimeContext<P>
82where
83    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
84{
85    pub fn new(pager: Arc<P>) -> Self {
86        Self::new_with_catalog_inner(pager, None)
87    }
88
89    pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
90        Self::new_with_catalog_inner(pager, Some(catalog))
91    }
92
93    fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
94        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
95
96        let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
97        let catalog = SysCatalog::new(&store);
98
99        let next_txn_id = match catalog.get_next_txn_id() {
100            Ok(Some(id)) => {
101                tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
102                id
103            }
104            Ok(None) => {
105                tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
106                TXN_ID_AUTO_COMMIT + 1
107            }
108            Err(e) => {
109                tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
110                TXN_ID_AUTO_COMMIT + 1
111            }
112        };
113
114        let last_committed = match catalog.get_last_committed_txn_id() {
115            Ok(Some(id)) => {
116                tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
117                id
118            }
119            Ok(None) => {
120                tracing::debug!(
121                    "[CONTEXT] No persisted last_committed found, starting from default"
122                );
123                TXN_ID_AUTO_COMMIT
124            }
125            Err(e) => {
126                tracing::warn!(
127                    "[CONTEXT] Failed to load last_committed: {}, using default",
128                    e
129                );
130                TXN_ID_AUTO_COMMIT
131            }
132        };
133
134        let store_arc = Arc::new(store);
135        let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
136
137        let loaded_tables = match metadata.all_table_metas() {
138            Ok(metas) => {
139                tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
140                metas
141            }
142            Err(e) => {
143                tracing::warn!(
144                    "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
145                    e
146                );
147                Vec::new()
148            }
149        };
150
151        let transaction_manager =
152            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
153        let txn_manager = transaction_manager.txn_manager();
154
155        // LAZY LOADING: Only load table metadata at first access. We intentionally
156        // avoid loading any row/column data into memory here. The executor
157        // performs streaming reads from the ColumnStore when a query runs, so
158        // large tables are never fully materialized.
159        //
160        // Benefits of this approach:
161        // - Instant database open (no upfront I/O for table data)
162        // - Lower memory footprint (only metadata cached)
163        // - Natural parallelism: if multiple threads request different tables
164        //   concurrently, those tables will be loaded concurrently by the
165        //   caller threads (no global preload required).
166        //
167        // Future Optimizations (if profiling shows need):
168        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
169        // 2. Background preload of catalog entries after startup
170        // 3. LRU-based eviction for extremely large deployments
171        // 4. Cache compact representations of schemas to reduce per-table RAM
172        //
173        // Note: `loaded_tables` holds catalog metadata that helped us discover
174        // which tables exist; we discard it here because metadata will be
175        // fetched on-demand during lazy loads.
176        tracing::debug!(
177            "[CONTEXT] Initialized with lazy loading for {} table(s)",
178            loaded_tables.len()
179        );
180
181        // Initialize catalog and populate with existing tables
182        let (catalog, is_shared_catalog) = match shared_catalog {
183            Some(existing) => (existing, true),
184            None => (Arc::new(TableCatalog::new()), false),
185        };
186        for (table_id, table_meta) in &loaded_tables {
187            if let Some(ref table_name) = table_meta.name
188                && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
189            {
190                match e {
191                    Error::CatalogError(ref msg)
192                        if is_shared_catalog && msg.contains("already exists") =>
193                    {
194                        tracing::debug!(
195                            "[CONTEXT] Shared catalog already contains table '{}' with id={}",
196                            table_name,
197                            table_id
198                        );
199                    }
200                    other => {
201                        tracing::warn!(
202                            "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
203                            table_name,
204                            table_id,
205                            other
206                        );
207                    }
208                }
209            }
210        }
211        tracing::debug!(
212            "[CONTEXT] Catalog initialized with {} table(s)",
213            catalog.table_count()
214        );
215
216        let constraint_service =
217            ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
218        let catalog_service = CatalogManager::new(
219            Arc::clone(&metadata),
220            Arc::clone(&catalog),
221            Arc::clone(&store_arc),
222        );
223
224        // Load custom types from SysCatalog into catalog_service
225        if let Err(e) = catalog_service.load_types_from_catalog() {
226            tracing::warn!("[CONTEXT] Failed to load custom types: {}", e);
227        }
228
229        Self {
230            pager,
231            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
232            dropped_tables: RwLock::new(FxHashSet::default()),
233            metadata,
234            constraint_service,
235            catalog_service,
236            catalog,
237            store: store_arc,
238            transaction_manager,
239            txn_manager,
240            txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
241        }
242    }
243
244    /// Return the transaction ID manager shared with sessions.
245    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
246        Arc::clone(&self.txn_manager)
247    }
248
249    /// Return the column store for catalog operations.
250    pub fn store(&self) -> &Arc<ColumnStore<P>> {
251        &self.store
252    }
253
254    /// Register a custom type alias (CREATE TYPE/DOMAIN).
255    pub fn register_type(&self, name: String, data_type: sqlparser::ast::DataType) {
256        self.catalog_service.register_type(name, data_type);
257    }
258
259    /// Drop a custom type alias (DROP TYPE/DOMAIN).
260    pub fn drop_type(&self, name: &str) -> Result<()> {
261        self.catalog_service.drop_type(name)?;
262        Ok(())
263    }
264
265    /// Create a view by storing its SQL definition in the catalog.
266    /// The view will be registered as a table with a view_definition.
267    pub fn create_view(&self, display_name: &str, view_definition: String) -> Result<TableId> {
268        self.catalog_service
269            .create_view(display_name, view_definition)
270    }
271
272    /// Check if a table is actually a view by looking at its metadata.
273    /// Returns true if the table exists and has a view_definition.
274    pub fn is_view(&self, table_id: TableId) -> Result<bool> {
275        self.catalog_service.is_view(table_id)
276    }
277
278    /// Resolve a type name to its base DataType, recursively following aliases.
279    pub fn resolve_type(&self, data_type: &sqlparser::ast::DataType) -> sqlparser::ast::DataType {
280        self.catalog_service.resolve_type(data_type)
281    }
282
283    /// Persist the next_txn_id to the catalog.
284    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
285        let catalog = SysCatalog::new(&self.store);
286        catalog.put_next_txn_id(next_txn_id)?;
287        let last_committed = self.txn_manager.last_committed();
288        catalog.put_last_committed_txn_id(last_committed)?;
289        tracing::debug!(
290            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
291            next_txn_id,
292            last_committed
293        );
294        Ok(())
295    }
296
297    /// Construct the default snapshot for auto-commit operations.
298    pub fn default_snapshot(&self) -> TransactionSnapshot {
299        TransactionSnapshot {
300            txn_id: TXN_ID_AUTO_COMMIT,
301            snapshot_id: self.txn_manager.last_committed(),
302        }
303    }
304
305    /// Get the table catalog for schema and table name management.
306    pub fn table_catalog(&self) -> Arc<TableCatalog> {
307        Arc::clone(&self.catalog)
308    }
309
310    /// Access the catalog manager for type registry, view management, and metadata operations.
311    pub fn catalog(&self) -> &CatalogManager<P> {
312        &self.catalog_service
313    }
314
315    /// Create a new session for transaction management.
316    /// Each session can have its own independent transaction.
317    pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
318        tracing::debug!("[SESSION] RuntimeContext::create_session called");
319        let namespaces = Arc::new(crate::runtime_session::SessionNamespaces::new(Arc::clone(
320            self,
321        )));
322        let wrapper = RuntimeTransactionContext::new(Arc::clone(self));
323        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
324        tracing::debug!(
325            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
326        );
327        RuntimeSession::from_parts(inner, namespaces)
328    }
329
330    /// Get a handle to an existing table by name.
331    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
332        RuntimeTableHandle::new(Arc::clone(self), name)
333    }
334
335    /// Create a table with explicit column definitions - Programmatic API.
336    ///
337    /// This is a **convenience method** for programmatically creating tables with explicit
338    /// column definitions. Use this when you're writing Rust code that needs to create tables
339    /// directly, rather than executing SQL.
340    ///
341    /// # When to use `create_table` vs [`CatalogDdl::create_table`]
342    ///
343    /// **Use `create_table`:**
344    /// - You're writing Rust code (not parsing SQL)
345    /// - You have explicit column definitions
346    /// - You want a simple, ergonomic API: `ctx.create_table("users", vec![...])`
347    /// - You want a `RuntimeTableHandle` to work with immediately
348    /// - **Does NOT support**: CREATE TABLE AS SELECT, foreign keys, namespaces
349    ///
350    /// **Use [`CatalogDdl::create_table`]:**
351    /// - You're implementing SQL execution (already have a parsed `CreateTablePlan`)
352    /// - You need CREATE TABLE AS SELECT support
353    /// - You need foreign key constraints
354    /// - You need namespace support (temporary tables)
355    /// - You need IF NOT EXISTS / OR REPLACE semantics
356    /// - You're working within the transaction system
357    ///
358    /// # Usage Comparison
359    ///
360    /// **Programmatic API** (this method):
361    /// - `ctx.create_table("users", vec![("id", DataType::Int64, false), ...])?`
362    /// - Returns `RuntimeTableHandle` for immediate use
363    /// - Simple, ergonomic, no plan construction needed
364    ///
365    /// **SQL execution API** ([`CatalogDdl::create_table`]):
366    /// - Construct a `CreateTablePlan` with all SQL features
367    /// - Delegates to the [`CatalogDdl`] trait for catalog-aware creation
368    /// - Support for CTAS, foreign keys, namespaces, transactions
369    /// - Returns `RuntimeStatementResult` for consistency with other SQL operations
370    ///
371    /// # Returns
372    /// Returns a [`RuntimeTableHandle`] that provides immediate access to the table.
373    /// Use this for further programmatic operations on the table.
374    /// Returns all table names currently registered in the catalog.
375    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
376        // Use catalog for table names (single source of truth)
377        self.catalog.table_names()
378    }
379}
380
381impl<P> CatalogDdl for RuntimeContext<P>
382where
383    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
384{
385    type CreateTableOutput = RuntimeStatementResult<P>;
386    type DropTableOutput = ();
387    type RenameTableOutput = ();
388    type AlterTableOutput = RuntimeStatementResult<P>;
389    type CreateIndexOutput = RuntimeStatementResult<P>;
390    type DropIndexOutput = Option<SingleColumnIndexDescriptor>;
391
392    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
393        if plan.columns.is_empty() && plan.source.is_none() {
394            return Err(Error::InvalidArgumentError(
395                "CREATE TABLE requires explicit columns or a source".into(),
396            ));
397        }
398
399        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
400        let CreateTablePlan {
401            name: _,
402            if_not_exists,
403            or_replace,
404            columns,
405            source,
406            namespace: _,
407            foreign_keys,
408            multi_column_uniques,
409        } = plan;
410
411        tracing::trace!(
412            "DEBUG create_table (plan): table='{}' if_not_exists={} columns={}",
413            display_name,
414            if_not_exists,
415            columns.len()
416        );
417        for (idx, col) in columns.iter().enumerate() {
418            tracing::trace!(
419                "  plan column[{}]: name='{}' primary_key={}",
420                idx,
421                col.name,
422                col.primary_key
423            );
424        }
425        let (exists, is_dropped) = {
426            let tables = self.tables.read().unwrap();
427            let in_cache = tables.contains_key(&canonical_name);
428            let is_dropped = self
429                .dropped_tables
430                .read()
431                .unwrap()
432                .contains(&canonical_name);
433            // Table exists if it's in cache and NOT marked as dropped
434            (in_cache && !is_dropped, is_dropped)
435        };
436        tracing::trace!(
437            "DEBUG create_table (plan): exists={}, is_dropped={}",
438            exists,
439            is_dropped
440        );
441
442        // If table was dropped, remove it from cache before creating new one
443        if is_dropped {
444            self.remove_table_entry(&canonical_name);
445            self.dropped_tables.write().unwrap().remove(&canonical_name);
446        }
447
448        if exists {
449            if or_replace {
450                tracing::trace!(
451                    "DEBUG create_table (plan): table '{}' exists and or_replace=true, removing existing table before recreation",
452                    display_name
453                );
454                self.remove_table_entry(&canonical_name);
455            } else if if_not_exists {
456                tracing::trace!(
457                    "DEBUG create_table (plan): table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
458                    display_name
459                );
460                return Ok(RuntimeStatementResult::CreateTable {
461                    table_name: display_name,
462                });
463            } else {
464                return Err(Error::CatalogError(format!(
465                    "Catalog Error: Table '{}' already exists",
466                    display_name
467                )));
468            }
469        }
470
471        match source {
472            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
473                display_name,
474                canonical_name,
475                schema,
476                batches,
477                if_not_exists,
478            ),
479            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
480                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table"
481                    .into(),
482            )),
483            None => self.create_table_from_columns(
484                display_name,
485                canonical_name,
486                columns,
487                foreign_keys,
488                multi_column_uniques,
489                if_not_exists,
490            ),
491        }
492    }
493
494    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
495        let DropTablePlan { name, if_exists } = plan;
496        let (display_name, canonical_name) = canonical_table_name(&name)?;
497
498        tracing::debug!("drop_table: attempting to drop table '{}'", canonical_name);
499
500        let cached_entry = {
501            let tables = self.tables.read().unwrap();
502            tracing::debug!("drop_table: cache contains {} tables", tables.len());
503            tables.get(&canonical_name).cloned()
504        };
505
506        let table_entry = match cached_entry {
507            Some(entry) => entry,
508            None => {
509                tracing::debug!(
510                    "drop_table: table '{}' not cached; attempting reload",
511                    canonical_name
512                );
513
514                if self.catalog.table_id(&canonical_name).is_none() {
515                    tracing::debug!(
516                        "drop_table: no catalog entry for '{}'; if_exists={}",
517                        canonical_name,
518                        if_exists
519                    );
520                    if if_exists {
521                        return Ok(());
522                    }
523                    return Err(Error::CatalogError(format!(
524                        "Catalog Error: Table '{}' does not exist",
525                        display_name
526                    )));
527                }
528
529                match self.lookup_table(&canonical_name) {
530                    Ok(entry) => entry,
531                    Err(err) => {
532                        tracing::warn!(
533                            "drop_table: failed to reload table '{}': {:?}",
534                            canonical_name,
535                            err
536                        );
537                        if if_exists {
538                            return Ok(());
539                        }
540                        return Err(err);
541                    }
542                }
543            }
544        };
545
546        let column_field_ids = table_entry
547            .schema
548            .columns
549            .iter()
550            .map(|col| col.field_id)
551            .collect::<Vec<_>>();
552        let table_id = table_entry.table.table_id();
553
554        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
555
556        for detail in referencing {
557            if detail.referencing_table_canonical == canonical_name {
558                continue;
559            }
560
561            if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
562                continue;
563            }
564
565            return Err(Error::CatalogError(format!(
566                "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
567                detail.referencing_table_display
568            )));
569        }
570
571        self.catalog_service
572            .drop_table(&canonical_name, table_id, &column_field_ids)?;
573        tracing::debug!(
574            "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
575            canonical_name,
576            table_id
577        );
578
579        self.dropped_tables
580            .write()
581            .unwrap()
582            .insert(canonical_name.clone());
583        Ok(())
584    }
585
586    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
587        let RenameTablePlan {
588            current_name,
589            new_name,
590            if_exists,
591        } = plan;
592
593        let (current_display, current_canonical) = canonical_table_name(&current_name)?;
594        let (new_display, new_canonical) = canonical_table_name(&new_name)?;
595
596        if current_canonical == new_canonical && current_display == new_display {
597            return Ok(());
598        }
599
600        if self.is_table_marked_dropped(&current_canonical) {
601            if if_exists {
602                return Ok(());
603            }
604            return Err(Error::CatalogError(format!(
605                "Catalog Error: Table '{}' does not exist",
606                current_display
607            )));
608        }
609
610        let table_id = match self
611            .catalog
612            .table_id(&current_canonical)
613            .or_else(|| self.catalog.table_id(&current_display))
614        {
615            Some(id) => id,
616            None => {
617                if if_exists {
618                    return Ok(());
619                }
620                return Err(Error::CatalogError(format!(
621                    "Catalog Error: Table '{}' does not exist",
622                    current_display
623                )));
624            }
625        };
626
627        if !current_display.eq_ignore_ascii_case(&new_display)
628            && (self.catalog.table_id(&new_canonical).is_some()
629                || self.catalog.table_id(&new_display).is_some())
630        {
631            return Err(Error::CatalogError(format!(
632                "Catalog Error: Table '{}' already exists",
633                new_display
634            )));
635        }
636
637        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
638        if !referencing.is_empty() {
639            return Err(Error::CatalogError(format!(
640                "Dependency Error: Cannot alter entry \"{}\" because there are entries that depend on it.",
641                current_display
642            )));
643        }
644
645        self.catalog_service
646            .rename_table(table_id, &current_display, &new_display)?;
647
648        let mut tables = self.tables.write().unwrap();
649        if let Some(table) = tables.remove(&current_canonical) {
650            tables.insert(new_canonical.clone(), table);
651        }
652
653        let mut dropped = self.dropped_tables.write().unwrap();
654        dropped.remove(&current_canonical);
655        dropped.remove(&new_canonical);
656
657        Ok(())
658    }
659
660    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
661        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
662
663        let view = match self.catalog_service.table_view(&canonical_table) {
664            Ok(view) => view,
665            Err(err) if plan.if_exists && is_table_missing_error(&err) => {
666                return Ok(RuntimeStatementResult::NoOp);
667            }
668            Err(err) => return Err(err),
669        };
670
671        let table_meta = match view.table_meta.as_ref() {
672            Some(meta) => meta,
673            None => {
674                if plan.if_exists {
675                    return Ok(RuntimeStatementResult::NoOp);
676                }
677                return Err(Error::Internal("table metadata missing".into()));
678            }
679        };
680
681        let table_id = table_meta.table_id;
682
683        validate_alter_table_operation(&plan.operation, &view, table_id, &self.catalog_service)?;
684
685        match &plan.operation {
686            llkv_plan::AlterTableOperation::RenameColumn {
687                old_column_name,
688                new_column_name,
689            } => {
690                self.rename_column(&plan.table_name, old_column_name, new_column_name)?;
691            }
692            llkv_plan::AlterTableOperation::SetColumnDataType {
693                column_name,
694                new_data_type,
695            } => {
696                self.alter_column_type(&plan.table_name, column_name, new_data_type)?;
697            }
698            llkv_plan::AlterTableOperation::DropColumn { column_name, .. } => {
699                self.drop_column(&plan.table_name, column_name)?;
700            }
701        }
702
703        Ok(RuntimeStatementResult::NoOp)
704    }
705
706    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
707        if plan.columns.is_empty() {
708            return Err(Error::InvalidArgumentError(
709                "CREATE INDEX requires at least one column".into(),
710            ));
711        }
712
713        for column_plan in &plan.columns {
714            if !column_plan.ascending || column_plan.nulls_first {
715                return Err(Error::InvalidArgumentError(
716                    "only ASC indexes with NULLS LAST are supported".into(),
717                ));
718            }
719        }
720
721        let mut index_name = plan.name.clone();
722        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
723        let table = self.lookup_table(&canonical_name)?;
724
725        let mut column_indices = Vec::with_capacity(plan.columns.len());
726        let mut field_ids = Vec::with_capacity(plan.columns.len());
727        let mut column_names = Vec::with_capacity(plan.columns.len());
728        let mut seen_column_indices = FxHashSet::default();
729
730        for column_plan in &plan.columns {
731            let normalized = column_plan.name.to_ascii_lowercase();
732            let col_idx = table
733                .schema
734                .lookup
735                .get(&normalized)
736                .copied()
737                .ok_or_else(|| {
738                    Error::InvalidArgumentError(format!(
739                        "column '{}' does not exist in table '{}'",
740                        column_plan.name, display_name
741                    ))
742                })?;
743            if !seen_column_indices.insert(col_idx) {
744                return Err(Error::InvalidArgumentError(format!(
745                    "duplicate column '{}' in CREATE INDEX",
746                    column_plan.name
747                )));
748            }
749
750            let column = &table.schema.columns[col_idx];
751            column_indices.push(col_idx);
752            field_ids.push(column.field_id);
753            column_names.push(column.name.clone());
754        }
755
756        if plan.columns.len() == 1 {
757            let field_id = field_ids[0];
758            let column_name = column_names[0].clone();
759
760            if plan.unique {
761                let snapshot = self.default_snapshot();
762                let existing_values =
763                    self.scan_column_values(table.as_ref(), field_id, snapshot)?;
764                ensure_single_column_unique(&existing_values, &[], &column_name)?;
765            }
766
767            let registration = self.catalog_service.register_single_column_index(
768                &display_name,
769                &canonical_name,
770                &table.table,
771                field_id,
772                &column_name,
773                plan.name.clone(),
774                plan.unique,
775                plan.if_not_exists,
776            )?;
777
778            let created_name = match registration {
779                SingleColumnIndexRegistration::Created { index_name } => index_name,
780                SingleColumnIndexRegistration::AlreadyExists { index_name } => {
781                    drop(table);
782                    return Ok(RuntimeStatementResult::CreateIndex {
783                        table_name: display_name,
784                        index_name: Some(index_name),
785                    });
786                }
787            };
788
789            index_name = Some(created_name.clone());
790
791            if let Some(updated_table) =
792                Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
793            {
794                self.tables
795                    .write()
796                    .unwrap()
797                    .insert(canonical_name.clone(), Arc::clone(&updated_table));
798            } else {
799                self.remove_table_entry(&canonical_name);
800            }
801
802            drop(table);
803
804            return Ok(RuntimeStatementResult::CreateIndex {
805                table_name: display_name,
806                index_name,
807            });
808        }
809
810        if !plan.unique {
811            return Err(Error::InvalidArgumentError(
812                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
813            ));
814        }
815
816        let table_id = table.table.table_id();
817
818        let snapshot = self.default_snapshot();
819        let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
820        ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
821
822        let executor_entry = ExecutorMultiColumnUnique {
823            index_name: index_name.clone(),
824            column_indices: column_indices.clone(),
825        };
826
827        let registration = self.catalog_service.register_multi_column_unique_index(
828            table_id,
829            &field_ids,
830            index_name.clone(),
831        )?;
832
833        match registration {
834            MultiColumnUniqueRegistration::Created => {
835                table.add_multi_column_unique(executor_entry);
836            }
837            MultiColumnUniqueRegistration::AlreadyExists {
838                index_name: existing,
839            } => {
840                if plan.if_not_exists {
841                    drop(table);
842                    return Ok(RuntimeStatementResult::CreateIndex {
843                        table_name: display_name,
844                        index_name: existing,
845                    });
846                }
847                return Err(Error::CatalogError(format!(
848                    "Index already exists on columns '{}'",
849                    column_names.join(", ")
850                )));
851            }
852        }
853
854        Ok(RuntimeStatementResult::CreateIndex {
855            table_name: display_name,
856            index_name,
857        })
858    }
859
860    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
861        let descriptor = self.catalog_service.drop_single_column_index(plan)?;
862
863        if let Some(descriptor) = &descriptor {
864            self.remove_table_entry(&descriptor.canonical_table_name);
865        }
866
867        Ok(descriptor)
868    }
869}