llkv_runtime/
lib.rs

1//! Query execution runtime for LLKV.
2//!
3//! This crate provides the runtime API (see [`RuntimeEngine`]) for executing SQL plans with full
4//! transaction support. It coordinates between the transaction layer, storage layer,
5//! and query executor to provide a complete database runtime.
6//!
7//! # Key Components
8//!
9//! - **[`RuntimeEngine`]**: Main execution engine for SQL operations
10//! - **[`RuntimeSession`]**: Session-level interface with transaction management
11//! - **[`TransactionContext`]**: Single-transaction execution context
12//! - **Table Provider**: Integration with the query executor for table access
13//!
14//! # Transaction Support
15//!
16//! The runtime supports both:
17//! - **Auto-commit**: Single-statement transactions (uses `TXN_ID_AUTO_COMMIT`)
18//! - **Multi-statement**: Explicit BEGIN/COMMIT/ROLLBACK transactions
19//!
20//! # MVCC Integration
21//!
22//! All data modifications automatically include MVCC metadata:
23//! - `row_id`: Unique row identifier
24//! - `created_by`: Transaction ID that created the row
25//! - `deleted_by`: Transaction ID that deleted the row (or `TXN_ID_NONE`)
26//!
27//! The runtime ensures these columns are injected and managed consistently.
28
29#![forbid(unsafe_code)]
30
31pub mod storage_namespace;
32
33use std::fmt;
34use std::marker::PhantomData;
35use std::mem;
36use std::ops::Bound;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, RwLock};
39use std::time::{SystemTime, UNIX_EPOCH};
40
41use rustc_hash::{FxHashMap, FxHashSet};
42
43use arrow::array::{
44    Array, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
45    UInt64Array, UInt64Builder,
46};
47use arrow::datatypes::{DataType, Field, FieldRef, Schema};
48use arrow::record_batch::RecordBatch;
49use llkv_column_map::ColumnStore;
50use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
51use llkv_column_map::types::LogicalFieldId;
52use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
53use llkv_result::Error;
54use llkv_storage::pager::{BoxedPager, MemPager, Pager};
55use llkv_table::catalog::{FieldConstraints, FieldDefinition, TableCatalog};
56use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
57use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId};
58use llkv_table::{
59    CatalogManager, ConstraintColumnInfo, ConstraintService, CreateTableResult, ForeignKeyColumn,
60    ForeignKeyTableInfo, ForeignKeyView, InsertColumnConstraint, InsertMultiColumnUnique,
61    InsertUniqueColumn, MetadataManager, MultiColumnUniqueEntryMeta, MultiColumnUniqueRegistration,
62    SysCatalog, TableConstraintSummaryView, TableView, UniqueKey, build_composite_unique_key,
63    canonical_table_name, constraints::ConstraintKind, ensure_multi_column_unique,
64    ensure_single_column_unique,
65};
66use simd_r_drive_entry_handle::EntryHandle;
67use sqlparser::ast::{
68    Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
69    SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
70    ValueWithSpan,
71};
72use time::{Date, Month};
73
74pub type Result<T> = llkv_result::Result<T>;
75
76// Re-export plan structures from llkv-plan
77pub use llkv_plan::{
78    AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
79    ColumnSpec, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan, ForeignKeyAction,
80    ForeignKeySpec, IndexColumnPlan, InsertPlan, InsertSource, IntoColumnSpec, NotNull, Nullable,
81    OrderByPlan, OrderSortType, OrderTarget, PlanOperation, PlanStatement, PlanValue, SelectPlan,
82    SelectProjection, UpdatePlan,
83};
84
85// Execution structures from llkv-executor
86use llkv_executor::{ExecutorColumn, ExecutorMultiColumnUnique, ExecutorSchema, ExecutorTable};
87pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
88
89use crate::storage_namespace::{
90    PersistentNamespace, StorageNamespace, StorageNamespaceRegistry, TemporaryNamespace,
91};
92
93// Import transaction structures from llkv-transaction for internal use.
94pub use llkv_transaction::TransactionKind;
95use llkv_transaction::{
96    RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
97    TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
98};
99
100// Internal low-level transaction session type (from llkv-transaction)
101use llkv_transaction::TransactionSession;
102
103// Note: RuntimeSession is the high-level wrapper that users should use instead of the lower-level TransactionSession API
104
105use llkv_table::mvcc;
106
107struct TableConstraintContext {
108    schema_field_ids: Vec<FieldId>,
109    column_constraints: Vec<InsertColumnConstraint>,
110    unique_columns: Vec<InsertUniqueColumn>,
111    multi_column_uniques: Vec<InsertMultiColumnUnique>,
112    primary_key: Option<InsertMultiColumnUnique>,
113}
114
115/// Result of running a plan statement.
116#[allow(clippy::large_enum_variant)]
117#[derive(Clone)]
118pub enum RuntimeStatementResult<P>
119where
120    P: Pager<Blob = EntryHandle> + Send + Sync,
121{
122    CreateTable {
123        table_name: String,
124    },
125    CreateIndex {
126        table_name: String,
127        index_name: Option<String>,
128    },
129    NoOp,
130    Insert {
131        table_name: String,
132        rows_inserted: usize,
133    },
134    Update {
135        table_name: String,
136        rows_updated: usize,
137    },
138    Delete {
139        table_name: String,
140        rows_deleted: usize,
141    },
142    Select {
143        table_name: String,
144        schema: Arc<Schema>,
145        execution: SelectExecution<P>,
146    },
147    Transaction {
148        kind: TransactionKind,
149    },
150}
151
152impl<P> fmt::Debug for RuntimeStatementResult<P>
153where
154    P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        match self {
158            RuntimeStatementResult::CreateTable { table_name } => f
159                .debug_struct("CreateTable")
160                .field("table_name", table_name)
161                .finish(),
162            RuntimeStatementResult::CreateIndex {
163                table_name,
164                index_name,
165            } => f
166                .debug_struct("CreateIndex")
167                .field("table_name", table_name)
168                .field("index_name", index_name)
169                .finish(),
170            RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
171            RuntimeStatementResult::Insert {
172                table_name,
173                rows_inserted,
174            } => f
175                .debug_struct("Insert")
176                .field("table_name", table_name)
177                .field("rows_inserted", rows_inserted)
178                .finish(),
179            RuntimeStatementResult::Update {
180                table_name,
181                rows_updated,
182            } => f
183                .debug_struct("Update")
184                .field("table_name", table_name)
185                .field("rows_updated", rows_updated)
186                .finish(),
187            RuntimeStatementResult::Delete {
188                table_name,
189                rows_deleted,
190            } => f
191                .debug_struct("Delete")
192                .field("table_name", table_name)
193                .field("rows_deleted", rows_deleted)
194                .finish(),
195            RuntimeStatementResult::Select {
196                table_name, schema, ..
197            } => f
198                .debug_struct("Select")
199                .field("table_name", table_name)
200                .field("schema", schema)
201                .finish(),
202            RuntimeStatementResult::Transaction { kind } => {
203                f.debug_struct("Transaction").field("kind", kind).finish()
204            }
205        }
206    }
207}
208
209impl<P> RuntimeStatementResult<P>
210where
211    P: Pager<Blob = EntryHandle> + Send + Sync,
212{
213    /// Convert a StatementResult from one pager type to another.
214    /// Only works for non-SELECT results (CreateTable, Insert, Update, Delete, NoOp, Transaction).
215    #[allow(dead_code)]
216    pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
217    where
218        Q: Pager<Blob = EntryHandle> + Send + Sync,
219    {
220        match self {
221            RuntimeStatementResult::CreateTable { table_name } => {
222                Ok(RuntimeStatementResult::CreateTable { table_name })
223            }
224            RuntimeStatementResult::CreateIndex {
225                table_name,
226                index_name,
227            } => Ok(RuntimeStatementResult::CreateIndex {
228                table_name,
229                index_name,
230            }),
231            RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
232            RuntimeStatementResult::Insert {
233                table_name,
234                rows_inserted,
235            } => Ok(RuntimeStatementResult::Insert {
236                table_name,
237                rows_inserted,
238            }),
239            RuntimeStatementResult::Update {
240                table_name,
241                rows_updated,
242            } => Ok(RuntimeStatementResult::Update {
243                table_name,
244                rows_updated,
245            }),
246            RuntimeStatementResult::Delete {
247                table_name,
248                rows_deleted,
249            } => Ok(RuntimeStatementResult::Delete {
250                table_name,
251                rows_deleted,
252            }),
253            RuntimeStatementResult::Transaction { kind } => {
254                Ok(RuntimeStatementResult::Transaction { kind })
255            }
256            RuntimeStatementResult::Select { .. } => Err(Error::Internal(
257                "Cannot convert SELECT result between pager types in transaction".into(),
258            )),
259        }
260    }
261}
262
263/// Return the table name referenced by a plan statement, if any.
264///
265/// This is a small helper used by higher-level engines (for example the
266/// SQL front-end) to provide better error messages when a statement fails
267/// with a table-related error. It intentionally returns an `Option<&str>` so
268/// callers can decide how to report missing table context.
269pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
270    match statement {
271        PlanStatement::CreateTable(plan) => Some(&plan.name),
272        PlanStatement::CreateIndex(plan) => Some(&plan.table),
273        PlanStatement::Insert(plan) => Some(&plan.table),
274        PlanStatement::Update(plan) => Some(&plan.table),
275        PlanStatement::Delete(plan) => Some(&plan.table),
276        PlanStatement::Select(plan) => {
277            // Return Some only for single-table queries
278            if plan.tables.len() == 1 {
279                Some(&plan.tables[0].table)
280            } else {
281                None
282            }
283        }
284        PlanStatement::BeginTransaction
285        | PlanStatement::CommitTransaction
286        | PlanStatement::RollbackTransaction => None,
287    }
288}
289
290// ============================================================================
291// Plan Structures (now in llkv-plan and re-exported above)
292// ============================================================================
293//
294// The following types are defined in llkv-plan and re-exported:
295// - plan values, CreateTablePlan, ColumnSpec, IntoColumnSpec
296// - InsertPlan, InsertSource, UpdatePlan, DeletePlan
297// - SelectPlan, SelectProjection, AggregateExpr, AggregateFunction
298// - OrderByPlan, OrderSortType, OrderTarget
299// - PlanOperation
300//
301// This separation allows plans to be used independently of execution logic.
302// ============================================================================
303
304// Transaction management is now handled by llkv-transaction crate
305// The SessionTransaction and TableDeltaState types are re-exported from there
306
307/// Wrapper for Context that implements TransactionContext
308pub struct RuntimeContextWrapper<P>
309where
310    P: Pager<Blob = EntryHandle> + Send + Sync,
311{
312    ctx: Arc<RuntimeContext<P>>,
313    snapshot: RwLock<TransactionSnapshot>,
314}
315
316impl<P> RuntimeContextWrapper<P>
317where
318    P: Pager<Blob = EntryHandle> + Send + Sync,
319{
320    fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
321        let snapshot = ctx.default_snapshot();
322        Self {
323            ctx,
324            snapshot: RwLock::new(snapshot),
325        }
326    }
327
328    fn update_snapshot(&self, snapshot: TransactionSnapshot) {
329        let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
330        *guard = snapshot;
331    }
332
333    fn current_snapshot(&self) -> TransactionSnapshot {
334        *self.snapshot.read().expect("snapshot lock poisoned")
335    }
336
337    fn context(&self) -> &Arc<RuntimeContext<P>> {
338        &self.ctx
339    }
340
341    fn ctx(&self) -> &RuntimeContext<P> {
342        &self.ctx
343    }
344}
345
346struct SessionNamespaces<P>
347where
348    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
349{
350    persistent: Arc<PersistentNamespace<P>>,
351    temporary: Option<Arc<TemporaryNamespace<BoxedPager>>>,
352    registry: Arc<RwLock<StorageNamespaceRegistry>>,
353}
354
355impl<P> SessionNamespaces<P>
356where
357    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
358{
359    fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
360        let persistent = Arc::new(PersistentNamespace::new(
361            storage_namespace::PERSISTENT_NAMESPACE_ID.to_string(),
362            Arc::clone(&base_context),
363        ));
364
365        let mut registry = StorageNamespaceRegistry::new(persistent.namespace_id().clone());
366        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
367
368        let temporary = {
369            let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
370            let temp_context = Arc::new(RuntimeContext::new(temp_pager));
371            let namespace = Arc::new(TemporaryNamespace::new(
372                storage_namespace::TEMPORARY_NAMESPACE_ID.to_string(),
373                temp_context,
374            ));
375            registry.register_namespace(
376                Arc::clone(&namespace),
377                vec![storage_namespace::TEMPORARY_NAMESPACE_ID.to_string()],
378                true,
379            );
380            namespace
381        };
382
383        Self {
384            persistent,
385            temporary: Some(temporary),
386            registry: Arc::new(RwLock::new(registry)),
387        }
388    }
389
390    fn persistent(&self) -> Arc<PersistentNamespace<P>> {
391        Arc::clone(&self.persistent)
392    }
393
394    fn temporary(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
395        self.temporary.as_ref().map(Arc::clone)
396    }
397
398    fn registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
399        Arc::clone(&self.registry)
400    }
401}
402
403impl<P> Drop for SessionNamespaces<P>
404where
405    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
406{
407    fn drop(&mut self) {
408        if let Some(temp) = &self.temporary {
409            temp.clear_tables();
410        }
411    }
412}
413
414/// A session for executing operations with optional transaction support.
415///
416/// This is a high-level wrapper around the transaction machinery that provides
417/// a clean API for users. Operations can be executed directly or within a transaction.
418pub struct RuntimeSession<P>
419where
420    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
421{
422    // TODO: Allow generic pager type
423    inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
424    namespaces: Arc<SessionNamespaces<P>>,
425}
426
427impl<P> RuntimeSession<P>
428where
429    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
430{
431    /// Clone this session (reuses the same underlying TransactionSession).
432    /// This is necessary to maintain transaction state across Engine clones.
433    pub(crate) fn clone_session(&self) -> Self {
434        Self {
435            inner: self.inner.clone_session(),
436            namespaces: self.namespaces.clone(),
437        }
438    }
439
440    pub fn namespace_registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
441        self.namespaces.registry()
442    }
443
444    fn resolve_namespace_for_table(&self, canonical: &str) -> storage_namespace::NamespaceId {
445        self.namespace_registry()
446            .read()
447            .expect("namespace registry poisoned")
448            .namespace_for_table(canonical)
449    }
450
451    fn namespace_for_select_plan(
452        &self,
453        plan: &SelectPlan,
454    ) -> Option<storage_namespace::NamespaceId> {
455        if plan.tables.len() != 1 {
456            return None;
457        }
458
459        let qualified = plan.tables[0].qualified_name();
460        let (_, canonical) = canonical_table_name(&qualified).ok()?;
461        Some(self.resolve_namespace_for_table(&canonical))
462    }
463
464    fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
465        let temp_namespace = self
466            .temporary_namespace()
467            .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
468
469        let table_name = if plan.tables.len() == 1 {
470            plan.tables[0].qualified_name()
471        } else {
472            String::new()
473        };
474
475        let execution = temp_namespace.context().execute_select(plan.clone())?;
476        let schema = execution.schema();
477        let batches = execution.collect()?;
478
479        let combined = if batches.is_empty() {
480            RecordBatch::new_empty(Arc::clone(&schema))
481        } else if batches.len() == 1 {
482            batches.into_iter().next().unwrap()
483        } else {
484            let refs: Vec<&RecordBatch> = batches.iter().collect();
485            arrow::compute::concat_batches(&schema, refs)?
486        };
487
488        let execution =
489            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
490
491        Ok(RuntimeStatementResult::Select {
492            execution,
493            table_name,
494            schema,
495        })
496    }
497
498    fn persistent_namespace(&self) -> Arc<PersistentNamespace<P>> {
499        self.namespaces.persistent()
500    }
501
502    #[allow(dead_code)]
503    fn temporary_namespace(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
504        self.namespaces.temporary()
505    }
506
507    /// Begin a transaction in this session.
508    /// Creates an empty staging context for new tables created within the transaction.
509    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
510    pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
511        let staging_pager = Arc::new(MemPager::default());
512        tracing::trace!(
513            "BEGIN_TRANSACTION: Created staging pager at {:p}",
514            &*staging_pager
515        );
516        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
517
518        // Staging context is EMPTY - used only for tables created within the transaction.
519        // Existing tables are read from base context with MVCC visibility filtering.
520        // No data copying occurs at BEGIN - this is pure MVCC.
521
522        let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
523
524        self.inner.begin_transaction(staging_wrapper)?;
525        Ok(RuntimeStatementResult::Transaction {
526            kind: TransactionKind::Begin,
527        })
528    }
529
530    /// Mark the current transaction as aborted due to an error.
531    /// This should be called when any error occurs during a transaction.
532    pub fn abort_transaction(&self) {
533        self.inner.abort_transaction();
534    }
535
536    /// Check if this session has an active transaction.
537    pub fn has_active_transaction(&self) -> bool {
538        let result = self.inner.has_active_transaction();
539        tracing::trace!("SESSION: has_active_transaction() = {}", result);
540        result
541    }
542
543    /// Check if the current transaction has been aborted due to an error.
544    pub fn is_aborted(&self) -> bool {
545        self.inner.is_aborted()
546    }
547
548    /// Commit the current transaction and apply changes to the base context.
549    /// If the transaction was aborted, this acts as a ROLLBACK instead.
550    pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
551        tracing::trace!("Session::commit_transaction called");
552        let (tx_result, operations) = self.inner.commit_transaction()?;
553        tracing::trace!(
554            "Session::commit_transaction got {} operations",
555            operations.len()
556        );
557
558        if !operations.is_empty() {
559            let dropped_tables = self
560                .inner
561                .context()
562                .ctx()
563                .dropped_tables
564                .read()
565                .unwrap()
566                .clone();
567            if !dropped_tables.is_empty() {
568                for operation in &operations {
569                    let table_name_opt = match operation {
570                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
571                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
572                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
573                        _ => None,
574                    };
575                    if let Some(table_name) = table_name_opt {
576                        let (_, canonical) = canonical_table_name(table_name)?;
577                        if dropped_tables.contains(&canonical) {
578                            self.abort_transaction();
579                            return Err(Error::TransactionContextError(
580                                "another transaction has dropped this table".into(),
581                            ));
582                        }
583                    }
584                }
585            }
586        }
587
588        // Extract the transaction kind from the transaction module's result
589        let kind = match tx_result {
590            TransactionResult::Transaction { kind } => kind,
591            _ => {
592                return Err(Error::Internal(
593                    "commit_transaction returned non-transaction result".into(),
594                ));
595            }
596        };
597        tracing::trace!("Session::commit_transaction kind={:?}", kind);
598
599        // Only replay operations if there are any (empty if transaction was aborted)
600        for operation in operations {
601            match operation {
602                PlanOperation::CreateTable(plan) => {
603                    TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
604                }
605                PlanOperation::Insert(plan) => {
606                    TransactionContext::insert(&**self.inner.context(), plan)?;
607                }
608                PlanOperation::Update(plan) => {
609                    TransactionContext::update(&**self.inner.context(), plan)?;
610                }
611                PlanOperation::Delete(plan) => {
612                    TransactionContext::delete(&**self.inner.context(), plan)?;
613                }
614                _ => {}
615            }
616        }
617
618        // Reset the base context snapshot to the default auto-commit view now that
619        // the transaction has been replayed onto the base tables.
620        let base_ctx = self.inner.context();
621        let default_snapshot = base_ctx.ctx().default_snapshot();
622        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
623
624        // Persist the next_txn_id to the catalog after a successful commit
625        if matches!(kind, TransactionKind::Commit) {
626            let ctx = base_ctx.ctx();
627            let next_txn_id = ctx.txn_manager().current_next_txn_id();
628            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
629                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
630            }
631        }
632
633        // Return a StatementResult with the correct kind (Commit or Rollback)
634        Ok(RuntimeStatementResult::Transaction { kind })
635    }
636
637    /// Rollback the current transaction, discarding all changes.
638    pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
639        self.inner.rollback_transaction()?;
640        let base_ctx = self.inner.context();
641        let default_snapshot = base_ctx.ctx().default_snapshot();
642        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
643        Ok(RuntimeStatementResult::Transaction {
644            kind: TransactionKind::Rollback,
645        })
646    }
647
648    fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
649        if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
650            let select_result = self.select(*select_plan)?;
651            let (schema, batches) = match select_result {
652                RuntimeStatementResult::Select {
653                    schema, execution, ..
654                } => {
655                    let batches = execution.collect()?;
656                    (schema, batches)
657                }
658                _ => {
659                    return Err(Error::Internal(
660                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
661                    ));
662                }
663            };
664            plan.source = Some(CreateTableSource::Batches { schema, batches });
665        }
666        Ok(plan)
667    }
668
669    /// Create a table (outside or inside transaction).
670    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
671        let mut plan = self.materialize_create_table_plan(plan)?;
672        let namespace_id = plan
673            .namespace
674            .clone()
675            .unwrap_or_else(|| storage_namespace::PERSISTENT_NAMESPACE_ID.to_string());
676        plan.namespace = Some(namespace_id.clone());
677
678        match namespace_id.as_str() {
679            storage_namespace::TEMPORARY_NAMESPACE_ID => {
680                let temp_namespace = self
681                    .temporary_namespace()
682                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
683                temp_namespace.create_table(plan)?.convert_pager_type::<P>()
684            }
685            storage_namespace::PERSISTENT_NAMESPACE_ID => {
686                if self.has_active_transaction() {
687                    let table_name = plan.name.clone();
688                    match self
689                        .inner
690                        .execute_operation(PlanOperation::CreateTable(plan))
691                    {
692                        Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
693                        Err(e) => {
694                            // If an error occurs during a transaction, abort it
695                            self.abort_transaction();
696                            Err(e)
697                        }
698                    }
699                } else {
700                    self.persistent_namespace().create_table(plan)
701                }
702            }
703            other => Err(Error::InvalidArgumentError(format!(
704                "Unknown storage namespace '{}'",
705                other
706            ))),
707        }
708    }
709
710    pub fn drop_table(&self, name: &str, if_exists: bool) -> Result<()> {
711        let (_, canonical_table) = canonical_table_name(name)?;
712        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
713
714        match namespace_id.as_str() {
715            storage_namespace::TEMPORARY_NAMESPACE_ID => {
716                let temp_namespace = self
717                    .temporary_namespace()
718                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
719                temp_namespace.drop_table(name, if_exists)
720            }
721            storage_namespace::PERSISTENT_NAMESPACE_ID => {
722                self.persistent_namespace().drop_table(name, if_exists)
723            }
724            other => Err(Error::InvalidArgumentError(format!(
725                "Unknown storage namespace '{}'",
726                other
727            ))),
728        }
729    }
730    /// Create an index (auto-commit only for now).
731    pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
732        let (_, canonical_table) = canonical_table_name(&plan.table)?;
733        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
734
735        match namespace_id.as_str() {
736            storage_namespace::TEMPORARY_NAMESPACE_ID => {
737                let temp_namespace = self
738                    .temporary_namespace()
739                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
740                temp_namespace.create_index(plan)?.convert_pager_type::<P>()
741            }
742            storage_namespace::PERSISTENT_NAMESPACE_ID => {
743                if self.has_active_transaction() {
744                    return Err(Error::InvalidArgumentError(
745                        "CREATE INDEX is not supported inside an active transaction".into(),
746                    ));
747                }
748
749                self.persistent_namespace().create_index(plan)
750            }
751            other => Err(Error::InvalidArgumentError(format!(
752                "Unknown storage namespace '{}'",
753                other
754            ))),
755        }
756    }
757
758    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
759        let InsertPlan {
760            table,
761            columns,
762            source,
763        } = plan;
764
765        match source {
766            InsertSource::Rows(rows) => {
767                let count = rows.len();
768                Ok((
769                    InsertPlan {
770                        table,
771                        columns,
772                        source: InsertSource::Rows(rows),
773                    },
774                    count,
775                ))
776            }
777            InsertSource::Batches(batches) => {
778                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
779                Ok((
780                    InsertPlan {
781                        table,
782                        columns,
783                        source: InsertSource::Batches(batches),
784                    },
785                    count,
786                ))
787            }
788            InsertSource::Select { plan: select_plan } => {
789                let select_result = self.select(*select_plan)?;
790                let rows = match select_result {
791                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
792                    _ => {
793                        return Err(Error::Internal(
794                            "expected Select result when executing INSERT ... SELECT".into(),
795                        ));
796                    }
797                };
798                let count = rows.len();
799                Ok((
800                    InsertPlan {
801                        table,
802                        columns,
803                        source: InsertSource::Rows(rows),
804                    },
805                    count,
806                ))
807            }
808        }
809    }
810
811    /// Insert rows (outside or inside transaction).
812    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
813        tracing::trace!("Session::insert called for table={}", plan.table);
814        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
815        let table_name = plan.table.clone();
816        let (_, canonical_table) = canonical_table_name(&plan.table)?;
817        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
818
819        match namespace_id.as_str() {
820            storage_namespace::TEMPORARY_NAMESPACE_ID => {
821                let temp_namespace = self
822                    .temporary_namespace()
823                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
824                temp_namespace
825                    .context()
826                    .insert(plan)?
827                    .convert_pager_type::<P>()?;
828                Ok(RuntimeStatementResult::Insert {
829                    rows_inserted,
830                    table_name,
831                })
832            }
833            storage_namespace::PERSISTENT_NAMESPACE_ID => {
834                if self.has_active_transaction() {
835                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
836                        Ok(_) => {
837                            tracing::trace!("Session::insert succeeded for table={}", table_name);
838                            Ok(RuntimeStatementResult::Insert {
839                                rows_inserted,
840                                table_name,
841                            })
842                        }
843                        Err(e) => {
844                            tracing::trace!(
845                                "Session::insert failed for table={}, error={:?}",
846                                table_name,
847                                e
848                            );
849                            if matches!(e, Error::ConstraintError(_)) {
850                                tracing::trace!("Transaction is_aborted=true");
851                                self.abort_transaction();
852                            }
853                            Err(e)
854                        }
855                    }
856                } else {
857                    let context = self.inner.context();
858                    let default_snapshot = context.ctx().default_snapshot();
859                    TransactionContext::set_snapshot(&**context, default_snapshot);
860                    TransactionContext::insert(&**context, plan)?;
861                    Ok(RuntimeStatementResult::Insert {
862                        rows_inserted,
863                        table_name,
864                    })
865                }
866            }
867            other => Err(Error::InvalidArgumentError(format!(
868                "Unknown storage namespace '{}'",
869                other
870            ))),
871        }
872    }
873
874    /// Select rows (outside or inside transaction).
875    pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
876        if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
877            && namespace_id == storage_namespace::TEMPORARY_NAMESPACE_ID
878        {
879            return self.select_from_temporary(plan);
880        }
881
882        if self.has_active_transaction() {
883            let tx_result = match self
884                .inner
885                .execute_operation(PlanOperation::Select(plan.clone()))
886            {
887                Ok(result) => result,
888                Err(e) => {
889                    // Only abort transaction on specific errors (constraint violations, etc.)
890                    // Don't abort on catalog errors (table doesn't exist) or similar
891                    if matches!(e, Error::ConstraintError(_)) {
892                        self.abort_transaction();
893                    }
894                    return Err(e);
895                }
896            };
897            match tx_result {
898                TransactionResult::Select {
899                    table_name,
900                    schema,
901                    execution: staging_execution,
902                } => {
903                    // Convert from staging (MemPager) execution to base pager execution
904                    // by collecting batches and rebuilding
905                    let batches = staging_execution.collect().unwrap_or_default();
906                    let combined = if batches.is_empty() {
907                        RecordBatch::new_empty(Arc::clone(&schema))
908                    } else if batches.len() == 1 {
909                        batches.into_iter().next().unwrap()
910                    } else {
911                        let refs: Vec<&RecordBatch> = batches.iter().collect();
912                        arrow::compute::concat_batches(&schema, refs)?
913                    };
914
915                    let execution = SelectExecution::from_batch(
916                        table_name.clone(),
917                        Arc::clone(&schema),
918                        combined,
919                    );
920
921                    Ok(RuntimeStatementResult::Select {
922                        execution,
923                        table_name,
924                        schema,
925                    })
926                }
927                _ => Err(Error::Internal("expected Select result".into())),
928            }
929        } else {
930            // Call via TransactionContext trait
931            let context = self.inner.context();
932            let default_snapshot = context.ctx().default_snapshot();
933            TransactionContext::set_snapshot(&**context, default_snapshot);
934            let table_name = if plan.tables.len() == 1 {
935                plan.tables[0].qualified_name()
936            } else {
937                String::new()
938            };
939            let execution = TransactionContext::execute_select(&**context, plan)?;
940            let schema = execution.schema();
941            Ok(RuntimeStatementResult::Select {
942                execution,
943                table_name,
944                schema,
945            })
946        }
947    }
948
949    /// Convenience helper to fetch all rows from a table within this session.
950    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
951        let plan =
952            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
953        match self.select(plan)? {
954            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
955            other => Err(Error::Internal(format!(
956                "expected Select result when reading table '{table}', got {:?}",
957                other
958            ))),
959        }
960    }
961
962    /// Update rows (outside or inside transaction).
963    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
964        let (_, canonical_table) = canonical_table_name(&plan.table)?;
965        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
966
967        match namespace_id.as_str() {
968            storage_namespace::TEMPORARY_NAMESPACE_ID => {
969                let temp_namespace = self
970                    .temporary_namespace()
971                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
972                temp_namespace
973                    .context()
974                    .update(plan)?
975                    .convert_pager_type::<P>()
976            }
977            storage_namespace::PERSISTENT_NAMESPACE_ID => {
978                if self.has_active_transaction() {
979                    let table_name = plan.table.clone();
980                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
981                        Ok(result) => result,
982                        Err(e) => {
983                            // If an error occurs during a transaction, abort it
984                            self.abort_transaction();
985                            return Err(e);
986                        }
987                    };
988                    match result {
989                        TransactionResult::Update {
990                            rows_matched: _,
991                            rows_updated,
992                        } => Ok(RuntimeStatementResult::Update {
993                            rows_updated,
994                            table_name,
995                        }),
996                        _ => Err(Error::Internal("expected Update result".into())),
997                    }
998                } else {
999                    // Call via TransactionContext trait
1000                    let context = self.inner.context();
1001                    let default_snapshot = context.ctx().default_snapshot();
1002                    TransactionContext::set_snapshot(&**context, default_snapshot);
1003                    let table_name = plan.table.clone();
1004                    let result = TransactionContext::update(&**context, plan)?;
1005                    match result {
1006                        TransactionResult::Update {
1007                            rows_matched: _,
1008                            rows_updated,
1009                        } => Ok(RuntimeStatementResult::Update {
1010                            rows_updated,
1011                            table_name,
1012                        }),
1013                        _ => Err(Error::Internal("expected Update result".into())),
1014                    }
1015                }
1016            }
1017            other => Err(Error::InvalidArgumentError(format!(
1018                "Unknown storage namespace '{}'",
1019                other
1020            ))),
1021        }
1022    }
1023
1024    /// Delete rows (outside or inside transaction).
1025    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1026        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1027        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1028
1029        match namespace_id.as_str() {
1030            storage_namespace::TEMPORARY_NAMESPACE_ID => {
1031                let temp_namespace = self
1032                    .temporary_namespace()
1033                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1034                temp_namespace
1035                    .context()
1036                    .delete(plan)?
1037                    .convert_pager_type::<P>()
1038            }
1039            storage_namespace::PERSISTENT_NAMESPACE_ID => {
1040                if self.has_active_transaction() {
1041                    let table_name = plan.table.clone();
1042                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1043                        Ok(result) => result,
1044                        Err(e) => {
1045                            // If an error occurs during a transaction, abort it
1046                            self.abort_transaction();
1047                            return Err(e);
1048                        }
1049                    };
1050                    match result {
1051                        TransactionResult::Delete { rows_deleted } => {
1052                            Ok(RuntimeStatementResult::Delete {
1053                                rows_deleted,
1054                                table_name,
1055                            })
1056                        }
1057                        _ => Err(Error::Internal("expected Delete result".into())),
1058                    }
1059                } else {
1060                    // Call via TransactionContext trait
1061                    let context = self.inner.context();
1062                    let default_snapshot = context.ctx().default_snapshot();
1063                    TransactionContext::set_snapshot(&**context, default_snapshot);
1064                    let table_name = plan.table.clone();
1065                    let result = TransactionContext::delete(&**context, plan)?;
1066                    match result {
1067                        TransactionResult::Delete { rows_deleted } => {
1068                            Ok(RuntimeStatementResult::Delete {
1069                                rows_deleted,
1070                                table_name,
1071                            })
1072                        }
1073                        _ => Err(Error::Internal("expected Delete result".into())),
1074                    }
1075                }
1076            }
1077            other => Err(Error::InvalidArgumentError(format!(
1078                "Unknown storage namespace '{}'",
1079                other
1080            ))),
1081        }
1082    }
1083}
1084
1085pub struct RuntimeEngine<P>
1086where
1087    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1088{
1089    context: Arc<RuntimeContext<P>>,
1090    session: RuntimeSession<P>,
1091}
1092
1093impl<P> Clone for RuntimeEngine<P>
1094where
1095    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1096{
1097    fn clone(&self) -> Self {
1098        // IMPORTANT: Reuse the same session to maintain transaction state!
1099        // Creating a new session would break multi-statement transactions.
1100        tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
1101        Self {
1102            context: Arc::clone(&self.context),
1103            session: self.session.clone_session(),
1104        }
1105    }
1106}
1107
1108impl<P> RuntimeEngine<P>
1109where
1110    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1111{
1112    pub fn new(pager: Arc<P>) -> Self {
1113        let context = Arc::new(RuntimeContext::new(pager));
1114        Self::from_context(context)
1115    }
1116
1117    pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
1118        tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
1119        let session = context.create_session();
1120        tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
1121        Self { context, session }
1122    }
1123
1124    pub fn context(&self) -> Arc<RuntimeContext<P>> {
1125        Arc::clone(&self.context)
1126    }
1127
1128    pub fn session(&self) -> &RuntimeSession<P> {
1129        &self.session
1130    }
1131
1132    pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
1133        match statement {
1134            PlanStatement::BeginTransaction => self.session.begin_transaction(),
1135            PlanStatement::CommitTransaction => self.session.commit_transaction(),
1136            PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
1137            PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
1138            PlanStatement::CreateIndex(plan) => self.session.create_index(plan),
1139            PlanStatement::Insert(plan) => self.session.insert(plan),
1140            PlanStatement::Update(plan) => self.session.update(plan),
1141            PlanStatement::Delete(plan) => self.session.delete(plan),
1142            PlanStatement::Select(plan) => self.session.select(plan),
1143        }
1144    }
1145
1146    pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
1147    where
1148        I: IntoIterator<Item = PlanStatement>,
1149    {
1150        let mut results = Vec::new();
1151        for statement in statements {
1152            results.push(self.execute_statement(statement)?);
1153        }
1154        Ok(results)
1155    }
1156}
1157
1158/// In-memory execution context shared by plan-based queries.
1159///
1160/// Important: "lazy loading" here refers to *table metadata only* (schema,
1161/// executor-side column descriptors, and a small next-row-id counter). We do
1162/// NOT eagerly load or materialize the table's row data into memory. All
1163/// row/column data remains on the ColumnStore and is streamed in chunks during
1164/// query execution. This keeps the memory footprint low even for very large
1165/// tables.
1166///
1167/// Typical resource usage:
1168/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
1169/// - ExecutorTable struct: small (handles + counters)
1170/// - Actual table rows: streamed from disk in chunks (never fully resident)
1171pub struct RuntimeContext<P>
1172where
1173    P: Pager<Blob = EntryHandle> + Send + Sync,
1174{
1175    pager: Arc<P>,
1176    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
1177    dropped_tables: RwLock<FxHashSet<String>>,
1178    metadata: Arc<MetadataManager<P>>,
1179    constraint_service: ConstraintService<P>,
1180    catalog_service: CatalogManager<P>,
1181    // Centralized catalog for table/field name resolution
1182    catalog: Arc<TableCatalog>,
1183    // Shared column store for all tables in this context
1184    // This ensures catalog state is synchronized across all tables
1185    store: Arc<ColumnStore<P>>,
1186    // Transaction manager for session-based transactions
1187    transaction_manager:
1188        TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
1189    txn_manager: Arc<TxnIdManager>,
1190    txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
1191}
1192
1193impl<P> RuntimeContext<P>
1194where
1195    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1196{
1197    pub fn new(pager: Arc<P>) -> Self {
1198        Self::new_with_catalog_inner(pager, None)
1199    }
1200
1201    pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
1202        Self::new_with_catalog_inner(pager, Some(catalog))
1203    }
1204
1205    fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
1206        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
1207
1208        let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
1209        let catalog = SysCatalog::new(&store);
1210
1211        let next_txn_id = match catalog.get_next_txn_id() {
1212            Ok(Some(id)) => {
1213                tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
1214                id
1215            }
1216            Ok(None) => {
1217                tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
1218                TXN_ID_AUTO_COMMIT + 1
1219            }
1220            Err(e) => {
1221                tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
1222                TXN_ID_AUTO_COMMIT + 1
1223            }
1224        };
1225
1226        let last_committed = match catalog.get_last_committed_txn_id() {
1227            Ok(Some(id)) => {
1228                tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
1229                id
1230            }
1231            Ok(None) => {
1232                tracing::debug!(
1233                    "[CONTEXT] No persisted last_committed found, starting from default"
1234                );
1235                TXN_ID_AUTO_COMMIT
1236            }
1237            Err(e) => {
1238                tracing::warn!(
1239                    "[CONTEXT] Failed to load last_committed: {}, using default",
1240                    e
1241                );
1242                TXN_ID_AUTO_COMMIT
1243            }
1244        };
1245
1246        let store_arc = Arc::new(store);
1247        let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
1248
1249        let loaded_tables = match metadata.all_table_metas() {
1250            Ok(metas) => {
1251                tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1252                metas
1253            }
1254            Err(e) => {
1255                tracing::warn!(
1256                    "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1257                    e
1258                );
1259                Vec::new()
1260            }
1261        };
1262
1263        let transaction_manager =
1264            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1265        let txn_manager = transaction_manager.txn_manager();
1266
1267        // LAZY LOADING: Only load table metadata at first access. We intentionally
1268        // avoid loading any row/column data into memory here. The executor
1269        // performs streaming reads from the ColumnStore when a query runs, so
1270        // large tables are never fully materialized.
1271        //
1272        // Benefits of this approach:
1273        // - Instant database open (no upfront I/O for table data)
1274        // - Lower memory footprint (only metadata cached)
1275        // - Natural parallelism: if multiple threads request different tables
1276        //   concurrently, those tables will be loaded concurrently by the
1277        //   caller threads (no global preload required).
1278        //
1279        // Future Optimizations (if profiling shows need):
1280        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
1281        // 2. Background preload of catalog entries after startup
1282        // 3. LRU-based eviction for extremely large deployments
1283        // 4. Cache compact representations of schemas to reduce per-table RAM
1284        //
1285        // Note: `loaded_tables` holds catalog metadata that helped us discover
1286        // which tables exist; we discard it here because metadata will be
1287        // fetched on-demand during lazy loads.
1288        tracing::debug!(
1289            "[CONTEXT] Initialized with lazy loading for {} table(s)",
1290            loaded_tables.len()
1291        );
1292
1293        // Initialize catalog and populate with existing tables
1294        let (catalog, is_shared_catalog) = match shared_catalog {
1295            Some(existing) => (existing, true),
1296            None => (Arc::new(TableCatalog::new()), false),
1297        };
1298        for (table_id, table_meta) in &loaded_tables {
1299            if let Some(ref table_name) = table_meta.name
1300                && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
1301            {
1302                match e {
1303                    Error::CatalogError(ref msg)
1304                        if is_shared_catalog && msg.contains("already exists") =>
1305                    {
1306                        tracing::debug!(
1307                            "[CONTEXT] Shared catalog already contains table '{}' with id={}",
1308                            table_name,
1309                            table_id
1310                        );
1311                    }
1312                    other => {
1313                        tracing::warn!(
1314                            "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
1315                            table_name,
1316                            table_id,
1317                            other
1318                        );
1319                    }
1320                }
1321            }
1322        }
1323        tracing::debug!(
1324            "[CONTEXT] Catalog initialized with {} table(s)",
1325            catalog.table_count()
1326        );
1327
1328        let constraint_service =
1329            ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
1330        let catalog_service = CatalogManager::new(
1331            Arc::clone(&metadata),
1332            Arc::clone(&catalog),
1333            Arc::clone(&store_arc),
1334        );
1335
1336        Self {
1337            pager,
1338            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
1339            dropped_tables: RwLock::new(FxHashSet::default()),
1340            metadata,
1341            constraint_service,
1342            catalog_service,
1343            catalog,
1344            store: store_arc,
1345            transaction_manager,
1346            txn_manager,
1347            txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
1348        }
1349    }
1350
1351    /// Return the transaction ID manager shared with sessions.
1352    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1353        Arc::clone(&self.txn_manager)
1354    }
1355
1356    /// Persist the next_txn_id to the catalog.
1357    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1358        let catalog = SysCatalog::new(&self.store);
1359        catalog.put_next_txn_id(next_txn_id)?;
1360        let last_committed = self.txn_manager.last_committed();
1361        catalog.put_last_committed_txn_id(last_committed)?;
1362        tracing::debug!(
1363            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1364            next_txn_id,
1365            last_committed
1366        );
1367        Ok(())
1368    }
1369
1370    fn build_executor_multi_column_uniques(
1371        table: &ExecutorTable<P>,
1372        stored: &[MultiColumnUniqueEntryMeta],
1373    ) -> Vec<ExecutorMultiColumnUnique> {
1374        let mut results = Vec::with_capacity(stored.len());
1375
1376        'outer: for entry in stored {
1377            if entry.column_ids.is_empty() {
1378                continue;
1379            }
1380
1381            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
1382            for field_id in &entry.column_ids {
1383                if let Some((idx, _)) = table
1384                    .schema
1385                    .columns
1386                    .iter()
1387                    .enumerate()
1388                    .find(|(_, col)| &col.field_id == field_id)
1389                {
1390                    column_indices.push(idx);
1391                } else {
1392                    tracing::warn!(
1393                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
1394                        entry.index_name,
1395                        table.table.table_id(),
1396                        field_id
1397                    );
1398                    continue 'outer;
1399                }
1400            }
1401
1402            results.push(ExecutorMultiColumnUnique {
1403                index_name: entry.index_name.clone(),
1404                column_indices,
1405            });
1406        }
1407
1408        results
1409    }
1410
1411    /// Construct the default snapshot for auto-commit operations.
1412    pub fn default_snapshot(&self) -> TransactionSnapshot {
1413        TransactionSnapshot {
1414            txn_id: TXN_ID_AUTO_COMMIT,
1415            snapshot_id: self.txn_manager.last_committed(),
1416        }
1417    }
1418
1419    /// Get the table catalog for schema and table name management.
1420    pub fn table_catalog(&self) -> Arc<TableCatalog> {
1421        Arc::clone(&self.catalog)
1422    }
1423
1424    /// Create a new session for transaction management.
1425    /// Each session can have its own independent transaction.
1426    pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1427        tracing::debug!("[SESSION] RuntimeContext::create_session called");
1428        let namespaces = Arc::new(SessionNamespaces::new(Arc::clone(self)));
1429        let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1430        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1431        tracing::debug!(
1432            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1433        );
1434        RuntimeSession { inner, namespaces }
1435    }
1436
1437    /// Get a handle to an existing table by name.
1438    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1439        RuntimeTableHandle::new(Arc::clone(self), name)
1440    }
1441
1442    /// Check if there's an active transaction (checks if ANY session has a transaction).
1443    #[deprecated(note = "Use session-based transactions instead")]
1444    pub fn has_active_transaction(&self) -> bool {
1445        self.transaction_manager.has_active_transaction()
1446    }
1447
1448    pub fn create_table<C, I>(
1449        self: &Arc<Self>,
1450        name: &str,
1451        columns: I,
1452    ) -> Result<RuntimeTableHandle<P>>
1453    where
1454        C: IntoColumnSpec,
1455        I: IntoIterator<Item = C>,
1456    {
1457        self.create_table_with_options(name, columns, false)
1458    }
1459
1460    pub fn create_table_if_not_exists<C, I>(
1461        self: &Arc<Self>,
1462        name: &str,
1463        columns: I,
1464    ) -> Result<RuntimeTableHandle<P>>
1465    where
1466        C: IntoColumnSpec,
1467        I: IntoIterator<Item = C>,
1468    {
1469        self.create_table_with_options(name, columns, true)
1470    }
1471
1472    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1473        if plan.columns.is_empty() && plan.source.is_none() {
1474            return Err(Error::InvalidArgumentError(
1475                "CREATE TABLE requires explicit columns or a source".into(),
1476            ));
1477        }
1478
1479        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1480        let CreateTablePlan {
1481            name: _,
1482            if_not_exists,
1483            or_replace,
1484            columns,
1485            source,
1486            namespace: _,
1487            foreign_keys,
1488        } = plan;
1489
1490        tracing::trace!(
1491            "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1492            display_name,
1493            if_not_exists,
1494            columns.len()
1495        );
1496        for (idx, col) in columns.iter().enumerate() {
1497            tracing::trace!(
1498                "  plan column[{}]: name='{}' primary_key={}",
1499                idx,
1500                col.name,
1501                col.primary_key
1502            );
1503        }
1504        let (exists, is_dropped) = {
1505            let tables = self.tables.read().unwrap();
1506            let in_cache = tables.contains_key(&canonical_name);
1507            let is_dropped = self
1508                .dropped_tables
1509                .read()
1510                .unwrap()
1511                .contains(&canonical_name);
1512            // Table exists if it's in cache and NOT marked as dropped
1513            (in_cache && !is_dropped, is_dropped)
1514        };
1515        tracing::trace!(
1516            "DEBUG create_table_plan: exists={}, is_dropped={}",
1517            exists,
1518            is_dropped
1519        );
1520
1521        // If table was dropped, remove it from cache before creating new one
1522        if is_dropped {
1523            self.remove_table_entry(&canonical_name);
1524            self.dropped_tables.write().unwrap().remove(&canonical_name);
1525        }
1526
1527        if exists {
1528            if or_replace {
1529                tracing::trace!(
1530                    "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1531                    display_name
1532                );
1533                self.remove_table_entry(&canonical_name);
1534            } else if if_not_exists {
1535                tracing::trace!(
1536                    "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1537                    display_name
1538                );
1539                return Ok(RuntimeStatementResult::CreateTable {
1540                    table_name: display_name,
1541                });
1542            } else {
1543                return Err(Error::CatalogError(format!(
1544                    "Catalog Error: Table '{}' already exists",
1545                    display_name
1546                )));
1547            }
1548        }
1549
1550        match source {
1551            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1552                display_name,
1553                canonical_name,
1554                schema,
1555                batches,
1556                if_not_exists,
1557            ),
1558            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1559                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1560                    .into(),
1561            )),
1562            None => self.create_table_from_columns(
1563                display_name,
1564                canonical_name,
1565                columns,
1566                foreign_keys,
1567                if_not_exists,
1568            ),
1569        }
1570    }
1571
1572    pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
1573        if plan.columns.is_empty() {
1574            return Err(Error::InvalidArgumentError(
1575                "CREATE INDEX requires at least one column".into(),
1576            ));
1577        }
1578
1579        for column_plan in &plan.columns {
1580            if !column_plan.ascending || column_plan.nulls_first {
1581                return Err(Error::InvalidArgumentError(
1582                    "only ASC indexes with NULLS LAST are supported".into(),
1583                ));
1584            }
1585        }
1586
1587        let index_name = plan.name.clone();
1588        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1589        let table = self.lookup_table(&canonical_name)?;
1590
1591        let mut column_indices = Vec::with_capacity(plan.columns.len());
1592        let mut field_ids = Vec::with_capacity(plan.columns.len());
1593        let mut column_names = Vec::with_capacity(plan.columns.len());
1594        let mut seen_column_indices = FxHashSet::default();
1595
1596        for column_plan in &plan.columns {
1597            let normalized = column_plan.name.to_ascii_lowercase();
1598            let col_idx = table
1599                .schema
1600                .lookup
1601                .get(&normalized)
1602                .copied()
1603                .ok_or_else(|| {
1604                    Error::InvalidArgumentError(format!(
1605                        "column '{}' does not exist in table '{}'",
1606                        column_plan.name, display_name
1607                    ))
1608                })?;
1609            if !seen_column_indices.insert(col_idx) {
1610                return Err(Error::InvalidArgumentError(format!(
1611                    "duplicate column '{}' in CREATE INDEX",
1612                    column_plan.name
1613                )));
1614            }
1615
1616            let column = &table.schema.columns[col_idx];
1617            column_indices.push(col_idx);
1618            field_ids.push(column.field_id);
1619            column_names.push(column.name.clone());
1620        }
1621
1622        if plan.columns.len() == 1 {
1623            let field_id = field_ids[0];
1624            let column_name = column_names[0].clone();
1625
1626            if plan.unique {
1627                let snapshot = self.default_snapshot();
1628                let existing_values =
1629                    self.scan_column_values(table.as_ref(), field_id, snapshot)?;
1630                ensure_single_column_unique(&existing_values, &[], &column_name)?;
1631            }
1632
1633            let created = self.catalog_service.register_single_column_index(
1634                &display_name,
1635                &canonical_name,
1636                &table.table,
1637                field_id,
1638                &column_name,
1639                plan.unique,
1640                plan.if_not_exists,
1641            )?;
1642
1643            if !created {
1644                // Index already existed and if_not_exists was true
1645                return Ok(RuntimeStatementResult::CreateIndex {
1646                    table_name: display_name,
1647                    index_name,
1648                });
1649            }
1650
1651            if let Some(updated_table) =
1652                Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1653            {
1654                self.tables
1655                    .write()
1656                    .unwrap()
1657                    .insert(canonical_name.clone(), Arc::clone(&updated_table));
1658            } else {
1659                self.remove_table_entry(&canonical_name);
1660            }
1661
1662            drop(table);
1663
1664            return Ok(RuntimeStatementResult::CreateIndex {
1665                table_name: display_name,
1666                index_name,
1667            });
1668        }
1669
1670        if !plan.unique {
1671            return Err(Error::InvalidArgumentError(
1672                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1673            ));
1674        }
1675
1676        let table_id = table.table.table_id();
1677
1678        let snapshot = self.default_snapshot();
1679        let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1680        ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1681
1682        let executor_entry = ExecutorMultiColumnUnique {
1683            index_name: index_name.clone(),
1684            column_indices: column_indices.clone(),
1685        };
1686
1687        let registration = self.catalog_service.register_multi_column_unique_index(
1688            table_id,
1689            &field_ids,
1690            index_name.clone(),
1691        )?;
1692
1693        match registration {
1694            MultiColumnUniqueRegistration::Created => {
1695                table.add_multi_column_unique(executor_entry);
1696            }
1697            MultiColumnUniqueRegistration::AlreadyExists {
1698                index_name: existing,
1699            } => {
1700                if plan.if_not_exists {
1701                    drop(table);
1702                    return Ok(RuntimeStatementResult::CreateIndex {
1703                        table_name: display_name,
1704                        index_name: existing,
1705                    });
1706                }
1707                return Err(Error::CatalogError(format!(
1708                    "Index already exists on columns '{}'",
1709                    column_names.join(", ")
1710                )));
1711            }
1712        }
1713
1714        Ok(RuntimeStatementResult::CreateIndex {
1715            table_name: display_name,
1716            index_name,
1717        })
1718    }
1719
1720    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1721        // Use catalog for table names (single source of truth)
1722        self.catalog.table_names()
1723    }
1724
1725    pub fn table_view(&self, canonical_name: &str) -> Result<TableView> {
1726        self.catalog_service.table_view(canonical_name)
1727    }
1728
1729    fn filter_visible_row_ids(
1730        &self,
1731        table: &ExecutorTable<P>,
1732        row_ids: Vec<RowId>,
1733        snapshot: TransactionSnapshot,
1734    ) -> Result<Vec<RowId>> {
1735        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
1736    }
1737
1738    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1739        RuntimeCreateTableBuilder {
1740            ctx: self,
1741            plan: CreateTablePlan::new(name),
1742        }
1743    }
1744
1745    pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1746        let (_, canonical_name) = canonical_table_name(name)?;
1747        self.catalog_service.table_column_specs(&canonical_name)
1748    }
1749
1750    pub fn foreign_key_views(self: &Arc<Self>, name: &str) -> Result<Vec<ForeignKeyView>> {
1751        let (_, canonical_name) = canonical_table_name(name)?;
1752        self.catalog_service.foreign_key_views(&canonical_name)
1753    }
1754
1755    pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1756        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1757        handle.lazy()?.collect_rows()
1758    }
1759
1760    fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1761        self.create_table_plan(plan)
1762    }
1763
1764    fn create_table_with_options<C, I>(
1765        self: &Arc<Self>,
1766        name: &str,
1767        columns: I,
1768        if_not_exists: bool,
1769    ) -> Result<RuntimeTableHandle<P>>
1770    where
1771        C: IntoColumnSpec,
1772        I: IntoIterator<Item = C>,
1773    {
1774        let mut plan = CreateTablePlan::new(name);
1775        plan.if_not_exists = if_not_exists;
1776        plan.columns = columns
1777            .into_iter()
1778            .map(|column| column.into_column_spec())
1779            .collect();
1780        let result = self.create_table_plan(plan)?;
1781        match result {
1782            RuntimeStatementResult::CreateTable { .. } => {
1783                RuntimeTableHandle::new(Arc::clone(self), name)
1784            }
1785            other => Err(Error::InvalidArgumentError(format!(
1786                "unexpected statement result {other:?} when creating table"
1787            ))),
1788        }
1789    }
1790
1791    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1792        // For non-transactional inserts, use TXN_ID_AUTO_COMMIT directly
1793        // instead of creating a temporary transaction
1794        let snapshot = TransactionSnapshot {
1795            txn_id: TXN_ID_AUTO_COMMIT,
1796            snapshot_id: self.txn_manager.last_committed(),
1797        };
1798        self.insert_with_snapshot(plan, snapshot)
1799    }
1800
1801    pub fn insert_with_snapshot(
1802        &self,
1803        plan: InsertPlan,
1804        snapshot: TransactionSnapshot,
1805    ) -> Result<RuntimeStatementResult<P>> {
1806        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1807        let table = self.lookup_table(&canonical_name)?;
1808
1809        // Targeted debug for 'keys' table only
1810        if display_name == "keys" {
1811            tracing::trace!(
1812                "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1813                table.table.table_id(),
1814                &*self.pager
1815            );
1816            tracing::trace!(
1817                "[KEYS] Table has {} columns, primary_key columns: {:?}",
1818                table.schema.columns.len(),
1819                table
1820                    .schema
1821                    .columns
1822                    .iter()
1823                    .filter(|c| c.primary_key)
1824                    .map(|c| &c.name)
1825                    .collect::<Vec<_>>()
1826            );
1827        }
1828
1829        let result = match plan.source {
1830            InsertSource::Rows(rows) => self.insert_rows(
1831                table.as_ref(),
1832                display_name.clone(),
1833                canonical_name.clone(),
1834                rows,
1835                plan.columns,
1836                snapshot,
1837            ),
1838            InsertSource::Batches(batches) => self.insert_batches(
1839                table.as_ref(),
1840                display_name.clone(),
1841                canonical_name.clone(),
1842                batches,
1843                plan.columns,
1844                snapshot,
1845            ),
1846            InsertSource::Select { .. } => Err(Error::Internal(
1847                "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1848                    .into(),
1849            )),
1850        };
1851
1852        if display_name == "keys" {
1853            tracing::trace!(
1854                "[KEYS] INSERT completed: {:?}",
1855                result
1856                    .as_ref()
1857                    .map(|_| "OK")
1858                    .map_err(|e| format!("{:?}", e))
1859            );
1860        }
1861
1862        if matches!(result, Err(Error::NotFound)) {
1863            panic!(
1864                "BUG: insert yielded Error::NotFound for table '{}'. \
1865                 This should never happen: insert should never return NotFound after successful table lookup. \
1866                 This indicates a logic error in the runtime.",
1867                display_name
1868            );
1869        }
1870
1871        result
1872    }
1873
1874    /// Get raw batches from a table including row_ids, optionally filtered.
1875    /// This is used for transaction seeding where we need to preserve existing row_ids.
1876    pub fn get_batches_with_row_ids(
1877        &self,
1878        table_name: &str,
1879        filter: Option<LlkvExpr<'static, String>>,
1880    ) -> Result<Vec<RecordBatch>> {
1881        self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1882    }
1883
1884    pub fn get_batches_with_row_ids_with_snapshot(
1885        &self,
1886        table_name: &str,
1887        filter: Option<LlkvExpr<'static, String>>,
1888        snapshot: TransactionSnapshot,
1889    ) -> Result<Vec<RecordBatch>> {
1890        let (_, canonical_name) = canonical_table_name(table_name)?;
1891        let table = self.lookup_table(&canonical_name)?;
1892
1893        let filter_expr = match filter {
1894            Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1895            None => {
1896                let field_id = table.schema.first_field_id().ok_or_else(|| {
1897                    Error::InvalidArgumentError(
1898                        "table has no columns; cannot perform wildcard scan".into(),
1899                    )
1900                })?;
1901                full_table_scan_filter(field_id)
1902            }
1903        };
1904
1905        // First, get the row_ids that match the filter
1906        let row_ids = table.table.filter_row_ids(&filter_expr)?;
1907        if row_ids.is_empty() {
1908            return Ok(Vec::new());
1909        }
1910
1911        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1912        if visible_row_ids.is_empty() {
1913            return Ok(Vec::new());
1914        }
1915
1916        // Scan to get the column data without materializing full columns
1917        let table_id = table.table.table_id();
1918
1919        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1920        let mut logical_fields: Vec<LogicalFieldId> =
1921            Vec::with_capacity(table.schema.columns.len());
1922
1923        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1924
1925        for column in &table.schema.columns {
1926            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1927            logical_fields.push(logical_field_id);
1928            let field = mvcc::build_field_with_metadata(
1929                &column.name,
1930                column.data_type.clone(),
1931                column.nullable,
1932                column.field_id,
1933            );
1934            fields.push(field);
1935        }
1936
1937        let schema = Arc::new(Schema::new(fields));
1938
1939        if logical_fields.is_empty() {
1940            // Tables without user columns should still return row_id batches.
1941            let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
1942            for row_id in &visible_row_ids {
1943                row_id_builder.append_value(*row_id);
1944            }
1945            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
1946            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1947            return Ok(vec![batch]);
1948        }
1949
1950        let mut stream = table.table.stream_columns(
1951            Arc::from(logical_fields),
1952            visible_row_ids,
1953            GatherNullPolicy::IncludeNulls,
1954        )?;
1955
1956        let mut batches = Vec::new();
1957        while let Some(chunk) = stream.next_batch()? {
1958            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
1959
1960            let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
1961            for row_id in chunk.row_ids() {
1962                row_id_builder.append_value(*row_id);
1963            }
1964            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
1965
1966            let chunk_batch = chunk.into_batch();
1967            for column_array in chunk_batch.columns() {
1968                arrays.push(column_array.clone());
1969            }
1970
1971            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1972            batches.push(batch);
1973        }
1974
1975        Ok(batches)
1976    }
1977
1978    /// Append batches directly to a table, preserving row_ids from the batches.
1979    /// This is used for transaction seeding where we need to preserve existing row_ids.
1980    pub fn append_batches_with_row_ids(
1981        &self,
1982        table_name: &str,
1983        batches: Vec<RecordBatch>,
1984    ) -> Result<usize> {
1985        let (_, canonical_name) = canonical_table_name(table_name)?;
1986        let table = self.lookup_table(&canonical_name)?;
1987
1988        let mut total_rows = 0;
1989        for batch in batches {
1990            if batch.num_rows() == 0 {
1991                continue;
1992            }
1993
1994            // Verify the batch has a row_id column
1995            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
1996                Error::InvalidArgumentError(
1997                    "batch must contain row_id column for direct append".into(),
1998                )
1999            })?;
2000
2001            // Append the batch directly to the underlying table
2002            table.table.append(&batch)?;
2003            total_rows += batch.num_rows();
2004        }
2005
2006        Ok(total_rows)
2007    }
2008
2009    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
2010        let snapshot = self.txn_manager.begin_transaction();
2011        match self.update_with_snapshot(plan, snapshot) {
2012            Ok(result) => {
2013                self.txn_manager.mark_committed(snapshot.txn_id);
2014                Ok(result)
2015            }
2016            Err(err) => {
2017                self.txn_manager.mark_aborted(snapshot.txn_id);
2018                Err(err)
2019            }
2020        }
2021    }
2022
2023    pub fn update_with_snapshot(
2024        &self,
2025        plan: UpdatePlan,
2026        snapshot: TransactionSnapshot,
2027    ) -> Result<RuntimeStatementResult<P>> {
2028        let UpdatePlan {
2029            table,
2030            assignments,
2031            filter,
2032        } = plan;
2033        let (display_name, canonical_name) = canonical_table_name(&table)?;
2034        let table = self.lookup_table(&canonical_name)?;
2035        if let Some(filter) = filter {
2036            self.update_filtered_rows(
2037                table.as_ref(),
2038                display_name,
2039                canonical_name,
2040                assignments,
2041                filter,
2042                snapshot,
2043            )
2044        } else {
2045            self.update_all_rows(
2046                table.as_ref(),
2047                display_name,
2048                canonical_name,
2049                assignments,
2050                snapshot,
2051            )
2052        }
2053    }
2054
2055    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
2056        let snapshot = self.txn_manager.begin_transaction();
2057        match self.delete_with_snapshot(plan, snapshot) {
2058            Ok(result) => {
2059                self.txn_manager.mark_committed(snapshot.txn_id);
2060                Ok(result)
2061            }
2062            Err(err) => {
2063                self.txn_manager.mark_aborted(snapshot.txn_id);
2064                Err(err)
2065            }
2066        }
2067    }
2068
2069    pub fn delete_with_snapshot(
2070        &self,
2071        plan: DeletePlan,
2072        snapshot: TransactionSnapshot,
2073    ) -> Result<RuntimeStatementResult<P>> {
2074        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
2075        // Get table - will be checked against snapshot during actual deletion
2076        let table = match self.tables.read().unwrap().get(&canonical_name) {
2077            Some(t) => Arc::clone(t),
2078            None => return Err(Error::NotFound),
2079        };
2080        match plan.filter {
2081            Some(filter) => self.delete_filtered_rows(
2082                table.as_ref(),
2083                display_name,
2084                canonical_name.clone(),
2085                filter,
2086                snapshot,
2087            ),
2088            None => self.delete_all_rows(table.as_ref(), display_name, canonical_name, snapshot),
2089        }
2090    }
2091
2092    pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
2093        RuntimeTableHandle::new(Arc::clone(self), name)
2094    }
2095
2096    pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
2097        let snapshot = self.default_snapshot();
2098        self.execute_select_with_snapshot(plan, snapshot)
2099    }
2100
2101    pub fn execute_select_with_snapshot(
2102        self: &Arc<Self>,
2103        plan: SelectPlan,
2104        snapshot: TransactionSnapshot,
2105    ) -> Result<SelectExecution<P>> {
2106        // Handle SELECT without FROM clause (e.g., SELECT 42, SELECT {'a': 1})
2107        if plan.tables.is_empty() {
2108            let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2109                context: Arc::clone(self),
2110            });
2111            let executor = QueryExecutor::new(provider);
2112            // No row filter needed since there's no table
2113            return executor.execute_select_with_filter(plan, None);
2114        }
2115
2116        // Resolve canonical names for all tables (don't verify existence - executor will handle it with snapshot)
2117        let mut canonical_tables = Vec::new();
2118        for table_ref in &plan.tables {
2119            let qualified = table_ref.qualified_name();
2120            let (_display, canonical) = canonical_table_name(&qualified)?;
2121            // Parse canonical back into schema.table
2122            let parts: Vec<&str> = canonical.split('.').collect();
2123            let canon_ref = if parts.len() >= 2 {
2124                llkv_plan::TableRef::new(parts[0], parts[1])
2125            } else {
2126                llkv_plan::TableRef::new("", &canonical)
2127            };
2128            canonical_tables.push(canon_ref);
2129        }
2130
2131        let mut canonical_plan = plan.clone();
2132        canonical_plan.tables = canonical_tables;
2133
2134        let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2135            context: Arc::clone(self),
2136        });
2137        let executor = QueryExecutor::new(provider);
2138
2139        // For single-table queries, apply MVCC filtering
2140        let row_filter: Option<Arc<dyn RowIdFilter<P>>> = if canonical_plan.tables.len() == 1 {
2141            Some(Arc::new(MvccRowIdFilter::new(
2142                Arc::clone(&self.txn_manager),
2143                snapshot,
2144            )))
2145        } else {
2146            // For multi-table queries, we don't apply MVCC filtering yet (TODO)
2147            None
2148        };
2149
2150        executor.execute_select_with_filter(canonical_plan, row_filter)
2151    }
2152
2153    fn create_table_from_columns(
2154        &self,
2155        display_name: String,
2156        canonical_name: String,
2157        columns: Vec<ColumnSpec>,
2158        foreign_keys: Vec<ForeignKeySpec>,
2159        if_not_exists: bool,
2160    ) -> Result<RuntimeStatementResult<P>> {
2161        tracing::trace!(
2162            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
2163            display_name,
2164            columns.len()
2165        );
2166        for (idx, col) in columns.iter().enumerate() {
2167            tracing::trace!(
2168                "  input column[{}]: name='{}' primary_key={}",
2169                idx,
2170                col.name,
2171                col.primary_key
2172            );
2173        }
2174        if columns.is_empty() {
2175            return Err(Error::InvalidArgumentError(
2176                "CREATE TABLE requires at least one column".into(),
2177            ));
2178        }
2179
2180        // Avoid repeating catalog work if the table already exists.
2181        {
2182            let tables = self.tables.read().unwrap();
2183            if tables.contains_key(&canonical_name) {
2184                if if_not_exists {
2185                    return Ok(RuntimeStatementResult::CreateTable {
2186                        table_name: display_name,
2187                    });
2188                }
2189
2190                return Err(Error::CatalogError(format!(
2191                    "Catalog Error: Table '{}' already exists",
2192                    display_name
2193                )));
2194            }
2195        }
2196
2197        let CreateTableResult {
2198            table_id,
2199            table,
2200            table_columns,
2201            column_lookup,
2202        } = Table::create_from_columns(
2203            &display_name,
2204            &canonical_name,
2205            &columns,
2206            self.metadata.clone(),
2207            self.catalog.clone(),
2208            self.store.clone(),
2209        )?;
2210
2211        tracing::trace!(
2212            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
2213            display_name,
2214            table_id,
2215            &*self.pager
2216        );
2217
2218        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2219        for (idx, column) in table_columns.iter().enumerate() {
2220            tracing::trace!(
2221                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
2222                idx,
2223                column.name,
2224                column.data_type,
2225                column.nullable,
2226                column.primary_key,
2227                column.unique
2228            );
2229            column_defs.push(ExecutorColumn {
2230                name: column.name.clone(),
2231                data_type: column.data_type.clone(),
2232                nullable: column.nullable,
2233                primary_key: column.primary_key,
2234                unique: column.unique,
2235                field_id: column.field_id,
2236                check_expr: column.check_expr.clone(),
2237            });
2238        }
2239
2240        let schema = Arc::new(ExecutorSchema {
2241            columns: column_defs.clone(),
2242            lookup: column_lookup,
2243        });
2244        let table_entry = Arc::new(ExecutorTable {
2245            table: Arc::clone(&table),
2246            schema,
2247            next_row_id: AtomicU64::new(0),
2248            total_rows: AtomicU64::new(0),
2249            multi_column_uniques: RwLock::new(Vec::new()),
2250        });
2251
2252        let mut tables = self.tables.write().unwrap();
2253        if tables.contains_key(&canonical_name) {
2254            drop(tables);
2255            let field_ids: Vec<FieldId> =
2256                table_columns.iter().map(|column| column.field_id).collect();
2257            let _ = self
2258                .catalog_service
2259                .drop_table(&canonical_name, table_id, &field_ids);
2260            if if_not_exists {
2261                return Ok(RuntimeStatementResult::CreateTable {
2262                    table_name: display_name,
2263                });
2264            }
2265            return Err(Error::CatalogError(format!(
2266                "Catalog Error: Table '{}' already exists",
2267                display_name
2268            )));
2269        }
2270        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2271        drop(tables);
2272
2273        if !foreign_keys.is_empty() {
2274            let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
2275                table_id,
2276                &display_name,
2277                &canonical_name,
2278                &table_columns,
2279                &foreign_keys,
2280                |table_name| {
2281                    let (display, canonical) = canonical_table_name(table_name)?;
2282                    let referenced_table = self.lookup_table(&canonical).map_err(|_| {
2283                        Error::InvalidArgumentError(format!(
2284                            "referenced table '{}' does not exist",
2285                            table_name
2286                        ))
2287                    })?;
2288
2289                    let columns = referenced_table
2290                        .schema
2291                        .columns
2292                        .iter()
2293                        .map(|column| ForeignKeyColumn {
2294                            name: column.name.clone(),
2295                            data_type: column.data_type.clone(),
2296                            nullable: column.nullable,
2297                            primary_key: column.primary_key,
2298                            unique: column.unique,
2299                            field_id: column.field_id,
2300                        })
2301                        .collect();
2302
2303                    Ok(ForeignKeyTableInfo {
2304                        display_name: display,
2305                        canonical_name: canonical,
2306                        table_id: referenced_table.table.table_id(),
2307                        columns,
2308                    })
2309                },
2310                current_time_micros(),
2311            );
2312
2313            if let Err(err) = fk_result {
2314                let field_ids: Vec<FieldId> =
2315                    table_columns.iter().map(|column| column.field_id).collect();
2316                let _ = self
2317                    .catalog_service
2318                    .drop_table(&canonical_name, table_id, &field_ids);
2319                self.remove_table_entry(&canonical_name);
2320                return Err(err);
2321            }
2322        }
2323
2324        Ok(RuntimeStatementResult::CreateTable {
2325            table_name: display_name,
2326        })
2327    }
2328
2329    fn create_table_from_batches(
2330        &self,
2331        display_name: String,
2332        canonical_name: String,
2333        schema: Arc<Schema>,
2334        batches: Vec<RecordBatch>,
2335        if_not_exists: bool,
2336    ) -> Result<RuntimeStatementResult<P>> {
2337        {
2338            let tables = self.tables.read().unwrap();
2339            if tables.contains_key(&canonical_name) {
2340                if if_not_exists {
2341                    return Ok(RuntimeStatementResult::CreateTable {
2342                        table_name: display_name,
2343                    });
2344                }
2345                return Err(Error::CatalogError(format!(
2346                    "Catalog Error: Table '{}' already exists",
2347                    display_name
2348                )));
2349            }
2350        }
2351
2352        let CreateTableResult {
2353            table_id,
2354            table,
2355            table_columns,
2356            column_lookup,
2357        } = self.catalog_service.create_table_from_schema(
2358            &display_name,
2359            &canonical_name,
2360            &schema,
2361        )?;
2362
2363        tracing::trace!(
2364            "=== CTAS table '{}' created with table_id={} pager={:p} ===",
2365            display_name,
2366            table_id,
2367            &*self.pager
2368        );
2369
2370        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2371        for column in &table_columns {
2372            column_defs.push(ExecutorColumn {
2373                name: column.name.clone(),
2374                data_type: column.data_type.clone(),
2375                nullable: column.nullable,
2376                primary_key: column.primary_key,
2377                unique: column.unique,
2378                field_id: column.field_id,
2379                check_expr: column.check_expr.clone(),
2380            });
2381        }
2382
2383        let schema_arc = Arc::new(ExecutorSchema {
2384            columns: column_defs.clone(),
2385            lookup: column_lookup,
2386        });
2387        let table_entry = Arc::new(ExecutorTable {
2388            table: Arc::clone(&table),
2389            schema: schema_arc,
2390            next_row_id: AtomicU64::new(0),
2391            total_rows: AtomicU64::new(0),
2392            multi_column_uniques: RwLock::new(Vec::new()),
2393        });
2394
2395        let creator_snapshot = self.txn_manager.begin_transaction();
2396        let creator_txn_id = creator_snapshot.txn_id;
2397        let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
2398            table.as_ref(),
2399            &table_columns,
2400            &batches,
2401            creator_txn_id,
2402            TXN_ID_NONE,
2403            0,
2404        ) {
2405            Ok(result) => {
2406                self.txn_manager.mark_committed(creator_txn_id);
2407                result
2408            }
2409            Err(err) => {
2410                self.txn_manager.mark_aborted(creator_txn_id);
2411                let field_ids: Vec<FieldId> =
2412                    table_columns.iter().map(|column| column.field_id).collect();
2413                let _ = self
2414                    .catalog_service
2415                    .drop_table(&canonical_name, table_id, &field_ids);
2416                return Err(err);
2417            }
2418        };
2419
2420        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
2421        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
2422
2423        let mut tables = self.tables.write().unwrap();
2424        if tables.contains_key(&canonical_name) {
2425            if if_not_exists {
2426                return Ok(RuntimeStatementResult::CreateTable {
2427                    table_name: display_name,
2428                });
2429            }
2430            return Err(Error::CatalogError(format!(
2431                "Catalog Error: Table '{}' already exists",
2432                display_name
2433            )));
2434        }
2435        tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2436        drop(tables); // Release write lock before catalog operations
2437
2438        Ok(RuntimeStatementResult::CreateTable {
2439            table_name: display_name,
2440        })
2441    }
2442
2443    fn rebuild_executor_table_with_unique(
2444        table: &ExecutorTable<P>,
2445        field_id: FieldId,
2446    ) -> Option<Arc<ExecutorTable<P>>> {
2447        let mut columns = table.schema.columns.clone();
2448        let mut found = false;
2449        for column in &mut columns {
2450            if column.field_id == field_id {
2451                column.unique = true;
2452                found = true;
2453                break;
2454            }
2455        }
2456        if !found {
2457            return None;
2458        }
2459
2460        let schema = Arc::new(ExecutorSchema {
2461            columns,
2462            lookup: table.schema.lookup.clone(),
2463        });
2464
2465        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
2466        let total_rows = table.total_rows.load(Ordering::SeqCst);
2467        let uniques = table.multi_column_uniques();
2468
2469        Some(Arc::new(ExecutorTable {
2470            table: Arc::clone(&table.table),
2471            schema,
2472            next_row_id: AtomicU64::new(next_row_id),
2473            total_rows: AtomicU64::new(total_rows),
2474            multi_column_uniques: RwLock::new(uniques),
2475        }))
2476    }
2477
2478    fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
2479        if txn_id == TXN_ID_AUTO_COMMIT {
2480            return;
2481        }
2482
2483        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2484        guard.entry(txn_id).or_default().insert(canonical_name);
2485    }
2486
2487    fn collect_rows_created_by_txn(
2488        &self,
2489        table: &ExecutorTable<P>,
2490        txn_id: TxnId,
2491    ) -> Result<Vec<Vec<PlanValue>>> {
2492        if txn_id == TXN_ID_AUTO_COMMIT {
2493            return Ok(Vec::new());
2494        }
2495
2496        if table.schema.columns.is_empty() {
2497            return Ok(Vec::new());
2498        }
2499
2500        let Some(first_field_id) = table.schema.first_field_id() else {
2501            return Ok(Vec::new());
2502        };
2503        let filter_expr = full_table_scan_filter(first_field_id);
2504
2505        let row_ids = table.table.filter_row_ids(&filter_expr)?;
2506        if row_ids.is_empty() {
2507            return Ok(Vec::new());
2508        }
2509
2510        let table_id = table.table.table_id();
2511        let mut logical_fields: Vec<LogicalFieldId> =
2512            Vec::with_capacity(table.schema.columns.len() + 2);
2513        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
2514        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
2515        for column in &table.schema.columns {
2516            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
2517        }
2518
2519        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
2520        let mut stream = table.table.stream_columns(
2521            Arc::clone(&logical_fields),
2522            row_ids,
2523            GatherNullPolicy::IncludeNulls,
2524        )?;
2525
2526        let mut rows = Vec::new();
2527        while let Some(chunk) = stream.next_batch()? {
2528            let batch = chunk.batch();
2529            if batch.num_columns() < table.schema.columns.len() + 2 {
2530                continue;
2531            }
2532
2533            let created_col = batch
2534                .column(0)
2535                .as_any()
2536                .downcast_ref::<UInt64Array>()
2537                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
2538            let deleted_col = batch
2539                .column(1)
2540                .as_any()
2541                .downcast_ref::<UInt64Array>()
2542                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
2543
2544            for row_idx in 0..batch.num_rows() {
2545                let created_by = if created_col.is_null(row_idx) {
2546                    TXN_ID_AUTO_COMMIT
2547                } else {
2548                    created_col.value(row_idx)
2549                };
2550                if created_by != txn_id {
2551                    continue;
2552                }
2553
2554                let deleted_by = if deleted_col.is_null(row_idx) {
2555                    TXN_ID_NONE
2556                } else {
2557                    deleted_col.value(row_idx)
2558                };
2559                if deleted_by != TXN_ID_NONE {
2560                    continue;
2561                }
2562
2563                let mut row_values = Vec::with_capacity(table.schema.columns.len());
2564                for col_idx in 0..table.schema.columns.len() {
2565                    let array = batch.column(col_idx + 2);
2566                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2567                    row_values.push(value);
2568                }
2569                rows.push(row_values);
2570            }
2571        }
2572
2573        Ok(rows)
2574    }
2575
2576    fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
2577        if txn_id == TXN_ID_AUTO_COMMIT {
2578            return Ok(());
2579        }
2580
2581        let tables = {
2582            let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2583            guard.remove(&txn_id)
2584        };
2585
2586        let Some(tables) = tables else {
2587            return Ok(());
2588        };
2589
2590        if tables.is_empty() {
2591            return Ok(());
2592        }
2593
2594        let snapshot = TransactionSnapshot {
2595            txn_id: TXN_ID_AUTO_COMMIT,
2596            snapshot_id: self.txn_manager.last_committed(),
2597        };
2598
2599        for table_name in tables {
2600            let table = self.lookup_table(&table_name)?;
2601            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
2602            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
2603                continue;
2604            };
2605
2606            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
2607            if new_rows.is_empty() {
2608                continue;
2609            }
2610
2611            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
2612            self.constraint_service.validate_primary_key_rows(
2613                &constraint_ctx.schema_field_ids,
2614                primary_key,
2615                &column_order,
2616                &new_rows,
2617                |field_ids| self.scan_multi_column_values(table.as_ref(), field_ids, snapshot),
2618            )?;
2619        }
2620
2621        Ok(())
2622    }
2623
2624    fn clear_transaction_state(&self, txn_id: TxnId) {
2625        if txn_id == TXN_ID_AUTO_COMMIT {
2626            return;
2627        }
2628
2629        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2630        guard.remove(&txn_id);
2631    }
2632
2633    fn coerce_plan_value_for_column(
2634        &self,
2635        value: PlanValue,
2636        column: &ExecutorColumn,
2637    ) -> Result<PlanValue> {
2638        match value {
2639            PlanValue::Null => Ok(PlanValue::Null),
2640            PlanValue::Integer(v) => match &column.data_type {
2641                DataType::Int64 => Ok(PlanValue::Integer(v)),
2642                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
2643                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
2644                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2645                DataType::Date32 => {
2646                    let casted = i32::try_from(v).map_err(|_| {
2647                        Error::InvalidArgumentError(format!(
2648                            "integer literal out of range for DATE column '{}'",
2649                            column.name
2650                        ))
2651                    })?;
2652                    Ok(PlanValue::Integer(casted as i64))
2653                }
2654                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2655                    "cannot assign integer to STRUCT column '{}'",
2656                    column.name
2657                ))),
2658                _ => Ok(PlanValue::Integer(v)),
2659            },
2660            PlanValue::Float(v) => match &column.data_type {
2661                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
2662                DataType::Float64 => Ok(PlanValue::Float(v)),
2663                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
2664                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2665                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
2666                    "cannot assign floating-point value to DATE column '{}'",
2667                    column.name
2668                ))),
2669                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2670                    "cannot assign floating-point value to STRUCT column '{}'",
2671                    column.name
2672                ))),
2673                _ => Ok(PlanValue::Float(v)),
2674            },
2675            PlanValue::String(s) => match &column.data_type {
2676                DataType::Boolean => {
2677                    let normalized = s.trim().to_ascii_lowercase();
2678                    match normalized.as_str() {
2679                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
2680                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
2681                        _ => Err(Error::InvalidArgumentError(format!(
2682                            "cannot assign string '{}' to BOOLEAN column '{}'",
2683                            s, column.name
2684                        ))),
2685                    }
2686                }
2687                DataType::Utf8 => Ok(PlanValue::String(s)),
2688                DataType::Date32 => {
2689                    let days = parse_date32_literal(&s)?;
2690                    Ok(PlanValue::Integer(days as i64))
2691                }
2692                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
2693                    "cannot assign string '{}' to numeric column '{}'",
2694                    s, column.name
2695                ))),
2696                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2697                    "cannot assign string to STRUCT column '{}'",
2698                    column.name
2699                ))),
2700                _ => Ok(PlanValue::String(s)),
2701            },
2702            PlanValue::Struct(map) => match &column.data_type {
2703                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
2704                _ => Err(Error::InvalidArgumentError(format!(
2705                    "cannot assign struct value to column '{}'",
2706                    column.name
2707                ))),
2708            },
2709        }
2710    }
2711
2712    // TODO: Make streamable; don't buffer all values in memory at once
2713    fn scan_column_values(
2714        &self,
2715        table: &ExecutorTable<P>,
2716        field_id: FieldId,
2717        snapshot: TransactionSnapshot,
2718    ) -> Result<Vec<PlanValue>> {
2719        let table_id = table.table.table_id();
2720        use llkv_expr::{Expr, Filter, Operator};
2721        use std::ops::Bound;
2722
2723        // Create a filter that matches all rows (unbounded range)
2724        let match_all_filter = Filter {
2725            field_id,
2726            op: Operator::Range {
2727                lower: Bound::Unbounded,
2728                upper: Bound::Unbounded,
2729            },
2730        };
2731        let filter_expr = Expr::Pred(match_all_filter);
2732
2733        // Get all matching row_ids first
2734        let row_ids = match table.table.filter_row_ids(&filter_expr) {
2735            Ok(ids) => ids,
2736            Err(Error::NotFound) => return Ok(Vec::new()),
2737            Err(e) => return Err(e),
2738        };
2739
2740        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
2741        let row_ids = filter_row_ids_for_snapshot(
2742            table.table.as_ref(),
2743            row_ids,
2744            &self.txn_manager,
2745            snapshot,
2746        )?;
2747
2748        if row_ids.is_empty() {
2749            return Ok(Vec::new());
2750        }
2751
2752        // Gather the column values for visible rows
2753        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2754        let row_count = row_ids.len();
2755        let mut stream = match table.table.stream_columns(
2756            vec![logical_field_id],
2757            row_ids,
2758            GatherNullPolicy::IncludeNulls,
2759        ) {
2760            Ok(stream) => stream,
2761            Err(Error::NotFound) => return Ok(Vec::new()),
2762            Err(e) => return Err(e),
2763        };
2764
2765        // TODO: Don't buffer all values; make this streamable
2766        let mut values = Vec::with_capacity(row_count);
2767        while let Some(chunk) = stream.next_batch()? {
2768            let batch = chunk.batch();
2769            if batch.num_columns() == 0 {
2770                continue;
2771            }
2772            let array = batch.column(0);
2773            for row_idx in 0..batch.num_rows() {
2774                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2775                    values.push(value);
2776                }
2777            }
2778        }
2779
2780        Ok(values)
2781    }
2782
2783    // TODO: Make streamable; don't buffer all values in memory at once
2784    fn scan_multi_column_values(
2785        &self,
2786        table: &ExecutorTable<P>,
2787        field_ids: &[FieldId],
2788        snapshot: TransactionSnapshot,
2789    ) -> Result<Vec<Vec<PlanValue>>> {
2790        if field_ids.is_empty() {
2791            return Ok(Vec::new());
2792        }
2793
2794        let table_id = table.table.table_id();
2795        use llkv_expr::{Expr, Filter, Operator};
2796        use std::ops::Bound;
2797
2798        let match_all_filter = Filter {
2799            field_id: field_ids[0],
2800            op: Operator::Range {
2801                lower: Bound::Unbounded,
2802                upper: Bound::Unbounded,
2803            },
2804        };
2805        let filter_expr = Expr::Pred(match_all_filter);
2806
2807        let row_ids = match table.table.filter_row_ids(&filter_expr) {
2808            Ok(ids) => ids,
2809            Err(Error::NotFound) => return Ok(Vec::new()),
2810            Err(e) => return Err(e),
2811        };
2812
2813        let row_ids = filter_row_ids_for_snapshot(
2814            table.table.as_ref(),
2815            row_ids,
2816            &self.txn_manager,
2817            snapshot,
2818        )?;
2819
2820        if row_ids.is_empty() {
2821            return Ok(Vec::new());
2822        }
2823
2824        let logical_field_ids: Vec<_> = field_ids
2825            .iter()
2826            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2827            .collect();
2828
2829        let total_rows = row_ids.len();
2830        let mut stream = match table.table.stream_columns(
2831            logical_field_ids,
2832            row_ids,
2833            GatherNullPolicy::IncludeNulls,
2834        ) {
2835            Ok(stream) => stream,
2836            Err(Error::NotFound) => return Ok(Vec::new()),
2837            Err(e) => return Err(e),
2838        };
2839
2840        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
2841        while let Some(chunk) = stream.next_batch()? {
2842            let batch = chunk.batch();
2843            if batch.num_columns() == 0 {
2844                continue;
2845            }
2846
2847            let base = chunk.row_offset();
2848            let local_len = batch.num_rows();
2849            for col_idx in 0..batch.num_columns() {
2850                let array = batch.column(col_idx);
2851                for local_idx in 0..local_len {
2852                    let target_index = base + local_idx;
2853                    debug_assert!(
2854                        target_index < rows.len(),
2855                        "stream chunk produced out-of-bounds row index"
2856                    );
2857                    if let Some(row) = rows.get_mut(target_index) {
2858                        match llkv_plan::plan_value_from_array(array, local_idx) {
2859                            Ok(value) => row.push(value),
2860                            Err(_) => row.push(PlanValue::Null),
2861                        }
2862                    }
2863                }
2864            }
2865        }
2866
2867        Ok(rows)
2868    }
2869
2870    fn collect_row_values_for_ids(
2871        &self,
2872        table: &ExecutorTable<P>,
2873        row_ids: &[RowId],
2874        field_ids: &[FieldId],
2875    ) -> Result<Vec<Vec<PlanValue>>> {
2876        if row_ids.is_empty() || field_ids.is_empty() {
2877            return Ok(Vec::new());
2878        }
2879
2880        let table_id = table.table.table_id();
2881        let logical_field_ids: Vec<LogicalFieldId> = field_ids
2882            .iter()
2883            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2884            .collect();
2885
2886        let mut stream = match table.table.stream_columns(
2887            logical_field_ids.clone(),
2888            row_ids.to_vec(),
2889            GatherNullPolicy::IncludeNulls,
2890        ) {
2891            Ok(stream) => stream,
2892            Err(Error::NotFound) => return Ok(Vec::new()),
2893            Err(e) => return Err(e),
2894        };
2895
2896        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
2897        while let Some(chunk) = stream.next_batch()? {
2898            let batch = chunk.batch();
2899            let base = chunk.row_offset();
2900            let local_len = batch.num_rows();
2901            for col_idx in 0..batch.num_columns() {
2902                let array = batch.column(col_idx);
2903                for local_idx in 0..local_len {
2904                    let target_index = base + local_idx;
2905                    if let Some(row) = rows.get_mut(target_index) {
2906                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2907                        row.push(value);
2908                    }
2909                }
2910            }
2911        }
2912
2913        Ok(rows)
2914    }
2915
2916    fn collect_visible_child_rows(
2917        &self,
2918        table: &ExecutorTable<P>,
2919        field_ids: &[FieldId],
2920        snapshot: TransactionSnapshot,
2921    ) -> Result<Vec<(RowId, Vec<PlanValue>)>> {
2922        if field_ids.is_empty() {
2923            return Ok(Vec::new());
2924        }
2925
2926        let anchor_field = field_ids[0];
2927        let filter_expr = full_table_scan_filter(anchor_field);
2928        let raw_row_ids = match table.table.filter_row_ids(&filter_expr) {
2929            Ok(ids) => ids,
2930            Err(Error::NotFound) => return Ok(Vec::new()),
2931            Err(e) => return Err(e),
2932        };
2933
2934        let visible_row_ids = filter_row_ids_for_snapshot(
2935            table.table.as_ref(),
2936            raw_row_ids,
2937            &self.txn_manager,
2938            snapshot,
2939        )?;
2940
2941        if visible_row_ids.is_empty() {
2942            return Ok(Vec::new());
2943        }
2944
2945        let table_id = table.table.table_id();
2946        let logical_field_ids: Vec<LogicalFieldId> = field_ids
2947            .iter()
2948            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2949            .collect();
2950
2951        let mut stream = match table.table.stream_columns(
2952            logical_field_ids.clone(),
2953            visible_row_ids.clone(),
2954            GatherNullPolicy::IncludeNulls,
2955        ) {
2956            Ok(stream) => stream,
2957            Err(Error::NotFound) => return Ok(Vec::new()),
2958            Err(e) => return Err(e),
2959        };
2960
2961        let mut rows = vec![Vec::with_capacity(field_ids.len()); visible_row_ids.len()];
2962        while let Some(chunk) = stream.next_batch()? {
2963            let batch = chunk.batch();
2964            let base = chunk.row_offset();
2965            let local_len = batch.num_rows();
2966            for col_idx in 0..batch.num_columns() {
2967                let array = batch.column(col_idx);
2968                for local_idx in 0..local_len {
2969                    let target_index = base + local_idx;
2970                    if let Some(row) = rows.get_mut(target_index) {
2971                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2972                        row.push(value);
2973                    }
2974                }
2975            }
2976        }
2977
2978        Ok(visible_row_ids.into_iter().zip(rows).collect())
2979    }
2980
2981    fn build_table_constraint_context(
2982        &self,
2983        table: &ExecutorTable<P>,
2984    ) -> Result<TableConstraintContext> {
2985        let schema_field_ids: Vec<FieldId> = table
2986            .schema
2987            .columns
2988            .iter()
2989            .map(|column| column.field_id)
2990            .collect();
2991
2992        let column_constraints: Vec<InsertColumnConstraint> = table
2993            .schema
2994            .columns
2995            .iter()
2996            .enumerate()
2997            .map(|(idx, column)| InsertColumnConstraint {
2998                schema_index: idx,
2999                column: ConstraintColumnInfo {
3000                    name: column.name.clone(),
3001                    field_id: column.field_id,
3002                    data_type: column.data_type.clone(),
3003                    nullable: column.nullable,
3004                    check_expr: column.check_expr.clone(),
3005                },
3006            })
3007            .collect();
3008
3009        let unique_columns: Vec<InsertUniqueColumn> = table
3010            .schema
3011            .columns
3012            .iter()
3013            .enumerate()
3014            .filter(|(_, column)| column.unique && !column.primary_key)
3015            .map(|(idx, column)| InsertUniqueColumn {
3016                schema_index: idx,
3017                field_id: column.field_id,
3018                name: column.name.clone(),
3019            })
3020            .collect();
3021
3022        let mut multi_column_uniques: Vec<InsertMultiColumnUnique> = Vec::new();
3023        for constraint in table.multi_column_uniques() {
3024            if constraint.column_indices.is_empty() {
3025                continue;
3026            }
3027
3028            let mut schema_indices = Vec::with_capacity(constraint.column_indices.len());
3029            let mut field_ids = Vec::with_capacity(constraint.column_indices.len());
3030            let mut column_names = Vec::with_capacity(constraint.column_indices.len());
3031            for &col_idx in &constraint.column_indices {
3032                let column = table.schema.columns.get(col_idx).ok_or_else(|| {
3033                    Error::Internal(format!(
3034                        "multi-column UNIQUE constraint references invalid column index {}",
3035                        col_idx
3036                    ))
3037                })?;
3038                schema_indices.push(col_idx);
3039                field_ids.push(column.field_id);
3040                column_names.push(column.name.clone());
3041            }
3042
3043            multi_column_uniques.push(InsertMultiColumnUnique {
3044                schema_indices,
3045                field_ids,
3046                column_names,
3047            });
3048        }
3049
3050        let primary_indices: Vec<usize> = table
3051            .schema
3052            .columns
3053            .iter()
3054            .enumerate()
3055            .filter(|(_, column)| column.primary_key)
3056            .map(|(idx, _)| idx)
3057            .collect();
3058
3059        let primary_key = if primary_indices.is_empty() {
3060            None
3061        } else {
3062            let mut field_ids = Vec::with_capacity(primary_indices.len());
3063            let mut column_names = Vec::with_capacity(primary_indices.len());
3064            for &idx in &primary_indices {
3065                let column = table.schema.columns.get(idx).ok_or_else(|| {
3066                    Error::Internal(format!(
3067                        "primary key references invalid column index {}",
3068                        idx
3069                    ))
3070                })?;
3071                field_ids.push(column.field_id);
3072                column_names.push(column.name.clone());
3073            }
3074
3075            Some(InsertMultiColumnUnique {
3076                schema_indices: primary_indices.clone(),
3077                field_ids,
3078                column_names,
3079            })
3080        };
3081
3082        Ok(TableConstraintContext {
3083            schema_field_ids,
3084            column_constraints,
3085            unique_columns,
3086            multi_column_uniques,
3087            primary_key,
3088        })
3089    }
3090
3091    fn insert_rows(
3092        &self,
3093        table: &ExecutorTable<P>,
3094        display_name: String,
3095        canonical_name: String,
3096        mut rows: Vec<Vec<PlanValue>>,
3097        columns: Vec<String>,
3098        snapshot: TransactionSnapshot,
3099    ) -> Result<RuntimeStatementResult<P>> {
3100        if rows.is_empty() {
3101            return Err(Error::InvalidArgumentError(
3102                "INSERT requires at least one row".into(),
3103            ));
3104        }
3105
3106        let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
3107        let expected_len = column_order.len();
3108        for row in &rows {
3109            if row.len() != expected_len {
3110                return Err(Error::InvalidArgumentError(format!(
3111                    "expected {} values in INSERT row, found {}",
3112                    expected_len,
3113                    row.len()
3114                )));
3115            }
3116        }
3117
3118        for row in rows.iter_mut() {
3119            for (position, value) in row.iter_mut().enumerate() {
3120                let schema_index = column_order
3121                    .get(position)
3122                    .copied()
3123                    .ok_or_else(|| Error::Internal("invalid INSERT column index mapping".into()))?;
3124                let column = table.schema.columns.get(schema_index).ok_or_else(|| {
3125                    Error::Internal(format!(
3126                        "INSERT column index {} out of bounds for table '{}'",
3127                        schema_index, display_name
3128                    ))
3129                })?;
3130                let normalized = normalize_insert_value_for_column(column, value.clone())?;
3131                *value = normalized;
3132            }
3133        }
3134
3135        let constraint_ctx = self.build_table_constraint_context(table)?;
3136        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3137
3138        if display_name == "keys" {
3139            tracing::trace!(
3140                "[KEYS] Validating constraints for {} row(s) before insert",
3141                rows.len()
3142            );
3143            for (i, row) in rows.iter().enumerate() {
3144                tracing::trace!("[KEYS]   row[{}]: {:?}", i, row);
3145            }
3146        }
3147
3148        self.constraint_service.validate_insert_constraints(
3149            &constraint_ctx.schema_field_ids,
3150            &constraint_ctx.column_constraints,
3151            &constraint_ctx.unique_columns,
3152            &constraint_ctx.multi_column_uniques,
3153            primary_key_spec,
3154            &column_order,
3155            &rows,
3156            |field_id| self.scan_column_values(table, field_id, snapshot),
3157            |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3158        )?;
3159
3160        self.check_foreign_keys_on_insert(table, &display_name, &rows, &column_order, snapshot)?;
3161
3162        let row_count = rows.len();
3163        let mut column_values: Vec<Vec<PlanValue>> =
3164            vec![Vec::with_capacity(row_count); table.schema.columns.len()];
3165        for row in rows {
3166            for (idx, value) in row.into_iter().enumerate() {
3167                let dest_index = column_order[idx];
3168                column_values[dest_index].push(value);
3169            }
3170        }
3171
3172        let start_row = table.next_row_id.load(Ordering::SeqCst);
3173
3174        // Build MVCC columns using helper
3175        let (row_id_array, created_by_array, deleted_by_array) =
3176            mvcc::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id, TXN_ID_NONE);
3177
3178        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
3179        arrays.push(row_id_array);
3180        arrays.push(created_by_array);
3181        arrays.push(deleted_by_array);
3182
3183        let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
3184        fields.extend(mvcc::build_mvcc_fields());
3185
3186        for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
3187            let array = build_array_for_column(&column.data_type, &values)?;
3188            let field = mvcc::build_field_with_metadata(
3189                &column.name,
3190                column.data_type.clone(),
3191                column.nullable,
3192                column.field_id,
3193            );
3194            arrays.push(array);
3195            fields.push(field);
3196        }
3197
3198        let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
3199        tracing::trace!(
3200            table_name = %display_name,
3201            store_ptr = ?std::ptr::addr_of!(*table.table.store()),
3202            "About to call table.append"
3203        );
3204        table.table.append(&batch)?;
3205        table
3206            .next_row_id
3207            .store(start_row + row_count as u64, Ordering::SeqCst);
3208        table
3209            .total_rows
3210            .fetch_add(row_count as u64, Ordering::SeqCst);
3211
3212        self.record_table_with_new_rows(snapshot.txn_id, canonical_name);
3213
3214        Ok(RuntimeStatementResult::Insert {
3215            table_name: display_name,
3216            rows_inserted: row_count,
3217        })
3218    }
3219
3220    fn insert_batches(
3221        &self,
3222        table: &ExecutorTable<P>,
3223        display_name: String,
3224        canonical_name: String,
3225        batches: Vec<RecordBatch>,
3226        columns: Vec<String>,
3227        snapshot: TransactionSnapshot,
3228    ) -> Result<RuntimeStatementResult<P>> {
3229        if batches.is_empty() {
3230            return Ok(RuntimeStatementResult::Insert {
3231                table_name: display_name,
3232                rows_inserted: 0,
3233            });
3234        }
3235
3236        let expected_len = if columns.is_empty() {
3237            table.schema.columns.len()
3238        } else {
3239            columns.len()
3240        };
3241        let mut total_rows_inserted = 0usize;
3242
3243        for batch in batches {
3244            if batch.num_columns() != expected_len {
3245                return Err(Error::InvalidArgumentError(format!(
3246                    "expected {} columns in INSERT batch, found {}",
3247                    expected_len,
3248                    batch.num_columns()
3249                )));
3250            }
3251            let row_count = batch.num_rows();
3252            if row_count == 0 {
3253                continue;
3254            }
3255            let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
3256            for row_idx in 0..row_count {
3257                let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
3258                for col_idx in 0..expected_len {
3259                    let array = batch.column(col_idx);
3260                    row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
3261                }
3262                rows.push(row);
3263            }
3264
3265            match self.insert_rows(
3266                table,
3267                display_name.clone(),
3268                canonical_name.clone(),
3269                rows,
3270                columns.clone(),
3271                snapshot,
3272            )? {
3273                RuntimeStatementResult::Insert { rows_inserted, .. } => {
3274                    total_rows_inserted += rows_inserted;
3275                }
3276                _ => unreachable!("insert_rows must return Insert result"),
3277            }
3278        }
3279
3280        Ok(RuntimeStatementResult::Insert {
3281            table_name: display_name,
3282            rows_inserted: total_rows_inserted,
3283        })
3284    }
3285
3286    fn update_filtered_rows(
3287        &self,
3288        table: &ExecutorTable<P>,
3289        display_name: String,
3290        canonical_name: String,
3291        assignments: Vec<ColumnAssignment>,
3292        filter: LlkvExpr<'static, String>,
3293        snapshot: TransactionSnapshot,
3294    ) -> Result<RuntimeStatementResult<P>> {
3295        if assignments.is_empty() {
3296            return Err(Error::InvalidArgumentError(
3297                "UPDATE requires at least one assignment".into(),
3298            ));
3299        }
3300
3301        let schema = table.schema.as_ref();
3302        let filter_expr = translate_predicate(filter, schema)?;
3303
3304        // TODO: Dedupe
3305        enum PreparedValue {
3306            Literal(PlanValue),
3307            Expression { expr_index: usize },
3308        }
3309
3310        let mut seen_columns: FxHashSet<String> =
3311            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3312        let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
3313            Vec::with_capacity(assignments.len());
3314        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3315
3316        for assignment in assignments {
3317            let normalized = assignment.column.to_ascii_lowercase();
3318            if !seen_columns.insert(normalized.clone()) {
3319                return Err(Error::InvalidArgumentError(format!(
3320                    "duplicate column '{}' in UPDATE assignments",
3321                    assignment.column
3322                )));
3323            }
3324            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3325                Error::InvalidArgumentError(format!(
3326                    "unknown column '{}' in UPDATE",
3327                    assignment.column
3328                ))
3329            })?;
3330
3331            match assignment.value {
3332                AssignmentValue::Literal(value) => {
3333                    prepared.push((column.clone(), PreparedValue::Literal(value)));
3334                }
3335                AssignmentValue::Expression(expr) => {
3336                    let translated = translate_scalar(&expr, schema)?;
3337                    let expr_index = scalar_exprs.len();
3338                    scalar_exprs.push(translated);
3339                    prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
3340                }
3341            }
3342        }
3343
3344        let (row_ids, mut expr_values) =
3345            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3346
3347        if row_ids.is_empty() {
3348            return Ok(RuntimeStatementResult::Update {
3349                table_name: display_name,
3350                rows_updated: 0,
3351            });
3352        }
3353
3354        let row_count = row_ids.len();
3355        let table_id = table.table.table_id();
3356        let logical_fields: Vec<LogicalFieldId> = table
3357            .schema
3358            .columns
3359            .iter()
3360            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3361            .collect();
3362
3363        let mut stream = table.table.stream_columns(
3364            logical_fields.clone(),
3365            row_ids.clone(),
3366            GatherNullPolicy::IncludeNulls,
3367        )?;
3368
3369        let mut new_rows: Vec<Vec<PlanValue>> =
3370            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3371        while let Some(chunk) = stream.next_batch()? {
3372            let batch = chunk.batch();
3373            let base = chunk.row_offset();
3374            let local_len = batch.num_rows();
3375            for col_idx in 0..batch.num_columns() {
3376                let array = batch.column(col_idx);
3377                for local_idx in 0..local_len {
3378                    let target_index = base + local_idx;
3379                    debug_assert!(
3380                        target_index < new_rows.len(),
3381                        "column stream produced out-of-range row index"
3382                    );
3383                    if let Some(row) = new_rows.get_mut(target_index) {
3384                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3385                        row.push(value);
3386                    }
3387                }
3388            }
3389        }
3390        debug_assert!(
3391            new_rows
3392                .iter()
3393                .all(|row| row.len() == table.schema.columns.len())
3394        );
3395
3396        tracing::trace!(
3397            table = %display_name,
3398            row_count,
3399            rows = ?new_rows,
3400            "update_filtered_rows captured source rows"
3401        );
3402
3403        let constraint_ctx = self.build_table_constraint_context(table)?;
3404        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3405        let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3406        if let Some(pk) = primary_key_spec {
3407            original_primary_key_keys.reserve(row_count);
3408            for row in &new_rows {
3409                let mut values = Vec::with_capacity(pk.schema_indices.len());
3410                for &idx in &pk.schema_indices {
3411                    let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3412                    values.push(value);
3413                }
3414                let key = build_composite_unique_key(&values, &pk.column_names)?;
3415                original_primary_key_keys.push(key);
3416            }
3417        }
3418
3419        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3420            table
3421                .schema
3422                .columns
3423                .iter()
3424                .enumerate()
3425                .map(|(idx, column)| (column.field_id, idx)),
3426        );
3427
3428        for (column, value) in prepared {
3429            let column_index =
3430                column_positions
3431                    .get(&column.field_id)
3432                    .copied()
3433                    .ok_or_else(|| {
3434                        Error::InvalidArgumentError(format!(
3435                            "column '{}' missing in table schema during UPDATE",
3436                            column.name
3437                        ))
3438                    })?;
3439
3440            let values = match value {
3441                PreparedValue::Literal(lit) => vec![lit; row_count],
3442                PreparedValue::Expression { expr_index } => {
3443                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3444                        Error::InvalidArgumentError(
3445                            "expression assignment value missing during UPDATE".into(),
3446                        )
3447                    })?;
3448                    if column_values.len() != row_count {
3449                        return Err(Error::InvalidArgumentError(
3450                            "expression result count did not match targeted row count".into(),
3451                        ));
3452                    }
3453                    mem::take(column_values)
3454                }
3455            };
3456
3457            for (row_idx, new_value) in values.into_iter().enumerate() {
3458                if let Some(row) = new_rows.get_mut(row_idx) {
3459                    let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3460                    row[column_index] = coerced;
3461                }
3462            }
3463        }
3464
3465        let column_names: Vec<String> = table
3466            .schema
3467            .columns
3468            .iter()
3469            .map(|column| column.name.clone())
3470            .collect();
3471        let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3472        self.constraint_service.validate_row_level_constraints(
3473            &constraint_ctx.schema_field_ids,
3474            &constraint_ctx.column_constraints,
3475            &column_order,
3476            &new_rows,
3477        )?;
3478
3479        if let Some(pk) = primary_key_spec {
3480            self.constraint_service.validate_update_primary_keys(
3481                &constraint_ctx.schema_field_ids,
3482                pk,
3483                &column_order,
3484                &new_rows,
3485                &original_primary_key_keys,
3486                |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3487            )?;
3488        }
3489
3490        let _ = self.apply_delete(
3491            table,
3492            display_name.clone(),
3493            canonical_name.clone(),
3494            row_ids.clone(),
3495            snapshot,
3496            false,
3497        )?;
3498
3499        let _ = self.insert_rows(
3500            table,
3501            display_name.clone(),
3502            canonical_name,
3503            new_rows,
3504            column_names,
3505            snapshot,
3506        )?;
3507
3508        Ok(RuntimeStatementResult::Update {
3509            table_name: display_name,
3510            rows_updated: row_count,
3511        })
3512    }
3513
3514    fn update_all_rows(
3515        &self,
3516        table: &ExecutorTable<P>,
3517        display_name: String,
3518        canonical_name: String,
3519        assignments: Vec<ColumnAssignment>,
3520        snapshot: TransactionSnapshot,
3521    ) -> Result<RuntimeStatementResult<P>> {
3522        if assignments.is_empty() {
3523            return Err(Error::InvalidArgumentError(
3524                "UPDATE requires at least one assignment".into(),
3525            ));
3526        }
3527
3528        let total_rows = table.total_rows.load(Ordering::SeqCst);
3529        let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
3530            Error::InvalidArgumentError("table row count exceeds supported range".into())
3531        })?;
3532        if total_rows_usize == 0 {
3533            return Ok(RuntimeStatementResult::Update {
3534                table_name: display_name,
3535                rows_updated: 0,
3536            });
3537        }
3538
3539        let schema = table.schema.as_ref();
3540
3541        // TODO: Dedupe
3542        enum PreparedValue {
3543            Literal(PlanValue),
3544            Expression { expr_index: usize },
3545        }
3546
3547        let mut seen_columns: FxHashSet<String> =
3548            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3549        let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
3550            Vec::with_capacity(assignments.len());
3551        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3552        let mut first_field_id: Option<FieldId> = None;
3553
3554        for assignment in assignments {
3555            let normalized = assignment.column.to_ascii_lowercase();
3556            if !seen_columns.insert(normalized.clone()) {
3557                return Err(Error::InvalidArgumentError(format!(
3558                    "duplicate column '{}' in UPDATE assignments",
3559                    assignment.column
3560                )));
3561            }
3562            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3563                Error::InvalidArgumentError(format!(
3564                    "unknown column '{}' in UPDATE",
3565                    assignment.column
3566                ))
3567            })?;
3568            if first_field_id.is_none() {
3569                first_field_id = Some(column.field_id);
3570            }
3571
3572            match assignment.value {
3573                AssignmentValue::Literal(value) => {
3574                    prepared.push((column.clone(), PreparedValue::Literal(value)));
3575                }
3576                AssignmentValue::Expression(expr) => {
3577                    let translated = translate_scalar(&expr, schema)?;
3578                    let expr_index = scalar_exprs.len();
3579                    scalar_exprs.push(translated);
3580                    prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
3581                }
3582            }
3583        }
3584
3585        let anchor_field = first_field_id.ok_or_else(|| {
3586            Error::InvalidArgumentError("UPDATE requires at least one target column".into())
3587        })?;
3588
3589        let filter_expr = full_table_scan_filter(anchor_field);
3590        let (row_ids, mut expr_values) =
3591            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3592
3593        if row_ids.is_empty() {
3594            return Ok(RuntimeStatementResult::Update {
3595                table_name: display_name,
3596                rows_updated: 0,
3597            });
3598        }
3599
3600        let row_count = row_ids.len();
3601        let table_id = table.table.table_id();
3602        let logical_fields: Vec<LogicalFieldId> = table
3603            .schema
3604            .columns
3605            .iter()
3606            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3607            .collect();
3608
3609        let mut stream = table.table.stream_columns(
3610            logical_fields.clone(),
3611            row_ids.clone(),
3612            GatherNullPolicy::IncludeNulls,
3613        )?;
3614
3615        let mut new_rows: Vec<Vec<PlanValue>> =
3616            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3617        while let Some(chunk) = stream.next_batch()? {
3618            let batch = chunk.batch();
3619            let base = chunk.row_offset();
3620            let local_len = batch.num_rows();
3621            for col_idx in 0..batch.num_columns() {
3622                let array = batch.column(col_idx);
3623                for local_idx in 0..local_len {
3624                    let target_index = base + local_idx;
3625                    debug_assert!(
3626                        target_index < new_rows.len(),
3627                        "column stream produced out-of-range row index"
3628                    );
3629                    if let Some(row) = new_rows.get_mut(target_index) {
3630                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3631                        row.push(value);
3632                    }
3633                }
3634            }
3635        }
3636        debug_assert!(
3637            new_rows
3638                .iter()
3639                .all(|row| row.len() == table.schema.columns.len())
3640        );
3641
3642        let constraint_ctx = self.build_table_constraint_context(table)?;
3643        let primary_key_spec = constraint_ctx.primary_key.as_ref();
3644        let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3645        if let Some(pk) = primary_key_spec {
3646            original_primary_key_keys.reserve(row_count);
3647            for row in &new_rows {
3648                let mut values = Vec::with_capacity(pk.schema_indices.len());
3649                for &idx in &pk.schema_indices {
3650                    let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3651                    values.push(value);
3652                }
3653                let key = build_composite_unique_key(&values, &pk.column_names)?;
3654                original_primary_key_keys.push(key);
3655            }
3656        }
3657
3658        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3659            table
3660                .schema
3661                .columns
3662                .iter()
3663                .enumerate()
3664                .map(|(idx, column)| (column.field_id, idx)),
3665        );
3666
3667        for (column, value) in prepared {
3668            let column_index =
3669                column_positions
3670                    .get(&column.field_id)
3671                    .copied()
3672                    .ok_or_else(|| {
3673                        Error::InvalidArgumentError(format!(
3674                            "column '{}' missing in table schema during UPDATE",
3675                            column.name
3676                        ))
3677                    })?;
3678
3679            let values = match value {
3680                PreparedValue::Literal(lit) => vec![lit; row_count],
3681                PreparedValue::Expression { expr_index } => {
3682                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3683                        Error::InvalidArgumentError(
3684                            "expression assignment value missing during UPDATE".into(),
3685                        )
3686                    })?;
3687                    if column_values.len() != row_count {
3688                        return Err(Error::InvalidArgumentError(
3689                            "expression result count did not match targeted row count".into(),
3690                        ));
3691                    }
3692                    mem::take(column_values)
3693                }
3694            };
3695
3696            for (row_idx, new_value) in values.into_iter().enumerate() {
3697                if let Some(row) = new_rows.get_mut(row_idx) {
3698                    let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3699                    row[column_index] = coerced;
3700                }
3701            }
3702        }
3703
3704        let column_names: Vec<String> = table
3705            .schema
3706            .columns
3707            .iter()
3708            .map(|column| column.name.clone())
3709            .collect();
3710        let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3711        self.constraint_service.validate_row_level_constraints(
3712            &constraint_ctx.schema_field_ids,
3713            &constraint_ctx.column_constraints,
3714            &column_order,
3715            &new_rows,
3716        )?;
3717
3718        if let Some(pk) = primary_key_spec {
3719            self.constraint_service.validate_update_primary_keys(
3720                &constraint_ctx.schema_field_ids,
3721                pk,
3722                &column_order,
3723                &new_rows,
3724                &original_primary_key_keys,
3725                |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3726            )?;
3727        }
3728
3729        let _ = self.apply_delete(
3730            table,
3731            display_name.clone(),
3732            canonical_name.clone(),
3733            row_ids.clone(),
3734            snapshot,
3735            false,
3736        )?;
3737
3738        let _ = self.insert_rows(
3739            table,
3740            display_name.clone(),
3741            canonical_name,
3742            new_rows,
3743            column_names,
3744            snapshot,
3745        )?;
3746
3747        Ok(RuntimeStatementResult::Update {
3748            table_name: display_name,
3749            rows_updated: row_count,
3750        })
3751    }
3752
3753    fn delete_filtered_rows(
3754        &self,
3755        table: &ExecutorTable<P>,
3756        display_name: String,
3757        canonical_name: String,
3758        filter: LlkvExpr<'static, String>,
3759        snapshot: TransactionSnapshot,
3760    ) -> Result<RuntimeStatementResult<P>> {
3761        let schema = table.schema.as_ref();
3762        let filter_expr = translate_predicate(filter, schema)?;
3763        let row_ids = table.table.filter_row_ids(&filter_expr)?;
3764        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3765        tracing::trace!(
3766            table = %display_name,
3767            rows = row_ids.len(),
3768            "delete_filtered_rows collected row ids"
3769        );
3770        self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3771    }
3772
3773    fn delete_all_rows(
3774        &self,
3775        table: &ExecutorTable<P>,
3776        display_name: String,
3777        canonical_name: String,
3778        snapshot: TransactionSnapshot,
3779    ) -> Result<RuntimeStatementResult<P>> {
3780        let total_rows = table.total_rows.load(Ordering::SeqCst);
3781        if total_rows == 0 {
3782            return Ok(RuntimeStatementResult::Delete {
3783                table_name: display_name,
3784                rows_deleted: 0,
3785            });
3786        }
3787
3788        let anchor_field = table.schema.first_field_id().ok_or_else(|| {
3789            Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
3790        })?;
3791        let filter_expr = full_table_scan_filter(anchor_field);
3792        let row_ids = table.table.filter_row_ids(&filter_expr)?;
3793        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3794        self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3795    }
3796
3797    fn apply_delete(
3798        &self,
3799        table: &ExecutorTable<P>,
3800        display_name: String,
3801        canonical_name: String,
3802        row_ids: Vec<RowId>,
3803        snapshot: TransactionSnapshot,
3804        enforce_foreign_keys: bool,
3805    ) -> Result<RuntimeStatementResult<P>> {
3806        if row_ids.is_empty() {
3807            return Ok(RuntimeStatementResult::Delete {
3808                table_name: display_name,
3809                rows_deleted: 0,
3810            });
3811        }
3812
3813        if enforce_foreign_keys {
3814            self.check_foreign_keys_on_delete(
3815                table,
3816                &display_name,
3817                &canonical_name,
3818                &row_ids,
3819                snapshot,
3820            )?;
3821        }
3822
3823        self.detect_delete_conflicts(table, &display_name, &row_ids, snapshot)?;
3824
3825        let removed = row_ids.len();
3826
3827        // Build DELETE batch using helper
3828        let batch = mvcc::build_delete_batch(row_ids.clone(), snapshot.txn_id)?;
3829        table.table.append(&batch)?;
3830
3831        let removed_u64 = u64::try_from(removed)
3832            .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
3833        table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
3834
3835        Ok(RuntimeStatementResult::Delete {
3836            table_name: display_name,
3837            rows_deleted: removed,
3838        })
3839    }
3840
3841    fn check_foreign_keys_on_delete(
3842        &self,
3843        table: &ExecutorTable<P>,
3844        _display_name: &str,
3845        _canonical_name: &str,
3846        row_ids: &[RowId],
3847        snapshot: TransactionSnapshot,
3848    ) -> Result<()> {
3849        if row_ids.is_empty() {
3850            return Ok(());
3851        }
3852
3853        self.constraint_service.validate_delete_foreign_keys(
3854            table.table.table_id(),
3855            row_ids,
3856            |request| {
3857                self.collect_row_values_for_ids(
3858                    table,
3859                    request.referenced_row_ids,
3860                    request.referenced_field_ids,
3861                )
3862            },
3863            |request| {
3864                let child_table = self.lookup_table(request.referencing_table_canonical)?;
3865                self.collect_visible_child_rows(
3866                    child_table.as_ref(),
3867                    request.referencing_field_ids,
3868                    snapshot,
3869                )
3870            },
3871        )
3872    }
3873
3874    fn check_foreign_keys_on_insert(
3875        &self,
3876        table: &ExecutorTable<P>,
3877        _display_name: &str,
3878        rows: &[Vec<PlanValue>],
3879        column_order: &[usize],
3880        snapshot: TransactionSnapshot,
3881    ) -> Result<()> {
3882        if rows.is_empty() {
3883            return Ok(());
3884        }
3885
3886        let schema_field_ids: Vec<FieldId> = table
3887            .schema
3888            .columns
3889            .iter()
3890            .map(|column| column.field_id)
3891            .collect();
3892
3893        self.constraint_service.validate_insert_foreign_keys(
3894            table.table.table_id(),
3895            &schema_field_ids,
3896            column_order,
3897            rows,
3898            |request| {
3899                let parent_table = self.lookup_table(request.referenced_table_canonical)?;
3900                self.scan_multi_column_values(
3901                    parent_table.as_ref(),
3902                    request.referenced_field_ids,
3903                    snapshot,
3904                )
3905            },
3906        )
3907    }
3908
3909    fn detect_delete_conflicts(
3910        &self,
3911        table: &ExecutorTable<P>,
3912        display_name: &str,
3913        row_ids: &[RowId],
3914        snapshot: TransactionSnapshot,
3915    ) -> Result<()> {
3916        if row_ids.is_empty() {
3917            return Ok(());
3918        }
3919
3920        let table_id = table.table.table_id();
3921        let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3922        let logical_fields: Arc<[LogicalFieldId]> = Arc::from([deleted_lfid]);
3923
3924        if let Err(err) = table
3925            .table
3926            .store()
3927            .prepare_gather_context(logical_fields.as_ref())
3928        {
3929            match err {
3930                Error::NotFound => return Ok(()),
3931                other => return Err(other),
3932            }
3933        }
3934
3935        let mut stream = table.table.stream_columns(
3936            Arc::clone(&logical_fields),
3937            row_ids.to_vec(),
3938            GatherNullPolicy::IncludeNulls,
3939        )?;
3940
3941        while let Some(chunk) = stream.next_batch()? {
3942            let batch = chunk.batch();
3943            let window = chunk.row_ids();
3944            let deleted_column = batch
3945                .column(0)
3946                .as_any()
3947                .downcast_ref::<UInt64Array>()
3948                .ok_or_else(|| {
3949                    Error::Internal(
3950                        "failed to read MVCC deleted_by column for conflict detection".into(),
3951                    )
3952                })?;
3953
3954            for (idx, row_id) in window.iter().enumerate() {
3955                let deleted_by = if deleted_column.is_null(idx) {
3956                    TXN_ID_NONE
3957                } else {
3958                    deleted_column.value(idx)
3959                };
3960
3961                if deleted_by == TXN_ID_NONE || deleted_by == snapshot.txn_id {
3962                    continue;
3963                }
3964
3965                let status = self.txn_manager.status(deleted_by);
3966                if !status.is_active() {
3967                    continue;
3968                }
3969
3970                tracing::debug!(
3971                    "[MVCC] delete conflict: table='{}' row_id={} deleted_by={} status={:?} current_txn={}",
3972                    display_name,
3973                    row_id,
3974                    deleted_by,
3975                    status,
3976                    snapshot.txn_id
3977                );
3978
3979                return Err(Error::TransactionContextError(format!(
3980                    "transaction conflict on table '{}' for row {}: row locked by transaction {} ({:?})",
3981                    display_name, row_id, deleted_by, status
3982                )));
3983            }
3984        }
3985
3986        Ok(())
3987    }
3988
3989    fn collect_update_rows(
3990        &self,
3991        table: &ExecutorTable<P>,
3992        filter_expr: &LlkvExpr<'static, FieldId>,
3993        expressions: &[ScalarExpr<FieldId>],
3994        snapshot: TransactionSnapshot,
3995    ) -> Result<(Vec<RowId>, Vec<Vec<PlanValue>>)> {
3996        let row_ids = table.table.filter_row_ids(filter_expr)?;
3997        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3998        if row_ids.is_empty() {
3999            return Ok((row_ids, vec![Vec::new(); expressions.len()]));
4000        }
4001
4002        if expressions.is_empty() {
4003            return Ok((row_ids, Vec::new()));
4004        }
4005
4006        let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
4007        for (idx, expr) in expressions.iter().enumerate() {
4008            let alias = format!("__expr_{idx}");
4009            projections.push(ScanProjection::computed(expr.clone(), alias));
4010        }
4011
4012        let mut expr_values: Vec<Vec<PlanValue>> =
4013            vec![Vec::with_capacity(row_ids.len()); expressions.len()];
4014        let mut error: Option<Error> = None;
4015        let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
4016            Arc::clone(&self.txn_manager),
4017            snapshot,
4018        ));
4019        let options = ScanStreamOptions {
4020            include_nulls: true,
4021            order: None,
4022            row_id_filter: Some(row_filter),
4023        };
4024
4025        table
4026            .table
4027            .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
4028                if error.is_some() {
4029                    return;
4030                }
4031                if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
4032                    error = Some(err);
4033                }
4034            })?;
4035
4036        if let Some(err) = error {
4037            return Err(err);
4038        }
4039
4040        for values in &expr_values {
4041            if values.len() != row_ids.len() {
4042                return Err(Error::InvalidArgumentError(
4043                    "expression result count did not match targeted row count".into(),
4044                ));
4045            }
4046        }
4047
4048        Ok((row_ids, expr_values))
4049    }
4050
4051    fn collect_expression_values(
4052        expr_values: &mut [Vec<PlanValue>],
4053        batch: RecordBatch,
4054    ) -> Result<()> {
4055        for row_idx in 0..batch.num_rows() {
4056            for (expr_index, values) in expr_values.iter_mut().enumerate() {
4057                let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
4058                values.push(value);
4059            }
4060        }
4061
4062        Ok(())
4063    }
4064
4065    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4066        // Fast path: check if table is already loaded
4067        {
4068            let tables = self.tables.read().unwrap();
4069            if let Some(table) = tables.get(canonical_name) {
4070                // Check if table has been dropped
4071                if self.dropped_tables.read().unwrap().contains(canonical_name) {
4072                    // Table was dropped - treat as not found
4073                    return Err(Error::NotFound);
4074                }
4075                tracing::trace!(
4076                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
4077                    canonical_name,
4078                    table.table.table_id(),
4079                    table.schema.columns.len(),
4080                    &*self.pager
4081                );
4082                return Ok(Arc::clone(table));
4083            }
4084        } // Release read lock
4085
4086        // Slow path: load table from catalog (happens once per table)
4087        tracing::debug!(
4088            "[LAZY_LOAD] Loading table '{}' from catalog",
4089            canonical_name
4090        );
4091
4092        // Check catalog first for table existence
4093        let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
4094            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4095        })?;
4096
4097        let table_id = catalog_table_id;
4098        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
4099        let store = table.store();
4100        let mut logical_fields = store.user_field_ids_for_table(table_id);
4101        logical_fields.sort_by_key(|lfid| lfid.field_id());
4102        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
4103        let summary = self
4104            .catalog_service
4105            .table_constraint_summary(canonical_name)?;
4106        let TableConstraintSummaryView {
4107            table_meta,
4108            column_metas,
4109            constraint_records,
4110            multi_column_uniques,
4111        } = summary;
4112        let _table_meta = table_meta.ok_or_else(|| {
4113            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4114        })?;
4115        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
4116        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
4117        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
4118        let mut has_primary_key_records = false;
4119        let mut has_single_unique_records = false;
4120
4121        for record in constraint_records
4122            .iter()
4123            .filter(|record| record.is_active())
4124        {
4125            match &record.kind {
4126                ConstraintKind::PrimaryKey(pk) => {
4127                    has_primary_key_records = true;
4128                    for field_id in &pk.field_ids {
4129                        metadata_primary_keys.insert(*field_id);
4130                        metadata_unique_fields.insert(*field_id);
4131                    }
4132                }
4133                ConstraintKind::Unique(unique) => {
4134                    if unique.field_ids.len() == 1 {
4135                        has_single_unique_records = true;
4136                        metadata_unique_fields.insert(unique.field_ids[0]);
4137                    }
4138                }
4139                _ => {}
4140            }
4141        }
4142
4143        // Build ExecutorSchema from metadata manager snapshots
4144        let mut executor_columns = Vec::new();
4145        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
4146
4147        for (idx, lfid) in logical_fields.iter().enumerate() {
4148            let field_id = lfid.field_id();
4149            let normalized_index = executor_columns.len();
4150
4151            let column_name = column_metas
4152                .get(idx)
4153                .and_then(|meta| meta.as_ref())
4154                .and_then(|meta| meta.name.clone())
4155                .unwrap_or_else(|| format!("col_{}", field_id));
4156
4157            let normalized = column_name.to_ascii_lowercase();
4158            lookup.insert(normalized, normalized_index);
4159
4160            let fallback_constraints: FieldConstraints = catalog_field_resolver
4161                .as_ref()
4162                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
4163                .unwrap_or_default();
4164
4165            let metadata_primary = metadata_primary_keys.contains(&field_id);
4166            let primary_key = if has_primary_key_records {
4167                metadata_primary
4168            } else {
4169                fallback_constraints.primary_key
4170            };
4171
4172            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
4173            let unique = if has_primary_key_records || has_single_unique_records {
4174                metadata_unique
4175            } else {
4176                fallback_constraints.primary_key || fallback_constraints.unique
4177            };
4178
4179            let data_type = store.data_type(*lfid)?;
4180            let nullable = !primary_key;
4181
4182            executor_columns.push(ExecutorColumn {
4183                name: column_name,
4184                data_type,
4185                nullable,
4186                primary_key,
4187                unique,
4188                field_id,
4189                check_expr: fallback_constraints.check_expr.clone(),
4190            });
4191        }
4192
4193        let exec_schema = Arc::new(ExecutorSchema {
4194            columns: executor_columns,
4195            lookup,
4196        });
4197
4198        // Find the maximum row_id in the table to set next_row_id correctly
4199        let max_row_id = {
4200            use arrow::array::UInt64Array;
4201            use llkv_column_map::store::rowid_fid;
4202            use llkv_column_map::store::scan::{
4203                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
4204                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
4205            };
4206
4207            struct MaxRowIdVisitor {
4208                max: RowId,
4209            }
4210
4211            impl PrimitiveVisitor for MaxRowIdVisitor {
4212                fn u64_chunk(&mut self, values: &UInt64Array) {
4213                    for i in 0..values.len() {
4214                        let val = values.value(i);
4215                        if val > self.max {
4216                            self.max = val;
4217                        }
4218                    }
4219                }
4220            }
4221
4222            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
4223            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
4224            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
4225
4226            // Scan the row_id column for any user field in this table
4227            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
4228            let mut visitor = MaxRowIdVisitor { max: 0 };
4229
4230            match ScanBuilder::new(table.store(), row_id_field)
4231                .options(ScanOptions::default())
4232                .run(&mut visitor)
4233            {
4234                Ok(_) => visitor.max,
4235                Err(llkv_result::Error::NotFound) => 0,
4236                Err(e) => {
4237                    tracing::warn!(
4238                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
4239                        canonical_name,
4240                        e
4241                    );
4242                    0
4243                }
4244            }
4245        };
4246
4247        let next_row_id = if max_row_id > 0 {
4248            max_row_id.saturating_add(1)
4249        } else {
4250            0
4251        };
4252
4253        // Get the actual persisted row count from table metadata
4254        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
4255        // Fallback to 0 for truly empty tables
4256        let total_rows = table.total_rows().unwrap_or(0);
4257
4258        let executor_table = Arc::new(ExecutorTable {
4259            table: Arc::new(table),
4260            schema: exec_schema,
4261            next_row_id: AtomicU64::new(next_row_id),
4262            total_rows: AtomicU64::new(total_rows),
4263            multi_column_uniques: RwLock::new(Vec::new()),
4264        });
4265
4266        if !multi_column_uniques.is_empty() {
4267            let executor_uniques =
4268                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
4269            executor_table.set_multi_column_uniques(executor_uniques);
4270        }
4271
4272        // Cache the loaded table
4273        {
4274            let mut tables = self.tables.write().unwrap();
4275            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
4276        }
4277
4278        // Register fields in catalog (may already be registered from RuntimeContext::new())
4279        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
4280            for col in &executor_table.schema.columns {
4281                let definition = FieldDefinition::new(&col.name)
4282                    .with_primary_key(col.primary_key)
4283                    .with_unique(col.unique)
4284                    .with_check_expr(col.check_expr.clone());
4285                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
4286            }
4287            tracing::debug!(
4288                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
4289                executor_table.schema.columns.len(),
4290                canonical_name
4291            );
4292        }
4293
4294        tracing::debug!(
4295            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
4296            canonical_name,
4297            table_id,
4298            field_ids.len(),
4299            next_row_id
4300        );
4301
4302        Ok(executor_table)
4303    }
4304
4305    fn remove_table_entry(&self, canonical_name: &str) {
4306        let mut tables = self.tables.write().unwrap();
4307        if tables.remove(canonical_name).is_some() {
4308            tracing::trace!(
4309                "remove_table_entry: removed table '{}' from context cache",
4310                canonical_name
4311            );
4312        }
4313    }
4314
4315    pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
4316        let (display_name, canonical_name) = canonical_table_name(name)?;
4317        let (table_id, column_field_ids) = {
4318            let tables = self.tables.read().unwrap();
4319            let Some(entry) = tables.get(&canonical_name) else {
4320                if if_exists {
4321                    return Ok(());
4322                } else {
4323                    return Err(Error::CatalogError(format!(
4324                        "Catalog Error: Table '{}' does not exist",
4325                        display_name
4326                    )));
4327                }
4328            };
4329
4330            let field_ids = entry
4331                .schema
4332                .columns
4333                .iter()
4334                .map(|col| col.field_id)
4335                .collect::<Vec<_>>();
4336            (entry.table.table_id(), field_ids)
4337        };
4338
4339        let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
4340
4341        for detail in referencing {
4342            if detail.referencing_table_canonical == canonical_name {
4343                continue;
4344            }
4345
4346            if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
4347                continue;
4348            }
4349
4350            let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
4351            return Err(Error::ConstraintError(format!(
4352                "Cannot drop table '{}' because it is referenced by foreign key constraint '{}' on table '{}'",
4353                display_name, constraint_label, detail.referencing_table_display
4354            )));
4355        }
4356
4357        self.catalog_service
4358            .drop_table(&canonical_name, table_id, &column_field_ids)?;
4359        tracing::debug!(
4360            "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
4361            canonical_name,
4362            table_id
4363        );
4364
4365        self.dropped_tables
4366            .write()
4367            .unwrap()
4368            .insert(canonical_name.clone());
4369        Ok(())
4370    }
4371
4372    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
4373        self.dropped_tables.read().unwrap().contains(canonical_name)
4374    }
4375}
4376
4377// Implement TransactionContext for ContextWrapper to enable llkv-transaction integration
4378impl<P> TransactionContext for RuntimeContextWrapper<P>
4379where
4380    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4381{
4382    type Pager = P;
4383
4384    fn set_snapshot(&self, snapshot: TransactionSnapshot) {
4385        self.update_snapshot(snapshot);
4386    }
4387
4388    fn snapshot(&self) -> TransactionSnapshot {
4389        self.current_snapshot()
4390    }
4391
4392    fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
4393        RuntimeContext::table_column_specs(self.context(), table_name)
4394    }
4395
4396    fn export_table_rows(
4397        &self,
4398        table_name: &str,
4399    ) -> llkv_result::Result<llkv_transaction::RowBatch> {
4400        let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
4401        // Convert from llkv_executor::RowBatch to llkv_transaction::RowBatch
4402        Ok(llkv_transaction::RowBatch {
4403            columns: batch.columns,
4404            rows: batch.rows,
4405        })
4406    }
4407
4408    fn get_batches_with_row_ids(
4409        &self,
4410        table_name: &str,
4411        filter: Option<LlkvExpr<'static, String>>,
4412    ) -> llkv_result::Result<Vec<RecordBatch>> {
4413        RuntimeContext::get_batches_with_row_ids_with_snapshot(
4414            self.context(),
4415            table_name,
4416            filter,
4417            self.snapshot(),
4418        )
4419    }
4420
4421    fn execute_select(
4422        &self,
4423        plan: SelectPlan,
4424    ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
4425        RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
4426    }
4427
4428    fn create_table_plan(
4429        &self,
4430        plan: CreateTablePlan,
4431    ) -> llkv_result::Result<TransactionResult<P>> {
4432        let result = RuntimeContext::create_table_plan(self.context(), plan)?;
4433        Ok(convert_statement_result(result))
4434    }
4435
4436    fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
4437        tracing::trace!(
4438            "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
4439            plan.table,
4440            &*self.ctx.pager
4441        );
4442        let snapshot = self.current_snapshot();
4443        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4444            self.ctx().insert(plan)?
4445        } else {
4446            RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
4447        };
4448        Ok(convert_statement_result(result))
4449    }
4450
4451    fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
4452        let snapshot = self.current_snapshot();
4453        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4454            self.ctx().update(plan)?
4455        } else {
4456            RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
4457        };
4458        Ok(convert_statement_result(result))
4459    }
4460
4461    fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
4462        let snapshot = self.current_snapshot();
4463        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4464            self.ctx().delete(plan)?
4465        } else {
4466            RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
4467        };
4468        Ok(convert_statement_result(result))
4469    }
4470
4471    fn create_index(&self, plan: CreateIndexPlan) -> llkv_result::Result<TransactionResult<P>> {
4472        let result = RuntimeContext::create_index(self.context(), plan)?;
4473        Ok(convert_statement_result(result))
4474    }
4475
4476    fn append_batches_with_row_ids(
4477        &self,
4478        table_name: &str,
4479        batches: Vec<RecordBatch>,
4480    ) -> llkv_result::Result<usize> {
4481        RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
4482    }
4483
4484    fn table_names(&self) -> Vec<String> {
4485        RuntimeContext::table_names(self.context())
4486    }
4487
4488    fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
4489        // Check CURRENT state: if table is marked as dropped, return error
4490        // This is used by conflict detection to detect if a table was dropped
4491        let ctx = self.context();
4492        if ctx.is_table_marked_dropped(table_name) {
4493            return Err(Error::InvalidArgumentError(format!(
4494                "table '{}' has been dropped",
4495                table_name
4496            )));
4497        }
4498
4499        let table = ctx.lookup_table(table_name)?;
4500        Ok(table.table.table_id())
4501    }
4502
4503    fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
4504        let ctx = self.context();
4505        ctx.catalog.snapshot()
4506    }
4507
4508    fn validate_commit_constraints(&self, txn_id: TxnId) -> llkv_result::Result<()> {
4509        self.ctx.validate_primary_keys_for_commit(txn_id)
4510    }
4511
4512    fn clear_transaction_state(&self, txn_id: TxnId) {
4513        self.ctx.clear_transaction_state(txn_id);
4514    }
4515}
4516
4517// Helper to convert StatementResult between types
4518fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
4519where
4520    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4521{
4522    use llkv_transaction::TransactionResult as TxResult;
4523    match result {
4524        RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
4525        RuntimeStatementResult::CreateIndex {
4526            table_name,
4527            index_name,
4528        } => TxResult::CreateIndex {
4529            table_name,
4530            index_name,
4531        },
4532        RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
4533        RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
4534            rows_matched: rows_updated,
4535            rows_updated,
4536        },
4537        RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
4538        RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
4539        _ => panic!("unsupported StatementResult conversion"),
4540    }
4541}
4542
4543fn filter_row_ids_for_snapshot<P>(
4544    table: &Table<P>,
4545    row_ids: Vec<RowId>,
4546    txn_manager: &TxnIdManager,
4547    snapshot: TransactionSnapshot,
4548) -> Result<Vec<RowId>>
4549where
4550    P: Pager<Blob = EntryHandle> + Send + Sync,
4551{
4552    tracing::debug!(
4553        "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
4554        row_ids.len(),
4555        snapshot.txn_id,
4556        snapshot.snapshot_id
4557    );
4558
4559    if row_ids.is_empty() {
4560        return Ok(row_ids);
4561    }
4562
4563    let table_id = table.table_id();
4564    let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
4565    let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
4566    let logical_fields: Arc<[LogicalFieldId]> = Arc::from([created_lfid, deleted_lfid]);
4567
4568    if let Err(err) = table
4569        .store()
4570        .prepare_gather_context(logical_fields.as_ref())
4571    {
4572        match err {
4573            Error::NotFound => {
4574                tracing::trace!(
4575                    "[FILTER_ROWS] MVCC columns not found for table_id={}, treating all rows as visible",
4576                    table_id
4577                );
4578                return Ok(row_ids);
4579            }
4580            other => {
4581                tracing::error!(
4582                    "[FILTER_ROWS] Failed to prepare gather context: {:?}",
4583                    other
4584                );
4585                return Err(other);
4586            }
4587        }
4588    }
4589
4590    let total_rows = row_ids.len();
4591    let mut stream = match table.stream_columns(
4592        Arc::clone(&logical_fields),
4593        row_ids,
4594        GatherNullPolicy::IncludeNulls,
4595    ) {
4596        Ok(stream) => stream,
4597        Err(err) => {
4598            tracing::error!("[FILTER_ROWS] stream_columns error: {:?}", err);
4599            return Err(err);
4600        }
4601    };
4602
4603    let mut visible = Vec::with_capacity(total_rows);
4604
4605    while let Some(chunk) = stream.next_batch()? {
4606        let batch = chunk.batch();
4607        let window = chunk.row_ids();
4608
4609        if batch.num_columns() < 2 {
4610            tracing::debug!(
4611                "[FILTER_ROWS] version_batch has < 2 columns for table_id={}, returning window rows unfiltered",
4612                table_id
4613            );
4614            visible.extend_from_slice(window);
4615            continue;
4616        }
4617
4618        let created_column = batch.column(0).as_any().downcast_ref::<UInt64Array>();
4619        let deleted_column = batch.column(1).as_any().downcast_ref::<UInt64Array>();
4620
4621        if created_column.is_none() || deleted_column.is_none() {
4622            tracing::debug!(
4623                "[FILTER_ROWS] Failed to downcast MVCC columns for table_id={}, returning window rows unfiltered",
4624                table_id
4625            );
4626            visible.extend_from_slice(window);
4627            continue;
4628        }
4629
4630        let created_column = created_column.unwrap();
4631        let deleted_column = deleted_column.unwrap();
4632
4633        for (idx, row_id) in window.iter().enumerate() {
4634            let created_by = if created_column.is_null(idx) {
4635                TXN_ID_AUTO_COMMIT
4636            } else {
4637                created_column.value(idx)
4638            };
4639            let deleted_by = if deleted_column.is_null(idx) {
4640                TXN_ID_NONE
4641            } else {
4642                deleted_column.value(idx)
4643            };
4644
4645            let version = RowVersion {
4646                created_by,
4647                deleted_by,
4648            };
4649            let is_visible = version.is_visible_for(txn_manager, snapshot);
4650            tracing::trace!(
4651                "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
4652                row_id,
4653                created_by,
4654                deleted_by,
4655                is_visible
4656            );
4657            if is_visible {
4658                visible.push(*row_id);
4659            }
4660        }
4661    }
4662
4663    tracing::debug!(
4664        "[FILTER_ROWS] Filtered from {} to {} visible rows",
4665        total_rows,
4666        visible.len()
4667    );
4668    Ok(visible)
4669}
4670
4671struct MvccRowIdFilter<P>
4672where
4673    P: Pager<Blob = EntryHandle> + Send + Sync,
4674{
4675    txn_manager: Arc<TxnIdManager>,
4676    snapshot: TransactionSnapshot,
4677    _marker: PhantomData<fn(P)>,
4678}
4679
4680impl<P> MvccRowIdFilter<P>
4681where
4682    P: Pager<Blob = EntryHandle> + Send + Sync,
4683{
4684    fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
4685        Self {
4686            txn_manager,
4687            snapshot,
4688            _marker: PhantomData,
4689        }
4690    }
4691}
4692
4693impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
4694where
4695    P: Pager<Blob = EntryHandle> + Send + Sync,
4696{
4697    fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> Result<Vec<RowId>> {
4698        tracing::trace!(
4699            "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
4700            row_ids,
4701            self.snapshot.txn_id,
4702            self.snapshot.snapshot_id
4703        );
4704        let result = filter_row_ids_for_snapshot(table, row_ids, &self.txn_manager, self.snapshot);
4705        if let Ok(ref visible) = result {
4706            tracing::trace!(
4707                "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
4708                visible
4709            );
4710        }
4711        result
4712    }
4713}
4714
4715// Wrapper to implement TableProvider for Context
4716struct ContextProvider<P>
4717where
4718    P: Pager<Blob = EntryHandle> + Send + Sync,
4719{
4720    context: Arc<RuntimeContext<P>>,
4721}
4722
4723impl<P> TableProvider<P> for ContextProvider<P>
4724where
4725    P: Pager<Blob = EntryHandle> + Send + Sync,
4726{
4727    fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4728        self.context.lookup_table(canonical_name)
4729    }
4730}
4731
4732/// Lazily built logical plan (thin wrapper over SelectPlan).
4733pub struct RuntimeLazyFrame<P>
4734where
4735    P: Pager<Blob = EntryHandle> + Send + Sync,
4736{
4737    context: Arc<RuntimeContext<P>>,
4738    plan: SelectPlan,
4739}
4740
4741impl<P> RuntimeLazyFrame<P>
4742where
4743    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4744{
4745    pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
4746        let (display, canonical) = canonical_table_name(table)?;
4747        context.lookup_table(&canonical)?;
4748        Ok(Self {
4749            context,
4750            plan: SelectPlan::new(display),
4751        })
4752    }
4753
4754    pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
4755        self.plan.filter = Some(predicate);
4756        self
4757    }
4758
4759    pub fn select_all(mut self) -> Self {
4760        self.plan.projections = vec![SelectProjection::AllColumns];
4761        self
4762    }
4763
4764    pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
4765    where
4766        S: AsRef<str>,
4767    {
4768        self.plan.projections = columns
4769            .into_iter()
4770            .map(|name| SelectProjection::Column {
4771                name: name.as_ref().to_string(),
4772                alias: None,
4773            })
4774            .collect();
4775        self
4776    }
4777
4778    pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
4779        self.plan.projections = projections;
4780        self
4781    }
4782
4783    pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
4784        self.plan.aggregates = aggregates;
4785        self
4786    }
4787
4788    pub fn collect(self) -> Result<SelectExecution<P>> {
4789        self.context.execute_select(self.plan)
4790    }
4791
4792    pub fn collect_rows(self) -> Result<RowBatch> {
4793        let execution = self.context.execute_select(self.plan)?;
4794        execution.collect_rows()
4795    }
4796
4797    pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
4798        Ok(self.collect_rows()?.rows)
4799    }
4800}
4801
4802fn current_time_micros() -> u64 {
4803    SystemTime::now()
4804        .duration_since(UNIX_EPOCH)
4805        .unwrap_or_default()
4806        .as_micros() as u64
4807}
4808
4809pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
4810    if columns.is_empty() {
4811        return Ok((0..schema.columns.len()).collect());
4812    }
4813    let mut resolved = Vec::with_capacity(columns.len());
4814    for column in columns {
4815        let normalized = column.to_ascii_lowercase();
4816        let index = schema.lookup.get(&normalized).ok_or_else(|| {
4817            Error::InvalidArgumentError(format!(
4818                "Binder Error: does not have a column named '{}'",
4819                column
4820            ))
4821        })?;
4822        resolved.push(*index);
4823    }
4824    Ok(resolved)
4825}
4826
4827fn normalize_insert_value_for_column(
4828    column: &ExecutorColumn,
4829    value: PlanValue,
4830) -> Result<PlanValue> {
4831    match (&column.data_type, value) {
4832        (_, PlanValue::Null) => Ok(PlanValue::Null),
4833        (DataType::Int64, PlanValue::Integer(v)) => Ok(PlanValue::Integer(v)),
4834        (DataType::Int64, PlanValue::Float(v)) => Ok(PlanValue::Integer(v as i64)),
4835        (DataType::Int64, other) => Err(Error::InvalidArgumentError(format!(
4836            "cannot insert {other:?} into INT column '{}'",
4837            column.name
4838        ))),
4839        (DataType::Boolean, PlanValue::Integer(v)) => {
4840            Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 }))
4841        }
4842        (DataType::Boolean, PlanValue::Float(v)) => {
4843            Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 }))
4844        }
4845        (DataType::Boolean, PlanValue::String(s)) => {
4846            let normalized = s.trim().to_ascii_lowercase();
4847            let value = match normalized.as_str() {
4848                "true" | "t" | "1" => 1,
4849                "false" | "f" | "0" => 0,
4850                _ => {
4851                    return Err(Error::InvalidArgumentError(format!(
4852                        "cannot insert string '{}' into BOOLEAN column '{}'",
4853                        s, column.name
4854                    )));
4855                }
4856            };
4857            Ok(PlanValue::Integer(value))
4858        }
4859        (DataType::Boolean, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4860            "cannot insert struct into BOOLEAN column '{}'",
4861            column.name
4862        ))),
4863        (DataType::Float64, PlanValue::Integer(v)) => Ok(PlanValue::Float(v as f64)),
4864        (DataType::Float64, PlanValue::Float(v)) => Ok(PlanValue::Float(v)),
4865        (DataType::Float64, other) => Err(Error::InvalidArgumentError(format!(
4866            "cannot insert {other:?} into DOUBLE column '{}'",
4867            column.name
4868        ))),
4869        (DataType::Utf8, PlanValue::Integer(v)) => Ok(PlanValue::String(v.to_string())),
4870        (DataType::Utf8, PlanValue::Float(v)) => Ok(PlanValue::String(v.to_string())),
4871        (DataType::Utf8, PlanValue::String(s)) => Ok(PlanValue::String(s)),
4872        (DataType::Utf8, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4873            "cannot insert struct into STRING column '{}'",
4874            column.name
4875        ))),
4876        (DataType::Date32, PlanValue::Integer(days)) => {
4877            let casted = i32::try_from(days).map_err(|_| {
4878                Error::InvalidArgumentError(format!(
4879                    "integer literal out of range for DATE column '{}'",
4880                    column.name
4881                ))
4882            })?;
4883            Ok(PlanValue::Integer(casted as i64))
4884        }
4885        (DataType::Date32, PlanValue::String(text)) => {
4886            let days = parse_date32_literal(&text)?;
4887            Ok(PlanValue::Integer(days as i64))
4888        }
4889        (DataType::Date32, other) => Err(Error::InvalidArgumentError(format!(
4890            "cannot insert {other:?} into DATE column '{}'",
4891            column.name
4892        ))),
4893        (DataType::Struct(_), PlanValue::Struct(map)) => Ok(PlanValue::Struct(map)),
4894        (DataType::Struct(_), other) => Err(Error::InvalidArgumentError(format!(
4895            "expected struct value for struct column '{}', got {other:?}",
4896            column.name
4897        ))),
4898        (other_type, other_value) => Err(Error::InvalidArgumentError(format!(
4899            "unsupported Arrow data type {:?} for INSERT value {:?} in column '{}'",
4900            other_type, other_value, column.name
4901        ))),
4902    }
4903}
4904
4905pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
4906    match dtype {
4907        DataType::Int64 => {
4908            let mut builder = Int64Builder::with_capacity(values.len());
4909            for value in values {
4910                match value {
4911                    PlanValue::Null => builder.append_null(),
4912                    PlanValue::Integer(v) => builder.append_value(*v),
4913                    PlanValue::Float(v) => builder.append_value(*v as i64),
4914                    PlanValue::String(_) | PlanValue::Struct(_) => {
4915                        return Err(Error::InvalidArgumentError(
4916                            "cannot insert non-integer into INT column".into(),
4917                        ));
4918                    }
4919                }
4920            }
4921            Ok(Arc::new(builder.finish()))
4922        }
4923        DataType::Boolean => {
4924            let mut builder = BooleanBuilder::with_capacity(values.len());
4925            for value in values {
4926                match value {
4927                    PlanValue::Null => builder.append_null(),
4928                    PlanValue::Integer(v) => builder.append_value(*v != 0),
4929                    PlanValue::Float(v) => builder.append_value(*v != 0.0),
4930                    PlanValue::String(s) => {
4931                        let normalized = s.trim().to_ascii_lowercase();
4932                        match normalized.as_str() {
4933                            "true" | "t" | "1" => builder.append_value(true),
4934                            "false" | "f" | "0" => builder.append_value(false),
4935                            _ => {
4936                                return Err(Error::InvalidArgumentError(format!(
4937                                    "cannot insert string '{}' into BOOLEAN column",
4938                                    s
4939                                )));
4940                            }
4941                        }
4942                    }
4943                    PlanValue::Struct(_) => {
4944                        return Err(Error::InvalidArgumentError(
4945                            "cannot insert struct into BOOLEAN column".into(),
4946                        ));
4947                    }
4948                }
4949            }
4950            Ok(Arc::new(builder.finish()))
4951        }
4952        DataType::Float64 => {
4953            let mut builder = Float64Builder::with_capacity(values.len());
4954            for value in values {
4955                match value {
4956                    PlanValue::Null => builder.append_null(),
4957                    PlanValue::Integer(v) => builder.append_value(*v as f64),
4958                    PlanValue::Float(v) => builder.append_value(*v),
4959                    PlanValue::String(_) | PlanValue::Struct(_) => {
4960                        return Err(Error::InvalidArgumentError(
4961                            "cannot insert non-numeric into DOUBLE column".into(),
4962                        ));
4963                    }
4964                }
4965            }
4966            Ok(Arc::new(builder.finish()))
4967        }
4968        DataType::Utf8 => {
4969            let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
4970            for value in values {
4971                match value {
4972                    PlanValue::Null => builder.append_null(),
4973                    PlanValue::Integer(v) => builder.append_value(v.to_string()),
4974                    PlanValue::Float(v) => builder.append_value(v.to_string()),
4975                    PlanValue::String(s) => builder.append_value(s),
4976                    PlanValue::Struct(_) => {
4977                        return Err(Error::InvalidArgumentError(
4978                            "cannot insert struct into STRING column".into(),
4979                        ));
4980                    }
4981                }
4982            }
4983            Ok(Arc::new(builder.finish()))
4984        }
4985        DataType::Date32 => {
4986            let mut builder = Date32Builder::with_capacity(values.len());
4987            for value in values {
4988                match value {
4989                    PlanValue::Null => builder.append_null(),
4990                    PlanValue::Integer(days) => {
4991                        let casted = i32::try_from(*days).map_err(|_| {
4992                            Error::InvalidArgumentError(
4993                                "integer literal out of range for DATE column".into(),
4994                            )
4995                        })?;
4996                        builder.append_value(casted);
4997                    }
4998                    PlanValue::Float(_) | PlanValue::Struct(_) => {
4999                        return Err(Error::InvalidArgumentError(
5000                            "cannot insert non-date value into DATE column".into(),
5001                        ));
5002                    }
5003                    PlanValue::String(text) => {
5004                        let days = parse_date32_literal(text)?;
5005                        builder.append_value(days);
5006                    }
5007                }
5008            }
5009            Ok(Arc::new(builder.finish()))
5010        }
5011        DataType::Struct(fields) => {
5012            use arrow::array::StructArray;
5013            let mut field_arrays: Vec<(FieldRef, ArrayRef)> = Vec::with_capacity(fields.len());
5014
5015            for field in fields.iter() {
5016                let field_name = field.name();
5017                let field_type = field.data_type();
5018                let mut field_values = Vec::with_capacity(values.len());
5019
5020                for value in values {
5021                    match value {
5022                        PlanValue::Null => field_values.push(PlanValue::Null),
5023                        PlanValue::Struct(map) => {
5024                            let field_value =
5025                                map.get(field_name).cloned().unwrap_or(PlanValue::Null);
5026                            field_values.push(field_value);
5027                        }
5028                        _ => {
5029                            return Err(Error::InvalidArgumentError(format!(
5030                                "expected struct value for struct column, got {:?}",
5031                                value
5032                            )));
5033                        }
5034                    }
5035                }
5036
5037                let field_array = build_array_for_column(field_type, &field_values)?;
5038                field_arrays.push((Arc::clone(field), field_array));
5039            }
5040
5041            Ok(Arc::new(StructArray::from(field_arrays)))
5042        }
5043        other => Err(Error::InvalidArgumentError(format!(
5044            "unsupported Arrow data type for INSERT: {other:?}"
5045        ))),
5046    }
5047}
5048
5049fn parse_date32_literal(text: &str) -> Result<i32> {
5050    let mut parts = text.split('-');
5051    let year_str = parts
5052        .next()
5053        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5054    let month_str = parts
5055        .next()
5056        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5057    let day_str = parts
5058        .next()
5059        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5060    if parts.next().is_some() {
5061        return Err(Error::InvalidArgumentError(format!(
5062            "invalid DATE literal '{text}'"
5063        )));
5064    }
5065
5066    let year = year_str.parse::<i32>().map_err(|_| {
5067        Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
5068    })?;
5069    let month_num = month_str.parse::<u8>().map_err(|_| {
5070        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5071    })?;
5072    let day = day_str.parse::<u8>().map_err(|_| {
5073        Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
5074    })?;
5075
5076    let month = Month::try_from(month_num).map_err(|_| {
5077        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5078    })?;
5079
5080    let date = Date::from_calendar_date(year, month, day).map_err(|err| {
5081        Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
5082    })?;
5083    let days = date.to_julian_day() - epoch_julian_day();
5084    Ok(days)
5085}
5086
5087fn epoch_julian_day() -> i32 {
5088    Date::from_calendar_date(1970, Month::January, 1)
5089        .expect("1970-01-01 is a valid date")
5090        .to_julian_day()
5091}
5092
5093// Array -> PlanValue conversion is provided by llkv-plan::plan_value_from_array
5094
5095fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
5096    LlkvExpr::Pred(Filter {
5097        field_id,
5098        op: Operator::Range {
5099            lower: Bound::Unbounded,
5100            upper: Bound::Unbounded,
5101        },
5102    })
5103}
5104
5105fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
5106    if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
5107        return Ok(ROW_ID_FIELD_ID);
5108    }
5109
5110    schema
5111        .resolve(name)
5112        .map(|column| column.field_id)
5113        .ok_or_else(|| {
5114            Error::InvalidArgumentError(format!(
5115                "Binder Error: does not have a column named '{name}'"
5116            ))
5117        })
5118}
5119
5120fn translate_predicate(
5121    expr: LlkvExpr<'static, String>,
5122    schema: &ExecutorSchema,
5123) -> Result<LlkvExpr<'static, FieldId>> {
5124    match expr {
5125        LlkvExpr::And(list) => {
5126            let mut converted = Vec::with_capacity(list.len());
5127            for item in list {
5128                converted.push(translate_predicate(item, schema)?);
5129            }
5130            Ok(LlkvExpr::And(converted))
5131        }
5132        LlkvExpr::Or(list) => {
5133            let mut converted = Vec::with_capacity(list.len());
5134            for item in list {
5135                converted.push(translate_predicate(item, schema)?);
5136            }
5137            Ok(LlkvExpr::Or(converted))
5138        }
5139        LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
5140            *inner, schema,
5141        )?))),
5142        LlkvExpr::Pred(Filter { field_id, op }) => {
5143            let resolved = resolve_field_id_from_schema(schema, &field_id)?;
5144            Ok(LlkvExpr::Pred(Filter {
5145                field_id: resolved,
5146                op,
5147            }))
5148        }
5149        LlkvExpr::Compare { left, op, right } => {
5150            let left = translate_scalar(&left, schema)?;
5151            let right = translate_scalar(&right, schema)?;
5152            Ok(LlkvExpr::Compare { left, op, right })
5153        }
5154    }
5155}
5156
5157fn translate_scalar(
5158    expr: &ScalarExpr<String>,
5159    schema: &ExecutorSchema,
5160) -> Result<ScalarExpr<FieldId>> {
5161    match expr {
5162        ScalarExpr::Column(name) => {
5163            let field_id = resolve_field_id_from_schema(schema, name)?;
5164            Ok(ScalarExpr::column(field_id))
5165        }
5166        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
5167        ScalarExpr::Binary { left, op, right } => {
5168            let left_expr = translate_scalar(left, schema)?;
5169            let right_expr = translate_scalar(right, schema)?;
5170            Ok(ScalarExpr::Binary {
5171                left: Box::new(left_expr),
5172                op: *op,
5173                right: Box::new(right_expr),
5174            })
5175        }
5176        ScalarExpr::Aggregate(agg) => {
5177            // Translate column names in aggregate calls to field IDs
5178            use llkv_expr::expr::AggregateCall;
5179            let translated_agg = match agg {
5180                AggregateCall::CountStar => AggregateCall::CountStar,
5181                AggregateCall::Count(name) => {
5182                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5183                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5184                    })?;
5185                    AggregateCall::Count(field_id)
5186                }
5187                AggregateCall::Sum(name) => {
5188                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5189                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5190                    })?;
5191                    AggregateCall::Sum(field_id)
5192                }
5193                AggregateCall::Min(name) => {
5194                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5195                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5196                    })?;
5197                    AggregateCall::Min(field_id)
5198                }
5199                AggregateCall::Max(name) => {
5200                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5201                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5202                    })?;
5203                    AggregateCall::Max(field_id)
5204                }
5205                AggregateCall::CountNulls(name) => {
5206                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5207                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5208                    })?;
5209                    AggregateCall::CountNulls(field_id)
5210                }
5211            };
5212            Ok(ScalarExpr::Aggregate(translated_agg))
5213        }
5214        ScalarExpr::GetField { base, field_name } => {
5215            let base_expr = translate_scalar(base, schema)?;
5216            Ok(ScalarExpr::GetField {
5217                base: Box::new(base_expr),
5218                field_name: field_name.clone(),
5219            })
5220        }
5221    }
5222}
5223
5224fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
5225    match expr {
5226        SqlExpr::Value(value) => plan_value_from_sql_value(value),
5227        SqlExpr::UnaryOp {
5228            op: UnaryOperator::Minus,
5229            expr,
5230        } => match plan_value_from_sql_expr(expr)? {
5231            PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
5232            PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
5233            PlanValue::Null | PlanValue::String(_) | PlanValue::Struct(_) => Err(
5234                Error::InvalidArgumentError("cannot negate non-numeric literal".into()),
5235            ),
5236        },
5237        SqlExpr::UnaryOp {
5238            op: UnaryOperator::Plus,
5239            expr,
5240        } => plan_value_from_sql_expr(expr),
5241        SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
5242        SqlExpr::Dictionary(fields) => {
5243            let mut map = std::collections::HashMap::new();
5244            for field in fields {
5245                let key = field.key.value.clone();
5246                let value = plan_value_from_sql_expr(&field.value)?;
5247                map.insert(key, value);
5248            }
5249            Ok(PlanValue::Struct(map))
5250        }
5251        other => Err(Error::InvalidArgumentError(format!(
5252            "unsupported literal expression: {other:?}"
5253        ))),
5254    }
5255}
5256
5257fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
5258    match &value.value {
5259        Value::Null => Ok(PlanValue::Null),
5260        Value::Number(text, _) => {
5261            if text.contains(['.', 'e', 'E']) {
5262                let parsed = text.parse::<f64>().map_err(|err| {
5263                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5264                })?;
5265                Ok(PlanValue::Float(parsed))
5266            } else {
5267                let parsed = text.parse::<i64>().map_err(|err| {
5268                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5269                })?;
5270                Ok(PlanValue::Integer(parsed))
5271            }
5272        }
5273        Value::Boolean(_) => Err(Error::InvalidArgumentError(
5274            "BOOLEAN literals are not supported yet".into(),
5275        )),
5276        other => {
5277            if let Some(text) = other.clone().into_string() {
5278                Ok(PlanValue::String(text))
5279            } else {
5280                Err(Error::InvalidArgumentError(format!(
5281                    "unsupported literal: {other:?}"
5282                )))
5283            }
5284        }
5285    }
5286}
5287
5288fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5289    matches!(
5290        expr,
5291        GroupByExpr::Expressions(exprs, modifiers)
5292            if exprs.is_empty() && modifiers.is_empty()
5293    )
5294}
5295
5296#[derive(Clone)]
5297pub struct RuntimeRangeSelectRows {
5298    rows: Vec<Vec<PlanValue>>,
5299}
5300
5301impl RuntimeRangeSelectRows {
5302    pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
5303        self.rows
5304    }
5305}
5306
5307#[derive(Clone)]
5308enum RangeProjection {
5309    Column,
5310    Literal(PlanValue),
5311}
5312
5313#[derive(Clone)]
5314pub struct RuntimeRangeSpec {
5315    start: i64,
5316    #[allow(dead_code)] // Used for validation, computed into row_count
5317    end: i64,
5318    row_count: usize,
5319    column_name_lower: String,
5320    table_alias_lower: Option<String>,
5321}
5322
5323impl RuntimeRangeSpec {
5324    fn matches_identifier(&self, ident: &str) -> bool {
5325        let lower = ident.to_ascii_lowercase();
5326        lower == self.column_name_lower || lower == "range"
5327    }
5328
5329    fn matches_table_alias(&self, ident: &str) -> bool {
5330        let lower = ident.to_ascii_lowercase();
5331        match &self.table_alias_lower {
5332            Some(alias) => lower == *alias,
5333            None => lower == "range",
5334        }
5335    }
5336
5337    fn matches_object_name(&self, name: &ObjectName) -> bool {
5338        if name.0.len() != 1 {
5339            return false;
5340        }
5341        match &name.0[0] {
5342            ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
5343            _ => false,
5344        }
5345    }
5346}
5347
5348pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
5349    let spec = match parse_range_spec(select)? {
5350        Some(spec) => spec,
5351        None => return Ok(None),
5352    };
5353
5354    if select.selection.is_some() {
5355        return Err(Error::InvalidArgumentError(
5356            "WHERE clauses are not supported for range() SELECT statements".into(),
5357        ));
5358    }
5359    if select.having.is_some()
5360        || !select.named_window.is_empty()
5361        || select.qualify.is_some()
5362        || select.distinct.is_some()
5363        || select.top.is_some()
5364        || select.into.is_some()
5365        || select.prewhere.is_some()
5366        || !select.lateral_views.is_empty()
5367        || select.value_table_mode.is_some()
5368        || !group_by_is_empty(&select.group_by)
5369    {
5370        return Err(Error::InvalidArgumentError(
5371            "advanced SELECT clauses are not supported for range() SELECT statements".into(),
5372        ));
5373    }
5374
5375    let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
5376
5377    // If projection is empty, treat it as SELECT * (implicit wildcard)
5378    if select.projection.is_empty() {
5379        projections.push(RangeProjection::Column);
5380    } else {
5381        for item in &select.projection {
5382            let projection = match item {
5383                SelectItem::Wildcard(_) => RangeProjection::Column,
5384                SelectItem::QualifiedWildcard(kind, _) => match kind {
5385                    SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
5386                        if spec.matches_object_name(object_name) {
5387                            RangeProjection::Column
5388                        } else {
5389                            return Err(Error::InvalidArgumentError(
5390                                "qualified wildcard must reference the range() source".into(),
5391                            ));
5392                        }
5393                    }
5394                    SelectItemQualifiedWildcardKind::Expr(_) => {
5395                        return Err(Error::InvalidArgumentError(
5396                            "expression-qualified wildcards are not supported for range() SELECT statements".into(),
5397                        ));
5398                    }
5399                },
5400                SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
5401                SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
5402            };
5403            projections.push(projection);
5404        }
5405    }
5406
5407    let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
5408    for idx in 0..spec.row_count {
5409        let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
5410        let value = spec.start + (idx as i64);
5411        for projection in &projections {
5412            match projection {
5413                RangeProjection::Column => row.push(PlanValue::Integer(value)),
5414                RangeProjection::Literal(value) => row.push(value.clone()),
5415            }
5416        }
5417        rows.push(row);
5418    }
5419
5420    Ok(Some(RuntimeRangeSelectRows { rows }))
5421}
5422
5423fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
5424    match expr {
5425        SqlExpr::Identifier(ident) => {
5426            if spec.matches_identifier(&ident.value) {
5427                Ok(RangeProjection::Column)
5428            } else {
5429                Err(Error::InvalidArgumentError(format!(
5430                    "unknown column '{}' in range() SELECT",
5431                    ident.value
5432                )))
5433            }
5434        }
5435        SqlExpr::CompoundIdentifier(parts) => {
5436            if parts.len() == 2
5437                && spec.matches_table_alias(&parts[0].value)
5438                && spec.matches_identifier(&parts[1].value)
5439            {
5440                Ok(RangeProjection::Column)
5441            } else {
5442                Err(Error::InvalidArgumentError(
5443                    "compound identifiers must reference the range() source".into(),
5444                ))
5445            }
5446        }
5447        SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
5448        other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
5449    }
5450}
5451
5452fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
5453    if select.from.len() != 1 {
5454        return Ok(None);
5455    }
5456    let item = &select.from[0];
5457    if !item.joins.is_empty() {
5458        return Err(Error::InvalidArgumentError(
5459            "JOIN clauses are not supported for range() SELECT statements".into(),
5460        ));
5461    }
5462
5463    match &item.relation {
5464        TableFactor::Function {
5465            lateral,
5466            name,
5467            args,
5468            alias,
5469        } => {
5470            if *lateral {
5471                return Err(Error::InvalidArgumentError(
5472                    "LATERAL range() is not supported".into(),
5473                ));
5474            }
5475            parse_range_spec_from_args(name, args, alias)
5476        }
5477        TableFactor::Table {
5478            name,
5479            alias,
5480            args: Some(table_args),
5481            with_ordinality,
5482            ..
5483        } => {
5484            if *with_ordinality {
5485                return Err(Error::InvalidArgumentError(
5486                    "WITH ORDINALITY is not supported for range()".into(),
5487                ));
5488            }
5489            if table_args.settings.is_some() {
5490                return Err(Error::InvalidArgumentError(
5491                    "range() SETTINGS clause is not supported".into(),
5492                ));
5493            }
5494            parse_range_spec_from_args(name, &table_args.args, alias)
5495        }
5496        _ => Ok(None),
5497    }
5498}
5499
5500fn parse_range_spec_from_args(
5501    name: &ObjectName,
5502    args: &[FunctionArg],
5503    alias: &Option<TableAlias>,
5504) -> Result<Option<RuntimeRangeSpec>> {
5505    if name.0.len() != 1 {
5506        return Ok(None);
5507    }
5508    let func_name = match &name.0[0] {
5509        ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
5510        _ => return Ok(None),
5511    };
5512    if func_name != "range" {
5513        return Ok(None);
5514    }
5515
5516    if args.is_empty() || args.len() > 2 {
5517        return Err(Error::InvalidArgumentError(
5518            "range() requires one or two arguments".into(),
5519        ));
5520    }
5521
5522    // Helper to extract integer from argument
5523    let extract_int = |arg: &FunctionArg| -> Result<i64> {
5524        let arg_expr = match arg {
5525            FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5526            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
5527            | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
5528                return Err(Error::InvalidArgumentError(
5529                    "range() argument must be an integer literal".into(),
5530                ));
5531            }
5532            FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
5533                return Err(Error::InvalidArgumentError(
5534                    "named arguments are not supported for range()".into(),
5535                ));
5536            }
5537        };
5538
5539        let value = plan_value_from_sql_expr(arg_expr)?;
5540        match value {
5541            PlanValue::Integer(v) => Ok(v),
5542            _ => Err(Error::InvalidArgumentError(
5543                "range() argument must be an integer literal".into(),
5544            )),
5545        }
5546    };
5547
5548    let (start, end, row_count) = if args.len() == 1 {
5549        // range(count) - generate [0, count)
5550        let count = extract_int(&args[0])?;
5551        if count < 0 {
5552            return Err(Error::InvalidArgumentError(
5553                "range() argument must be non-negative".into(),
5554            ));
5555        }
5556        (0, count, count as usize)
5557    } else {
5558        // range(start, end) - generate [start, end)
5559        let start = extract_int(&args[0])?;
5560        let end = extract_int(&args[1])?;
5561        if end < start {
5562            return Err(Error::InvalidArgumentError(
5563                "range() end must be >= start".into(),
5564            ));
5565        }
5566        let row_count = (end - start) as usize;
5567        (start, end, row_count)
5568    };
5569
5570    let column_name_lower = alias
5571        .as_ref()
5572        .and_then(|a| {
5573            a.columns
5574                .first()
5575                .map(|col| col.name.value.to_ascii_lowercase())
5576        })
5577        .unwrap_or_else(|| "range".to_string());
5578    let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
5579
5580    Ok(Some(RuntimeRangeSpec {
5581        start,
5582        end,
5583        row_count,
5584        column_name_lower,
5585        table_alias_lower,
5586    }))
5587}
5588
5589pub struct RuntimeCreateTableBuilder<'ctx, P>
5590where
5591    P: Pager<Blob = EntryHandle> + Send + Sync,
5592{
5593    ctx: &'ctx RuntimeContext<P>,
5594    plan: CreateTablePlan,
5595}
5596
5597impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
5598where
5599    P: Pager<Blob = EntryHandle> + Send + Sync,
5600{
5601    pub fn if_not_exists(mut self) -> Self {
5602        self.plan.if_not_exists = true;
5603        self
5604    }
5605
5606    pub fn or_replace(mut self) -> Self {
5607        self.plan.or_replace = true;
5608        self
5609    }
5610
5611    pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5612        self.plan
5613            .columns
5614            .push(ColumnSpec::new(name.into(), data_type, true));
5615        self
5616    }
5617
5618    pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5619        self.plan
5620            .columns
5621            .push(ColumnSpec::new(name.into(), data_type, false));
5622        self
5623    }
5624
5625    pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
5626        self.plan.columns.push(spec);
5627        self
5628    }
5629
5630    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
5631        self.ctx.execute_create_table(self.plan)
5632    }
5633}
5634
5635#[derive(Clone, Debug, Default)]
5636pub struct RuntimeRow {
5637    values: Vec<(String, PlanValue)>,
5638}
5639
5640impl RuntimeRow {
5641    pub fn new() -> Self {
5642        Self { values: Vec::new() }
5643    }
5644
5645    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
5646        self.set(name, value);
5647        self
5648    }
5649
5650    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
5651        let name = name.into();
5652        let value = value.into();
5653        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
5654            *existing = value;
5655        } else {
5656            self.values.push((name, value));
5657        }
5658        self
5659    }
5660
5661    fn columns(&self) -> Vec<String> {
5662        self.values.iter().map(|(n, _)| n.clone()).collect()
5663    }
5664
5665    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
5666        let mut out = Vec::with_capacity(columns.len());
5667        for column in columns {
5668            let value = self
5669                .values
5670                .iter()
5671                .find(|(name, _)| name == column)
5672                .ok_or_else(|| {
5673                    Error::InvalidArgumentError(format!(
5674                        "insert row missing value for column '{}'",
5675                        column
5676                    ))
5677                })?;
5678            out.push(value.1.clone());
5679        }
5680        Ok(out)
5681    }
5682}
5683
5684pub fn row() -> RuntimeRow {
5685    RuntimeRow::new()
5686}
5687
5688#[doc(hidden)]
5689pub enum RuntimeInsertRowKind {
5690    Named {
5691        columns: Vec<String>,
5692        values: Vec<PlanValue>,
5693    },
5694    Positional(Vec<PlanValue>),
5695}
5696
5697pub trait IntoInsertRow {
5698    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
5699}
5700
5701impl IntoInsertRow for RuntimeRow {
5702    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5703        let row = self;
5704        if row.values.is_empty() {
5705            return Err(Error::InvalidArgumentError(
5706                "insert requires at least one column".into(),
5707            ));
5708        }
5709        let columns = row.columns();
5710        let values = row.values_for_columns(&columns)?;
5711        Ok(RuntimeInsertRowKind::Named { columns, values })
5712    }
5713}
5714
5715// Remove the generic impl for `&T` which caused unconditional-recursion
5716// and noop-clone clippy warnings. Callers can pass owned values or use
5717// the provided tuple/array/Vec implementations.
5718
5719impl<T> IntoInsertRow for Vec<T>
5720where
5721    T: Into<PlanValue>,
5722{
5723    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5724        if self.is_empty() {
5725            return Err(Error::InvalidArgumentError(
5726                "insert requires at least one column".into(),
5727            ));
5728        }
5729        Ok(RuntimeInsertRowKind::Positional(
5730            self.into_iter().map(Into::into).collect(),
5731        ))
5732    }
5733}
5734
5735impl<T, const N: usize> IntoInsertRow for [T; N]
5736where
5737    T: Into<PlanValue>,
5738{
5739    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5740        if N == 0 {
5741            return Err(Error::InvalidArgumentError(
5742                "insert requires at least one column".into(),
5743            ));
5744        }
5745        Ok(RuntimeInsertRowKind::Positional(
5746            self.into_iter().map(Into::into).collect(),
5747        ))
5748    }
5749}
5750
5751macro_rules! impl_into_insert_row_tuple {
5752    ($($type:ident => $value:ident),+) => {
5753        impl<$($type,)+> IntoInsertRow for ($($type,)+)
5754        where
5755            $($type: Into<PlanValue>,)+
5756        {
5757            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5758                let ($($value,)+) = self;
5759                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
5760            }
5761        }
5762    };
5763}
5764
5765impl_into_insert_row_tuple!(T1 => v1);
5766impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
5767impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
5768impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
5769impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
5770impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
5771impl_into_insert_row_tuple!(
5772    T1 => v1,
5773    T2 => v2,
5774    T3 => v3,
5775    T4 => v4,
5776    T5 => v5,
5777    T6 => v6,
5778    T7 => v7
5779);
5780impl_into_insert_row_tuple!(
5781    T1 => v1,
5782    T2 => v2,
5783    T3 => v3,
5784    T4 => v4,
5785    T5 => v5,
5786    T6 => v6,
5787    T7 => v7,
5788    T8 => v8
5789);
5790
5791pub struct RuntimeTableHandle<P>
5792where
5793    P: Pager<Blob = EntryHandle> + Send + Sync,
5794{
5795    context: Arc<RuntimeContext<P>>,
5796    display_name: String,
5797    _canonical_name: String,
5798}
5799
5800impl<P> RuntimeTableHandle<P>
5801where
5802    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
5803{
5804    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
5805        let (display_name, canonical_name) = canonical_table_name(name)?;
5806        context.lookup_table(&canonical_name)?;
5807        Ok(Self {
5808            context,
5809            display_name,
5810            _canonical_name: canonical_name,
5811        })
5812    }
5813
5814    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
5815        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
5816    }
5817
5818    pub fn insert_rows<R>(
5819        &self,
5820        rows: impl IntoIterator<Item = R>,
5821    ) -> Result<RuntimeStatementResult<P>>
5822    where
5823        R: IntoInsertRow,
5824    {
5825        enum InsertMode {
5826            Named,
5827            Positional,
5828        }
5829
5830        let table = self.context.lookup_table(&self._canonical_name)?;
5831        let schema = table.schema.as_ref();
5832        let schema_column_names: Vec<String> =
5833            schema.columns.iter().map(|col| col.name.clone()).collect();
5834        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
5835        let mut mode: Option<InsertMode> = None;
5836        let mut column_names: Option<Vec<String>> = None;
5837        let mut row_count = 0usize;
5838
5839        for row in rows.into_iter() {
5840            row_count += 1;
5841            match row.into_insert_row()? {
5842                RuntimeInsertRowKind::Named { columns, values } => {
5843                    if let Some(existing) = &mode {
5844                        if !matches!(existing, InsertMode::Named) {
5845                            return Err(Error::InvalidArgumentError(
5846                                "cannot mix positional and named insert rows".into(),
5847                            ));
5848                        }
5849                    } else {
5850                        mode = Some(InsertMode::Named);
5851                        let mut seen =
5852                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
5853                        for column in &columns {
5854                            if !seen.insert(column.clone()) {
5855                                return Err(Error::InvalidArgumentError(format!(
5856                                    "duplicate column '{}' in insert row",
5857                                    column
5858                                )));
5859                            }
5860                        }
5861                        column_names = Some(columns.clone());
5862                    }
5863
5864                    let expected = column_names
5865                        .as_ref()
5866                        .expect("column names must be initialized for named insert");
5867                    if columns != *expected {
5868                        return Err(Error::InvalidArgumentError(
5869                            "insert rows must specify the same columns".into(),
5870                        ));
5871                    }
5872                    if values.len() != expected.len() {
5873                        return Err(Error::InvalidArgumentError(format!(
5874                            "insert row expected {} values, found {}",
5875                            expected.len(),
5876                            values.len()
5877                        )));
5878                    }
5879                    normalized_rows.push(values);
5880                }
5881                RuntimeInsertRowKind::Positional(values) => {
5882                    if let Some(existing) = &mode {
5883                        if !matches!(existing, InsertMode::Positional) {
5884                            return Err(Error::InvalidArgumentError(
5885                                "cannot mix positional and named insert rows".into(),
5886                            ));
5887                        }
5888                    } else {
5889                        mode = Some(InsertMode::Positional);
5890                        column_names = Some(schema_column_names.clone());
5891                    }
5892
5893                    if values.len() != schema.columns.len() {
5894                        return Err(Error::InvalidArgumentError(format!(
5895                            "insert row expected {} values, found {}",
5896                            schema.columns.len(),
5897                            values.len()
5898                        )));
5899                    }
5900                    normalized_rows.push(values);
5901                }
5902            }
5903        }
5904
5905        if row_count == 0 {
5906            return Err(Error::InvalidArgumentError(
5907                "insert requires at least one row".into(),
5908            ));
5909        }
5910
5911        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
5912        self.insert_row_batch(RowBatch {
5913            columns,
5914            rows: normalized_rows,
5915        })
5916    }
5917
5918    pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
5919        if batch.rows.is_empty() {
5920            return Err(Error::InvalidArgumentError(
5921                "insert requires at least one row".into(),
5922            ));
5923        }
5924        if batch.columns.is_empty() {
5925            return Err(Error::InvalidArgumentError(
5926                "insert requires at least one column".into(),
5927            ));
5928        }
5929        for row in &batch.rows {
5930            if row.len() != batch.columns.len() {
5931                return Err(Error::InvalidArgumentError(
5932                    "insert rows must have values for every column".into(),
5933                ));
5934            }
5935        }
5936
5937        let plan = InsertPlan {
5938            table: self.display_name.clone(),
5939            columns: batch.columns,
5940            source: InsertSource::Rows(batch.rows),
5941        };
5942        self.context.insert(plan)
5943    }
5944
5945    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
5946        let plan = InsertPlan {
5947            table: self.display_name.clone(),
5948            columns: Vec::new(),
5949            source: InsertSource::Batches(batches),
5950        };
5951        self.context.insert(plan)
5952    }
5953
5954    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
5955        let RowBatch { columns, rows } = frame.collect_rows()?;
5956        self.insert_row_batch(RowBatch { columns, rows })
5957    }
5958
5959    pub fn name(&self) -> &str {
5960        &self.display_name
5961    }
5962}
5963
5964#[cfg(test)]
5965mod tests {
5966    use super::*;
5967    use arrow::array::{Array, Int64Array, StringArray};
5968    use llkv_storage::pager::MemPager;
5969    use std::sync::Arc;
5970
5971    #[test]
5972    fn create_insert_select_roundtrip() {
5973        let pager = Arc::new(MemPager::default());
5974        let context = Arc::new(RuntimeContext::new(pager));
5975
5976        let table = context
5977            .create_table(
5978                "people",
5979                [
5980                    ("id", DataType::Int64, NotNull),
5981                    ("name", DataType::Utf8, Nullable),
5982                ],
5983            )
5984            .expect("create table");
5985        table
5986            .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
5987            .expect("insert rows");
5988
5989        let execution = table.lazy().expect("lazy scan");
5990        let select = execution.collect().expect("build select execution");
5991        let batches = select.collect().expect("collect batches");
5992        assert_eq!(batches.len(), 1);
5993        let column = batches[0]
5994            .column(1)
5995            .as_any()
5996            .downcast_ref::<StringArray>()
5997            .expect("string column");
5998        assert_eq!(column.len(), 2);
5999    }
6000
6001    #[test]
6002    fn aggregate_count_nulls() {
6003        let pager = Arc::new(MemPager::default());
6004        let context = Arc::new(RuntimeContext::new(pager));
6005
6006        let table = context
6007            .create_table("ints", [("i", DataType::Int64)])
6008            .expect("create table");
6009        table
6010            .insert_rows([
6011                (PlanValue::Null,),
6012                (PlanValue::Integer(1),),
6013                (PlanValue::Null,),
6014            ])
6015            .expect("insert rows");
6016
6017        let plan =
6018            SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
6019        let execution = context.execute_select(plan).expect("select");
6020        let batches = execution.collect().expect("collect batches");
6021        let column = batches[0]
6022            .column(0)
6023            .as_any()
6024            .downcast_ref::<Int64Array>()
6025            .expect("int column");
6026        assert_eq!(column.value(0), 2);
6027    }
6028}