llkv_runtime/
lib.rs

1//! Query execution runtime for LLKV.
2//!
3//! This crate provides the runtime API (see [`RuntimeEngine`]) for executing SQL plans with full
4//! transaction support. It coordinates between the transaction layer, storage layer,
5//! and query executor to provide a complete database runtime.
6//!
7//! # Key Components
8//!
9//! - **[`RuntimeEngine`]**: Main execution engine for SQL operations
10//! - **[`RuntimeSession`]**: Session-level interface with transaction management
11//! - **[`TransactionContext`]**: Single-transaction execution context
12//! - **Table Provider**: Integration with the query executor for table access
13//!
14//! # Transaction Support
15//!
16//! The runtime supports both:
17//! - **Auto-commit**: Single-statement transactions (uses `TXN_ID_AUTO_COMMIT`)
18//! - **Multi-statement**: Explicit BEGIN/COMMIT/ROLLBACK transactions
19//!
20//! # MVCC Integration
21//!
22//! All data modifications automatically include MVCC metadata:
23//! - `row_id`: Unique row identifier
24//! - `created_by`: Transaction ID that created the row
25//! - `deleted_by`: Transaction ID that deleted the row (or `TXN_ID_NONE`)
26//!
27//! The runtime ensures these columns are injected and managed consistently.
28
29#![forbid(unsafe_code)]
30
31pub mod storage_namespace;
32
33use std::fmt;
34use std::marker::PhantomData;
35use std::mem;
36use std::ops::Bound;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, RwLock};
39use std::time::{SystemTime, UNIX_EPOCH};
40
41use rustc_hash::{FxHashMap, FxHashSet};
42
43use arrow::array::{
44    Array, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
45    UInt64Array, UInt64Builder,
46};
47use arrow::datatypes::{DataType, Field, FieldRef, Schema};
48use arrow::record_batch::RecordBatch;
49use llkv_column_map::ColumnStore;
50use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
51use llkv_column_map::types::LogicalFieldId;
52use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
53use llkv_result::Error;
54use llkv_storage::pager::{BoxedPager, MemPager, Pager};
55use llkv_table::catalog::{FieldConstraints, FieldDefinition, TableCatalog};
56use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
57use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId};
58use llkv_table::{
59    CatalogManager, ConstraintColumnInfo, ConstraintService, CreateTableResult, ForeignKeyColumn,
60    ForeignKeyTableInfo, ForeignKeyView, InsertColumnConstraint, InsertMultiColumnUnique,
61    InsertUniqueColumn, MetadataManager, MultiColumnUniqueEntryMeta, MultiColumnUniqueRegistration,
62    SysCatalog, TableConstraintSummaryView, TableView, UniqueKey, build_composite_unique_key,
63    canonical_table_name, constraints::ConstraintKind, ensure_multi_column_unique,
64    ensure_single_column_unique,
65};
66use simd_r_drive_entry_handle::EntryHandle;
67use sqlparser::ast::{
68    Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
69    SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
70    ValueWithSpan,
71};
72use time::{Date, Month};
73
74pub type Result<T> = llkv_result::Result<T>;
75
76// Re-export plan structures from llkv-plan
77pub use llkv_plan::{
78    AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
79    ColumnSpec, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan, ForeignKeyAction,
80    ForeignKeySpec, IndexColumnPlan, InsertPlan, InsertSource, IntoColumnSpec, NotNull, Nullable,
81    OrderByPlan, OrderSortType, OrderTarget, PlanOperation, PlanStatement, PlanValue, SelectPlan,
82    SelectProjection, UpdatePlan,
83};
84
85// Execution structures from llkv-executor
86use llkv_executor::{ExecutorColumn, ExecutorMultiColumnUnique, ExecutorSchema, ExecutorTable};
87pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
88
89use crate::storage_namespace::{
90    PersistentNamespace, StorageNamespace, StorageNamespaceRegistry, TemporaryNamespace,
91};
92
93// Import transaction structures from llkv-transaction for internal use.
94pub use llkv_transaction::TransactionKind;
95use llkv_transaction::{
96    RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
97    TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
98};
99
100// Internal low-level transaction session type (from llkv-transaction)
101use llkv_transaction::TransactionSession;
102
103// Note: RuntimeSession is the high-level wrapper that users should use instead of the lower-level TransactionSession API
104
105use llkv_table::mvcc;
106
107struct TableConstraintContext {
108    schema_field_ids: Vec<FieldId>,
109    column_constraints: Vec<InsertColumnConstraint>,
110    unique_columns: Vec<InsertUniqueColumn>,
111    multi_column_uniques: Vec<InsertMultiColumnUnique>,
112    primary_key: Option<InsertMultiColumnUnique>,
113}
114
115/// Result of running a plan statement.
116#[allow(clippy::large_enum_variant)]
117#[derive(Clone)]
118pub enum RuntimeStatementResult<P>
119where
120    P: Pager<Blob = EntryHandle> + Send + Sync,
121{
122    CreateTable {
123        table_name: String,
124    },
125    CreateIndex {
126        table_name: String,
127        index_name: Option<String>,
128    },
129    NoOp,
130    Insert {
131        table_name: String,
132        rows_inserted: usize,
133    },
134    Update {
135        table_name: String,
136        rows_updated: usize,
137    },
138    Delete {
139        table_name: String,
140        rows_deleted: usize,
141    },
142    Select {
143        table_name: String,
144        schema: Arc<Schema>,
145        execution: SelectExecution<P>,
146    },
147    Transaction {
148        kind: TransactionKind,
149    },
150}
151
152impl<P> fmt::Debug for RuntimeStatementResult<P>
153where
154    P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        match self {
158            RuntimeStatementResult::CreateTable { table_name } => f
159                .debug_struct("CreateTable")
160                .field("table_name", table_name)
161                .finish(),
162            RuntimeStatementResult::CreateIndex {
163                table_name,
164                index_name,
165            } => f
166                .debug_struct("CreateIndex")
167                .field("table_name", table_name)
168                .field("index_name", index_name)
169                .finish(),
170            RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
171            RuntimeStatementResult::Insert {
172                table_name,
173                rows_inserted,
174            } => f
175                .debug_struct("Insert")
176                .field("table_name", table_name)
177                .field("rows_inserted", rows_inserted)
178                .finish(),
179            RuntimeStatementResult::Update {
180                table_name,
181                rows_updated,
182            } => f
183                .debug_struct("Update")
184                .field("table_name", table_name)
185                .field("rows_updated", rows_updated)
186                .finish(),
187            RuntimeStatementResult::Delete {
188                table_name,
189                rows_deleted,
190            } => f
191                .debug_struct("Delete")
192                .field("table_name", table_name)
193                .field("rows_deleted", rows_deleted)
194                .finish(),
195            RuntimeStatementResult::Select {
196                table_name, schema, ..
197            } => f
198                .debug_struct("Select")
199                .field("table_name", table_name)
200                .field("schema", schema)
201                .finish(),
202            RuntimeStatementResult::Transaction { kind } => {
203                f.debug_struct("Transaction").field("kind", kind).finish()
204            }
205        }
206    }
207}
208
209/// Represents how a column assignment should be materialized during UPDATE/INSERT.
210enum PreparedAssignmentValue {
211    Literal(PlanValue),
212    Expression { expr_index: usize },
213}
214
215impl<P> RuntimeStatementResult<P>
216where
217    P: Pager<Blob = EntryHandle> + Send + Sync,
218{
219    /// Convert a StatementResult from one pager type to another.
220    /// Only works for non-SELECT results (CreateTable, Insert, Update, Delete, NoOp, Transaction).
221    #[allow(dead_code)]
222    pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
223    where
224        Q: Pager<Blob = EntryHandle> + Send + Sync,
225    {
226        match self {
227            RuntimeStatementResult::CreateTable { table_name } => {
228                Ok(RuntimeStatementResult::CreateTable { table_name })
229            }
230            RuntimeStatementResult::CreateIndex {
231                table_name,
232                index_name,
233            } => Ok(RuntimeStatementResult::CreateIndex {
234                table_name,
235                index_name,
236            }),
237            RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
238            RuntimeStatementResult::Insert {
239                table_name,
240                rows_inserted,
241            } => Ok(RuntimeStatementResult::Insert {
242                table_name,
243                rows_inserted,
244            }),
245            RuntimeStatementResult::Update {
246                table_name,
247                rows_updated,
248            } => Ok(RuntimeStatementResult::Update {
249                table_name,
250                rows_updated,
251            }),
252            RuntimeStatementResult::Delete {
253                table_name,
254                rows_deleted,
255            } => Ok(RuntimeStatementResult::Delete {
256                table_name,
257                rows_deleted,
258            }),
259            RuntimeStatementResult::Transaction { kind } => {
260                Ok(RuntimeStatementResult::Transaction { kind })
261            }
262            RuntimeStatementResult::Select { .. } => Err(Error::Internal(
263                "Cannot convert SELECT result between pager types in transaction".into(),
264            )),
265        }
266    }
267}
268
269/// Return the table name referenced by a plan statement, if any.
270///
271/// This is a small helper used by higher-level engines (for example the
272/// SQL front-end) to provide better error messages when a statement fails
273/// with a table-related error. It intentionally returns an `Option<&str>` so
274/// callers can decide how to report missing table context.
275pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
276    match statement {
277        PlanStatement::CreateTable(plan) => Some(&plan.name),
278        PlanStatement::CreateIndex(plan) => Some(&plan.table),
279        PlanStatement::Insert(plan) => Some(&plan.table),
280        PlanStatement::Update(plan) => Some(&plan.table),
281        PlanStatement::Delete(plan) => Some(&plan.table),
282        PlanStatement::Select(plan) => {
283            // Return Some only for single-table queries
284            if plan.tables.len() == 1 {
285                Some(&plan.tables[0].table)
286            } else {
287                None
288            }
289        }
290        PlanStatement::BeginTransaction
291        | PlanStatement::CommitTransaction
292        | PlanStatement::RollbackTransaction => None,
293    }
294}
295
296// ============================================================================
297// Plan Structures (now in llkv-plan and re-exported above)
298// ============================================================================
299//
300// The following types are defined in llkv-plan and re-exported:
301// - plan values, CreateTablePlan, ColumnSpec, IntoColumnSpec
302// - InsertPlan, InsertSource, UpdatePlan, DeletePlan
303// - SelectPlan, SelectProjection, AggregateExpr, AggregateFunction
304// - OrderByPlan, OrderSortType, OrderTarget
305// - PlanOperation
306//
307// This separation allows plans to be used independently of execution logic.
308// ============================================================================
309
310// Transaction management is now handled by llkv-transaction crate.
311// The SessionTransaction and TableDeltaState types are re-exported from there.
312
313/// Wrapper for Context that implements TransactionContext.
314pub struct RuntimeContextWrapper<P>
315where
316    P: Pager<Blob = EntryHandle> + Send + Sync,
317{
318    ctx: Arc<RuntimeContext<P>>,
319    snapshot: RwLock<TransactionSnapshot>,
320}
321
322impl<P> RuntimeContextWrapper<P>
323where
324    P: Pager<Blob = EntryHandle> + Send + Sync,
325{
326    fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
327        let snapshot = ctx.default_snapshot();
328        Self {
329            ctx,
330            snapshot: RwLock::new(snapshot),
331        }
332    }
333
334    fn update_snapshot(&self, snapshot: TransactionSnapshot) {
335        let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
336        *guard = snapshot;
337    }
338
339    fn current_snapshot(&self) -> TransactionSnapshot {
340        *self.snapshot.read().expect("snapshot lock poisoned")
341    }
342
343    fn context(&self) -> &Arc<RuntimeContext<P>> {
344        &self.ctx
345    }
346
347    fn ctx(&self) -> &RuntimeContext<P> {
348        &self.ctx
349    }
350}
351
352struct SessionNamespaces<P>
353where
354    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
355{
356    persistent: Arc<PersistentNamespace<P>>,
357    temporary: Option<Arc<TemporaryNamespace<BoxedPager>>>,
358    registry: Arc<RwLock<StorageNamespaceRegistry>>,
359}
360
361impl<P> SessionNamespaces<P>
362where
363    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
364{
365    fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
366        let persistent = Arc::new(PersistentNamespace::new(
367            storage_namespace::PERSISTENT_NAMESPACE_ID.to_string(),
368            Arc::clone(&base_context),
369        ));
370
371        let mut registry = StorageNamespaceRegistry::new(persistent.namespace_id().clone());
372        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
373
374        let temporary = {
375            let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
376            let temp_context = Arc::new(RuntimeContext::new(temp_pager));
377            let namespace = Arc::new(TemporaryNamespace::new(
378                storage_namespace::TEMPORARY_NAMESPACE_ID.to_string(),
379                temp_context,
380            ));
381            registry.register_namespace(
382                Arc::clone(&namespace),
383                vec![storage_namespace::TEMPORARY_NAMESPACE_ID.to_string()],
384                true,
385            );
386            namespace
387        };
388
389        Self {
390            persistent,
391            temporary: Some(temporary),
392            registry: Arc::new(RwLock::new(registry)),
393        }
394    }
395
396    fn persistent(&self) -> Arc<PersistentNamespace<P>> {
397        Arc::clone(&self.persistent)
398    }
399
400    fn temporary(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
401        self.temporary.as_ref().map(Arc::clone)
402    }
403
404    fn registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
405        Arc::clone(&self.registry)
406    }
407}
408
409impl<P> Drop for SessionNamespaces<P>
410where
411    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
412{
413    fn drop(&mut self) {
414        if let Some(temp) = &self.temporary {
415            temp.clear_tables();
416        }
417    }
418}
419
420/// A session for executing operations with optional transaction support.
421///
422/// This is a high-level wrapper around the transaction machinery that provides
423/// a clean API for users. Operations can be executed directly or within a transaction.
424pub struct RuntimeSession<P>
425where
426    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
427{
428    // TODO: Allow generic pager type
429    // NOTE: Sessions always embed a `MemPager` for temporary namespaces; extend the
430    // wrapper when pluggable temp storage is supported.
431    inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
432    namespaces: Arc<SessionNamespaces<P>>,
433}
434
435impl<P> RuntimeSession<P>
436where
437    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
438{
439    /// Clone this session (reuses the same underlying TransactionSession).
440    /// This is necessary to maintain transaction state across Engine clones.
441    pub(crate) fn clone_session(&self) -> Self {
442        Self {
443            inner: self.inner.clone_session(),
444            namespaces: self.namespaces.clone(),
445        }
446    }
447
448    pub fn namespace_registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
449        self.namespaces.registry()
450    }
451
452    fn resolve_namespace_for_table(&self, canonical: &str) -> storage_namespace::NamespaceId {
453        self.namespace_registry()
454            .read()
455            .expect("namespace registry poisoned")
456            .namespace_for_table(canonical)
457    }
458
459    fn namespace_for_select_plan(
460        &self,
461        plan: &SelectPlan,
462    ) -> Option<storage_namespace::NamespaceId> {
463        if plan.tables.len() != 1 {
464            return None;
465        }
466
467        let qualified = plan.tables[0].qualified_name();
468        let (_, canonical) = canonical_table_name(&qualified).ok()?;
469        Some(self.resolve_namespace_for_table(&canonical))
470    }
471
472    fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
473        let temp_namespace = self
474            .temporary_namespace()
475            .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
476
477        let table_name = if plan.tables.len() == 1 {
478            plan.tables[0].qualified_name()
479        } else {
480            String::new()
481        };
482
483        let execution = temp_namespace.context().execute_select(plan.clone())?;
484        let schema = execution.schema();
485        let batches = execution.collect()?;
486
487        let combined = if batches.is_empty() {
488            RecordBatch::new_empty(Arc::clone(&schema))
489        } else if batches.len() == 1 {
490            batches.into_iter().next().unwrap()
491        } else {
492            let refs: Vec<&RecordBatch> = batches.iter().collect();
493            arrow::compute::concat_batches(&schema, refs)?
494        };
495
496        let execution =
497            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
498
499        Ok(RuntimeStatementResult::Select {
500            execution,
501            table_name,
502            schema,
503        })
504    }
505
506    fn persistent_namespace(&self) -> Arc<PersistentNamespace<P>> {
507        self.namespaces.persistent()
508    }
509
510    #[allow(dead_code)]
511    fn temporary_namespace(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
512        self.namespaces.temporary()
513    }
514
515    /// Begin a transaction in this session.
516    /// Creates an empty staging context for new tables created within the transaction.
517    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
518    pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
519        let staging_pager = Arc::new(MemPager::default());
520        tracing::trace!(
521            "BEGIN_TRANSACTION: Created staging pager at {:p}",
522            &*staging_pager
523        );
524        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
525
526        // Staging context is EMPTY - used only for tables created within the transaction.
527        // Existing tables are read from base context with MVCC visibility filtering.
528        // No data copying occurs at BEGIN - this is pure MVCC.
529
530        let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
531
532        self.inner.begin_transaction(staging_wrapper)?;
533        Ok(RuntimeStatementResult::Transaction {
534            kind: TransactionKind::Begin,
535        })
536    }
537
538    /// Mark the current transaction as aborted due to an error.
539    /// This should be called when any error occurs during a transaction.
540    pub fn abort_transaction(&self) {
541        self.inner.abort_transaction();
542    }
543
544    /// Check if this session has an active transaction.
545    pub fn has_active_transaction(&self) -> bool {
546        let result = self.inner.has_active_transaction();
547        tracing::trace!("SESSION: has_active_transaction() = {}", result);
548        result
549    }
550
551    /// Check if the current transaction has been aborted due to an error.
552    pub fn is_aborted(&self) -> bool {
553        self.inner.is_aborted()
554    }
555
556    /// Commit the current transaction and apply changes to the base context.
557    /// If the transaction was aborted, this acts as a ROLLBACK instead.
558    pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
559        tracing::trace!("Session::commit_transaction called");
560        let (tx_result, operations) = self.inner.commit_transaction()?;
561        tracing::trace!(
562            "Session::commit_transaction got {} operations",
563            operations.len()
564        );
565
566        if !operations.is_empty() {
567            let dropped_tables = self
568                .inner
569                .context()
570                .ctx()
571                .dropped_tables
572                .read()
573                .unwrap()
574                .clone();
575            if !dropped_tables.is_empty() {
576                for operation in &operations {
577                    let table_name_opt = match operation {
578                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
579                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
580                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
581                        _ => None,
582                    };
583                    if let Some(table_name) = table_name_opt {
584                        let (_, canonical) = canonical_table_name(table_name)?;
585                        if dropped_tables.contains(&canonical) {
586                            self.abort_transaction();
587                            return Err(Error::TransactionContextError(
588                                "another transaction has dropped this table".into(),
589                            ));
590                        }
591                    }
592                }
593            }
594        }
595
596        // Extract the transaction kind from the transaction module's result
597        let kind = match tx_result {
598            TransactionResult::Transaction { kind } => kind,
599            _ => {
600                return Err(Error::Internal(
601                    "commit_transaction returned non-transaction result".into(),
602                ));
603            }
604        };
605        tracing::trace!("Session::commit_transaction kind={:?}", kind);
606
607        // Only replay operations if there are any (empty if transaction was aborted)
608        for operation in operations {
609            match operation {
610                PlanOperation::CreateTable(plan) => {
611                    TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
612                }
613                PlanOperation::Insert(plan) => {
614                    TransactionContext::insert(&**self.inner.context(), plan)?;
615                }
616                PlanOperation::Update(plan) => {
617                    TransactionContext::update(&**self.inner.context(), plan)?;
618                }
619                PlanOperation::Delete(plan) => {
620                    TransactionContext::delete(&**self.inner.context(), plan)?;
621                }
622                _ => {}
623            }
624        }
625
626        // Reset the base context snapshot to the default auto-commit view now that
627        // the transaction has been replayed onto the base tables.
628        let base_ctx = self.inner.context();
629        let default_snapshot = base_ctx.ctx().default_snapshot();
630        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
631
632        // Persist the next_txn_id to the catalog after a successful commit
633        if matches!(kind, TransactionKind::Commit) {
634            let ctx = base_ctx.ctx();
635            let next_txn_id = ctx.txn_manager().current_next_txn_id();
636            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
637                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
638            }
639        }
640
641        // Return a StatementResult with the correct kind (Commit or Rollback)
642        Ok(RuntimeStatementResult::Transaction { kind })
643    }
644
645    /// Rollback the current transaction, discarding all changes.
646    pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
647        self.inner.rollback_transaction()?;
648        let base_ctx = self.inner.context();
649        let default_snapshot = base_ctx.ctx().default_snapshot();
650        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
651        Ok(RuntimeStatementResult::Transaction {
652            kind: TransactionKind::Rollback,
653        })
654    }
655
656    fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
657        if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
658            let select_result = self.select(*select_plan)?;
659            let (schema, batches) = match select_result {
660                RuntimeStatementResult::Select {
661                    schema, execution, ..
662                } => {
663                    let batches = execution.collect()?;
664                    (schema, batches)
665                }
666                _ => {
667                    return Err(Error::Internal(
668                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
669                    ));
670                }
671            };
672            plan.source = Some(CreateTableSource::Batches { schema, batches });
673        }
674        Ok(plan)
675    }
676
677    /// Create a table (outside or inside transaction).
678    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
679        let mut plan = self.materialize_create_table_plan(plan)?;
680        let namespace_id = plan
681            .namespace
682            .clone()
683            .unwrap_or_else(|| storage_namespace::PERSISTENT_NAMESPACE_ID.to_string());
684        plan.namespace = Some(namespace_id.clone());
685
686        match namespace_id.as_str() {
687            storage_namespace::TEMPORARY_NAMESPACE_ID => {
688                let temp_namespace = self
689                    .temporary_namespace()
690                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
691                temp_namespace.create_table(plan)?.convert_pager_type::<P>()
692            }
693            storage_namespace::PERSISTENT_NAMESPACE_ID => {
694                if self.has_active_transaction() {
695                    let table_name = plan.name.clone();
696                    match self
697                        .inner
698                        .execute_operation(PlanOperation::CreateTable(plan))
699                    {
700                        Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
701                        Err(e) => {
702                            // If an error occurs during a transaction, abort it
703                            self.abort_transaction();
704                            Err(e)
705                        }
706                    }
707                } else {
708                    self.persistent_namespace().create_table(plan)
709                }
710            }
711            other => Err(Error::InvalidArgumentError(format!(
712                "Unknown storage namespace '{}'",
713                other
714            ))),
715        }
716    }
717
718    pub fn drop_table(&self, name: &str, if_exists: bool) -> Result<()> {
719        let (_, canonical_table) = canonical_table_name(name)?;
720        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
721
722        match namespace_id.as_str() {
723            storage_namespace::TEMPORARY_NAMESPACE_ID => {
724                let temp_namespace = self
725                    .temporary_namespace()
726                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
727                temp_namespace.drop_table(name, if_exists)
728            }
729            storage_namespace::PERSISTENT_NAMESPACE_ID => {
730                self.persistent_namespace().drop_table(name, if_exists)
731            }
732            other => Err(Error::InvalidArgumentError(format!(
733                "Unknown storage namespace '{}'",
734                other
735            ))),
736        }
737    }
738    /// Create an index (auto-commit only for now).
739    pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
740        let (_, canonical_table) = canonical_table_name(&plan.table)?;
741        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
742
743        match namespace_id.as_str() {
744            storage_namespace::TEMPORARY_NAMESPACE_ID => {
745                let temp_namespace = self
746                    .temporary_namespace()
747                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
748                temp_namespace.create_index(plan)?.convert_pager_type::<P>()
749            }
750            storage_namespace::PERSISTENT_NAMESPACE_ID => {
751                if self.has_active_transaction() {
752                    return Err(Error::InvalidArgumentError(
753                        "CREATE INDEX is not supported inside an active transaction".into(),
754                    ));
755                }
756
757                self.persistent_namespace().create_index(plan)
758            }
759            other => Err(Error::InvalidArgumentError(format!(
760                "Unknown storage namespace '{}'",
761                other
762            ))),
763        }
764    }
765
766    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
767        let InsertPlan {
768            table,
769            columns,
770            source,
771        } = plan;
772
773        match source {
774            InsertSource::Rows(rows) => {
775                let count = rows.len();
776                Ok((
777                    InsertPlan {
778                        table,
779                        columns,
780                        source: InsertSource::Rows(rows),
781                    },
782                    count,
783                ))
784            }
785            InsertSource::Batches(batches) => {
786                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
787                Ok((
788                    InsertPlan {
789                        table,
790                        columns,
791                        source: InsertSource::Batches(batches),
792                    },
793                    count,
794                ))
795            }
796            InsertSource::Select { plan: select_plan } => {
797                let select_result = self.select(*select_plan)?;
798                let rows = match select_result {
799                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
800                    _ => {
801                        return Err(Error::Internal(
802                            "expected Select result when executing INSERT ... SELECT".into(),
803                        ));
804                    }
805                };
806                let count = rows.len();
807                Ok((
808                    InsertPlan {
809                        table,
810                        columns,
811                        source: InsertSource::Rows(rows),
812                    },
813                    count,
814                ))
815            }
816        }
817    }
818
819    /// Insert rows (outside or inside transaction).
820    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
821        tracing::trace!("Session::insert called for table={}", plan.table);
822        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
823        let table_name = plan.table.clone();
824        let (_, canonical_table) = canonical_table_name(&plan.table)?;
825        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
826
827        match namespace_id.as_str() {
828            storage_namespace::TEMPORARY_NAMESPACE_ID => {
829                let temp_namespace = self
830                    .temporary_namespace()
831                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
832                temp_namespace
833                    .context()
834                    .insert(plan)?
835                    .convert_pager_type::<P>()?;
836                Ok(RuntimeStatementResult::Insert {
837                    rows_inserted,
838                    table_name,
839                })
840            }
841            storage_namespace::PERSISTENT_NAMESPACE_ID => {
842                if self.has_active_transaction() {
843                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
844                        Ok(_) => {
845                            tracing::trace!("Session::insert succeeded for table={}", table_name);
846                            Ok(RuntimeStatementResult::Insert {
847                                rows_inserted,
848                                table_name,
849                            })
850                        }
851                        Err(e) => {
852                            tracing::trace!(
853                                "Session::insert failed for table={}, error={:?}",
854                                table_name,
855                                e
856                            );
857                            if matches!(e, Error::ConstraintError(_)) {
858                                tracing::trace!("Transaction is_aborted=true");
859                                self.abort_transaction();
860                            }
861                            Err(e)
862                        }
863                    }
864                } else {
865                    let context = self.inner.context();
866                    let default_snapshot = context.ctx().default_snapshot();
867                    TransactionContext::set_snapshot(&**context, default_snapshot);
868                    TransactionContext::insert(&**context, plan)?;
869                    Ok(RuntimeStatementResult::Insert {
870                        rows_inserted,
871                        table_name,
872                    })
873                }
874            }
875            other => Err(Error::InvalidArgumentError(format!(
876                "Unknown storage namespace '{}'",
877                other
878            ))),
879        }
880    }
881
882    /// Select rows (outside or inside transaction).
883    pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
884        if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
885            && namespace_id == storage_namespace::TEMPORARY_NAMESPACE_ID
886        {
887            return self.select_from_temporary(plan);
888        }
889
890        if self.has_active_transaction() {
891            let tx_result = match self
892                .inner
893                .execute_operation(PlanOperation::Select(plan.clone()))
894            {
895                Ok(result) => result,
896                Err(e) => {
897                    // Only abort transaction on specific errors (constraint violations, etc.)
898                    // Don't abort on catalog errors (table doesn't exist) or similar
899                    if matches!(e, Error::ConstraintError(_)) {
900                        self.abort_transaction();
901                    }
902                    return Err(e);
903                }
904            };
905            match tx_result {
906                TransactionResult::Select {
907                    table_name,
908                    schema,
909                    execution: staging_execution,
910                } => {
911                    // Convert from staging (MemPager) execution to base pager execution
912                    // by collecting batches and rebuilding
913                    let batches = staging_execution.collect().unwrap_or_default();
914                    let combined = if batches.is_empty() {
915                        RecordBatch::new_empty(Arc::clone(&schema))
916                    } else if batches.len() == 1 {
917                        batches.into_iter().next().unwrap()
918                    } else {
919                        let refs: Vec<&RecordBatch> = batches.iter().collect();
920                        arrow::compute::concat_batches(&schema, refs)?
921                    };
922
923                    let execution = SelectExecution::from_batch(
924                        table_name.clone(),
925                        Arc::clone(&schema),
926                        combined,
927                    );
928
929                    Ok(RuntimeStatementResult::Select {
930                        execution,
931                        table_name,
932                        schema,
933                    })
934                }
935                _ => Err(Error::Internal("expected Select result".into())),
936            }
937        } else {
938            // Call via TransactionContext trait
939            let context = self.inner.context();
940            let default_snapshot = context.ctx().default_snapshot();
941            TransactionContext::set_snapshot(&**context, default_snapshot);
942            let table_name = if plan.tables.len() == 1 {
943                plan.tables[0].qualified_name()
944            } else {
945                String::new()
946            };
947            let execution = TransactionContext::execute_select(&**context, plan)?;
948            let schema = execution.schema();
949            Ok(RuntimeStatementResult::Select {
950                execution,
951                table_name,
952                schema,
953            })
954        }
955    }
956
957    /// Convenience helper to fetch all rows from a table within this session.
958    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
959        let plan =
960            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
961        match self.select(plan)? {
962            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
963            other => Err(Error::Internal(format!(
964                "expected Select result when reading table '{table}', got {:?}",
965                other
966            ))),
967        }
968    }
969
970    /// Update rows (outside or inside transaction).
971    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
972        let (_, canonical_table) = canonical_table_name(&plan.table)?;
973        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
974
975        match namespace_id.as_str() {
976            storage_namespace::TEMPORARY_NAMESPACE_ID => {
977                let temp_namespace = self
978                    .temporary_namespace()
979                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
980                temp_namespace
981                    .context()
982                    .update(plan)?
983                    .convert_pager_type::<P>()
984            }
985            storage_namespace::PERSISTENT_NAMESPACE_ID => {
986                if self.has_active_transaction() {
987                    let table_name = plan.table.clone();
988                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
989                        Ok(result) => result,
990                        Err(e) => {
991                            // If an error occurs during a transaction, abort it
992                            self.abort_transaction();
993                            return Err(e);
994                        }
995                    };
996                    match result {
997                        TransactionResult::Update {
998                            rows_matched: _,
999                            rows_updated,
1000                        } => Ok(RuntimeStatementResult::Update {
1001                            rows_updated,
1002                            table_name,
1003                        }),
1004                        _ => Err(Error::Internal("expected Update result".into())),
1005                    }
1006                } else {
1007                    // Call via TransactionContext trait
1008                    let context = self.inner.context();
1009                    let default_snapshot = context.ctx().default_snapshot();
1010                    TransactionContext::set_snapshot(&**context, default_snapshot);
1011                    let table_name = plan.table.clone();
1012                    let result = TransactionContext::update(&**context, plan)?;
1013                    match result {
1014                        TransactionResult::Update {
1015                            rows_matched: _,
1016                            rows_updated,
1017                        } => Ok(RuntimeStatementResult::Update {
1018                            rows_updated,
1019                            table_name,
1020                        }),
1021                        _ => Err(Error::Internal("expected Update result".into())),
1022                    }
1023                }
1024            }
1025            other => Err(Error::InvalidArgumentError(format!(
1026                "Unknown storage namespace '{}'",
1027                other
1028            ))),
1029        }
1030    }
1031
1032    /// Delete rows (outside or inside transaction).
1033    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1034        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1035        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1036
1037        match namespace_id.as_str() {
1038            storage_namespace::TEMPORARY_NAMESPACE_ID => {
1039                let temp_namespace = self
1040                    .temporary_namespace()
1041                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1042                temp_namespace
1043                    .context()
1044                    .delete(plan)?
1045                    .convert_pager_type::<P>()
1046            }
1047            storage_namespace::PERSISTENT_NAMESPACE_ID => {
1048                if self.has_active_transaction() {
1049                    let table_name = plan.table.clone();
1050                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1051                        Ok(result) => result,
1052                        Err(e) => {
1053                            // If an error occurs during a transaction, abort it
1054                            self.abort_transaction();
1055                            return Err(e);
1056                        }
1057                    };
1058                    match result {
1059                        TransactionResult::Delete { rows_deleted } => {
1060                            Ok(RuntimeStatementResult::Delete {
1061                                rows_deleted,
1062                                table_name,
1063                            })
1064                        }
1065                        _ => Err(Error::Internal("expected Delete result".into())),
1066                    }
1067                } else {
1068                    // Call via TransactionContext trait
1069                    let context = self.inner.context();
1070                    let default_snapshot = context.ctx().default_snapshot();
1071                    TransactionContext::set_snapshot(&**context, default_snapshot);
1072                    let table_name = plan.table.clone();
1073                    let result = TransactionContext::delete(&**context, plan)?;
1074                    match result {
1075                        TransactionResult::Delete { rows_deleted } => {
1076                            Ok(RuntimeStatementResult::Delete {
1077                                rows_deleted,
1078                                table_name,
1079                            })
1080                        }
1081                        _ => Err(Error::Internal("expected Delete result".into())),
1082                    }
1083                }
1084            }
1085            other => Err(Error::InvalidArgumentError(format!(
1086                "Unknown storage namespace '{}'",
1087                other
1088            ))),
1089        }
1090    }
1091}
1092
1093pub struct RuntimeEngine<P>
1094where
1095    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1096{
1097    context: Arc<RuntimeContext<P>>,
1098    session: RuntimeSession<P>,
1099}
1100
1101impl<P> Clone for RuntimeEngine<P>
1102where
1103    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1104{
1105    fn clone(&self) -> Self {
1106        // IMPORTANT: Reuse the same session to maintain transaction state!
1107        // Creating a new session would break multi-statement transactions.
1108        tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
1109        Self {
1110            context: Arc::clone(&self.context),
1111            session: self.session.clone_session(),
1112        }
1113    }
1114}
1115
1116impl<P> RuntimeEngine<P>
1117where
1118    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1119{
1120    pub fn new(pager: Arc<P>) -> Self {
1121        let context = Arc::new(RuntimeContext::new(pager));
1122        Self::from_context(context)
1123    }
1124
1125    pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
1126        tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
1127        let session = context.create_session();
1128        tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
1129        Self { context, session }
1130    }
1131
1132    pub fn context(&self) -> Arc<RuntimeContext<P>> {
1133        Arc::clone(&self.context)
1134    }
1135
1136    pub fn session(&self) -> &RuntimeSession<P> {
1137        &self.session
1138    }
1139
1140    pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
1141        match statement {
1142            PlanStatement::BeginTransaction => self.session.begin_transaction(),
1143            PlanStatement::CommitTransaction => self.session.commit_transaction(),
1144            PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
1145            PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
1146            PlanStatement::CreateIndex(plan) => self.session.create_index(plan),
1147            PlanStatement::Insert(plan) => self.session.insert(plan),
1148            PlanStatement::Update(plan) => self.session.update(plan),
1149            PlanStatement::Delete(plan) => self.session.delete(plan),
1150            PlanStatement::Select(plan) => self.session.select(plan),
1151        }
1152    }
1153
1154    pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
1155    where
1156        I: IntoIterator<Item = PlanStatement>,
1157    {
1158        let mut results = Vec::new();
1159        for statement in statements {
1160            results.push(self.execute_statement(statement)?);
1161        }
1162        Ok(results)
1163    }
1164}
1165
1166/// In-memory execution context shared by plan-based queries.
1167///
1168/// Important: "lazy loading" here refers to *table metadata only* (schema,
1169/// executor-side column descriptors, and a small next-row-id counter). We do
1170/// NOT eagerly load or materialize the table's row data into memory. All
1171/// row/column data remains on the ColumnStore and is streamed in chunks during
1172/// query execution. This keeps the memory footprint low even for very large
1173/// tables.
1174///
1175/// Typical resource usage:
1176/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
1177/// - ExecutorTable struct: small (handles + counters)
1178/// - Actual table rows: streamed from disk in chunks (never fully resident)
1179pub struct RuntimeContext<P>
1180where
1181    P: Pager<Blob = EntryHandle> + Send + Sync,
1182{
1183    pager: Arc<P>,
1184    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
1185    dropped_tables: RwLock<FxHashSet<String>>,
1186    metadata: Arc<MetadataManager<P>>,
1187    constraint_service: ConstraintService<P>,
1188    catalog_service: CatalogManager<P>,
1189    // Centralized catalog for table/field name resolution
1190    catalog: Arc<TableCatalog>,
1191    // Shared column store for all tables in this context
1192    // This ensures catalog state is synchronized across all tables
1193    store: Arc<ColumnStore<P>>,
1194    // Transaction manager for session-based transactions
1195    transaction_manager:
1196        TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
1197    txn_manager: Arc<TxnIdManager>,
1198    txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
1199}
1200
1201impl<P> RuntimeContext<P>
1202where
1203    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1204{
1205    pub fn new(pager: Arc<P>) -> Self {
1206        Self::new_with_catalog_inner(pager, None)
1207    }
1208
1209    pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
1210        Self::new_with_catalog_inner(pager, Some(catalog))
1211    }
1212
1213    fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
1214        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
1215
1216        let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
1217        let catalog = SysCatalog::new(&store);
1218
1219        let next_txn_id = match catalog.get_next_txn_id() {
1220            Ok(Some(id)) => {
1221                tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
1222                id
1223            }
1224            Ok(None) => {
1225                tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
1226                TXN_ID_AUTO_COMMIT + 1
1227            }
1228            Err(e) => {
1229                tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
1230                TXN_ID_AUTO_COMMIT + 1
1231            }
1232        };
1233
1234        let last_committed = match catalog.get_last_committed_txn_id() {
1235            Ok(Some(id)) => {
1236                tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
1237                id
1238            }
1239            Ok(None) => {
1240                tracing::debug!(
1241                    "[CONTEXT] No persisted last_committed found, starting from default"
1242                );
1243                TXN_ID_AUTO_COMMIT
1244            }
1245            Err(e) => {
1246                tracing::warn!(
1247                    "[CONTEXT] Failed to load last_committed: {}, using default",
1248                    e
1249                );
1250                TXN_ID_AUTO_COMMIT
1251            }
1252        };
1253
1254        let store_arc = Arc::new(store);
1255        let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
1256
1257        let loaded_tables = match metadata.all_table_metas() {
1258            Ok(metas) => {
1259                tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1260                metas
1261            }
1262            Err(e) => {
1263                tracing::warn!(
1264                    "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1265                    e
1266                );
1267                Vec::new()
1268            }
1269        };
1270
1271        let transaction_manager =
1272            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1273        let txn_manager = transaction_manager.txn_manager();
1274
1275        // LAZY LOADING: Only load table metadata at first access. We intentionally
1276        // avoid loading any row/column data into memory here. The executor
1277        // performs streaming reads from the ColumnStore when a query runs, so
1278        // large tables are never fully materialized.
1279        //
1280        // Benefits of this approach:
1281        // - Instant database open (no upfront I/O for table data)
1282        // - Lower memory footprint (only metadata cached)
1283        // - Natural parallelism: if multiple threads request different tables
1284        //   concurrently, those tables will be loaded concurrently by the
1285        //   caller threads (no global preload required).
1286        //
1287        // Future Optimizations (if profiling shows need):
1288        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
1289        // 2. Background preload of catalog entries after startup
1290        // 3. LRU-based eviction for extremely large deployments
1291        // 4. Cache compact representations of schemas to reduce per-table RAM
1292        //
1293        // Note: `loaded_tables` holds catalog metadata that helped us discover
1294        // which tables exist; we discard it here because metadata will be
1295        // fetched on-demand during lazy loads.
1296        tracing::debug!(
1297            "[CONTEXT] Initialized with lazy loading for {} table(s)",
1298            loaded_tables.len()
1299        );
1300
1301        // Initialize catalog and populate with existing tables
1302        let (catalog, is_shared_catalog) = match shared_catalog {
1303            Some(existing) => (existing, true),
1304            None => (Arc::new(TableCatalog::new()), false),
1305        };
1306        for (table_id, table_meta) in &loaded_tables {
1307            if let Some(ref table_name) = table_meta.name
1308                && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
1309            {
1310                match e {
1311                    Error::CatalogError(ref msg)
1312                        if is_shared_catalog && msg.contains("already exists") =>
1313                    {
1314                        tracing::debug!(
1315                            "[CONTEXT] Shared catalog already contains table '{}' with id={}",
1316                            table_name,
1317                            table_id
1318                        );
1319                    }
1320                    other => {
1321                        tracing::warn!(
1322                            "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
1323                            table_name,
1324                            table_id,
1325                            other
1326                        );
1327                    }
1328                }
1329            }
1330        }
1331        tracing::debug!(
1332            "[CONTEXT] Catalog initialized with {} table(s)",
1333            catalog.table_count()
1334        );
1335
1336        let constraint_service =
1337            ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
1338        let catalog_service = CatalogManager::new(
1339            Arc::clone(&metadata),
1340            Arc::clone(&catalog),
1341            Arc::clone(&store_arc),
1342        );
1343
1344        Self {
1345            pager,
1346            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
1347            dropped_tables: RwLock::new(FxHashSet::default()),
1348            metadata,
1349            constraint_service,
1350            catalog_service,
1351            catalog,
1352            store: store_arc,
1353            transaction_manager,
1354            txn_manager,
1355            txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
1356        }
1357    }
1358
1359    /// Return the transaction ID manager shared with sessions.
1360    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1361        Arc::clone(&self.txn_manager)
1362    }
1363
1364    /// Persist the next_txn_id to the catalog.
1365    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1366        let catalog = SysCatalog::new(&self.store);
1367        catalog.put_next_txn_id(next_txn_id)?;
1368        let last_committed = self.txn_manager.last_committed();
1369        catalog.put_last_committed_txn_id(last_committed)?;
1370        tracing::debug!(
1371            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1372            next_txn_id,
1373            last_committed
1374        );
1375        Ok(())
1376    }
1377
1378    fn build_executor_multi_column_uniques(
1379        table: &ExecutorTable<P>,
1380        stored: &[MultiColumnUniqueEntryMeta],
1381    ) -> Vec<ExecutorMultiColumnUnique> {
1382        let mut results = Vec::with_capacity(stored.len());
1383
1384        'outer: for entry in stored {
1385            if entry.column_ids.is_empty() {
1386                continue;
1387            }
1388
1389            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
1390            for field_id in &entry.column_ids {
1391                if let Some((idx, _)) = table
1392                    .schema
1393                    .columns
1394                    .iter()
1395                    .enumerate()
1396                    .find(|(_, col)| &col.field_id == field_id)
1397                {
1398                    column_indices.push(idx);
1399                } else {
1400                    tracing::warn!(
1401                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
1402                        entry.index_name,
1403                        table.table.table_id(),
1404                        field_id
1405                    );
1406                    continue 'outer;
1407                }
1408            }
1409
1410            results.push(ExecutorMultiColumnUnique {
1411                index_name: entry.index_name.clone(),
1412                column_indices,
1413            });
1414        }
1415
1416        results
1417    }
1418
1419    /// Construct the default snapshot for auto-commit operations.
1420    pub fn default_snapshot(&self) -> TransactionSnapshot {
1421        TransactionSnapshot {
1422            txn_id: TXN_ID_AUTO_COMMIT,
1423            snapshot_id: self.txn_manager.last_committed(),
1424        }
1425    }
1426
1427    /// Get the table catalog for schema and table name management.
1428    pub fn table_catalog(&self) -> Arc<TableCatalog> {
1429        Arc::clone(&self.catalog)
1430    }
1431
1432    /// Create a new session for transaction management.
1433    /// Each session can have its own independent transaction.
1434    pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1435        tracing::debug!("[SESSION] RuntimeContext::create_session called");
1436        let namespaces = Arc::new(SessionNamespaces::new(Arc::clone(self)));
1437        let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1438        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1439        tracing::debug!(
1440            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1441        );
1442        RuntimeSession { inner, namespaces }
1443    }
1444
1445    /// Get a handle to an existing table by name.
1446    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1447        RuntimeTableHandle::new(Arc::clone(self), name)
1448    }
1449
1450    /// Check if there's an active transaction (checks if ANY session has a transaction).
1451    #[deprecated(note = "Use session-based transactions instead")]
1452    pub fn has_active_transaction(&self) -> bool {
1453        self.transaction_manager.has_active_transaction()
1454    }
1455
1456    pub fn create_table<C, I>(
1457        self: &Arc<Self>,
1458        name: &str,
1459        columns: I,
1460    ) -> Result<RuntimeTableHandle<P>>
1461    where
1462        C: IntoColumnSpec,
1463        I: IntoIterator<Item = C>,
1464    {
1465        self.create_table_with_options(name, columns, false)
1466    }
1467
1468    pub fn create_table_if_not_exists<C, I>(
1469        self: &Arc<Self>,
1470        name: &str,
1471        columns: I,
1472    ) -> Result<RuntimeTableHandle<P>>
1473    where
1474        C: IntoColumnSpec,
1475        I: IntoIterator<Item = C>,
1476    {
1477        self.create_table_with_options(name, columns, true)
1478    }
1479
1480    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1481        if plan.columns.is_empty() && plan.source.is_none() {
1482            return Err(Error::InvalidArgumentError(
1483                "CREATE TABLE requires explicit columns or a source".into(),
1484            ));
1485        }
1486
1487        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1488        let CreateTablePlan {
1489            name: _,
1490            if_not_exists,
1491            or_replace,
1492            columns,
1493            source,
1494            namespace: _,
1495            foreign_keys,
1496        } = plan;
1497
1498        tracing::trace!(
1499            "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1500            display_name,
1501            if_not_exists,
1502            columns.len()
1503        );
1504        for (idx, col) in columns.iter().enumerate() {
1505            tracing::trace!(
1506                "  plan column[{}]: name='{}' primary_key={}",
1507                idx,
1508                col.name,
1509                col.primary_key
1510            );
1511        }
1512        let (exists, is_dropped) = {
1513            let tables = self.tables.read().unwrap();
1514            let in_cache = tables.contains_key(&canonical_name);
1515            let is_dropped = self
1516                .dropped_tables
1517                .read()
1518                .unwrap()
1519                .contains(&canonical_name);
1520            // Table exists if it's in cache and NOT marked as dropped
1521            (in_cache && !is_dropped, is_dropped)
1522        };
1523        tracing::trace!(
1524            "DEBUG create_table_plan: exists={}, is_dropped={}",
1525            exists,
1526            is_dropped
1527        );
1528
1529        // If table was dropped, remove it from cache before creating new one
1530        if is_dropped {
1531            self.remove_table_entry(&canonical_name);
1532            self.dropped_tables.write().unwrap().remove(&canonical_name);
1533        }
1534
1535        if exists {
1536            if or_replace {
1537                tracing::trace!(
1538                    "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1539                    display_name
1540                );
1541                self.remove_table_entry(&canonical_name);
1542            } else if if_not_exists {
1543                tracing::trace!(
1544                    "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1545                    display_name
1546                );
1547                return Ok(RuntimeStatementResult::CreateTable {
1548                    table_name: display_name,
1549                });
1550            } else {
1551                return Err(Error::CatalogError(format!(
1552                    "Catalog Error: Table '{}' already exists",
1553                    display_name
1554                )));
1555            }
1556        }
1557
1558        match source {
1559            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1560                display_name,
1561                canonical_name,
1562                schema,
1563                batches,
1564                if_not_exists,
1565            ),
1566            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1567                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1568                    .into(),
1569            )),
1570            None => self.create_table_from_columns(
1571                display_name,
1572                canonical_name,
1573                columns,
1574                foreign_keys,
1575                if_not_exists,
1576            ),
1577        }
1578    }
1579
1580    pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
1581        if plan.columns.is_empty() {
1582            return Err(Error::InvalidArgumentError(
1583                "CREATE INDEX requires at least one column".into(),
1584            ));
1585        }
1586
1587        for column_plan in &plan.columns {
1588            if !column_plan.ascending || column_plan.nulls_first {
1589                return Err(Error::InvalidArgumentError(
1590                    "only ASC indexes with NULLS LAST are supported".into(),
1591                ));
1592            }
1593        }
1594
1595        let index_name = plan.name.clone();
1596        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1597        let table = self.lookup_table(&canonical_name)?;
1598
1599        let mut column_indices = Vec::with_capacity(plan.columns.len());
1600        let mut field_ids = Vec::with_capacity(plan.columns.len());
1601        let mut column_names = Vec::with_capacity(plan.columns.len());
1602        let mut seen_column_indices = FxHashSet::default();
1603
1604        for column_plan in &plan.columns {
1605            let normalized = column_plan.name.to_ascii_lowercase();
1606            let col_idx = table
1607                .schema
1608                .lookup
1609                .get(&normalized)
1610                .copied()
1611                .ok_or_else(|| {
1612                    Error::InvalidArgumentError(format!(
1613                        "column '{}' does not exist in table '{}'",
1614                        column_plan.name, display_name
1615                    ))
1616                })?;
1617            if !seen_column_indices.insert(col_idx) {
1618                return Err(Error::InvalidArgumentError(format!(
1619                    "duplicate column '{}' in CREATE INDEX",
1620                    column_plan.name
1621                )));
1622            }
1623
1624            let column = &table.schema.columns[col_idx];
1625            column_indices.push(col_idx);
1626            field_ids.push(column.field_id);
1627            column_names.push(column.name.clone());
1628        }
1629
1630        if plan.columns.len() == 1 {
1631            let field_id = field_ids[0];
1632            let column_name = column_names[0].clone();
1633
1634            if plan.unique {
1635                let snapshot = self.default_snapshot();
1636                let existing_values =
1637                    self.scan_column_values(table.as_ref(), field_id, snapshot)?;
1638                ensure_single_column_unique(&existing_values, &[], &column_name)?;
1639            }
1640
1641            let created = self.catalog_service.register_single_column_index(
1642                &display_name,
1643                &canonical_name,
1644                &table.table,
1645                field_id,
1646                &column_name,
1647                plan.unique,
1648                plan.if_not_exists,
1649            )?;
1650
1651            if !created {
1652                // Index already existed and if_not_exists was true
1653                return Ok(RuntimeStatementResult::CreateIndex {
1654                    table_name: display_name,
1655                    index_name,
1656                });
1657            }
1658
1659            if let Some(updated_table) =
1660                Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1661            {
1662                self.tables
1663                    .write()
1664                    .unwrap()
1665                    .insert(canonical_name.clone(), Arc::clone(&updated_table));
1666            } else {
1667                self.remove_table_entry(&canonical_name);
1668            }
1669
1670            drop(table);
1671
1672            return Ok(RuntimeStatementResult::CreateIndex {
1673                table_name: display_name,
1674                index_name,
1675            });
1676        }
1677
1678        if !plan.unique {
1679            return Err(Error::InvalidArgumentError(
1680                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1681            ));
1682        }
1683
1684        let table_id = table.table.table_id();
1685
1686        let snapshot = self.default_snapshot();
1687        let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1688        ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1689
1690        let executor_entry = ExecutorMultiColumnUnique {
1691            index_name: index_name.clone(),
1692            column_indices: column_indices.clone(),
1693        };
1694
1695        let registration = self.catalog_service.register_multi_column_unique_index(
1696            table_id,
1697            &field_ids,
1698            index_name.clone(),
1699        )?;
1700
1701        match registration {
1702            MultiColumnUniqueRegistration::Created => {
1703                table.add_multi_column_unique(executor_entry);
1704            }
1705            MultiColumnUniqueRegistration::AlreadyExists {
1706                index_name: existing,
1707            } => {
1708                if plan.if_not_exists {
1709                    drop(table);
1710                    return Ok(RuntimeStatementResult::CreateIndex {
1711                        table_name: display_name,
1712                        index_name: existing,
1713                    });
1714                }
1715                return Err(Error::CatalogError(format!(
1716                    "Index already exists on columns '{}'",
1717                    column_names.join(", ")
1718                )));
1719            }
1720        }
1721
1722        Ok(RuntimeStatementResult::CreateIndex {
1723            table_name: display_name,
1724            index_name,
1725        })
1726    }
1727
1728    /// Returns all table names currently registered in the catalog.
1729    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1730        // Use catalog for table names (single source of truth)
1731        self.catalog.table_names()
1732    }
1733
1734    /// Returns metadata snapshot for a table including columns, types, and constraints.
1735    pub fn table_view(&self, canonical_name: &str) -> Result<TableView> {
1736        self.catalog_service.table_view(canonical_name)
1737    }
1738
1739    fn filter_visible_row_ids(
1740        &self,
1741        table: &ExecutorTable<P>,
1742        row_ids: Vec<RowId>,
1743        snapshot: TransactionSnapshot,
1744    ) -> Result<Vec<RowId>> {
1745        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
1746    }
1747
1748    /// Creates a fluent builder for defining and creating a new table with columns and constraints.
1749    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1750        RuntimeCreateTableBuilder {
1751            ctx: self,
1752            plan: CreateTablePlan::new(name),
1753        }
1754    }
1755
1756    /// Returns column specifications for a table including names, types, and constraint flags.
1757    pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1758        let (_, canonical_name) = canonical_table_name(name)?;
1759        self.catalog_service.table_column_specs(&canonical_name)
1760    }
1761
1762    /// Returns foreign key relationships for a table, both referencing and referenced constraints.
1763    pub fn foreign_key_views(self: &Arc<Self>, name: &str) -> Result<Vec<ForeignKeyView>> {
1764        let (_, canonical_name) = canonical_table_name(name)?;
1765        self.catalog_service.foreign_key_views(&canonical_name)
1766    }
1767
1768    /// Exports all rows from a table as a `RowBatch`, useful for data migration or inspection.
1769    pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1770        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1771        handle.lazy()?.collect_rows()
1772    }
1773
1774    fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1775        self.create_table_plan(plan)
1776    }
1777
1778    fn create_table_with_options<C, I>(
1779        self: &Arc<Self>,
1780        name: &str,
1781        columns: I,
1782        if_not_exists: bool,
1783    ) -> Result<RuntimeTableHandle<P>>
1784    where
1785        C: IntoColumnSpec,
1786        I: IntoIterator<Item = C>,
1787    {
1788        let mut plan = CreateTablePlan::new(name);
1789        plan.if_not_exists = if_not_exists;
1790        plan.columns = columns
1791            .into_iter()
1792            .map(|column| column.into_column_spec())
1793            .collect();
1794        let result = self.create_table_plan(plan)?;
1795        match result {
1796            RuntimeStatementResult::CreateTable { .. } => {
1797                RuntimeTableHandle::new(Arc::clone(self), name)
1798            }
1799            other => Err(Error::InvalidArgumentError(format!(
1800                "unexpected statement result {other:?} when creating table"
1801            ))),
1802        }
1803    }
1804
1805    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1806        // For non-transactional inserts, use TXN_ID_AUTO_COMMIT directly
1807        // instead of creating a temporary transaction
1808        let snapshot = TransactionSnapshot {
1809            txn_id: TXN_ID_AUTO_COMMIT,
1810            snapshot_id: self.txn_manager.last_committed(),
1811        };
1812        self.insert_with_snapshot(plan, snapshot)
1813    }
1814
1815    pub fn insert_with_snapshot(
1816        &self,
1817        plan: InsertPlan,
1818        snapshot: TransactionSnapshot,
1819    ) -> Result<RuntimeStatementResult<P>> {
1820        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1821        let table = self.lookup_table(&canonical_name)?;
1822
1823        // Targeted debug for 'keys' table only
1824        if display_name == "keys" {
1825            tracing::trace!(
1826                "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1827                table.table.table_id(),
1828                &*self.pager
1829            );
1830            tracing::trace!(
1831                "[KEYS] Table has {} columns, primary_key columns: {:?}",
1832                table.schema.columns.len(),
1833                table
1834                    .schema
1835                    .columns
1836                    .iter()
1837                    .filter(|c| c.primary_key)
1838                    .map(|c| &c.name)
1839                    .collect::<Vec<_>>()
1840            );
1841        }
1842
1843        let result = match plan.source {
1844            InsertSource::Rows(rows) => self.insert_rows(
1845                table.as_ref(),
1846                display_name.clone(),
1847                canonical_name.clone(),
1848                rows,
1849                plan.columns,
1850                snapshot,
1851            ),
1852            InsertSource::Batches(batches) => self.insert_batches(
1853                table.as_ref(),
1854                display_name.clone(),
1855                canonical_name.clone(),
1856                batches,
1857                plan.columns,
1858                snapshot,
1859            ),
1860            InsertSource::Select { .. } => Err(Error::Internal(
1861                "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1862                    .into(),
1863            )),
1864        };
1865
1866        if display_name == "keys" {
1867            tracing::trace!(
1868                "[KEYS] INSERT completed: {:?}",
1869                result
1870                    .as_ref()
1871                    .map(|_| "OK")
1872                    .map_err(|e| format!("{:?}", e))
1873            );
1874        }
1875
1876        if matches!(result, Err(Error::NotFound)) {
1877            panic!(
1878                "BUG: insert yielded Error::NotFound for table '{}'. \
1879                 This should never happen: insert should never return NotFound after successful table lookup. \
1880                 This indicates a logic error in the runtime.",
1881                display_name
1882            );
1883        }
1884
1885        result
1886    }
1887
1888    /// Get raw batches from a table including row_ids, optionally filtered.
1889    /// This is used for transaction seeding where we need to preserve existing row_ids.
1890    pub fn get_batches_with_row_ids(
1891        &self,
1892        table_name: &str,
1893        filter: Option<LlkvExpr<'static, String>>,
1894    ) -> Result<Vec<RecordBatch>> {
1895        self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1896    }
1897
1898    pub fn get_batches_with_row_ids_with_snapshot(
1899        &self,
1900        table_name: &str,
1901        filter: Option<LlkvExpr<'static, String>>,
1902        snapshot: TransactionSnapshot,
1903    ) -> Result<Vec<RecordBatch>> {
1904        let (_, canonical_name) = canonical_table_name(table_name)?;
1905        let table = self.lookup_table(&canonical_name)?;
1906
1907        let filter_expr = match filter {
1908            Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1909            None => {
1910                let field_id = table.schema.first_field_id().ok_or_else(|| {
1911                    Error::InvalidArgumentError(
1912                        "table has no columns; cannot perform wildcard scan".into(),
1913                    )
1914                })?;
1915                full_table_scan_filter(field_id)
1916            }
1917        };
1918
1919        // First, get the row_ids that match the filter
1920        let row_ids = table.table.filter_row_ids(&filter_expr)?;
1921        if row_ids.is_empty() {
1922            return Ok(Vec::new());
1923        }
1924
1925        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1926        if visible_row_ids.is_empty() {
1927            return Ok(Vec::new());
1928        }
1929
1930        // Scan to get the column data without materializing full columns
1931        let table_id = table.table.table_id();
1932
1933        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1934        let mut logical_fields: Vec<LogicalFieldId> =
1935            Vec::with_capacity(table.schema.columns.len());
1936
1937        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1938
1939        for column in &table.schema.columns {
1940            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1941            logical_fields.push(logical_field_id);
1942            let field = mvcc::build_field_with_metadata(
1943                &column.name,
1944                column.data_type.clone(),
1945                column.nullable,
1946                column.field_id,
1947            );
1948            fields.push(field);
1949        }
1950
1951        let schema = Arc::new(Schema::new(fields));
1952
1953        if logical_fields.is_empty() {
1954            // Tables without user columns should still return row_id batches.
1955            let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
1956            for row_id in &visible_row_ids {
1957                row_id_builder.append_value(*row_id);
1958            }
1959            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
1960            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1961            return Ok(vec![batch]);
1962        }
1963
1964        let mut stream = table.table.stream_columns(
1965            Arc::from(logical_fields),
1966            visible_row_ids,
1967            GatherNullPolicy::IncludeNulls,
1968        )?;
1969
1970        let mut batches = Vec::new();
1971        while let Some(chunk) = stream.next_batch()? {
1972            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
1973
1974            let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
1975            for row_id in chunk.row_ids() {
1976                row_id_builder.append_value(*row_id);
1977            }
1978            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
1979
1980            let chunk_batch = chunk.into_batch();
1981            for column_array in chunk_batch.columns() {
1982                arrays.push(column_array.clone());
1983            }
1984
1985            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1986            batches.push(batch);
1987        }
1988
1989        Ok(batches)
1990    }
1991
1992    /// Append batches directly to a table, preserving row_ids from the batches.
1993    /// This is used for transaction seeding where we need to preserve existing row_ids.
1994    pub fn append_batches_with_row_ids(
1995        &self,
1996        table_name: &str,
1997        batches: Vec<RecordBatch>,
1998    ) -> Result<usize> {
1999        let (_, canonical_name) = canonical_table_name(table_name)?;
2000        let table = self.lookup_table(&canonical_name)?;
2001
2002        let mut total_rows = 0;
2003        for batch in batches {
2004            if batch.num_rows() == 0 {
2005                continue;
2006            }
2007
2008            // Verify the batch has a row_id column
2009            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
2010                Error::InvalidArgumentError(
2011                    "batch must contain row_id column for direct append".into(),
2012                )
2013            })?;
2014
2015            // Append the batch directly to the underlying table
2016            table.table.append(&batch)?;
2017            total_rows += batch.num_rows();
2018        }
2019
2020        Ok(total_rows)
2021    }
2022
2023    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
2024        let snapshot = self.txn_manager.begin_transaction();
2025        match self.update_with_snapshot(plan, snapshot) {
2026            Ok(result) => {
2027                self.txn_manager.mark_committed(snapshot.txn_id);
2028                Ok(result)
2029            }
2030            Err(err) => {
2031                self.txn_manager.mark_aborted(snapshot.txn_id);
2032                Err(err)
2033            }
2034        }
2035    }
2036
2037    pub fn update_with_snapshot(
2038        &self,
2039        plan: UpdatePlan,
2040        snapshot: TransactionSnapshot,
2041    ) -> Result<RuntimeStatementResult<P>> {
2042        let UpdatePlan {
2043            table,
2044            assignments,
2045            filter,
2046        } = plan;
2047        let (display_name, canonical_name) = canonical_table_name(&table)?;
2048        let table = self.lookup_table(&canonical_name)?;
2049        if let Some(filter) = filter {
2050            self.update_filtered_rows(
2051                table.as_ref(),
2052                display_name,
2053                canonical_name,
2054                assignments,
2055                filter,
2056                snapshot,
2057            )
2058        } else {
2059            self.update_all_rows(
2060                table.as_ref(),
2061                display_name,
2062                canonical_name,
2063                assignments,
2064                snapshot,
2065            )
2066        }
2067    }
2068
2069    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
2070        let snapshot = self.txn_manager.begin_transaction();
2071        match self.delete_with_snapshot(plan, snapshot) {
2072            Ok(result) => {
2073                self.txn_manager.mark_committed(snapshot.txn_id);
2074                Ok(result)
2075            }
2076            Err(err) => {
2077                self.txn_manager.mark_aborted(snapshot.txn_id);
2078                Err(err)
2079            }
2080        }
2081    }
2082
2083    pub fn delete_with_snapshot(
2084        &self,
2085        plan: DeletePlan,
2086        snapshot: TransactionSnapshot,
2087    ) -> Result<RuntimeStatementResult<P>> {
2088        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
2089        // Get table - will be checked against snapshot during actual deletion
2090        let table = match self.tables.read().unwrap().get(&canonical_name) {
2091            Some(t) => Arc::clone(t),
2092            None => return Err(Error::NotFound),
2093        };
2094        match plan.filter {
2095            Some(filter) => self.delete_filtered_rows(
2096                table.as_ref(),
2097                display_name,
2098                canonical_name.clone(),
2099                filter,
2100                snapshot,
2101            ),
2102            None => self.delete_all_rows(table.as_ref(), display_name, canonical_name, snapshot),
2103        }
2104    }
2105
2106    pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
2107        RuntimeTableHandle::new(Arc::clone(self), name)
2108    }
2109
2110    pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
2111        let snapshot = self.default_snapshot();
2112        self.execute_select_with_snapshot(plan, snapshot)
2113    }
2114
2115    pub fn execute_select_with_snapshot(
2116        self: &Arc<Self>,
2117        plan: SelectPlan,
2118        snapshot: TransactionSnapshot,
2119    ) -> Result<SelectExecution<P>> {
2120        // Handle SELECT without FROM clause (e.g., SELECT 42, SELECT {'a': 1})
2121        if plan.tables.is_empty() {
2122            let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2123                context: Arc::clone(self),
2124            });
2125            let executor = QueryExecutor::new(provider);
2126            // No row filter needed since there's no table
2127            return executor.execute_select_with_filter(plan, None);
2128        }
2129
2130        // Resolve canonical names for all tables (don't verify existence - executor will handle it with snapshot)
2131        let mut canonical_tables = Vec::new();
2132        for table_ref in &plan.tables {
2133            let qualified = table_ref.qualified_name();
2134            let (_display, canonical) = canonical_table_name(&qualified)?;
2135            // Parse canonical back into schema.table
2136            let parts: Vec<&str> = canonical.split('.').collect();
2137            let canon_ref = if parts.len() >= 2 {
2138                llkv_plan::TableRef::new(parts[0], parts[1])
2139            } else {
2140                llkv_plan::TableRef::new("", &canonical)
2141            };
2142            canonical_tables.push(canon_ref);
2143        }
2144
2145        let mut canonical_plan = plan.clone();
2146        canonical_plan.tables = canonical_tables;
2147
2148        let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2149            context: Arc::clone(self),
2150        });
2151        let executor = QueryExecutor::new(provider);
2152
2153        // For single-table queries, apply MVCC filtering
2154        let row_filter: Option<Arc<dyn RowIdFilter<P>>> = if canonical_plan.tables.len() == 1 {
2155            Some(Arc::new(MvccRowIdFilter::new(
2156                Arc::clone(&self.txn_manager),
2157                snapshot,
2158            )))
2159        } else {
2160            // TODO: Extend MVCC filtering once joins propagate per-table snapshots.
2161            // Multi-table plans rely on executor-level visibility checks
2162            None
2163        };
2164
2165        executor.execute_select_with_filter(canonical_plan, row_filter)
2166    }
2167
2168    fn create_table_from_columns(
2169        &self,
2170        display_name: String,
2171        canonical_name: String,
2172        columns: Vec<ColumnSpec>,
2173        foreign_keys: Vec<ForeignKeySpec>,
2174        if_not_exists: bool,
2175    ) -> Result<RuntimeStatementResult<P>> {
2176        tracing::trace!(
2177            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
2178            display_name,
2179            columns.len()
2180        );
2181        for (idx, col) in columns.iter().enumerate() {
2182            tracing::trace!(
2183                "  input column[{}]: name='{}' primary_key={}",
2184                idx,
2185                col.name,
2186                col.primary_key
2187            );
2188        }
2189        if columns.is_empty() {
2190            return Err(Error::InvalidArgumentError(
2191                "CREATE TABLE requires at least one column".into(),
2192            ));
2193        }
2194
2195        // Avoid repeating catalog work if the table already exists.
2196        {
2197            let tables = self.tables.read().unwrap();
2198            if tables.contains_key(&canonical_name) {
2199                if if_not_exists {
2200                    return Ok(RuntimeStatementResult::CreateTable {
2201                        table_name: display_name,
2202                    });
2203                }
2204
2205                return Err(Error::CatalogError(format!(
2206                    "Catalog Error: Table '{}' already exists",
2207                    display_name
2208                )));
2209            }
2210        }
2211
2212        let CreateTableResult {
2213            table_id,
2214            table,
2215            table_columns,
2216            column_lookup,
2217        } = Table::create_from_columns(
2218            &display_name,
2219            &canonical_name,
2220            &columns,
2221            self.metadata.clone(),
2222            self.catalog.clone(),
2223            self.store.clone(),
2224        )?;
2225
2226        tracing::trace!(
2227            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
2228            display_name,
2229            table_id,
2230            &*self.pager
2231        );
2232
2233        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2234        for (idx, column) in table_columns.iter().enumerate() {
2235            tracing::trace!(
2236                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
2237                idx,
2238                column.name,
2239                column.data_type,
2240                column.nullable,
2241                column.primary_key,
2242                column.unique
2243            );
2244            column_defs.push(ExecutorColumn {
2245                name: column.name.clone(),
2246                data_type: column.data_type.clone(),
2247                nullable: column.nullable,
2248                primary_key: column.primary_key,
2249                unique: column.unique,
2250                field_id: column.field_id,
2251                check_expr: column.check_expr.clone(),
2252            });
2253        }
2254
2255        let schema = Arc::new(ExecutorSchema {
2256            columns: column_defs.clone(),
2257            lookup: column_lookup,
2258        });
2259        let table_entry = Arc::new(ExecutorTable {
2260            table: Arc::clone(&table),
2261            schema,
2262            next_row_id: AtomicU64::new(0),
2263            total_rows: AtomicU64::new(0),
2264            multi_column_uniques: RwLock::new(Vec::new()),
2265        });
2266
2267        let mut tables = self.tables.write().unwrap();
2268        if tables.contains_key(&canonical_name) {
2269            drop(tables);
2270            let field_ids: Vec<FieldId> =
2271                table_columns.iter().map(|column| column.field_id).collect();
2272            let _ = self
2273                .catalog_service
2274                .drop_table(&canonical_name, table_id, &field_ids);
2275            if if_not_exists {
2276                return Ok(RuntimeStatementResult::CreateTable {
2277                    table_name: display_name,
2278                });
2279            }
2280            return Err(Error::CatalogError(format!(
2281                "Catalog Error: Table '{}' already exists",
2282                display_name
2283            )));
2284        }
2285        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2286        drop(tables);
2287
2288        if !foreign_keys.is_empty() {
2289            let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
2290                table_id,
2291                &display_name,
2292                &canonical_name,
2293                &table_columns,
2294                &foreign_keys,
2295                |table_name| {
2296                    let (display, canonical) = canonical_table_name(table_name)?;
2297                    let referenced_table = self.lookup_table(&canonical).map_err(|_| {
2298                        Error::InvalidArgumentError(format!(
2299                            "referenced table '{}' does not exist",
2300                            table_name
2301                        ))
2302                    })?;
2303
2304                    let columns = referenced_table
2305                        .schema
2306                        .columns
2307                        .iter()
2308                        .map(|column| ForeignKeyColumn {
2309                            name: column.name.clone(),
2310                            data_type: column.data_type.clone(),
2311                            nullable: column.nullable,
2312                            primary_key: column.primary_key,
2313                            unique: column.unique,
2314                            field_id: column.field_id,
2315                        })
2316                        .collect();
2317
2318                    Ok(ForeignKeyTableInfo {
2319                        display_name: display,
2320                        canonical_name: canonical,
2321                        table_id: referenced_table.table.table_id(),
2322                        columns,
2323                    })
2324                },
2325                current_time_micros(),
2326            );
2327
2328            if let Err(err) = fk_result {
2329                let field_ids: Vec<FieldId> =
2330                    table_columns.iter().map(|column| column.field_id).collect();
2331                let _ = self
2332                    .catalog_service
2333                    .drop_table(&canonical_name, table_id, &field_ids);
2334                self.remove_table_entry(&canonical_name);
2335                return Err(err);
2336            }
2337        }
2338
2339        Ok(RuntimeStatementResult::CreateTable {
2340            table_name: display_name,
2341        })
2342    }
2343
2344    fn create_table_from_batches(
2345        &self,
2346        display_name: String,
2347        canonical_name: String,
2348        schema: Arc<Schema>,
2349        batches: Vec<RecordBatch>,
2350        if_not_exists: bool,
2351    ) -> Result<RuntimeStatementResult<P>> {
2352        {
2353            let tables = self.tables.read().unwrap();
2354            if tables.contains_key(&canonical_name) {
2355                if if_not_exists {
2356                    return Ok(RuntimeStatementResult::CreateTable {
2357                        table_name: display_name,
2358                    });
2359                }
2360                return Err(Error::CatalogError(format!(
2361                    "Catalog Error: Table '{}' already exists",
2362                    display_name
2363                )));
2364            }
2365        }
2366
2367        let CreateTableResult {
2368            table_id,
2369            table,
2370            table_columns,
2371            column_lookup,
2372        } = self.catalog_service.create_table_from_schema(
2373            &display_name,
2374            &canonical_name,
2375            &schema,
2376        )?;
2377
2378        tracing::trace!(
2379            "=== CTAS table '{}' created with table_id={} pager={:p} ===",
2380            display_name,
2381            table_id,
2382            &*self.pager
2383        );
2384
2385        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2386        for column in &table_columns {
2387            column_defs.push(ExecutorColumn {
2388                name: column.name.clone(),
2389                data_type: column.data_type.clone(),
2390                nullable: column.nullable,
2391                primary_key: column.primary_key,
2392                unique: column.unique,
2393                field_id: column.field_id,
2394                check_expr: column.check_expr.clone(),
2395            });
2396        }
2397
2398        let schema_arc = Arc::new(ExecutorSchema {
2399            columns: column_defs.clone(),
2400            lookup: column_lookup,
2401        });
2402        let table_entry = Arc::new(ExecutorTable {
2403            table: Arc::clone(&table),
2404            schema: schema_arc,
2405            next_row_id: AtomicU64::new(0),
2406            total_rows: AtomicU64::new(0),
2407            multi_column_uniques: RwLock::new(Vec::new()),
2408        });
2409
2410        let creator_snapshot = self.txn_manager.begin_transaction();
2411        let creator_txn_id = creator_snapshot.txn_id;
2412        let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
2413            table.as_ref(),
2414            &table_columns,
2415            &batches,
2416            creator_txn_id,
2417            TXN_ID_NONE,
2418            0,
2419        ) {
2420            Ok(result) => {
2421                self.txn_manager.mark_committed(creator_txn_id);
2422                result
2423            }
2424            Err(err) => {
2425                self.txn_manager.mark_aborted(creator_txn_id);
2426                let field_ids: Vec<FieldId> =
2427                    table_columns.iter().map(|column| column.field_id).collect();
2428                let _ = self
2429                    .catalog_service
2430                    .drop_table(&canonical_name, table_id, &field_ids);
2431                return Err(err);
2432            }
2433        };
2434
2435        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
2436        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
2437
2438        let mut tables = self.tables.write().unwrap();
2439        if tables.contains_key(&canonical_name) {
2440            if if_not_exists {
2441                return Ok(RuntimeStatementResult::CreateTable {
2442                    table_name: display_name,
2443                });
2444            }
2445            return Err(Error::CatalogError(format!(
2446                "Catalog Error: Table '{}' already exists",
2447                display_name
2448            )));
2449        }
2450        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2451        drop(tables); // Release write lock before catalog operations
2452
2453        Ok(RuntimeStatementResult::CreateTable {
2454            table_name: display_name,
2455        })
2456    }
2457
2458    fn rebuild_executor_table_with_unique(
2459        table: &ExecutorTable<P>,
2460        field_id: FieldId,
2461    ) -> Option<Arc<ExecutorTable<P>>> {
2462        let mut columns = table.schema.columns.clone();
2463        let mut found = false;
2464        for column in &mut columns {
2465            if column.field_id == field_id {
2466                column.unique = true;
2467                found = true;
2468                break;
2469            }
2470        }
2471        if !found {
2472            return None;
2473        }
2474
2475        let schema = Arc::new(ExecutorSchema {
2476            columns,
2477            lookup: table.schema.lookup.clone(),
2478        });
2479
2480        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
2481        let total_rows = table.total_rows.load(Ordering::SeqCst);
2482        let uniques = table.multi_column_uniques();
2483
2484        Some(Arc::new(ExecutorTable {
2485            table: Arc::clone(&table.table),
2486            schema,
2487            next_row_id: AtomicU64::new(next_row_id),
2488            total_rows: AtomicU64::new(total_rows),
2489            multi_column_uniques: RwLock::new(uniques),
2490        }))
2491    }
2492
2493    fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
2494        if txn_id == TXN_ID_AUTO_COMMIT {
2495            return;
2496        }
2497
2498        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2499        guard.entry(txn_id).or_default().insert(canonical_name);
2500    }
2501
2502    fn collect_rows_created_by_txn(
2503        &self,
2504        table: &ExecutorTable<P>,
2505        txn_id: TxnId,
2506    ) -> Result<Vec<Vec<PlanValue>>> {
2507        if txn_id == TXN_ID_AUTO_COMMIT {
2508            return Ok(Vec::new());
2509        }
2510
2511        if table.schema.columns.is_empty() {
2512            return Ok(Vec::new());
2513        }
2514
2515        let Some(first_field_id) = table.schema.first_field_id() else {
2516            return Ok(Vec::new());
2517        };
2518        let filter_expr = full_table_scan_filter(first_field_id);
2519
2520        let row_ids = table.table.filter_row_ids(&filter_expr)?;
2521        if row_ids.is_empty() {
2522            return Ok(Vec::new());
2523        }
2524
2525        let table_id = table.table.table_id();
2526        let mut logical_fields: Vec<LogicalFieldId> =
2527            Vec::with_capacity(table.schema.columns.len() + 2);
2528        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
2529        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
2530        for column in &table.schema.columns {
2531            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
2532        }
2533
2534        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
2535        let mut stream = table.table.stream_columns(
2536            Arc::clone(&logical_fields),
2537            row_ids,
2538            GatherNullPolicy::IncludeNulls,
2539        )?;
2540
2541        let mut rows = Vec::new();
2542        while let Some(chunk) = stream.next_batch()? {
2543            let batch = chunk.batch();
2544            if batch.num_columns() < table.schema.columns.len() + 2 {
2545                continue;
2546            }
2547
2548            let created_col = batch
2549                .column(0)
2550                .as_any()
2551                .downcast_ref::<UInt64Array>()
2552                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
2553            let deleted_col = batch
2554                .column(1)
2555                .as_any()
2556                .downcast_ref::<UInt64Array>()
2557                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
2558
2559            for row_idx in 0..batch.num_rows() {
2560                let created_by = if created_col.is_null(row_idx) {
2561                    TXN_ID_AUTO_COMMIT
2562                } else {
2563                    created_col.value(row_idx)
2564                };
2565                if created_by != txn_id {
2566                    continue;
2567                }
2568
2569                let deleted_by = if deleted_col.is_null(row_idx) {
2570                    TXN_ID_NONE
2571                } else {
2572                    deleted_col.value(row_idx)
2573                };
2574                if deleted_by != TXN_ID_NONE {
2575                    continue;
2576                }
2577
2578                let mut row_values = Vec::with_capacity(table.schema.columns.len());
2579                for col_idx in 0..table.schema.columns.len() {
2580                    let array = batch.column(col_idx + 2);
2581                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2582                    row_values.push(value);
2583                }
2584                rows.push(row_values);
2585            }
2586        }
2587
2588        Ok(rows)
2589    }
2590
2591    fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
2592        if txn_id == TXN_ID_AUTO_COMMIT {
2593            return Ok(());
2594        }
2595
2596        let tables = {
2597            let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2598            guard.remove(&txn_id)
2599        };
2600
2601        let Some(tables) = tables else {
2602            return Ok(());
2603        };
2604
2605        if tables.is_empty() {
2606            return Ok(());
2607        }
2608
2609        let snapshot = TransactionSnapshot {
2610            txn_id: TXN_ID_AUTO_COMMIT,
2611            snapshot_id: self.txn_manager.last_committed(),
2612        };
2613
2614        for table_name in tables {
2615            let table = self.lookup_table(&table_name)?;
2616            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
2617            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
2618                continue;
2619            };
2620
2621            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
2622            if new_rows.is_empty() {
2623                continue;
2624            }
2625
2626            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
2627            self.constraint_service.validate_primary_key_rows(
2628                &constraint_ctx.schema_field_ids,
2629                primary_key,
2630                &column_order,
2631                &new_rows,
2632                |field_ids| self.scan_multi_column_values(table.as_ref(), field_ids, snapshot),
2633            )?;
2634        }
2635
2636        Ok(())
2637    }
2638
2639    fn clear_transaction_state(&self, txn_id: TxnId) {
2640        if txn_id == TXN_ID_AUTO_COMMIT {
2641            return;
2642        }
2643
2644        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2645        guard.remove(&txn_id);
2646    }
2647
2648    fn coerce_plan_value_for_column(
2649        &self,
2650        value: PlanValue,
2651        column: &ExecutorColumn,
2652    ) -> Result<PlanValue> {
2653        match value {
2654            PlanValue::Null => Ok(PlanValue::Null),
2655            PlanValue::Integer(v) => match &column.data_type {
2656                DataType::Int64 => Ok(PlanValue::Integer(v)),
2657                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
2658                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
2659                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2660                DataType::Date32 => {
2661                    let casted = i32::try_from(v).map_err(|_| {
2662                        Error::InvalidArgumentError(format!(
2663                            "integer literal out of range for DATE column '{}'",
2664                            column.name
2665                        ))
2666                    })?;
2667                    Ok(PlanValue::Integer(casted as i64))
2668                }
2669                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2670                    "cannot assign integer to STRUCT column '{}'",
2671                    column.name
2672                ))),
2673                _ => Ok(PlanValue::Integer(v)),
2674            },
2675            PlanValue::Float(v) => match &column.data_type {
2676                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
2677                DataType::Float64 => Ok(PlanValue::Float(v)),
2678                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
2679                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2680                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
2681                    "cannot assign floating-point value to DATE column '{}'",
2682                    column.name
2683                ))),
2684                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2685                    "cannot assign floating-point value to STRUCT column '{}'",
2686                    column.name
2687                ))),
2688                _ => Ok(PlanValue::Float(v)),
2689            },
2690            PlanValue::String(s) => match &column.data_type {
2691                DataType::Boolean => {
2692                    let normalized = s.trim().to_ascii_lowercase();
2693                    match normalized.as_str() {
2694                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
2695                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
2696                        _ => Err(Error::InvalidArgumentError(format!(
2697                            "cannot assign string '{}' to BOOLEAN column '{}'",
2698                            s, column.name
2699                        ))),
2700                    }
2701                }
2702                DataType::Utf8 => Ok(PlanValue::String(s)),
2703                DataType::Date32 => {
2704                    let days = parse_date32_literal(&s)?;
2705                    Ok(PlanValue::Integer(days as i64))
2706                }
2707                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
2708                    "cannot assign string '{}' to numeric column '{}'",
2709                    s, column.name
2710                ))),
2711                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2712                    "cannot assign string to STRUCT column '{}'",
2713                    column.name
2714                ))),
2715                _ => Ok(PlanValue::String(s)),
2716            },
2717            PlanValue::Struct(map) => match &column.data_type {
2718                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
2719                _ => Err(Error::InvalidArgumentError(format!(
2720                    "cannot assign struct value to column '{}'",
2721                    column.name
2722                ))),
2723            },
2724        }
2725    }
2726
2727    /// Scan a single column and materialize values into memory.
2728    ///
2729    /// NOTE: Current implementation buffers the entire result set; convert to a
2730    /// streaming iterator once executor-side consumers support incremental consumption.
2731    fn scan_column_values(
2732        &self,
2733        table: &ExecutorTable<P>,
2734        field_id: FieldId,
2735        snapshot: TransactionSnapshot,
2736    ) -> Result<Vec<PlanValue>> {
2737        let table_id = table.table.table_id();
2738        use llkv_expr::{Expr, Filter, Operator};
2739        use std::ops::Bound;
2740
2741        // Create a filter that matches all rows (unbounded range)
2742        let match_all_filter = Filter {
2743            field_id,
2744            op: Operator::Range {
2745                lower: Bound::Unbounded,
2746                upper: Bound::Unbounded,
2747            },
2748        };
2749        let filter_expr = Expr::Pred(match_all_filter);
2750
2751        // Get all matching row_ids first
2752        let row_ids = match table.table.filter_row_ids(&filter_expr) {
2753            Ok(ids) => ids,
2754            Err(Error::NotFound) => return Ok(Vec::new()),
2755            Err(e) => return Err(e),
2756        };
2757
2758        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
2759        let row_ids = filter_row_ids_for_snapshot(
2760            table.table.as_ref(),
2761            row_ids,
2762            &self.txn_manager,
2763            snapshot,
2764        )?;
2765
2766        if row_ids.is_empty() {
2767            return Ok(Vec::new());
2768        }
2769
2770        // Gather the column values for visible rows
2771        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2772        let row_count = row_ids.len();
2773        let mut stream = match table.table.stream_columns(
2774            vec![logical_field_id],
2775            row_ids,
2776            GatherNullPolicy::IncludeNulls,
2777        ) {
2778            Ok(stream) => stream,
2779            Err(Error::NotFound) => return Ok(Vec::new()),
2780            Err(e) => return Err(e),
2781        };
2782
2783        // TODO: Don't buffer all values; make this streamable
2784        // NOTE: Values are accumulated eagerly; revisit when `llkv-plan` supports
2785        // incremental parameter binding.
2786        let mut values = Vec::with_capacity(row_count);
2787        while let Some(chunk) = stream.next_batch()? {
2788            let batch = chunk.batch();
2789            if batch.num_columns() == 0 {
2790                continue;
2791            }
2792            let array = batch.column(0);
2793            for row_idx in 0..batch.num_rows() {
2794                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2795                    values.push(value);
2796                }
2797            }
2798        }
2799
2800        Ok(values)
2801    }
2802
2803    // TODO: Make streamable; don't buffer all values in memory at once
2804    /// Scan a set of columns and materialize rows into memory.
2805    ///
2806    /// NOTE: Similar to [`Self::scan_column_values`], this buffers eagerly pending
2807    /// enhancements to the executor pipeline.
2808    fn scan_multi_column_values(
2809        &self,
2810        table: &ExecutorTable<P>,
2811        field_ids: &[FieldId],
2812        snapshot: TransactionSnapshot,
2813    ) -> Result<Vec<Vec<PlanValue>>> {
2814        if field_ids.is_empty() {
2815            return Ok(Vec::new());
2816        }
2817
2818        let table_id = table.table.table_id();
2819        use llkv_expr::{Expr, Filter, Operator};
2820        use std::ops::Bound;
2821
2822        let match_all_filter = Filter {
2823            field_id: field_ids[0],
2824            op: Operator::Range {
2825                lower: Bound::Unbounded,
2826                upper: Bound::Unbounded,
2827            },
2828        };
2829        let filter_expr = Expr::Pred(match_all_filter);
2830
2831        let row_ids = match table.table.filter_row_ids(&filter_expr) {
2832            Ok(ids) => ids,
2833            Err(Error::NotFound) => return Ok(Vec::new()),
2834            Err(e) => return Err(e),
2835        };
2836
2837        let row_ids = filter_row_ids_for_snapshot(
2838            table.table.as_ref(),
2839            row_ids,
2840            &self.txn_manager,
2841            snapshot,
2842        )?;
2843
2844        if row_ids.is_empty() {
2845            return Ok(Vec::new());
2846        }
2847
2848        let logical_field_ids: Vec<_> = field_ids
2849            .iter()
2850            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2851            .collect();
2852
2853        let total_rows = row_ids.len();
2854        let mut stream = match table.table.stream_columns(
2855            logical_field_ids,
2856            row_ids,
2857            GatherNullPolicy::IncludeNulls,
2858        ) {
2859            Ok(stream) => stream,
2860            Err(Error::NotFound) => return Ok(Vec::new()),
2861            Err(e) => return Err(e),
2862        };
2863
2864        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
2865        while let Some(chunk) = stream.next_batch()? {
2866            let batch = chunk.batch();
2867            if batch.num_columns() == 0 {
2868                continue;
2869            }
2870
2871            let base = chunk.row_offset();
2872            let local_len = batch.num_rows();
2873            for col_idx in 0..batch.num_columns() {
2874                let array = batch.column(col_idx);
2875                for local_idx in 0..local_len {
2876                    let target_index = base + local_idx;
2877                    debug_assert!(
2878                        target_index < rows.len(),
2879                        "stream chunk produced out-of-bounds row index"
2880                    );
2881                    if let Some(row) = rows.get_mut(target_index) {
2882                        match llkv_plan::plan_value_from_array(array, local_idx) {
2883                            Ok(value) => row.push(value),
2884                            Err(_) => row.push(PlanValue::Null),
2885                        }
2886                    }
2887                }
2888            }
2889        }
2890
2891        Ok(rows)
2892    }
2893
2894    fn collect_row_values_for_ids(
2895        &self,
2896        table: &ExecutorTable<P>,
2897        row_ids: &[RowId],
2898        field_ids: &[FieldId],
2899    ) -> Result<Vec<Vec<PlanValue>>> {
2900        if row_ids.is_empty() || field_ids.is_empty() {
2901            return Ok(Vec::new());
2902        }
2903
2904        let table_id = table.table.table_id();
2905        let logical_field_ids: Vec<LogicalFieldId> = field_ids
2906            .iter()
2907            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2908            .collect();
2909
2910        let mut stream = match table.table.stream_columns(
2911            logical_field_ids.clone(),
2912            row_ids.to_vec(),
2913            GatherNullPolicy::IncludeNulls,
2914        ) {
2915            Ok(stream) => stream,
2916            Err(Error::NotFound) => return Ok(Vec::new()),
2917            Err(e) => return Err(e),
2918        };
2919
2920        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
2921        while let Some(chunk) = stream.next_batch()? {
2922            let batch = chunk.batch();
2923            let base = chunk.row_offset();
2924            let local_len = batch.num_rows();
2925            for col_idx in 0..batch.num_columns() {
2926                let array = batch.column(col_idx);
2927                for local_idx in 0..local_len {
2928                    let target_index = base + local_idx;
2929                    if let Some(row) = rows.get_mut(target_index) {
2930                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2931                        row.push(value);
2932                    }
2933                }
2934            }
2935        }
2936
2937        Ok(rows)
2938    }
2939
2940    fn collect_visible_child_rows(
2941        &self,
2942        table: &ExecutorTable<P>,
2943        field_ids: &[FieldId],
2944        snapshot: TransactionSnapshot,
2945    ) -> Result<Vec<(RowId, Vec<PlanValue>)>> {
2946        if field_ids.is_empty() {
2947            return Ok(Vec::new());
2948        }
2949
2950        let anchor_field = field_ids[0];
2951        let filter_expr = full_table_scan_filter(anchor_field);
2952        let raw_row_ids = match table.table.filter_row_ids(&filter_expr) {
2953            Ok(ids) => ids,
2954            Err(Error::NotFound) => return Ok(Vec::new()),
2955            Err(e) => return Err(e),
2956        };
2957
2958        let visible_row_ids = filter_row_ids_for_snapshot(
2959            table.table.as_ref(),
2960            raw_row_ids,
2961            &self.txn_manager,
2962            snapshot,
2963        )?;
2964
2965        if visible_row_ids.is_empty() {
2966            return Ok(Vec::new());
2967        }
2968
2969        let table_id = table.table.table_id();
2970        let logical_field_ids: Vec<LogicalFieldId> = field_ids
2971            .iter()
2972            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2973            .collect();
2974
2975        let mut stream = match table.table.stream_columns(
2976            logical_field_ids.clone(),
2977            visible_row_ids.clone(),
2978            GatherNullPolicy::IncludeNulls,
2979        ) {
2980            Ok(stream) => stream,
2981            Err(Error::NotFound) => return Ok(Vec::new()),
2982            Err(e) => return Err(e),
2983        };
2984
2985        let mut rows = vec![Vec::with_capacity(field_ids.len()); visible_row_ids.len()];
2986        while let Some(chunk) = stream.next_batch()? {
2987            let batch = chunk.batch();
2988            let base = chunk.row_offset();
2989            let local_len = batch.num_rows();
2990            for col_idx in 0..batch.num_columns() {
2991                let array = batch.column(col_idx);
2992                for local_idx in 0..local_len {
2993                    let target_index = base + local_idx;
2994                    if let Some(row) = rows.get_mut(target_index) {
2995                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2996                        row.push(value);
2997                    }
2998                }
2999            }
3000        }
3001
3002        Ok(visible_row_ids.into_iter().zip(rows).collect())
3003    }
3004
3005    fn build_table_constraint_context(
3006        &self,
3007        table: &ExecutorTable<P>,
3008    ) -> Result<TableConstraintContext> {
3009        let schema_field_ids: Vec<FieldId> = table
3010            .schema
3011            .columns
3012            .iter()
3013            .map(|column| column.field_id)
3014            .collect();
3015
3016        let column_constraints: Vec<InsertColumnConstraint> = table
3017            .schema
3018            .columns
3019            .iter()
3020            .enumerate()
3021            .map(|(idx, column)| InsertColumnConstraint {
3022                schema_index: idx,
3023                column: ConstraintColumnInfo {
3024                    name: column.name.clone(),
3025                    field_id: column.field_id,
3026                    data_type: column.data_type.clone(),
3027                    nullable: column.nullable,
3028                    check_expr: column.check_expr.clone(),
3029                },
3030            })
3031            .collect();
3032
3033        let unique_columns: Vec<InsertUniqueColumn> = table
3034            .schema
3035            .columns
3036            .iter()
3037            .enumerate()
3038            .filter(|(_, column)| column.unique && !column.primary_key)
3039            .map(|(idx, column)| InsertUniqueColumn {
3040                schema_index: idx,
3041                field_id: column.field_id,
3042                name: column.name.clone(),
3043            })
3044            .collect();
3045
3046        let mut multi_column_uniques: Vec<InsertMultiColumnUnique> = Vec::new();
3047        for constraint in table.multi_column_uniques() {
3048            if constraint.column_indices.is_empty() {
3049                continue;
3050            }
3051
3052            let mut schema_indices = Vec::with_capacity(constraint.column_indices.len());
3053            let mut field_ids = Vec::with_capacity(constraint.column_indices.len());
3054            let mut column_names = Vec::with_capacity(constraint.column_indices.len());
3055            for &col_idx in &constraint.column_indices {
3056                let column = table.schema.columns.get(col_idx).ok_or_else(|| {
3057                    Error::Internal(format!(
3058                        "multi-column UNIQUE constraint references invalid column index {}",
3059                        col_idx
3060                    ))
3061                })?;
3062                schema_indices.push(col_idx);
3063                field_ids.push(column.field_id);
3064                column_names.push(column.name.clone());
3065            }
3066
3067            multi_column_uniques.push(InsertMultiColumnUnique {
3068                schema_indices,
3069                field_ids,
3070                column_names,
3071            });
3072        }
3073
3074        let primary_indices: Vec<usize> = table
3075            .schema
3076            .columns
3077            .iter()
3078            .enumerate()
3079            .filter(|(_, column)| column.primary_key)
3080            .map(|(idx, _)| idx)
3081            .collect();
3082
3083        let primary_key = if primary_indices.is_empty() {
3084            None
3085        } else {
3086            let mut field_ids = Vec::with_capacity(primary_indices.len());
3087            let mut column_names = Vec::with_capacity(primary_indices.len());
3088            for &idx in &primary_indices {
3089                let column = table.schema.columns.get(idx).ok_or_else(|| {
3090                    Error::Internal(format!(
3091                        "primary key references invalid column index {}",
3092                        idx
3093                    ))
3094                })?;
3095                field_ids.push(column.field_id);
3096                column_names.push(column.name.clone());
3097            }
3098
3099            Some(InsertMultiColumnUnique {
3100                schema_indices: primary_indices.clone(),
3101                field_ids,
3102                column_names,
3103            })
3104        };
3105
3106        Ok(TableConstraintContext {
3107            schema_field_ids,
3108            column_constraints,
3109            unique_columns,
3110            multi_column_uniques,
3111            primary_key,
3112        })
3113    }
3114
3115    fn insert_rows(
3116        &self,
3117        table: &ExecutorTable<P>,
3118        display_name: String,
3119        canonical_name: String,
3120        mut rows: Vec<Vec<PlanValue>>,
3121        columns: Vec<String>,
3122        snapshot: TransactionSnapshot,
3123    ) -> Result<RuntimeStatementResult<P>> {
3124        if rows.is_empty() {
3125            return Err(Error::InvalidArgumentError(
3126                "INSERT requires at least one row".into(),
3127            ));
3128        }
3129
3130        let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
3131        let expected_len = column_order.len();
3132        for row in &rows {
3133            if row.len() != expected_len {
3134                return Err(Error::InvalidArgumentError(format!(
3135                    "expected {} values in INSERT row, found {}",
3136                    expected_len,
3137                    row.len()
3138                )));
3139            }
3140        }
3141
3142        for row in rows.iter_mut() {
3143            for (position, value) in row.iter_mut().enumerate() {
3144                let schema_index = column_order
3145                    .get(position)
3146                    .copied()
3147                    .ok_or_else(|| Error::Internal("invalid INSERT column index mapping".into()))?;
3148                let column = table.schema.columns.get(schema_index).ok_or_else(|| {
3149                    Error::Internal(format!(
3150                        "INSERT column index {} out of bounds for table '{}'",
3151                        schema_index, display_name
3152                    ))
3153                })?;
3154                let normalized = normalize_insert_value_for_column(column, value.clone())?;
3155                *value = normalized;
3156            }
3157        }
3158
3159        let constraint_ctx = self.build_table_constraint_context(table)?;
3160        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3161
3162        if display_name == "keys" {
3163            tracing::trace!(
3164                "[KEYS] Validating constraints for {} row(s) before insert",
3165                rows.len()
3166            );
3167            for (i, row) in rows.iter().enumerate() {
3168                tracing::trace!("[KEYS]   row[{}]: {:?}", i, row);
3169            }
3170        }
3171
3172        self.constraint_service.validate_insert_constraints(
3173            &constraint_ctx.schema_field_ids,
3174            &constraint_ctx.column_constraints,
3175            &constraint_ctx.unique_columns,
3176            &constraint_ctx.multi_column_uniques,
3177            primary_key_spec,
3178            &column_order,
3179            &rows,
3180            |field_id| self.scan_column_values(table, field_id, snapshot),
3181            |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3182        )?;
3183
3184        self.check_foreign_keys_on_insert(table, &display_name, &rows, &column_order, snapshot)?;
3185
3186        let row_count = rows.len();
3187        let mut column_values: Vec<Vec<PlanValue>> =
3188            vec![Vec::with_capacity(row_count); table.schema.columns.len()];
3189        for row in rows {
3190            for (idx, value) in row.into_iter().enumerate() {
3191                let dest_index = column_order[idx];
3192                column_values[dest_index].push(value);
3193            }
3194        }
3195
3196        let start_row = table.next_row_id.load(Ordering::SeqCst);
3197
3198        // Build MVCC columns using helper
3199        let (row_id_array, created_by_array, deleted_by_array) =
3200            mvcc::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id, TXN_ID_NONE);
3201
3202        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
3203        arrays.push(row_id_array);
3204        arrays.push(created_by_array);
3205        arrays.push(deleted_by_array);
3206
3207        let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
3208        fields.extend(mvcc::build_mvcc_fields());
3209
3210        for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
3211            let array = build_array_for_column(&column.data_type, &values)?;
3212            let field = mvcc::build_field_with_metadata(
3213                &column.name,
3214                column.data_type.clone(),
3215                column.nullable,
3216                column.field_id,
3217            );
3218            arrays.push(array);
3219            fields.push(field);
3220        }
3221
3222        let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
3223        tracing::trace!(
3224            table_name = %display_name,
3225            store_ptr = ?std::ptr::addr_of!(*table.table.store()),
3226            "About to call table.append"
3227        );
3228        table.table.append(&batch)?;
3229        table
3230            .next_row_id
3231            .store(start_row + row_count as u64, Ordering::SeqCst);
3232        table
3233            .total_rows
3234            .fetch_add(row_count as u64, Ordering::SeqCst);
3235
3236        self.record_table_with_new_rows(snapshot.txn_id, canonical_name);
3237
3238        Ok(RuntimeStatementResult::Insert {
3239            table_name: display_name,
3240            rows_inserted: row_count,
3241        })
3242    }
3243
3244    fn insert_batches(
3245        &self,
3246        table: &ExecutorTable<P>,
3247        display_name: String,
3248        canonical_name: String,
3249        batches: Vec<RecordBatch>,
3250        columns: Vec<String>,
3251        snapshot: TransactionSnapshot,
3252    ) -> Result<RuntimeStatementResult<P>> {
3253        if batches.is_empty() {
3254            return Ok(RuntimeStatementResult::Insert {
3255                table_name: display_name,
3256                rows_inserted: 0,
3257            });
3258        }
3259
3260        let expected_len = if columns.is_empty() {
3261            table.schema.columns.len()
3262        } else {
3263            columns.len()
3264        };
3265        let mut total_rows_inserted = 0usize;
3266
3267        for batch in batches {
3268            if batch.num_columns() != expected_len {
3269                return Err(Error::InvalidArgumentError(format!(
3270                    "expected {} columns in INSERT batch, found {}",
3271                    expected_len,
3272                    batch.num_columns()
3273                )));
3274            }
3275            let row_count = batch.num_rows();
3276            if row_count == 0 {
3277                continue;
3278            }
3279            let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
3280            for row_idx in 0..row_count {
3281                let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
3282                for col_idx in 0..expected_len {
3283                    let array = batch.column(col_idx);
3284                    row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
3285                }
3286                rows.push(row);
3287            }
3288
3289            match self.insert_rows(
3290                table,
3291                display_name.clone(),
3292                canonical_name.clone(),
3293                rows,
3294                columns.clone(),
3295                snapshot,
3296            )? {
3297                RuntimeStatementResult::Insert { rows_inserted, .. } => {
3298                    total_rows_inserted += rows_inserted;
3299                }
3300                _ => unreachable!("insert_rows must return Insert result"),
3301            }
3302        }
3303
3304        Ok(RuntimeStatementResult::Insert {
3305            table_name: display_name,
3306            rows_inserted: total_rows_inserted,
3307        })
3308    }
3309
3310    fn update_filtered_rows(
3311        &self,
3312        table: &ExecutorTable<P>,
3313        display_name: String,
3314        canonical_name: String,
3315        assignments: Vec<ColumnAssignment>,
3316        filter: LlkvExpr<'static, String>,
3317        snapshot: TransactionSnapshot,
3318    ) -> Result<RuntimeStatementResult<P>> {
3319        if assignments.is_empty() {
3320            return Err(Error::InvalidArgumentError(
3321                "UPDATE requires at least one assignment".into(),
3322            ));
3323        }
3324
3325        let schema = table.schema.as_ref();
3326        let filter_expr = translate_predicate(filter, schema)?;
3327
3328        let mut seen_columns: FxHashSet<String> =
3329            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3330        let mut prepared: Vec<(ExecutorColumn, PreparedAssignmentValue)> =
3331            Vec::with_capacity(assignments.len());
3332        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3333
3334        for assignment in assignments {
3335            let normalized = assignment.column.to_ascii_lowercase();
3336            if !seen_columns.insert(normalized.clone()) {
3337                return Err(Error::InvalidArgumentError(format!(
3338                    "duplicate column '{}' in UPDATE assignments",
3339                    assignment.column
3340                )));
3341            }
3342            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3343                Error::InvalidArgumentError(format!(
3344                    "unknown column '{}' in UPDATE",
3345                    assignment.column
3346                ))
3347            })?;
3348
3349            match assignment.value {
3350                AssignmentValue::Literal(value) => {
3351                    prepared.push((column.clone(), PreparedAssignmentValue::Literal(value)));
3352                }
3353                AssignmentValue::Expression(expr) => {
3354                    let translated = translate_scalar(&expr, schema)?;
3355                    let expr_index = scalar_exprs.len();
3356                    scalar_exprs.push(translated);
3357                    prepared.push((
3358                        column.clone(),
3359                        PreparedAssignmentValue::Expression { expr_index },
3360                    ));
3361                }
3362            }
3363        }
3364
3365        let (row_ids, mut expr_values) =
3366            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3367
3368        if row_ids.is_empty() {
3369            return Ok(RuntimeStatementResult::Update {
3370                table_name: display_name,
3371                rows_updated: 0,
3372            });
3373        }
3374
3375        let row_count = row_ids.len();
3376        let table_id = table.table.table_id();
3377        let logical_fields: Vec<LogicalFieldId> = table
3378            .schema
3379            .columns
3380            .iter()
3381            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3382            .collect();
3383
3384        let mut stream = table.table.stream_columns(
3385            logical_fields.clone(),
3386            row_ids.clone(),
3387            GatherNullPolicy::IncludeNulls,
3388        )?;
3389
3390        let mut new_rows: Vec<Vec<PlanValue>> =
3391            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3392        while let Some(chunk) = stream.next_batch()? {
3393            let batch = chunk.batch();
3394            let base = chunk.row_offset();
3395            let local_len = batch.num_rows();
3396            for col_idx in 0..batch.num_columns() {
3397                let array = batch.column(col_idx);
3398                for local_idx in 0..local_len {
3399                    let target_index = base + local_idx;
3400                    debug_assert!(
3401                        target_index < new_rows.len(),
3402                        "column stream produced out-of-range row index"
3403                    );
3404                    if let Some(row) = new_rows.get_mut(target_index) {
3405                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3406                        row.push(value);
3407                    }
3408                }
3409            }
3410        }
3411        debug_assert!(
3412            new_rows
3413                .iter()
3414                .all(|row| row.len() == table.schema.columns.len())
3415        );
3416
3417        tracing::trace!(
3418            table = %display_name,
3419            row_count,
3420            rows = ?new_rows,
3421            "update_filtered_rows captured source rows"
3422        );
3423
3424        let constraint_ctx = self.build_table_constraint_context(table)?;
3425        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3426        let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3427        if let Some(pk) = primary_key_spec {
3428            original_primary_key_keys.reserve(row_count);
3429            for row in &new_rows {
3430                let mut values = Vec::with_capacity(pk.schema_indices.len());
3431                for &idx in &pk.schema_indices {
3432                    let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3433                    values.push(value);
3434                }
3435                let key = build_composite_unique_key(&values, &pk.column_names)?;
3436                original_primary_key_keys.push(key);
3437            }
3438        }
3439
3440        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3441            table
3442                .schema
3443                .columns
3444                .iter()
3445                .enumerate()
3446                .map(|(idx, column)| (column.field_id, idx)),
3447        );
3448
3449        for (column, value) in prepared {
3450            let column_index =
3451                column_positions
3452                    .get(&column.field_id)
3453                    .copied()
3454                    .ok_or_else(|| {
3455                        Error::InvalidArgumentError(format!(
3456                            "column '{}' missing in table schema during UPDATE",
3457                            column.name
3458                        ))
3459                    })?;
3460
3461            let values = match value {
3462                PreparedAssignmentValue::Literal(lit) => vec![lit; row_count],
3463                PreparedAssignmentValue::Expression { expr_index } => {
3464                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3465                        Error::InvalidArgumentError(
3466                            "expression assignment value missing during UPDATE".into(),
3467                        )
3468                    })?;
3469                    if column_values.len() != row_count {
3470                        return Err(Error::InvalidArgumentError(
3471                            "expression result count did not match targeted row count".into(),
3472                        ));
3473                    }
3474                    mem::take(column_values)
3475                }
3476            };
3477
3478            for (row_idx, new_value) in values.into_iter().enumerate() {
3479                if let Some(row) = new_rows.get_mut(row_idx) {
3480                    let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3481                    row[column_index] = coerced;
3482                }
3483            }
3484        }
3485
3486        let column_names: Vec<String> = table
3487            .schema
3488            .columns
3489            .iter()
3490            .map(|column| column.name.clone())
3491            .collect();
3492        let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3493        self.constraint_service.validate_row_level_constraints(
3494            &constraint_ctx.schema_field_ids,
3495            &constraint_ctx.column_constraints,
3496            &column_order,
3497            &new_rows,
3498        )?;
3499
3500        if let Some(pk) = primary_key_spec {
3501            self.constraint_service.validate_update_primary_keys(
3502                &constraint_ctx.schema_field_ids,
3503                pk,
3504                &column_order,
3505                &new_rows,
3506                &original_primary_key_keys,
3507                |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3508            )?;
3509        }
3510
3511        let _ = self.apply_delete(
3512            table,
3513            display_name.clone(),
3514            canonical_name.clone(),
3515            row_ids.clone(),
3516            snapshot,
3517            false,
3518        )?;
3519
3520        let _ = self.insert_rows(
3521            table,
3522            display_name.clone(),
3523            canonical_name,
3524            new_rows,
3525            column_names,
3526            snapshot,
3527        )?;
3528
3529        Ok(RuntimeStatementResult::Update {
3530            table_name: display_name,
3531            rows_updated: row_count,
3532        })
3533    }
3534
3535    fn update_all_rows(
3536        &self,
3537        table: &ExecutorTable<P>,
3538        display_name: String,
3539        canonical_name: String,
3540        assignments: Vec<ColumnAssignment>,
3541        snapshot: TransactionSnapshot,
3542    ) -> Result<RuntimeStatementResult<P>> {
3543        if assignments.is_empty() {
3544            return Err(Error::InvalidArgumentError(
3545                "UPDATE requires at least one assignment".into(),
3546            ));
3547        }
3548
3549        let total_rows = table.total_rows.load(Ordering::SeqCst);
3550        let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
3551            Error::InvalidArgumentError("table row count exceeds supported range".into())
3552        })?;
3553        if total_rows_usize == 0 {
3554            return Ok(RuntimeStatementResult::Update {
3555                table_name: display_name,
3556                rows_updated: 0,
3557            });
3558        }
3559
3560        let schema = table.schema.as_ref();
3561
3562        let mut seen_columns: FxHashSet<String> =
3563            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3564        let mut prepared: Vec<(ExecutorColumn, PreparedAssignmentValue)> =
3565            Vec::with_capacity(assignments.len());
3566        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3567        let mut first_field_id: Option<FieldId> = None;
3568
3569        for assignment in assignments {
3570            let normalized = assignment.column.to_ascii_lowercase();
3571            if !seen_columns.insert(normalized.clone()) {
3572                return Err(Error::InvalidArgumentError(format!(
3573                    "duplicate column '{}' in UPDATE assignments",
3574                    assignment.column
3575                )));
3576            }
3577            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3578                Error::InvalidArgumentError(format!(
3579                    "unknown column '{}' in UPDATE",
3580                    assignment.column
3581                ))
3582            })?;
3583            if first_field_id.is_none() {
3584                first_field_id = Some(column.field_id);
3585            }
3586
3587            match assignment.value {
3588                AssignmentValue::Literal(value) => {
3589                    prepared.push((column.clone(), PreparedAssignmentValue::Literal(value)));
3590                }
3591                AssignmentValue::Expression(expr) => {
3592                    let translated = translate_scalar(&expr, schema)?;
3593                    let expr_index = scalar_exprs.len();
3594                    scalar_exprs.push(translated);
3595                    prepared.push((
3596                        column.clone(),
3597                        PreparedAssignmentValue::Expression { expr_index },
3598                    ));
3599                }
3600            }
3601        }
3602
3603        let anchor_field = first_field_id.ok_or_else(|| {
3604            Error::InvalidArgumentError("UPDATE requires at least one target column".into())
3605        })?;
3606
3607        let filter_expr = full_table_scan_filter(anchor_field);
3608        let (row_ids, mut expr_values) =
3609            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3610
3611        if row_ids.is_empty() {
3612            return Ok(RuntimeStatementResult::Update {
3613                table_name: display_name,
3614                rows_updated: 0,
3615            });
3616        }
3617
3618        let row_count = row_ids.len();
3619        let table_id = table.table.table_id();
3620        let logical_fields: Vec<LogicalFieldId> = table
3621            .schema
3622            .columns
3623            .iter()
3624            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3625            .collect();
3626
3627        let mut stream = table.table.stream_columns(
3628            logical_fields.clone(),
3629            row_ids.clone(),
3630            GatherNullPolicy::IncludeNulls,
3631        )?;
3632
3633        let mut new_rows: Vec<Vec<PlanValue>> =
3634            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3635        while let Some(chunk) = stream.next_batch()? {
3636            let batch = chunk.batch();
3637            let base = chunk.row_offset();
3638            let local_len = batch.num_rows();
3639            for col_idx in 0..batch.num_columns() {
3640                let array = batch.column(col_idx);
3641                for local_idx in 0..local_len {
3642                    let target_index = base + local_idx;
3643                    debug_assert!(
3644                        target_index < new_rows.len(),
3645                        "column stream produced out-of-range row index"
3646                    );
3647                    if let Some(row) = new_rows.get_mut(target_index) {
3648                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3649                        row.push(value);
3650                    }
3651                }
3652            }
3653        }
3654        debug_assert!(
3655            new_rows
3656                .iter()
3657                .all(|row| row.len() == table.schema.columns.len())
3658        );
3659
3660        let constraint_ctx = self.build_table_constraint_context(table)?;
3661        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3662        let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3663        if let Some(pk) = primary_key_spec {
3664            original_primary_key_keys.reserve(row_count);
3665            for row in &new_rows {
3666                let mut values = Vec::with_capacity(pk.schema_indices.len());
3667                for &idx in &pk.schema_indices {
3668                    let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3669                    values.push(value);
3670                }
3671                let key = build_composite_unique_key(&values, &pk.column_names)?;
3672                original_primary_key_keys.push(key);
3673            }
3674        }
3675
3676        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3677            table
3678                .schema
3679                .columns
3680                .iter()
3681                .enumerate()
3682                .map(|(idx, column)| (column.field_id, idx)),
3683        );
3684
3685        for (column, value) in prepared {
3686            let column_index =
3687                column_positions
3688                    .get(&column.field_id)
3689                    .copied()
3690                    .ok_or_else(|| {
3691                        Error::InvalidArgumentError(format!(
3692                            "column '{}' missing in table schema during UPDATE",
3693                            column.name
3694                        ))
3695                    })?;
3696
3697            let values = match value {
3698                PreparedAssignmentValue::Literal(lit) => vec![lit; row_count],
3699                PreparedAssignmentValue::Expression { expr_index } => {
3700                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3701                        Error::InvalidArgumentError(
3702                            "expression assignment value missing during UPDATE".into(),
3703                        )
3704                    })?;
3705                    if column_values.len() != row_count {
3706                        return Err(Error::InvalidArgumentError(
3707                            "expression result count did not match targeted row count".into(),
3708                        ));
3709                    }
3710                    mem::take(column_values)
3711                }
3712            };
3713
3714            for (row_idx, new_value) in values.into_iter().enumerate() {
3715                if let Some(row) = new_rows.get_mut(row_idx) {
3716                    let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3717                    row[column_index] = coerced;
3718                }
3719            }
3720        }
3721
3722        let column_names: Vec<String> = table
3723            .schema
3724            .columns
3725            .iter()
3726            .map(|column| column.name.clone())
3727            .collect();
3728        let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3729        self.constraint_service.validate_row_level_constraints(
3730            &constraint_ctx.schema_field_ids,
3731            &constraint_ctx.column_constraints,
3732            &column_order,
3733            &new_rows,
3734        )?;
3735
3736        if let Some(pk) = primary_key_spec {
3737            self.constraint_service.validate_update_primary_keys(
3738                &constraint_ctx.schema_field_ids,
3739                pk,
3740                &column_order,
3741                &new_rows,
3742                &original_primary_key_keys,
3743                |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3744            )?;
3745        }
3746
3747        let _ = self.apply_delete(
3748            table,
3749            display_name.clone(),
3750            canonical_name.clone(),
3751            row_ids.clone(),
3752            snapshot,
3753            false,
3754        )?;
3755
3756        let _ = self.insert_rows(
3757            table,
3758            display_name.clone(),
3759            canonical_name,
3760            new_rows,
3761            column_names,
3762            snapshot,
3763        )?;
3764
3765        Ok(RuntimeStatementResult::Update {
3766            table_name: display_name,
3767            rows_updated: row_count,
3768        })
3769    }
3770
3771    fn delete_filtered_rows(
3772        &self,
3773        table: &ExecutorTable<P>,
3774        display_name: String,
3775        canonical_name: String,
3776        filter: LlkvExpr<'static, String>,
3777        snapshot: TransactionSnapshot,
3778    ) -> Result<RuntimeStatementResult<P>> {
3779        let schema = table.schema.as_ref();
3780        let filter_expr = translate_predicate(filter, schema)?;
3781        let row_ids = table.table.filter_row_ids(&filter_expr)?;
3782        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3783        tracing::trace!(
3784            table = %display_name,
3785            rows = row_ids.len(),
3786            "delete_filtered_rows collected row ids"
3787        );
3788        self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3789    }
3790
3791    fn delete_all_rows(
3792        &self,
3793        table: &ExecutorTable<P>,
3794        display_name: String,
3795        canonical_name: String,
3796        snapshot: TransactionSnapshot,
3797    ) -> Result<RuntimeStatementResult<P>> {
3798        let total_rows = table.total_rows.load(Ordering::SeqCst);
3799        if total_rows == 0 {
3800            return Ok(RuntimeStatementResult::Delete {
3801                table_name: display_name,
3802                rows_deleted: 0,
3803            });
3804        }
3805
3806        let anchor_field = table.schema.first_field_id().ok_or_else(|| {
3807            Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
3808        })?;
3809        let filter_expr = full_table_scan_filter(anchor_field);
3810        let row_ids = table.table.filter_row_ids(&filter_expr)?;
3811        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3812        self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3813    }
3814
3815    fn apply_delete(
3816        &self,
3817        table: &ExecutorTable<P>,
3818        display_name: String,
3819        canonical_name: String,
3820        row_ids: Vec<RowId>,
3821        snapshot: TransactionSnapshot,
3822        enforce_foreign_keys: bool,
3823    ) -> Result<RuntimeStatementResult<P>> {
3824        if row_ids.is_empty() {
3825            return Ok(RuntimeStatementResult::Delete {
3826                table_name: display_name,
3827                rows_deleted: 0,
3828            });
3829        }
3830
3831        if enforce_foreign_keys {
3832            self.check_foreign_keys_on_delete(
3833                table,
3834                &display_name,
3835                &canonical_name,
3836                &row_ids,
3837                snapshot,
3838            )?;
3839        }
3840
3841        self.detect_delete_conflicts(table, &display_name, &row_ids, snapshot)?;
3842
3843        let removed = row_ids.len();
3844
3845        // Build DELETE batch using helper
3846        let batch = mvcc::build_delete_batch(row_ids.clone(), snapshot.txn_id)?;
3847        table.table.append(&batch)?;
3848
3849        let removed_u64 = u64::try_from(removed)
3850            .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
3851        table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
3852
3853        Ok(RuntimeStatementResult::Delete {
3854            table_name: display_name,
3855            rows_deleted: removed,
3856        })
3857    }
3858
3859    fn check_foreign_keys_on_delete(
3860        &self,
3861        table: &ExecutorTable<P>,
3862        _display_name: &str,
3863        _canonical_name: &str,
3864        row_ids: &[RowId],
3865        snapshot: TransactionSnapshot,
3866    ) -> Result<()> {
3867        if row_ids.is_empty() {
3868            return Ok(());
3869        }
3870
3871        self.constraint_service.validate_delete_foreign_keys(
3872            table.table.table_id(),
3873            row_ids,
3874            |request| {
3875                self.collect_row_values_for_ids(
3876                    table,
3877                    request.referenced_row_ids,
3878                    request.referenced_field_ids,
3879                )
3880            },
3881            |request| {
3882                let child_table = self.lookup_table(request.referencing_table_canonical)?;
3883                self.collect_visible_child_rows(
3884                    child_table.as_ref(),
3885                    request.referencing_field_ids,
3886                    snapshot,
3887                )
3888            },
3889        )
3890    }
3891
3892    fn check_foreign_keys_on_insert(
3893        &self,
3894        table: &ExecutorTable<P>,
3895        _display_name: &str,
3896        rows: &[Vec<PlanValue>],
3897        column_order: &[usize],
3898        snapshot: TransactionSnapshot,
3899    ) -> Result<()> {
3900        if rows.is_empty() {
3901            return Ok(());
3902        }
3903
3904        let schema_field_ids: Vec<FieldId> = table
3905            .schema
3906            .columns
3907            .iter()
3908            .map(|column| column.field_id)
3909            .collect();
3910
3911        self.constraint_service.validate_insert_foreign_keys(
3912            table.table.table_id(),
3913            &schema_field_ids,
3914            column_order,
3915            rows,
3916            |request| {
3917                let parent_table = self.lookup_table(request.referenced_table_canonical)?;
3918                self.scan_multi_column_values(
3919                    parent_table.as_ref(),
3920                    request.referenced_field_ids,
3921                    snapshot,
3922                )
3923            },
3924        )
3925    }
3926
3927    fn detect_delete_conflicts(
3928        &self,
3929        table: &ExecutorTable<P>,
3930        display_name: &str,
3931        row_ids: &[RowId],
3932        snapshot: TransactionSnapshot,
3933    ) -> Result<()> {
3934        if row_ids.is_empty() {
3935            return Ok(());
3936        }
3937
3938        let table_id = table.table.table_id();
3939        let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3940        let logical_fields: Arc<[LogicalFieldId]> = Arc::from([deleted_lfid]);
3941
3942        if let Err(err) = table
3943            .table
3944            .store()
3945            .prepare_gather_context(logical_fields.as_ref())
3946        {
3947            match err {
3948                Error::NotFound => return Ok(()),
3949                other => return Err(other),
3950            }
3951        }
3952
3953        let mut stream = table.table.stream_columns(
3954            Arc::clone(&logical_fields),
3955            row_ids.to_vec(),
3956            GatherNullPolicy::IncludeNulls,
3957        )?;
3958
3959        while let Some(chunk) = stream.next_batch()? {
3960            let batch = chunk.batch();
3961            let window = chunk.row_ids();
3962            let deleted_column = batch
3963                .column(0)
3964                .as_any()
3965                .downcast_ref::<UInt64Array>()
3966                .ok_or_else(|| {
3967                    Error::Internal(
3968                        "failed to read MVCC deleted_by column for conflict detection".into(),
3969                    )
3970                })?;
3971
3972            for (idx, row_id) in window.iter().enumerate() {
3973                let deleted_by = if deleted_column.is_null(idx) {
3974                    TXN_ID_NONE
3975                } else {
3976                    deleted_column.value(idx)
3977                };
3978
3979                if deleted_by == TXN_ID_NONE || deleted_by == snapshot.txn_id {
3980                    continue;
3981                }
3982
3983                let status = self.txn_manager.status(deleted_by);
3984                if !status.is_active() {
3985                    continue;
3986                }
3987
3988                tracing::debug!(
3989                    "[MVCC] delete conflict: table='{}' row_id={} deleted_by={} status={:?} current_txn={}",
3990                    display_name,
3991                    row_id,
3992                    deleted_by,
3993                    status,
3994                    snapshot.txn_id
3995                );
3996
3997                return Err(Error::TransactionContextError(format!(
3998                    "transaction conflict on table '{}' for row {}: row locked by transaction {} ({:?})",
3999                    display_name, row_id, deleted_by, status
4000                )));
4001            }
4002        }
4003
4004        Ok(())
4005    }
4006
4007    fn collect_update_rows(
4008        &self,
4009        table: &ExecutorTable<P>,
4010        filter_expr: &LlkvExpr<'static, FieldId>,
4011        expressions: &[ScalarExpr<FieldId>],
4012        snapshot: TransactionSnapshot,
4013    ) -> Result<(Vec<RowId>, Vec<Vec<PlanValue>>)> {
4014        let row_ids = table.table.filter_row_ids(filter_expr)?;
4015        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
4016        if row_ids.is_empty() {
4017            return Ok((row_ids, vec![Vec::new(); expressions.len()]));
4018        }
4019
4020        if expressions.is_empty() {
4021            return Ok((row_ids, Vec::new()));
4022        }
4023
4024        let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
4025        for (idx, expr) in expressions.iter().enumerate() {
4026            let alias = format!("__expr_{idx}");
4027            projections.push(ScanProjection::computed(expr.clone(), alias));
4028        }
4029
4030        let mut expr_values: Vec<Vec<PlanValue>> =
4031            vec![Vec::with_capacity(row_ids.len()); expressions.len()];
4032        let mut error: Option<Error> = None;
4033        let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
4034            Arc::clone(&self.txn_manager),
4035            snapshot,
4036        ));
4037        let options = ScanStreamOptions {
4038            include_nulls: true,
4039            order: None,
4040            row_id_filter: Some(row_filter),
4041        };
4042
4043        table
4044            .table
4045            .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
4046                if error.is_some() {
4047                    return;
4048                }
4049                if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
4050                    error = Some(err);
4051                }
4052            })?;
4053
4054        if let Some(err) = error {
4055            return Err(err);
4056        }
4057
4058        for values in &expr_values {
4059            if values.len() != row_ids.len() {
4060                return Err(Error::InvalidArgumentError(
4061                    "expression result count did not match targeted row count".into(),
4062                ));
4063            }
4064        }
4065
4066        Ok((row_ids, expr_values))
4067    }
4068
4069    fn collect_expression_values(
4070        expr_values: &mut [Vec<PlanValue>],
4071        batch: RecordBatch,
4072    ) -> Result<()> {
4073        for row_idx in 0..batch.num_rows() {
4074            for (expr_index, values) in expr_values.iter_mut().enumerate() {
4075                let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
4076                values.push(value);
4077            }
4078        }
4079
4080        Ok(())
4081    }
4082
4083    /// Looks up a table in the executor cache, lazily loading it from metadata if not already cached.
4084    /// This is the primary method for obtaining table references for query execution.
4085    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4086        // Fast path: check if table is already loaded
4087        {
4088            let tables = self.tables.read().unwrap();
4089            if let Some(table) = tables.get(canonical_name) {
4090                // Check if table has been dropped
4091                if self.dropped_tables.read().unwrap().contains(canonical_name) {
4092                    // Table was dropped - treat as not found
4093                    return Err(Error::NotFound);
4094                }
4095                tracing::trace!(
4096                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
4097                    canonical_name,
4098                    table.table.table_id(),
4099                    table.schema.columns.len(),
4100                    &*self.pager
4101                );
4102                return Ok(Arc::clone(table));
4103            }
4104        } // Release read lock
4105
4106        // Slow path: load table from catalog (happens once per table)
4107        tracing::debug!(
4108            "[LAZY_LOAD] Loading table '{}' from catalog",
4109            canonical_name
4110        );
4111
4112        // Check catalog first for table existence
4113        let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
4114            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4115        })?;
4116
4117        let table_id = catalog_table_id;
4118        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
4119        let store = table.store();
4120        let mut logical_fields = store.user_field_ids_for_table(table_id);
4121        logical_fields.sort_by_key(|lfid| lfid.field_id());
4122        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
4123        let summary = self
4124            .catalog_service
4125            .table_constraint_summary(canonical_name)?;
4126        let TableConstraintSummaryView {
4127            table_meta,
4128            column_metas,
4129            constraint_records,
4130            multi_column_uniques,
4131        } = summary;
4132        let _table_meta = table_meta.ok_or_else(|| {
4133            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4134        })?;
4135        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
4136        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
4137        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
4138        let mut has_primary_key_records = false;
4139        let mut has_single_unique_records = false;
4140
4141        for record in constraint_records
4142            .iter()
4143            .filter(|record| record.is_active())
4144        {
4145            match &record.kind {
4146                ConstraintKind::PrimaryKey(pk) => {
4147                    has_primary_key_records = true;
4148                    for field_id in &pk.field_ids {
4149                        metadata_primary_keys.insert(*field_id);
4150                        metadata_unique_fields.insert(*field_id);
4151                    }
4152                }
4153                ConstraintKind::Unique(unique) => {
4154                    if unique.field_ids.len() == 1 {
4155                        has_single_unique_records = true;
4156                        metadata_unique_fields.insert(unique.field_ids[0]);
4157                    }
4158                }
4159                _ => {}
4160            }
4161        }
4162
4163        // Build ExecutorSchema from metadata manager snapshots
4164        let mut executor_columns = Vec::new();
4165        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
4166
4167        for (idx, lfid) in logical_fields.iter().enumerate() {
4168            let field_id = lfid.field_id();
4169            let normalized_index = executor_columns.len();
4170
4171            let column_name = column_metas
4172                .get(idx)
4173                .and_then(|meta| meta.as_ref())
4174                .and_then(|meta| meta.name.clone())
4175                .unwrap_or_else(|| format!("col_{}", field_id));
4176
4177            let normalized = column_name.to_ascii_lowercase();
4178            lookup.insert(normalized, normalized_index);
4179
4180            let fallback_constraints: FieldConstraints = catalog_field_resolver
4181                .as_ref()
4182                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
4183                .unwrap_or_default();
4184
4185            let metadata_primary = metadata_primary_keys.contains(&field_id);
4186            let primary_key = if has_primary_key_records {
4187                metadata_primary
4188            } else {
4189                fallback_constraints.primary_key
4190            };
4191
4192            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
4193            let unique = if has_primary_key_records || has_single_unique_records {
4194                metadata_unique
4195            } else {
4196                fallback_constraints.primary_key || fallback_constraints.unique
4197            };
4198
4199            let data_type = store.data_type(*lfid)?;
4200            let nullable = !primary_key;
4201
4202            executor_columns.push(ExecutorColumn {
4203                name: column_name,
4204                data_type,
4205                nullable,
4206                primary_key,
4207                unique,
4208                field_id,
4209                check_expr: fallback_constraints.check_expr.clone(),
4210            });
4211        }
4212
4213        let exec_schema = Arc::new(ExecutorSchema {
4214            columns: executor_columns,
4215            lookup,
4216        });
4217
4218        // Find the maximum row_id in the table to set next_row_id correctly
4219        let max_row_id = {
4220            use arrow::array::UInt64Array;
4221            use llkv_column_map::store::rowid_fid;
4222            use llkv_column_map::store::scan::{
4223                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
4224                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
4225            };
4226
4227            struct MaxRowIdVisitor {
4228                max: RowId,
4229            }
4230
4231            impl PrimitiveVisitor for MaxRowIdVisitor {
4232                fn u64_chunk(&mut self, values: &UInt64Array) {
4233                    for i in 0..values.len() {
4234                        let val = values.value(i);
4235                        if val > self.max {
4236                            self.max = val;
4237                        }
4238                    }
4239                }
4240            }
4241
4242            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
4243            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
4244            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
4245
4246            // Scan the row_id column for any user field in this table
4247            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
4248            let mut visitor = MaxRowIdVisitor { max: 0 };
4249
4250            match ScanBuilder::new(table.store(), row_id_field)
4251                .options(ScanOptions::default())
4252                .run(&mut visitor)
4253            {
4254                Ok(_) => visitor.max,
4255                Err(llkv_result::Error::NotFound) => 0,
4256                Err(e) => {
4257                    tracing::warn!(
4258                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
4259                        canonical_name,
4260                        e
4261                    );
4262                    0
4263                }
4264            }
4265        };
4266
4267        let next_row_id = if max_row_id > 0 {
4268            max_row_id.saturating_add(1)
4269        } else {
4270            0
4271        };
4272
4273        // Get the actual persisted row count from table metadata
4274        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
4275        // Fallback to 0 for truly empty tables
4276        let total_rows = table.total_rows().unwrap_or(0);
4277
4278        let executor_table = Arc::new(ExecutorTable {
4279            table: Arc::new(table),
4280            schema: exec_schema,
4281            next_row_id: AtomicU64::new(next_row_id),
4282            total_rows: AtomicU64::new(total_rows),
4283            multi_column_uniques: RwLock::new(Vec::new()),
4284        });
4285
4286        if !multi_column_uniques.is_empty() {
4287            let executor_uniques =
4288                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
4289            executor_table.set_multi_column_uniques(executor_uniques);
4290        }
4291
4292        // Cache the loaded table
4293        {
4294            let mut tables = self.tables.write().unwrap();
4295            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
4296        }
4297
4298        // Register fields in catalog (may already be registered from RuntimeContext::new())
4299        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
4300            for col in &executor_table.schema.columns {
4301                let definition = FieldDefinition::new(&col.name)
4302                    .with_primary_key(col.primary_key)
4303                    .with_unique(col.unique)
4304                    .with_check_expr(col.check_expr.clone());
4305                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
4306            }
4307            tracing::debug!(
4308                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
4309                executor_table.schema.columns.len(),
4310                canonical_name
4311            );
4312        }
4313
4314        tracing::debug!(
4315            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
4316            canonical_name,
4317            table_id,
4318            field_ids.len(),
4319            next_row_id
4320        );
4321
4322        Ok(executor_table)
4323    }
4324
4325    fn remove_table_entry(&self, canonical_name: &str) {
4326        let mut tables = self.tables.write().unwrap();
4327        if tables.remove(canonical_name).is_some() {
4328            tracing::trace!(
4329                "remove_table_entry: removed table '{}' from context cache",
4330                canonical_name
4331            );
4332        }
4333    }
4334
4335    pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
4336        let (display_name, canonical_name) = canonical_table_name(name)?;
4337        let (table_id, column_field_ids) = {
4338            let tables = self.tables.read().unwrap();
4339            let Some(entry) = tables.get(&canonical_name) else {
4340                if if_exists {
4341                    return Ok(());
4342                } else {
4343                    return Err(Error::CatalogError(format!(
4344                        "Catalog Error: Table '{}' does not exist",
4345                        display_name
4346                    )));
4347                }
4348            };
4349
4350            let field_ids = entry
4351                .schema
4352                .columns
4353                .iter()
4354                .map(|col| col.field_id)
4355                .collect::<Vec<_>>();
4356            (entry.table.table_id(), field_ids)
4357        };
4358
4359        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
4360
4361        for detail in referencing {
4362            if detail.referencing_table_canonical == canonical_name {
4363                continue;
4364            }
4365
4366            if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
4367                continue;
4368            }
4369
4370            let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
4371            return Err(Error::ConstraintError(format!(
4372                "Cannot drop table '{}' because it is referenced by foreign key constraint '{}' on table '{}'",
4373                display_name, constraint_label, detail.referencing_table_display
4374            )));
4375        }
4376
4377        self.catalog_service
4378            .drop_table(&canonical_name, table_id, &column_field_ids)?;
4379        tracing::debug!(
4380            "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
4381            canonical_name,
4382            table_id
4383        );
4384
4385        self.dropped_tables
4386            .write()
4387            .unwrap()
4388            .insert(canonical_name.clone());
4389        Ok(())
4390    }
4391
4392    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
4393        self.dropped_tables.read().unwrap().contains(canonical_name)
4394    }
4395}
4396
4397// Implement TransactionContext for ContextWrapper to enable llkv-transaction integration
4398impl<P> TransactionContext for RuntimeContextWrapper<P>
4399where
4400    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4401{
4402    type Pager = P;
4403
4404    fn set_snapshot(&self, snapshot: TransactionSnapshot) {
4405        self.update_snapshot(snapshot);
4406    }
4407
4408    fn snapshot(&self) -> TransactionSnapshot {
4409        self.current_snapshot()
4410    }
4411
4412    fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
4413        RuntimeContext::table_column_specs(self.context(), table_name)
4414    }
4415
4416    fn export_table_rows(
4417        &self,
4418        table_name: &str,
4419    ) -> llkv_result::Result<llkv_transaction::RowBatch> {
4420        let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
4421        // Convert from llkv_executor::RowBatch to llkv_transaction::RowBatch
4422        Ok(llkv_transaction::RowBatch {
4423            columns: batch.columns,
4424            rows: batch.rows,
4425        })
4426    }
4427
4428    fn get_batches_with_row_ids(
4429        &self,
4430        table_name: &str,
4431        filter: Option<LlkvExpr<'static, String>>,
4432    ) -> llkv_result::Result<Vec<RecordBatch>> {
4433        RuntimeContext::get_batches_with_row_ids_with_snapshot(
4434            self.context(),
4435            table_name,
4436            filter,
4437            self.snapshot(),
4438        )
4439    }
4440
4441    fn execute_select(
4442        &self,
4443        plan: SelectPlan,
4444    ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
4445        RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
4446    }
4447
4448    fn create_table_plan(
4449        &self,
4450        plan: CreateTablePlan,
4451    ) -> llkv_result::Result<TransactionResult<P>> {
4452        let result = RuntimeContext::create_table_plan(self.context(), plan)?;
4453        Ok(convert_statement_result(result))
4454    }
4455
4456    fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
4457        tracing::trace!(
4458            "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
4459            plan.table,
4460            &*self.ctx.pager
4461        );
4462        let snapshot = self.current_snapshot();
4463        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4464            self.ctx().insert(plan)?
4465        } else {
4466            RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
4467        };
4468        Ok(convert_statement_result(result))
4469    }
4470
4471    fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
4472        let snapshot = self.current_snapshot();
4473        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4474            self.ctx().update(plan)?
4475        } else {
4476            RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
4477        };
4478        Ok(convert_statement_result(result))
4479    }
4480
4481    fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
4482        let snapshot = self.current_snapshot();
4483        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4484            self.ctx().delete(plan)?
4485        } else {
4486            RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
4487        };
4488        Ok(convert_statement_result(result))
4489    }
4490
4491    fn create_index(&self, plan: CreateIndexPlan) -> llkv_result::Result<TransactionResult<P>> {
4492        let result = RuntimeContext::create_index(self.context(), plan)?;
4493        Ok(convert_statement_result(result))
4494    }
4495
4496    fn append_batches_with_row_ids(
4497        &self,
4498        table_name: &str,
4499        batches: Vec<RecordBatch>,
4500    ) -> llkv_result::Result<usize> {
4501        RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
4502    }
4503
4504    fn table_names(&self) -> Vec<String> {
4505        RuntimeContext::table_names(self.context())
4506    }
4507
4508    fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
4509        // Check CURRENT state: if table is marked as dropped, return error
4510        // This is used by conflict detection to detect if a table was dropped
4511        let ctx = self.context();
4512        if ctx.is_table_marked_dropped(table_name) {
4513            return Err(Error::InvalidArgumentError(format!(
4514                "table '{}' has been dropped",
4515                table_name
4516            )));
4517        }
4518
4519        let table = ctx.lookup_table(table_name)?;
4520        Ok(table.table.table_id())
4521    }
4522
4523    fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
4524        let ctx = self.context();
4525        ctx.catalog.snapshot()
4526    }
4527
4528    fn validate_commit_constraints(&self, txn_id: TxnId) -> llkv_result::Result<()> {
4529        self.ctx.validate_primary_keys_for_commit(txn_id)
4530    }
4531
4532    fn clear_transaction_state(&self, txn_id: TxnId) {
4533        self.ctx.clear_transaction_state(txn_id);
4534    }
4535}
4536
4537// Helper to convert StatementResult between types
4538fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
4539where
4540    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4541{
4542    use llkv_transaction::TransactionResult as TxResult;
4543    match result {
4544        RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
4545        RuntimeStatementResult::CreateIndex {
4546            table_name,
4547            index_name,
4548        } => TxResult::CreateIndex {
4549            table_name,
4550            index_name,
4551        },
4552        RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
4553        RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
4554            rows_matched: rows_updated,
4555            rows_updated,
4556        },
4557        RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
4558        RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
4559        _ => panic!("unsupported StatementResult conversion"),
4560    }
4561}
4562
4563fn filter_row_ids_for_snapshot<P>(
4564    table: &Table<P>,
4565    row_ids: Vec<RowId>,
4566    txn_manager: &TxnIdManager,
4567    snapshot: TransactionSnapshot,
4568) -> Result<Vec<RowId>>
4569where
4570    P: Pager<Blob = EntryHandle> + Send + Sync,
4571{
4572    tracing::debug!(
4573        "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
4574        row_ids.len(),
4575        snapshot.txn_id,
4576        snapshot.snapshot_id
4577    );
4578
4579    if row_ids.is_empty() {
4580        return Ok(row_ids);
4581    }
4582
4583    let table_id = table.table_id();
4584    let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
4585    let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
4586    let logical_fields: Arc<[LogicalFieldId]> = Arc::from([created_lfid, deleted_lfid]);
4587
4588    if let Err(err) = table
4589        .store()
4590        .prepare_gather_context(logical_fields.as_ref())
4591    {
4592        match err {
4593            Error::NotFound => {
4594                tracing::trace!(
4595                    "[FILTER_ROWS] MVCC columns not found for table_id={}, treating all rows as visible",
4596                    table_id
4597                );
4598                return Ok(row_ids);
4599            }
4600            other => {
4601                tracing::error!(
4602                    "[FILTER_ROWS] Failed to prepare gather context: {:?}",
4603                    other
4604                );
4605                return Err(other);
4606            }
4607        }
4608    }
4609
4610    let total_rows = row_ids.len();
4611    let mut stream = match table.stream_columns(
4612        Arc::clone(&logical_fields),
4613        row_ids,
4614        GatherNullPolicy::IncludeNulls,
4615    ) {
4616        Ok(stream) => stream,
4617        Err(err) => {
4618            tracing::error!("[FILTER_ROWS] stream_columns error: {:?}", err);
4619            return Err(err);
4620        }
4621    };
4622
4623    let mut visible = Vec::with_capacity(total_rows);
4624
4625    while let Some(chunk) = stream.next_batch()? {
4626        let batch = chunk.batch();
4627        let window = chunk.row_ids();
4628
4629        if batch.num_columns() < 2 {
4630            tracing::debug!(
4631                "[FILTER_ROWS] version_batch has < 2 columns for table_id={}, returning window rows unfiltered",
4632                table_id
4633            );
4634            visible.extend_from_slice(window);
4635            continue;
4636        }
4637
4638        let created_column = batch.column(0).as_any().downcast_ref::<UInt64Array>();
4639        let deleted_column = batch.column(1).as_any().downcast_ref::<UInt64Array>();
4640
4641        if created_column.is_none() || deleted_column.is_none() {
4642            tracing::debug!(
4643                "[FILTER_ROWS] Failed to downcast MVCC columns for table_id={}, returning window rows unfiltered",
4644                table_id
4645            );
4646            visible.extend_from_slice(window);
4647            continue;
4648        }
4649
4650        let created_column = created_column.unwrap();
4651        let deleted_column = deleted_column.unwrap();
4652
4653        for (idx, row_id) in window.iter().enumerate() {
4654            let created_by = if created_column.is_null(idx) {
4655                TXN_ID_AUTO_COMMIT
4656            } else {
4657                created_column.value(idx)
4658            };
4659            let deleted_by = if deleted_column.is_null(idx) {
4660                TXN_ID_NONE
4661            } else {
4662                deleted_column.value(idx)
4663            };
4664
4665            let version = RowVersion {
4666                created_by,
4667                deleted_by,
4668            };
4669            let is_visible = version.is_visible_for(txn_manager, snapshot);
4670            tracing::trace!(
4671                "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
4672                row_id,
4673                created_by,
4674                deleted_by,
4675                is_visible
4676            );
4677            if is_visible {
4678                visible.push(*row_id);
4679            }
4680        }
4681    }
4682
4683    tracing::debug!(
4684        "[FILTER_ROWS] Filtered from {} to {} visible rows",
4685        total_rows,
4686        visible.len()
4687    );
4688    Ok(visible)
4689}
4690
4691struct MvccRowIdFilter<P>
4692where
4693    P: Pager<Blob = EntryHandle> + Send + Sync,
4694{
4695    txn_manager: Arc<TxnIdManager>,
4696    snapshot: TransactionSnapshot,
4697    _marker: PhantomData<fn(P)>,
4698}
4699
4700impl<P> MvccRowIdFilter<P>
4701where
4702    P: Pager<Blob = EntryHandle> + Send + Sync,
4703{
4704    fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
4705        Self {
4706            txn_manager,
4707            snapshot,
4708            _marker: PhantomData,
4709        }
4710    }
4711}
4712
4713impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
4714where
4715    P: Pager<Blob = EntryHandle> + Send + Sync,
4716{
4717    fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> Result<Vec<RowId>> {
4718        tracing::trace!(
4719            "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
4720            row_ids,
4721            self.snapshot.txn_id,
4722            self.snapshot.snapshot_id
4723        );
4724        let result = filter_row_ids_for_snapshot(table, row_ids, &self.txn_manager, self.snapshot);
4725        if let Ok(ref visible) = result {
4726            tracing::trace!(
4727                "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
4728                visible
4729            );
4730        }
4731        result
4732    }
4733}
4734
4735// Wrapper to implement TableProvider for Context
4736struct ContextProvider<P>
4737where
4738    P: Pager<Blob = EntryHandle> + Send + Sync,
4739{
4740    context: Arc<RuntimeContext<P>>,
4741}
4742
4743impl<P> TableProvider<P> for ContextProvider<P>
4744where
4745    P: Pager<Blob = EntryHandle> + Send + Sync,
4746{
4747    fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4748        self.context.lookup_table(canonical_name)
4749    }
4750}
4751
4752/// Lazily built logical plan (thin wrapper over SelectPlan).
4753pub struct RuntimeLazyFrame<P>
4754where
4755    P: Pager<Blob = EntryHandle> + Send + Sync,
4756{
4757    context: Arc<RuntimeContext<P>>,
4758    plan: SelectPlan,
4759}
4760
4761impl<P> RuntimeLazyFrame<P>
4762where
4763    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4764{
4765    pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
4766        let (display, canonical) = canonical_table_name(table)?;
4767        context.lookup_table(&canonical)?;
4768        Ok(Self {
4769            context,
4770            plan: SelectPlan::new(display),
4771        })
4772    }
4773
4774    pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
4775        self.plan.filter = Some(predicate);
4776        self
4777    }
4778
4779    pub fn select_all(mut self) -> Self {
4780        self.plan.projections = vec![SelectProjection::AllColumns];
4781        self
4782    }
4783
4784    pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
4785    where
4786        S: AsRef<str>,
4787    {
4788        self.plan.projections = columns
4789            .into_iter()
4790            .map(|name| SelectProjection::Column {
4791                name: name.as_ref().to_string(),
4792                alias: None,
4793            })
4794            .collect();
4795        self
4796    }
4797
4798    pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
4799        self.plan.projections = projections;
4800        self
4801    }
4802
4803    pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
4804        self.plan.aggregates = aggregates;
4805        self
4806    }
4807
4808    pub fn collect(self) -> Result<SelectExecution<P>> {
4809        self.context.execute_select(self.plan)
4810    }
4811
4812    pub fn collect_rows(self) -> Result<RowBatch> {
4813        let execution = self.context.execute_select(self.plan)?;
4814        execution.collect_rows()
4815    }
4816
4817    pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
4818        Ok(self.collect_rows()?.rows)
4819    }
4820}
4821
4822fn current_time_micros() -> u64 {
4823    SystemTime::now()
4824        .duration_since(UNIX_EPOCH)
4825        .unwrap_or_default()
4826        .as_micros() as u64
4827}
4828
4829pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
4830    if columns.is_empty() {
4831        return Ok((0..schema.columns.len()).collect());
4832    }
4833    let mut resolved = Vec::with_capacity(columns.len());
4834    for column in columns {
4835        let normalized = column.to_ascii_lowercase();
4836        let index = schema.lookup.get(&normalized).ok_or_else(|| {
4837            Error::InvalidArgumentError(format!(
4838                "Binder Error: does not have a column named '{}'",
4839                column
4840            ))
4841        })?;
4842        resolved.push(*index);
4843    }
4844    Ok(resolved)
4845}
4846
4847fn normalize_insert_value_for_column(
4848    column: &ExecutorColumn,
4849    value: PlanValue,
4850) -> Result<PlanValue> {
4851    match (&column.data_type, value) {
4852        (_, PlanValue::Null) => Ok(PlanValue::Null),
4853        (DataType::Int64, PlanValue::Integer(v)) => Ok(PlanValue::Integer(v)),
4854        (DataType::Int64, PlanValue::Float(v)) => Ok(PlanValue::Integer(v as i64)),
4855        (DataType::Int64, other) => Err(Error::InvalidArgumentError(format!(
4856            "cannot insert {other:?} into INT column '{}'",
4857            column.name
4858        ))),
4859        (DataType::Boolean, PlanValue::Integer(v)) => {
4860            Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 }))
4861        }
4862        (DataType::Boolean, PlanValue::Float(v)) => {
4863            Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 }))
4864        }
4865        (DataType::Boolean, PlanValue::String(s)) => {
4866            let normalized = s.trim().to_ascii_lowercase();
4867            let value = match normalized.as_str() {
4868                "true" | "t" | "1" => 1,
4869                "false" | "f" | "0" => 0,
4870                _ => {
4871                    return Err(Error::InvalidArgumentError(format!(
4872                        "cannot insert string '{}' into BOOLEAN column '{}'",
4873                        s, column.name
4874                    )));
4875                }
4876            };
4877            Ok(PlanValue::Integer(value))
4878        }
4879        (DataType::Boolean, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4880            "cannot insert struct into BOOLEAN column '{}'",
4881            column.name
4882        ))),
4883        (DataType::Float64, PlanValue::Integer(v)) => Ok(PlanValue::Float(v as f64)),
4884        (DataType::Float64, PlanValue::Float(v)) => Ok(PlanValue::Float(v)),
4885        (DataType::Float64, other) => Err(Error::InvalidArgumentError(format!(
4886            "cannot insert {other:?} into DOUBLE column '{}'",
4887            column.name
4888        ))),
4889        (DataType::Utf8, PlanValue::Integer(v)) => Ok(PlanValue::String(v.to_string())),
4890        (DataType::Utf8, PlanValue::Float(v)) => Ok(PlanValue::String(v.to_string())),
4891        (DataType::Utf8, PlanValue::String(s)) => Ok(PlanValue::String(s)),
4892        (DataType::Utf8, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4893            "cannot insert struct into STRING column '{}'",
4894            column.name
4895        ))),
4896        (DataType::Date32, PlanValue::Integer(days)) => {
4897            let casted = i32::try_from(days).map_err(|_| {
4898                Error::InvalidArgumentError(format!(
4899                    "integer literal out of range for DATE column '{}'",
4900                    column.name
4901                ))
4902            })?;
4903            Ok(PlanValue::Integer(casted as i64))
4904        }
4905        (DataType::Date32, PlanValue::String(text)) => {
4906            let days = parse_date32_literal(&text)?;
4907            Ok(PlanValue::Integer(days as i64))
4908        }
4909        (DataType::Date32, other) => Err(Error::InvalidArgumentError(format!(
4910            "cannot insert {other:?} into DATE column '{}'",
4911            column.name
4912        ))),
4913        (DataType::Struct(_), PlanValue::Struct(map)) => Ok(PlanValue::Struct(map)),
4914        (DataType::Struct(_), other) => Err(Error::InvalidArgumentError(format!(
4915            "expected struct value for struct column '{}', got {other:?}",
4916            column.name
4917        ))),
4918        (other_type, other_value) => Err(Error::InvalidArgumentError(format!(
4919            "unsupported Arrow data type {:?} for INSERT value {:?} in column '{}'",
4920            other_type, other_value, column.name
4921        ))),
4922    }
4923}
4924
4925pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
4926    match dtype {
4927        DataType::Int64 => {
4928            let mut builder = Int64Builder::with_capacity(values.len());
4929            for value in values {
4930                match value {
4931                    PlanValue::Null => builder.append_null(),
4932                    PlanValue::Integer(v) => builder.append_value(*v),
4933                    PlanValue::Float(v) => builder.append_value(*v as i64),
4934                    PlanValue::String(_) | PlanValue::Struct(_) => {
4935                        return Err(Error::InvalidArgumentError(
4936                            "cannot insert non-integer into INT column".into(),
4937                        ));
4938                    }
4939                }
4940            }
4941            Ok(Arc::new(builder.finish()))
4942        }
4943        DataType::Boolean => {
4944            let mut builder = BooleanBuilder::with_capacity(values.len());
4945            for value in values {
4946                match value {
4947                    PlanValue::Null => builder.append_null(),
4948                    PlanValue::Integer(v) => builder.append_value(*v != 0),
4949                    PlanValue::Float(v) => builder.append_value(*v != 0.0),
4950                    PlanValue::String(s) => {
4951                        let normalized = s.trim().to_ascii_lowercase();
4952                        match normalized.as_str() {
4953                            "true" | "t" | "1" => builder.append_value(true),
4954                            "false" | "f" | "0" => builder.append_value(false),
4955                            _ => {
4956                                return Err(Error::InvalidArgumentError(format!(
4957                                    "cannot insert string '{}' into BOOLEAN column",
4958                                    s
4959                                )));
4960                            }
4961                        }
4962                    }
4963                    PlanValue::Struct(_) => {
4964                        return Err(Error::InvalidArgumentError(
4965                            "cannot insert struct into BOOLEAN column".into(),
4966                        ));
4967                    }
4968                }
4969            }
4970            Ok(Arc::new(builder.finish()))
4971        }
4972        DataType::Float64 => {
4973            let mut builder = Float64Builder::with_capacity(values.len());
4974            for value in values {
4975                match value {
4976                    PlanValue::Null => builder.append_null(),
4977                    PlanValue::Integer(v) => builder.append_value(*v as f64),
4978                    PlanValue::Float(v) => builder.append_value(*v),
4979                    PlanValue::String(_) | PlanValue::Struct(_) => {
4980                        return Err(Error::InvalidArgumentError(
4981                            "cannot insert non-numeric into DOUBLE column".into(),
4982                        ));
4983                    }
4984                }
4985            }
4986            Ok(Arc::new(builder.finish()))
4987        }
4988        DataType::Utf8 => {
4989            let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
4990            for value in values {
4991                match value {
4992                    PlanValue::Null => builder.append_null(),
4993                    PlanValue::Integer(v) => builder.append_value(v.to_string()),
4994                    PlanValue::Float(v) => builder.append_value(v.to_string()),
4995                    PlanValue::String(s) => builder.append_value(s),
4996                    PlanValue::Struct(_) => {
4997                        return Err(Error::InvalidArgumentError(
4998                            "cannot insert struct into STRING column".into(),
4999                        ));
5000                    }
5001                }
5002            }
5003            Ok(Arc::new(builder.finish()))
5004        }
5005        DataType::Date32 => {
5006            let mut builder = Date32Builder::with_capacity(values.len());
5007            for value in values {
5008                match value {
5009                    PlanValue::Null => builder.append_null(),
5010                    PlanValue::Integer(days) => {
5011                        let casted = i32::try_from(*days).map_err(|_| {
5012                            Error::InvalidArgumentError(
5013                                "integer literal out of range for DATE column".into(),
5014                            )
5015                        })?;
5016                        builder.append_value(casted);
5017                    }
5018                    PlanValue::Float(_) | PlanValue::Struct(_) => {
5019                        return Err(Error::InvalidArgumentError(
5020                            "cannot insert non-date value into DATE column".into(),
5021                        ));
5022                    }
5023                    PlanValue::String(text) => {
5024                        let days = parse_date32_literal(text)?;
5025                        builder.append_value(days);
5026                    }
5027                }
5028            }
5029            Ok(Arc::new(builder.finish()))
5030        }
5031        DataType::Struct(fields) => {
5032            use arrow::array::StructArray;
5033            let mut field_arrays: Vec<(FieldRef, ArrayRef)> = Vec::with_capacity(fields.len());
5034
5035            for field in fields.iter() {
5036                let field_name = field.name();
5037                let field_type = field.data_type();
5038                let mut field_values = Vec::with_capacity(values.len());
5039
5040                for value in values {
5041                    match value {
5042                        PlanValue::Null => field_values.push(PlanValue::Null),
5043                        PlanValue::Struct(map) => {
5044                            let field_value =
5045                                map.get(field_name).cloned().unwrap_or(PlanValue::Null);
5046                            field_values.push(field_value);
5047                        }
5048                        _ => {
5049                            return Err(Error::InvalidArgumentError(format!(
5050                                "expected struct value for struct column, got {:?}",
5051                                value
5052                            )));
5053                        }
5054                    }
5055                }
5056
5057                let field_array = build_array_for_column(field_type, &field_values)?;
5058                field_arrays.push((Arc::clone(field), field_array));
5059            }
5060
5061            Ok(Arc::new(StructArray::from(field_arrays)))
5062        }
5063        other => Err(Error::InvalidArgumentError(format!(
5064            "unsupported Arrow data type for INSERT: {other:?}"
5065        ))),
5066    }
5067}
5068
5069fn parse_date32_literal(text: &str) -> Result<i32> {
5070    let mut parts = text.split('-');
5071    let year_str = parts
5072        .next()
5073        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5074    let month_str = parts
5075        .next()
5076        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5077    let day_str = parts
5078        .next()
5079        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5080    if parts.next().is_some() {
5081        return Err(Error::InvalidArgumentError(format!(
5082            "invalid DATE literal '{text}'"
5083        )));
5084    }
5085
5086    let year = year_str.parse::<i32>().map_err(|_| {
5087        Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
5088    })?;
5089    let month_num = month_str.parse::<u8>().map_err(|_| {
5090        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5091    })?;
5092    let day = day_str.parse::<u8>().map_err(|_| {
5093        Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
5094    })?;
5095
5096    let month = Month::try_from(month_num).map_err(|_| {
5097        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5098    })?;
5099
5100    let date = Date::from_calendar_date(year, month, day).map_err(|err| {
5101        Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
5102    })?;
5103    let days = date.to_julian_day() - epoch_julian_day();
5104    Ok(days)
5105}
5106
5107fn epoch_julian_day() -> i32 {
5108    Date::from_calendar_date(1970, Month::January, 1)
5109        .expect("1970-01-01 is a valid date")
5110        .to_julian_day()
5111}
5112
5113// Array -> PlanValue conversion is provided by llkv-plan::plan_value_from_array
5114
5115fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
5116    LlkvExpr::Pred(Filter {
5117        field_id,
5118        op: Operator::Range {
5119            lower: Bound::Unbounded,
5120            upper: Bound::Unbounded,
5121        },
5122    })
5123}
5124
5125fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
5126    if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
5127        return Ok(ROW_ID_FIELD_ID);
5128    }
5129
5130    schema
5131        .resolve(name)
5132        .map(|column| column.field_id)
5133        .ok_or_else(|| {
5134            Error::InvalidArgumentError(format!(
5135                "Binder Error: does not have a column named '{name}'"
5136            ))
5137        })
5138}
5139
5140fn translate_predicate(
5141    expr: LlkvExpr<'static, String>,
5142    schema: &ExecutorSchema,
5143) -> Result<LlkvExpr<'static, FieldId>> {
5144    match expr {
5145        LlkvExpr::And(list) => {
5146            let mut converted = Vec::with_capacity(list.len());
5147            for item in list {
5148                converted.push(translate_predicate(item, schema)?);
5149            }
5150            Ok(LlkvExpr::And(converted))
5151        }
5152        LlkvExpr::Or(list) => {
5153            let mut converted = Vec::with_capacity(list.len());
5154            for item in list {
5155                converted.push(translate_predicate(item, schema)?);
5156            }
5157            Ok(LlkvExpr::Or(converted))
5158        }
5159        LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
5160            *inner, schema,
5161        )?))),
5162        LlkvExpr::Pred(Filter { field_id, op }) => {
5163            let resolved = resolve_field_id_from_schema(schema, &field_id)?;
5164            Ok(LlkvExpr::Pred(Filter {
5165                field_id: resolved,
5166                op,
5167            }))
5168        }
5169        LlkvExpr::Compare { left, op, right } => {
5170            let left = translate_scalar(&left, schema)?;
5171            let right = translate_scalar(&right, schema)?;
5172            Ok(LlkvExpr::Compare { left, op, right })
5173        }
5174    }
5175}
5176
5177fn translate_scalar(
5178    expr: &ScalarExpr<String>,
5179    schema: &ExecutorSchema,
5180) -> Result<ScalarExpr<FieldId>> {
5181    match expr {
5182        ScalarExpr::Column(name) => {
5183            let field_id = resolve_field_id_from_schema(schema, name)?;
5184            Ok(ScalarExpr::column(field_id))
5185        }
5186        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
5187        ScalarExpr::Binary { left, op, right } => {
5188            let left_expr = translate_scalar(left, schema)?;
5189            let right_expr = translate_scalar(right, schema)?;
5190            Ok(ScalarExpr::Binary {
5191                left: Box::new(left_expr),
5192                op: *op,
5193                right: Box::new(right_expr),
5194            })
5195        }
5196        ScalarExpr::Aggregate(agg) => {
5197            // Translate column names in aggregate calls to field IDs
5198            use llkv_expr::expr::AggregateCall;
5199            let translated_agg = match agg {
5200                AggregateCall::CountStar => AggregateCall::CountStar,
5201                AggregateCall::Count(name) => {
5202                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5203                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5204                    })?;
5205                    AggregateCall::Count(field_id)
5206                }
5207                AggregateCall::Sum(name) => {
5208                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5209                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5210                    })?;
5211                    AggregateCall::Sum(field_id)
5212                }
5213                AggregateCall::Min(name) => {
5214                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5215                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5216                    })?;
5217                    AggregateCall::Min(field_id)
5218                }
5219                AggregateCall::Max(name) => {
5220                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5221                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5222                    })?;
5223                    AggregateCall::Max(field_id)
5224                }
5225                AggregateCall::CountNulls(name) => {
5226                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5227                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5228                    })?;
5229                    AggregateCall::CountNulls(field_id)
5230                }
5231            };
5232            Ok(ScalarExpr::Aggregate(translated_agg))
5233        }
5234        ScalarExpr::GetField { base, field_name } => {
5235            let base_expr = translate_scalar(base, schema)?;
5236            Ok(ScalarExpr::GetField {
5237                base: Box::new(base_expr),
5238                field_name: field_name.clone(),
5239            })
5240        }
5241    }
5242}
5243
5244fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
5245    match expr {
5246        SqlExpr::Value(value) => plan_value_from_sql_value(value),
5247        SqlExpr::UnaryOp {
5248            op: UnaryOperator::Minus,
5249            expr,
5250        } => match plan_value_from_sql_expr(expr)? {
5251            PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
5252            PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
5253            PlanValue::Null | PlanValue::String(_) | PlanValue::Struct(_) => Err(
5254                Error::InvalidArgumentError("cannot negate non-numeric literal".into()),
5255            ),
5256        },
5257        SqlExpr::UnaryOp {
5258            op: UnaryOperator::Plus,
5259            expr,
5260        } => plan_value_from_sql_expr(expr),
5261        SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
5262        SqlExpr::Dictionary(fields) => {
5263            let mut map = std::collections::HashMap::new();
5264            for field in fields {
5265                let key = field.key.value.clone();
5266                let value = plan_value_from_sql_expr(&field.value)?;
5267                map.insert(key, value);
5268            }
5269            Ok(PlanValue::Struct(map))
5270        }
5271        other => Err(Error::InvalidArgumentError(format!(
5272            "unsupported literal expression: {other:?}"
5273        ))),
5274    }
5275}
5276
5277fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
5278    match &value.value {
5279        Value::Null => Ok(PlanValue::Null),
5280        Value::Number(text, _) => {
5281            if text.contains(['.', 'e', 'E']) {
5282                let parsed = text.parse::<f64>().map_err(|err| {
5283                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5284                })?;
5285                Ok(PlanValue::Float(parsed))
5286            } else {
5287                let parsed = text.parse::<i64>().map_err(|err| {
5288                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5289                })?;
5290                Ok(PlanValue::Integer(parsed))
5291            }
5292        }
5293        Value::Boolean(_) => Err(Error::InvalidArgumentError(
5294            "BOOLEAN literals are not supported yet".into(),
5295        )),
5296        other => {
5297            if let Some(text) = other.clone().into_string() {
5298                Ok(PlanValue::String(text))
5299            } else {
5300                Err(Error::InvalidArgumentError(format!(
5301                    "unsupported literal: {other:?}"
5302                )))
5303            }
5304        }
5305    }
5306}
5307
5308fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5309    matches!(
5310        expr,
5311        GroupByExpr::Expressions(exprs, modifiers)
5312            if exprs.is_empty() && modifiers.is_empty()
5313    )
5314}
5315
5316#[derive(Clone)]
5317pub struct RuntimeRangeSelectRows {
5318    rows: Vec<Vec<PlanValue>>,
5319}
5320
5321impl RuntimeRangeSelectRows {
5322    pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
5323        self.rows
5324    }
5325}
5326
5327#[derive(Clone)]
5328enum RangeProjection {
5329    Column,
5330    Literal(PlanValue),
5331}
5332
5333#[derive(Clone)]
5334pub struct RuntimeRangeSpec {
5335    start: i64,
5336    #[allow(dead_code)] // Used for validation, computed into row_count
5337    end: i64,
5338    row_count: usize,
5339    column_name_lower: String,
5340    table_alias_lower: Option<String>,
5341}
5342
5343impl RuntimeRangeSpec {
5344    fn matches_identifier(&self, ident: &str) -> bool {
5345        let lower = ident.to_ascii_lowercase();
5346        lower == self.column_name_lower || lower == "range"
5347    }
5348
5349    fn matches_table_alias(&self, ident: &str) -> bool {
5350        let lower = ident.to_ascii_lowercase();
5351        match &self.table_alias_lower {
5352            Some(alias) => lower == *alias,
5353            None => lower == "range",
5354        }
5355    }
5356
5357    fn matches_object_name(&self, name: &ObjectName) -> bool {
5358        if name.0.len() != 1 {
5359            return false;
5360        }
5361        match &name.0[0] {
5362            ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
5363            _ => false,
5364        }
5365    }
5366}
5367
5368pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
5369    let spec = match parse_range_spec(select)? {
5370        Some(spec) => spec,
5371        None => return Ok(None),
5372    };
5373
5374    if select.selection.is_some() {
5375        return Err(Error::InvalidArgumentError(
5376            "WHERE clauses are not supported for range() SELECT statements".into(),
5377        ));
5378    }
5379    if select.having.is_some()
5380        || !select.named_window.is_empty()
5381        || select.qualify.is_some()
5382        || select.distinct.is_some()
5383        || select.top.is_some()
5384        || select.into.is_some()
5385        || select.prewhere.is_some()
5386        || !select.lateral_views.is_empty()
5387        || select.value_table_mode.is_some()
5388        || !group_by_is_empty(&select.group_by)
5389    {
5390        return Err(Error::InvalidArgumentError(
5391            "advanced SELECT clauses are not supported for range() SELECT statements".into(),
5392        ));
5393    }
5394
5395    let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
5396
5397    // If projection is empty, treat it as SELECT * (implicit wildcard)
5398    if select.projection.is_empty() {
5399        projections.push(RangeProjection::Column);
5400    } else {
5401        for item in &select.projection {
5402            let projection = match item {
5403                SelectItem::Wildcard(_) => RangeProjection::Column,
5404                SelectItem::QualifiedWildcard(kind, _) => match kind {
5405                    SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
5406                        if spec.matches_object_name(object_name) {
5407                            RangeProjection::Column
5408                        } else {
5409                            return Err(Error::InvalidArgumentError(
5410                                "qualified wildcard must reference the range() source".into(),
5411                            ));
5412                        }
5413                    }
5414                    SelectItemQualifiedWildcardKind::Expr(_) => {
5415                        return Err(Error::InvalidArgumentError(
5416                            "expression-qualified wildcards are not supported for range() SELECT statements".into(),
5417                        ));
5418                    }
5419                },
5420                SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
5421                SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
5422            };
5423            projections.push(projection);
5424        }
5425    }
5426
5427    let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
5428    for idx in 0..spec.row_count {
5429        let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
5430        let value = spec.start + (idx as i64);
5431        for projection in &projections {
5432            match projection {
5433                RangeProjection::Column => row.push(PlanValue::Integer(value)),
5434                RangeProjection::Literal(value) => row.push(value.clone()),
5435            }
5436        }
5437        rows.push(row);
5438    }
5439
5440    Ok(Some(RuntimeRangeSelectRows { rows }))
5441}
5442
5443fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
5444    match expr {
5445        SqlExpr::Identifier(ident) => {
5446            if spec.matches_identifier(&ident.value) {
5447                Ok(RangeProjection::Column)
5448            } else {
5449                Err(Error::InvalidArgumentError(format!(
5450                    "unknown column '{}' in range() SELECT",
5451                    ident.value
5452                )))
5453            }
5454        }
5455        SqlExpr::CompoundIdentifier(parts) => {
5456            if parts.len() == 2
5457                && spec.matches_table_alias(&parts[0].value)
5458                && spec.matches_identifier(&parts[1].value)
5459            {
5460                Ok(RangeProjection::Column)
5461            } else {
5462                Err(Error::InvalidArgumentError(
5463                    "compound identifiers must reference the range() source".into(),
5464                ))
5465            }
5466        }
5467        SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
5468        other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
5469    }
5470}
5471
5472fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
5473    if select.from.len() != 1 {
5474        return Ok(None);
5475    }
5476    let item = &select.from[0];
5477    if !item.joins.is_empty() {
5478        return Err(Error::InvalidArgumentError(
5479            "JOIN clauses are not supported for range() SELECT statements".into(),
5480        ));
5481    }
5482
5483    match &item.relation {
5484        TableFactor::Function {
5485            lateral,
5486            name,
5487            args,
5488            alias,
5489        } => {
5490            if *lateral {
5491                return Err(Error::InvalidArgumentError(
5492                    "LATERAL range() is not supported".into(),
5493                ));
5494            }
5495            parse_range_spec_from_args(name, args, alias)
5496        }
5497        TableFactor::Table {
5498            name,
5499            alias,
5500            args: Some(table_args),
5501            with_ordinality,
5502            ..
5503        } => {
5504            if *with_ordinality {
5505                return Err(Error::InvalidArgumentError(
5506                    "WITH ORDINALITY is not supported for range()".into(),
5507                ));
5508            }
5509            if table_args.settings.is_some() {
5510                return Err(Error::InvalidArgumentError(
5511                    "range() SETTINGS clause is not supported".into(),
5512                ));
5513            }
5514            parse_range_spec_from_args(name, &table_args.args, alias)
5515        }
5516        _ => Ok(None),
5517    }
5518}
5519
5520fn parse_range_spec_from_args(
5521    name: &ObjectName,
5522    args: &[FunctionArg],
5523    alias: &Option<TableAlias>,
5524) -> Result<Option<RuntimeRangeSpec>> {
5525    if name.0.len() != 1 {
5526        return Ok(None);
5527    }
5528    let func_name = match &name.0[0] {
5529        ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
5530        _ => return Ok(None),
5531    };
5532    if func_name != "range" {
5533        return Ok(None);
5534    }
5535
5536    if args.is_empty() || args.len() > 2 {
5537        return Err(Error::InvalidArgumentError(
5538            "range() requires one or two arguments".into(),
5539        ));
5540    }
5541
5542    // Helper to extract integer from argument
5543    let extract_int = |arg: &FunctionArg| -> Result<i64> {
5544        let arg_expr = match arg {
5545            FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5546            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
5547            | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
5548                return Err(Error::InvalidArgumentError(
5549                    "range() argument must be an integer literal".into(),
5550                ));
5551            }
5552            FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
5553                return Err(Error::InvalidArgumentError(
5554                    "named arguments are not supported for range()".into(),
5555                ));
5556            }
5557        };
5558
5559        let value = plan_value_from_sql_expr(arg_expr)?;
5560        match value {
5561            PlanValue::Integer(v) => Ok(v),
5562            _ => Err(Error::InvalidArgumentError(
5563                "range() argument must be an integer literal".into(),
5564            )),
5565        }
5566    };
5567
5568    let (start, end, row_count) = if args.len() == 1 {
5569        // range(count) - generate [0, count)
5570        let count = extract_int(&args[0])?;
5571        if count < 0 {
5572            return Err(Error::InvalidArgumentError(
5573                "range() argument must be non-negative".into(),
5574            ));
5575        }
5576        (0, count, count as usize)
5577    } else {
5578        // range(start, end) - generate [start, end)
5579        let start = extract_int(&args[0])?;
5580        let end = extract_int(&args[1])?;
5581        if end < start {
5582            return Err(Error::InvalidArgumentError(
5583                "range() end must be >= start".into(),
5584            ));
5585        }
5586        let row_count = (end - start) as usize;
5587        (start, end, row_count)
5588    };
5589
5590    let column_name_lower = alias
5591        .as_ref()
5592        .and_then(|a| {
5593            a.columns
5594                .first()
5595                .map(|col| col.name.value.to_ascii_lowercase())
5596        })
5597        .unwrap_or_else(|| "range".to_string());
5598    let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
5599
5600    Ok(Some(RuntimeRangeSpec {
5601        start,
5602        end,
5603        row_count,
5604        column_name_lower,
5605        table_alias_lower,
5606    }))
5607}
5608
5609pub struct RuntimeCreateTableBuilder<'ctx, P>
5610where
5611    P: Pager<Blob = EntryHandle> + Send + Sync,
5612{
5613    ctx: &'ctx RuntimeContext<P>,
5614    plan: CreateTablePlan,
5615}
5616
5617impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
5618where
5619    P: Pager<Blob = EntryHandle> + Send + Sync,
5620{
5621    pub fn if_not_exists(mut self) -> Self {
5622        self.plan.if_not_exists = true;
5623        self
5624    }
5625
5626    pub fn or_replace(mut self) -> Self {
5627        self.plan.or_replace = true;
5628        self
5629    }
5630
5631    pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5632        self.plan
5633            .columns
5634            .push(ColumnSpec::new(name.into(), data_type, true));
5635        self
5636    }
5637
5638    pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5639        self.plan
5640            .columns
5641            .push(ColumnSpec::new(name.into(), data_type, false));
5642        self
5643    }
5644
5645    pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
5646        self.plan.columns.push(spec);
5647        self
5648    }
5649
5650    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
5651        self.ctx.execute_create_table(self.plan)
5652    }
5653}
5654
5655#[derive(Clone, Debug, Default)]
5656pub struct RuntimeRow {
5657    values: Vec<(String, PlanValue)>,
5658}
5659
5660impl RuntimeRow {
5661    pub fn new() -> Self {
5662        Self { values: Vec::new() }
5663    }
5664
5665    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
5666        self.set(name, value);
5667        self
5668    }
5669
5670    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
5671        let name = name.into();
5672        let value = value.into();
5673        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
5674            *existing = value;
5675        } else {
5676            self.values.push((name, value));
5677        }
5678        self
5679    }
5680
5681    fn columns(&self) -> Vec<String> {
5682        self.values.iter().map(|(n, _)| n.clone()).collect()
5683    }
5684
5685    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
5686        let mut out = Vec::with_capacity(columns.len());
5687        for column in columns {
5688            let value = self
5689                .values
5690                .iter()
5691                .find(|(name, _)| name == column)
5692                .ok_or_else(|| {
5693                    Error::InvalidArgumentError(format!(
5694                        "insert row missing value for column '{}'",
5695                        column
5696                    ))
5697                })?;
5698            out.push(value.1.clone());
5699        }
5700        Ok(out)
5701    }
5702}
5703
5704pub fn row() -> RuntimeRow {
5705    RuntimeRow::new()
5706}
5707
5708#[doc(hidden)]
5709pub enum RuntimeInsertRowKind {
5710    Named {
5711        columns: Vec<String>,
5712        values: Vec<PlanValue>,
5713    },
5714    Positional(Vec<PlanValue>),
5715}
5716
5717pub trait IntoInsertRow {
5718    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
5719}
5720
5721impl IntoInsertRow for RuntimeRow {
5722    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5723        let row = self;
5724        if row.values.is_empty() {
5725            return Err(Error::InvalidArgumentError(
5726                "insert requires at least one column".into(),
5727            ));
5728        }
5729        let columns = row.columns();
5730        let values = row.values_for_columns(&columns)?;
5731        Ok(RuntimeInsertRowKind::Named { columns, values })
5732    }
5733}
5734
5735// Remove the generic impl for `&T` which caused unconditional-recursion
5736// and noop-clone clippy warnings. Callers can pass owned values or use
5737// the provided tuple/array/Vec implementations.
5738
5739impl<T> IntoInsertRow for Vec<T>
5740where
5741    T: Into<PlanValue>,
5742{
5743    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5744        if self.is_empty() {
5745            return Err(Error::InvalidArgumentError(
5746                "insert requires at least one column".into(),
5747            ));
5748        }
5749        Ok(RuntimeInsertRowKind::Positional(
5750            self.into_iter().map(Into::into).collect(),
5751        ))
5752    }
5753}
5754
5755impl<T, const N: usize> IntoInsertRow for [T; N]
5756where
5757    T: Into<PlanValue>,
5758{
5759    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5760        if N == 0 {
5761            return Err(Error::InvalidArgumentError(
5762                "insert requires at least one column".into(),
5763            ));
5764        }
5765        Ok(RuntimeInsertRowKind::Positional(
5766            self.into_iter().map(Into::into).collect(),
5767        ))
5768    }
5769}
5770
5771macro_rules! impl_into_insert_row_tuple {
5772    ($($type:ident => $value:ident),+) => {
5773        impl<$($type,)+> IntoInsertRow for ($($type,)+)
5774        where
5775            $($type: Into<PlanValue>,)+
5776        {
5777            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5778                let ($($value,)+) = self;
5779                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
5780            }
5781        }
5782    };
5783}
5784
5785impl_into_insert_row_tuple!(T1 => v1);
5786impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
5787impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
5788impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
5789impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
5790impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
5791impl_into_insert_row_tuple!(
5792    T1 => v1,
5793    T2 => v2,
5794    T3 => v3,
5795    T4 => v4,
5796    T5 => v5,
5797    T6 => v6,
5798    T7 => v7
5799);
5800impl_into_insert_row_tuple!(
5801    T1 => v1,
5802    T2 => v2,
5803    T3 => v3,
5804    T4 => v4,
5805    T5 => v5,
5806    T6 => v6,
5807    T7 => v7,
5808    T8 => v8
5809);
5810
5811pub struct RuntimeTableHandle<P>
5812where
5813    P: Pager<Blob = EntryHandle> + Send + Sync,
5814{
5815    context: Arc<RuntimeContext<P>>,
5816    display_name: String,
5817    _canonical_name: String,
5818}
5819
5820impl<P> RuntimeTableHandle<P>
5821where
5822    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
5823{
5824    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
5825        let (display_name, canonical_name) = canonical_table_name(name)?;
5826        context.lookup_table(&canonical_name)?;
5827        Ok(Self {
5828            context,
5829            display_name,
5830            _canonical_name: canonical_name,
5831        })
5832    }
5833
5834    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
5835        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
5836    }
5837
5838    pub fn insert_rows<R>(
5839        &self,
5840        rows: impl IntoIterator<Item = R>,
5841    ) -> Result<RuntimeStatementResult<P>>
5842    where
5843        R: IntoInsertRow,
5844    {
5845        enum InsertMode {
5846            Named,
5847            Positional,
5848        }
5849
5850        let table = self.context.lookup_table(&self._canonical_name)?;
5851        let schema = table.schema.as_ref();
5852        let schema_column_names: Vec<String> =
5853            schema.columns.iter().map(|col| col.name.clone()).collect();
5854        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
5855        let mut mode: Option<InsertMode> = None;
5856        let mut column_names: Option<Vec<String>> = None;
5857        let mut row_count = 0usize;
5858
5859        for row in rows.into_iter() {
5860            row_count += 1;
5861            match row.into_insert_row()? {
5862                RuntimeInsertRowKind::Named { columns, values } => {
5863                    if let Some(existing) = &mode {
5864                        if !matches!(existing, InsertMode::Named) {
5865                            return Err(Error::InvalidArgumentError(
5866                                "cannot mix positional and named insert rows".into(),
5867                            ));
5868                        }
5869                    } else {
5870                        mode = Some(InsertMode::Named);
5871                        let mut seen =
5872                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
5873                        for column in &columns {
5874                            if !seen.insert(column.clone()) {
5875                                return Err(Error::InvalidArgumentError(format!(
5876                                    "duplicate column '{}' in insert row",
5877                                    column
5878                                )));
5879                            }
5880                        }
5881                        column_names = Some(columns.clone());
5882                    }
5883
5884                    let expected = column_names
5885                        .as_ref()
5886                        .expect("column names must be initialized for named insert");
5887                    if columns != *expected {
5888                        return Err(Error::InvalidArgumentError(
5889                            "insert rows must specify the same columns".into(),
5890                        ));
5891                    }
5892                    if values.len() != expected.len() {
5893                        return Err(Error::InvalidArgumentError(format!(
5894                            "insert row expected {} values, found {}",
5895                            expected.len(),
5896                            values.len()
5897                        )));
5898                    }
5899                    normalized_rows.push(values);
5900                }
5901                RuntimeInsertRowKind::Positional(values) => {
5902                    if let Some(existing) = &mode {
5903                        if !matches!(existing, InsertMode::Positional) {
5904                            return Err(Error::InvalidArgumentError(
5905                                "cannot mix positional and named insert rows".into(),
5906                            ));
5907                        }
5908                    } else {
5909                        mode = Some(InsertMode::Positional);
5910                        column_names = Some(schema_column_names.clone());
5911                    }
5912
5913                    if values.len() != schema.columns.len() {
5914                        return Err(Error::InvalidArgumentError(format!(
5915                            "insert row expected {} values, found {}",
5916                            schema.columns.len(),
5917                            values.len()
5918                        )));
5919                    }
5920                    normalized_rows.push(values);
5921                }
5922            }
5923        }
5924
5925        if row_count == 0 {
5926            return Err(Error::InvalidArgumentError(
5927                "insert requires at least one row".into(),
5928            ));
5929        }
5930
5931        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
5932        self.insert_row_batch(RowBatch {
5933            columns,
5934            rows: normalized_rows,
5935        })
5936    }
5937
5938    pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
5939        if batch.rows.is_empty() {
5940            return Err(Error::InvalidArgumentError(
5941                "insert requires at least one row".into(),
5942            ));
5943        }
5944        if batch.columns.is_empty() {
5945            return Err(Error::InvalidArgumentError(
5946                "insert requires at least one column".into(),
5947            ));
5948        }
5949        for row in &batch.rows {
5950            if row.len() != batch.columns.len() {
5951                return Err(Error::InvalidArgumentError(
5952                    "insert rows must have values for every column".into(),
5953                ));
5954            }
5955        }
5956
5957        let plan = InsertPlan {
5958            table: self.display_name.clone(),
5959            columns: batch.columns,
5960            source: InsertSource::Rows(batch.rows),
5961        };
5962        self.context.insert(plan)
5963    }
5964
5965    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
5966        let plan = InsertPlan {
5967            table: self.display_name.clone(),
5968            columns: Vec::new(),
5969            source: InsertSource::Batches(batches),
5970        };
5971        self.context.insert(plan)
5972    }
5973
5974    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
5975        let RowBatch { columns, rows } = frame.collect_rows()?;
5976        self.insert_row_batch(RowBatch { columns, rows })
5977    }
5978
5979    pub fn name(&self) -> &str {
5980        &self.display_name
5981    }
5982}
5983
5984#[cfg(test)]
5985mod tests {
5986    use super::*;
5987    use arrow::array::{Array, Int64Array, StringArray};
5988    use llkv_storage::pager::MemPager;
5989    use std::sync::Arc;
5990
5991    #[test]
5992    fn create_insert_select_roundtrip() {
5993        let pager = Arc::new(MemPager::default());
5994        let context = Arc::new(RuntimeContext::new(pager));
5995
5996        let table = context
5997            .create_table(
5998                "people",
5999                [
6000                    ("id", DataType::Int64, NotNull),
6001                    ("name", DataType::Utf8, Nullable),
6002                ],
6003            )
6004            .expect("create table");
6005        table
6006            .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
6007            .expect("insert rows");
6008
6009        let execution = table.lazy().expect("lazy scan");
6010        let select = execution.collect().expect("build select execution");
6011        let batches = select.collect().expect("collect batches");
6012        assert_eq!(batches.len(), 1);
6013        let column = batches[0]
6014            .column(1)
6015            .as_any()
6016            .downcast_ref::<StringArray>()
6017            .expect("string column");
6018        assert_eq!(column.len(), 2);
6019    }
6020
6021    #[test]
6022    fn aggregate_count_nulls() {
6023        let pager = Arc::new(MemPager::default());
6024        let context = Arc::new(RuntimeContext::new(pager));
6025
6026        let table = context
6027            .create_table("ints", [("i", DataType::Int64)])
6028            .expect("create table");
6029        table
6030            .insert_rows([
6031                (PlanValue::Null,),
6032                (PlanValue::Integer(1),),
6033                (PlanValue::Null,),
6034            ])
6035            .expect("insert rows");
6036
6037        let plan =
6038            SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
6039        let execution = context.execute_select(plan).expect("select");
6040        let batches = execution.collect().expect("collect batches");
6041        let column = batches[0]
6042            .column(0)
6043            .as_any()
6044            .downcast_ref::<Int64Array>()
6045            .expect("int column");
6046        assert_eq!(column.value(0), 2);
6047    }
6048}