llkv_runtime/
lib.rs

1//! Query execution runtime for LLKV.
2//!
3//! This crate provides the runtime API (see [`RuntimeEngine`]) for executing SQL plans with full
4//! transaction support. It coordinates between the transaction layer, storage layer,
5//! and query executor to provide a complete database runtime.
6//!
7//! # Key Components
8//!
9//! - **[`RuntimeEngine`]**: Main execution engine for SQL operations
10//! - **[`RuntimeSession`]**: Session-level interface with transaction management
11//! - **[`TransactionContext`]**: Single-transaction execution context
12//! - **Table Provider**: Integration with the query executor for table access
13//!
14//! # Transaction Support
15//!
16//! The runtime supports both:
17//! - **Auto-commit**: Single-statement transactions (uses `TXN_ID_AUTO_COMMIT`)
18//! - **Multi-statement**: Explicit BEGIN/COMMIT/ROLLBACK transactions
19//!
20//! # MVCC Integration
21//!
22//! All data modifications automatically include MVCC metadata:
23//! - `row_id`: Unique row identifier
24//! - `created_by`: Transaction ID that created the row
25//! - `deleted_by`: Transaction ID that deleted the row (or `TXN_ID_NONE`)
26//!
27//! The runtime ensures these columns are injected and managed consistently.
28#![forbid(unsafe_code)]
29
30use std::fmt;
31use std::marker::PhantomData;
32use std::mem;
33use std::ops::Bound;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, RwLock};
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use rustc_hash::{FxHashMap, FxHashSet};
39
40use arrow::array::{
41    Array, ArrayRef, Date32Builder, Float64Builder, Int64Builder, StringBuilder, UInt64Array,
42    UInt64Builder,
43};
44use arrow::datatypes::{DataType, Field, Schema};
45use arrow::record_batch::RecordBatch;
46use llkv_column_map::ColumnStore;
47use llkv_column_map::store::{
48    CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, GatherNullPolicy, ROW_ID_COLUMN_NAME,
49};
50use llkv_column_map::types::LogicalFieldId;
51use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
52// Literal is not used at top-level; keep it out to avoid unused import warnings.
53use llkv_result::Error;
54use llkv_storage::pager::{MemPager, Pager};
55use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
56use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
57use llkv_table::{CATALOG_TABLE_ID, ColMeta, SysCatalog, TableMeta};
58use simd_r_drive_entry_handle::EntryHandle;
59use sqlparser::ast::{
60    Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
61    SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
62    ValueWithSpan,
63};
64use time::{Date, Month};
65
66pub type Result<T> = llkv_result::Result<T>;
67
68// Re-export plan structures from llkv-plan
69pub use llkv_plan::{
70    AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
71    ColumnSpec, CreateTablePlan, CreateTableSource, DeletePlan, InsertPlan, InsertSource,
72    IntoColumnSpec, NotNull, Nullable, OrderByPlan, OrderSortType, OrderTarget, PlanOperation,
73    PlanStatement, PlanValue, SelectPlan, SelectProjection, UpdatePlan,
74};
75
76// Execution structures from llkv-executor
77use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable};
78pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
79
80// Import transaction structures from llkv-transaction for internal use.
81pub use llkv_transaction::TransactionKind;
82use llkv_transaction::{
83    RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
84    TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
85};
86
87// Internal low-level transaction session type (from llkv-transaction)
88use llkv_transaction::TransactionSession;
89
90// Note: RuntimeSession is the high-level wrapper that users should use instead of the lower-level TransactionSession API
91
92/// Helper functions for MVCC column injection.
93///
94/// These functions consolidate the repeated logic for adding MVCC metadata columns
95/// (row_id, created_by, deleted_by) to RecordBatches.
96mod mvcc_columns {
97    use super::*;
98    use std::collections::HashMap;
99
100    /// Build MVCC columns (row_id, created_by, deleted_by) for INSERT operations.
101    ///
102    /// Returns (row_id_array, created_by_array, deleted_by_array) and updates next_row_id.
103    pub(crate) fn build_insert_mvcc_columns(
104        row_count: usize,
105        start_row_id: RowId,
106        creator_txn_id: TxnId,
107    ) -> (ArrayRef, ArrayRef, ArrayRef) {
108        let mut row_builder = UInt64Builder::with_capacity(row_count);
109        for offset in 0..row_count {
110            row_builder.append_value(start_row_id + offset as u64);
111        }
112
113        let mut created_builder = UInt64Builder::with_capacity(row_count);
114        let mut deleted_builder = UInt64Builder::with_capacity(row_count);
115        for _ in 0..row_count {
116            created_builder.append_value(creator_txn_id);
117            deleted_builder.append_value(TXN_ID_NONE);
118        }
119
120        (
121            Arc::new(row_builder.finish()) as ArrayRef,
122            Arc::new(created_builder.finish()) as ArrayRef,
123            Arc::new(deleted_builder.finish()) as ArrayRef,
124        )
125    }
126
127    /// Build MVCC field definitions (row_id, created_by, deleted_by).
128    ///
129    /// Returns the three Field definitions that should be prepended to user columns.
130    pub(crate) fn build_mvcc_fields() -> Vec<Field> {
131        vec![
132            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
133            Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
134            Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
135        ]
136    }
137
138    /// Build field with field_id metadata for a user column.
139    pub(crate) fn build_field_with_metadata(
140        name: &str,
141        data_type: DataType,
142        nullable: bool,
143        field_id: FieldId,
144    ) -> Field {
145        let mut metadata = FxHashMap::with_capacity_and_hasher(1, Default::default());
146        metadata.insert(
147            llkv_table::constants::FIELD_ID_META_KEY.to_string(),
148            field_id.to_string(),
149        );
150        Field::new(name, data_type, nullable)
151            .with_metadata(metadata.into_iter().collect::<HashMap<String, String>>())
152    }
153
154    /// Build DELETE batch with row_id and deleted_by columns.
155    ///
156    /// This creates a minimal RecordBatch for marking rows as deleted.
157    pub(crate) fn build_delete_batch(
158        row_ids: Vec<u64>,
159        deleted_by_txn_id: TxnId,
160    ) -> llkv_result::Result<RecordBatch> {
161        let row_count = row_ids.len();
162
163        let fields = vec![
164            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
165            Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
166        ];
167
168        let arrays: Vec<ArrayRef> = vec![
169            Arc::new(UInt64Array::from(row_ids)),
170            Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
171        ];
172
173        RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
174    }
175}
176
177/// Result of running a plan statement.
178#[allow(clippy::large_enum_variant)]
179#[derive(Clone)]
180pub enum RuntimeStatementResult<P>
181where
182    P: Pager<Blob = EntryHandle> + Send + Sync,
183{
184    CreateTable {
185        table_name: String,
186    },
187    NoOp,
188    Insert {
189        table_name: String,
190        rows_inserted: usize,
191    },
192    Update {
193        table_name: String,
194        rows_updated: usize,
195    },
196    Delete {
197        table_name: String,
198        rows_deleted: usize,
199    },
200    Select {
201        table_name: String,
202        schema: Arc<Schema>,
203        execution: SelectExecution<P>,
204    },
205    Transaction {
206        kind: TransactionKind,
207    },
208}
209
210impl<P> fmt::Debug for RuntimeStatementResult<P>
211where
212    P: Pager<Blob = EntryHandle> + Send + Sync,
213{
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        match self {
216            RuntimeStatementResult::CreateTable { table_name } => f
217                .debug_struct("CreateTable")
218                .field("table_name", table_name)
219                .finish(),
220            RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
221            RuntimeStatementResult::Insert {
222                table_name,
223                rows_inserted,
224            } => f
225                .debug_struct("Insert")
226                .field("table_name", table_name)
227                .field("rows_inserted", rows_inserted)
228                .finish(),
229            RuntimeStatementResult::Update {
230                table_name,
231                rows_updated,
232            } => f
233                .debug_struct("Update")
234                .field("table_name", table_name)
235                .field("rows_updated", rows_updated)
236                .finish(),
237            RuntimeStatementResult::Delete {
238                table_name,
239                rows_deleted,
240            } => f
241                .debug_struct("Delete")
242                .field("table_name", table_name)
243                .field("rows_deleted", rows_deleted)
244                .finish(),
245            RuntimeStatementResult::Select {
246                table_name, schema, ..
247            } => f
248                .debug_struct("Select")
249                .field("table_name", table_name)
250                .field("schema", schema)
251                .finish(),
252            RuntimeStatementResult::Transaction { kind } => {
253                f.debug_struct("Transaction").field("kind", kind).finish()
254            }
255        }
256    }
257}
258
259impl<P> RuntimeStatementResult<P>
260where
261    P: Pager<Blob = EntryHandle> + Send + Sync,
262{
263    /// Convert a StatementResult from one pager type to another.
264    /// Only works for non-SELECT results (CreateTable, Insert, Update, Delete, NoOp, Transaction).
265    #[allow(dead_code)]
266    pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
267    where
268        Q: Pager<Blob = EntryHandle> + Send + Sync,
269    {
270        match self {
271            RuntimeStatementResult::CreateTable { table_name } => {
272                Ok(RuntimeStatementResult::CreateTable { table_name })
273            }
274            RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
275            RuntimeStatementResult::Insert {
276                table_name,
277                rows_inserted,
278            } => Ok(RuntimeStatementResult::Insert {
279                table_name,
280                rows_inserted,
281            }),
282            RuntimeStatementResult::Update {
283                table_name,
284                rows_updated,
285            } => Ok(RuntimeStatementResult::Update {
286                table_name,
287                rows_updated,
288            }),
289            RuntimeStatementResult::Delete {
290                table_name,
291                rows_deleted,
292            } => Ok(RuntimeStatementResult::Delete {
293                table_name,
294                rows_deleted,
295            }),
296            RuntimeStatementResult::Transaction { kind } => {
297                Ok(RuntimeStatementResult::Transaction { kind })
298            }
299            RuntimeStatementResult::Select { .. } => Err(Error::Internal(
300                "Cannot convert SELECT result between pager types in transaction".into(),
301            )),
302        }
303    }
304}
305
306/// Return the table name referenced by a plan statement, if any.
307///
308/// This is a small helper used by higher-level engines (for example the
309/// SQL front-end) to provide better error messages when a statement fails
310/// with a table-related error. It intentionally returns an `Option<&str>` so
311/// callers can decide how to report missing table context.
312pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
313    match statement {
314        PlanStatement::CreateTable(plan) => Some(&plan.name),
315        PlanStatement::Insert(plan) => Some(&plan.table),
316        PlanStatement::Update(plan) => Some(&plan.table),
317        PlanStatement::Delete(plan) => Some(&plan.table),
318        PlanStatement::Select(plan) => Some(&plan.table),
319        PlanStatement::BeginTransaction
320        | PlanStatement::CommitTransaction
321        | PlanStatement::RollbackTransaction => None,
322    }
323}
324
325// ============================================================================
326// Plan Structures (now in llkv-plan and re-exported above)
327// ============================================================================
328//
329// The following types are defined in llkv-plan and re-exported:
330// - plan values, CreateTablePlan, ColumnSpec, IntoColumnSpec
331// - InsertPlan, InsertSource, UpdatePlan, DeletePlan
332// - SelectPlan, SelectProjection, AggregateExpr, AggregateFunction
333// - OrderByPlan, OrderSortType, OrderTarget
334// - PlanOperation
335//
336// This separation allows plans to be used independently of execution logic.
337// ============================================================================
338
339// Transaction management is now handled by llkv-transaction crate
340// The SessionTransaction and TableDeltaState types are re-exported from there
341
342/// Wrapper for Context that implements TransactionContext
343pub struct RuntimeContextWrapper<P>
344where
345    P: Pager<Blob = EntryHandle> + Send + Sync,
346{
347    ctx: Arc<RuntimeContext<P>>,
348    snapshot: RwLock<TransactionSnapshot>,
349}
350
351impl<P> RuntimeContextWrapper<P>
352where
353    P: Pager<Blob = EntryHandle> + Send + Sync,
354{
355    fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
356        let snapshot = ctx.default_snapshot();
357        Self {
358            ctx,
359            snapshot: RwLock::new(snapshot),
360        }
361    }
362
363    fn update_snapshot(&self, snapshot: TransactionSnapshot) {
364        let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
365        *guard = snapshot;
366    }
367
368    fn current_snapshot(&self) -> TransactionSnapshot {
369        *self.snapshot.read().expect("snapshot lock poisoned")
370    }
371
372    fn context(&self) -> &Arc<RuntimeContext<P>> {
373        &self.ctx
374    }
375
376    fn ctx(&self) -> &RuntimeContext<P> {
377        &self.ctx
378    }
379}
380
381/// A session for executing operations with optional transaction support.
382///
383/// This is a high-level wrapper around the transaction machinery that provides
384/// a clean API for users. Operations can be executed directly or within a transaction.
385pub struct RuntimeSession<P>
386where
387    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
388{
389    // TODO: Allow generic pager type
390    inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
391}
392
393impl<P> RuntimeSession<P>
394where
395    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
396{
397    /// Clone this session (reuses the same underlying TransactionSession).
398    /// This is necessary to maintain transaction state across Engine clones.
399    pub(crate) fn clone_session(&self) -> Self {
400        Self {
401            inner: self.inner.clone_session(),
402        }
403    }
404
405    /// Begin a transaction in this session.
406    /// Creates an isolated staging context automatically.
407    pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
408        let staging_pager = Arc::new(MemPager::default());
409        tracing::trace!(
410            "BEGIN_TRANSACTION: Created staging pager at {:p}",
411            &*staging_pager
412        );
413        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
414
415        // Copy table metadata from the main context to the staging context,
416        // but create new Table instances that use the staging pager
417        self.inner
418            .context()
419            .ctx()
420            .copy_tables_to_staging(&staging_ctx)?;
421
422        let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
423
424        self.inner.begin_transaction(staging_wrapper)?;
425        Ok(RuntimeStatementResult::Transaction {
426            kind: TransactionKind::Begin,
427        })
428    }
429
430    /// Mark the current transaction as aborted due to an error.
431    /// This should be called when any error occurs during a transaction.
432    pub fn abort_transaction(&self) {
433        self.inner.abort_transaction();
434    }
435
436    /// Check if this session has an active transaction.
437    pub fn has_active_transaction(&self) -> bool {
438        let result = self.inner.has_active_transaction();
439        tracing::trace!("SESSION: has_active_transaction() = {}", result);
440        result
441    }
442
443    /// Check if the current transaction has been aborted due to an error.
444    pub fn is_aborted(&self) -> bool {
445        self.inner.is_aborted()
446    }
447
448    /// Commit the current transaction and apply changes to the base context.
449    /// If the transaction was aborted, this acts as a ROLLBACK instead.
450    pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
451        tracing::trace!("Session::commit_transaction called");
452        let (tx_result, operations) = self.inner.commit_transaction()?;
453        tracing::trace!(
454            "Session::commit_transaction got {} operations",
455            operations.len()
456        );
457
458        if !operations.is_empty() {
459            let dropped_tables = self
460                .inner
461                .context()
462                .ctx()
463                .dropped_tables
464                .read()
465                .unwrap()
466                .clone();
467            if !dropped_tables.is_empty() {
468                for operation in &operations {
469                    let table_name_opt = match operation {
470                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
471                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
472                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
473                        _ => None,
474                    };
475                    if let Some(table_name) = table_name_opt {
476                        let (_, canonical) = canonical_table_name(table_name)?;
477                        if dropped_tables.contains(&canonical) {
478                            self.abort_transaction();
479                            return Err(Error::TransactionContextError(
480                                "another transaction has dropped this table".into(),
481                            ));
482                        }
483                    }
484                }
485            }
486        }
487
488        // Extract the transaction kind from the transaction module's result
489        let kind = match tx_result {
490            TransactionResult::Transaction { kind } => kind,
491            _ => {
492                return Err(Error::Internal(
493                    "commit_transaction returned non-transaction result".into(),
494                ));
495            }
496        };
497        tracing::trace!("Session::commit_transaction kind={:?}", kind);
498
499        // Only replay operations if there are any (empty if transaction was aborted)
500        for operation in operations {
501            match operation {
502                PlanOperation::CreateTable(plan) => {
503                    TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
504                }
505                PlanOperation::Insert(plan) => {
506                    TransactionContext::insert(&**self.inner.context(), plan)?;
507                }
508                PlanOperation::Update(plan) => {
509                    TransactionContext::update(&**self.inner.context(), plan)?;
510                }
511                PlanOperation::Delete(plan) => {
512                    TransactionContext::delete(&**self.inner.context(), plan)?;
513                }
514                _ => {}
515            }
516        }
517
518        // Reset the base context snapshot to the default auto-commit view now that
519        // the transaction has been replayed onto the base tables.
520        let base_ctx = self.inner.context();
521        let default_snapshot = base_ctx.ctx().default_snapshot();
522        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
523
524        // Persist the next_txn_id to the catalog after a successful commit
525        if matches!(kind, TransactionKind::Commit) {
526            let ctx = base_ctx.ctx();
527            let next_txn_id = ctx.txn_manager().current_next_txn_id();
528            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
529                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
530            }
531        }
532
533        // Return a StatementResult with the correct kind (Commit or Rollback)
534        Ok(RuntimeStatementResult::Transaction { kind })
535    }
536
537    /// Rollback the current transaction, discarding all changes.
538    pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
539        self.inner.rollback_transaction()?;
540        let base_ctx = self.inner.context();
541        let default_snapshot = base_ctx.ctx().default_snapshot();
542        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
543        Ok(RuntimeStatementResult::Transaction {
544            kind: TransactionKind::Rollback,
545        })
546    }
547
548    fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
549        if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
550            let select_result = self.select(*select_plan)?;
551            let (schema, batches) = match select_result {
552                RuntimeStatementResult::Select {
553                    schema, execution, ..
554                } => {
555                    let batches = execution.collect()?;
556                    (schema, batches)
557                }
558                _ => {
559                    return Err(Error::Internal(
560                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
561                    ));
562                }
563            };
564            plan.source = Some(CreateTableSource::Batches { schema, batches });
565        }
566        Ok(plan)
567    }
568
569    /// Create a table (outside or inside transaction).
570    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
571        let plan = self.materialize_create_table_plan(plan)?;
572        if self.has_active_transaction() {
573            let table_name = plan.name.clone();
574            match self
575                .inner
576                .execute_operation(PlanOperation::CreateTable(plan))
577            {
578                Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
579                Err(e) => {
580                    // If an error occurs during a transaction, abort it
581                    self.abort_transaction();
582                    Err(e)
583                }
584            }
585        } else {
586            // Call via TransactionContext trait
587            let table_name = plan.name.clone();
588            TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
589            Ok(RuntimeStatementResult::CreateTable { table_name })
590        }
591    }
592
593    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
594        let InsertPlan {
595            table,
596            columns,
597            source,
598        } = plan;
599
600        match source {
601            InsertSource::Rows(rows) => {
602                let count = rows.len();
603                Ok((
604                    InsertPlan {
605                        table,
606                        columns,
607                        source: InsertSource::Rows(rows),
608                    },
609                    count,
610                ))
611            }
612            InsertSource::Batches(batches) => {
613                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
614                Ok((
615                    InsertPlan {
616                        table,
617                        columns,
618                        source: InsertSource::Batches(batches),
619                    },
620                    count,
621                ))
622            }
623            InsertSource::Select { plan: select_plan } => {
624                let select_result = self.select(*select_plan)?;
625                let rows = match select_result {
626                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
627                    _ => {
628                        return Err(Error::Internal(
629                            "expected Select result when executing INSERT ... SELECT".into(),
630                        ));
631                    }
632                };
633                let count = rows.len();
634                Ok((
635                    InsertPlan {
636                        table,
637                        columns,
638                        source: InsertSource::Rows(rows),
639                    },
640                    count,
641                ))
642            }
643        }
644    }
645
646    /// Insert rows (outside or inside transaction).
647    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
648        tracing::trace!("Session::insert called for table={}", plan.table);
649        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
650        let table_name = plan.table.clone();
651
652        if self.has_active_transaction() {
653            match self.inner.execute_operation(PlanOperation::Insert(plan)) {
654                Ok(_) => {
655                    tracing::trace!("Session::insert succeeded for table={}", table_name);
656                    Ok(RuntimeStatementResult::Insert {
657                        rows_inserted,
658                        table_name,
659                    })
660                }
661                Err(e) => {
662                    tracing::trace!(
663                        "Session::insert failed for table={}, error={:?}",
664                        table_name,
665                        e
666                    );
667                    // Only abort transaction on constraint violations
668                    if matches!(e, Error::ConstraintError(_)) {
669                        tracing::trace!("Transaction is_aborted=true");
670                        self.abort_transaction();
671                    }
672                    Err(e)
673                }
674            }
675        } else {
676            // Call via TransactionContext trait
677            let context = self.inner.context();
678            let default_snapshot = context.ctx().default_snapshot();
679            TransactionContext::set_snapshot(&**context, default_snapshot);
680            TransactionContext::insert(&**context, plan)?;
681            Ok(RuntimeStatementResult::Insert {
682                rows_inserted,
683                table_name,
684            })
685        }
686    }
687
688    /// Select rows (outside or inside transaction).
689    pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
690        if self.has_active_transaction() {
691            let tx_result = match self
692                .inner
693                .execute_operation(PlanOperation::Select(plan.clone()))
694            {
695                Ok(result) => result,
696                Err(e) => {
697                    // Only abort transaction on specific errors (constraint violations, etc.)
698                    // Don't abort on catalog errors (table doesn't exist) or similar
699                    if matches!(e, Error::ConstraintError(_)) {
700                        self.abort_transaction();
701                    }
702                    return Err(e);
703                }
704            };
705            match tx_result {
706                TransactionResult::Select {
707                    table_name,
708                    schema,
709                    execution: staging_execution,
710                } => {
711                    // Convert from staging (MemPager) execution to base pager execution
712                    // by collecting batches and rebuilding
713                    let batches = staging_execution.collect().unwrap_or_default();
714                    let combined = if batches.is_empty() {
715                        RecordBatch::new_empty(Arc::clone(&schema))
716                    } else if batches.len() == 1 {
717                        batches.into_iter().next().unwrap()
718                    } else {
719                        let refs: Vec<&RecordBatch> = batches.iter().collect();
720                        arrow::compute::concat_batches(&schema, refs)?
721                    };
722
723                    let execution = SelectExecution::from_batch(
724                        table_name.clone(),
725                        Arc::clone(&schema),
726                        combined,
727                    );
728
729                    Ok(RuntimeStatementResult::Select {
730                        execution,
731                        table_name,
732                        schema,
733                    })
734                }
735                _ => Err(Error::Internal("expected Select result".into())),
736            }
737        } else {
738            // Call via TransactionContext trait
739            let context = self.inner.context();
740            let default_snapshot = context.ctx().default_snapshot();
741            TransactionContext::set_snapshot(&**context, default_snapshot);
742            let table_name = plan.table.clone();
743            let execution = TransactionContext::execute_select(&**context, plan)?;
744            let schema = execution.schema();
745            Ok(RuntimeStatementResult::Select {
746                execution,
747                table_name,
748                schema,
749            })
750        }
751    }
752
753    /// Convenience helper to fetch all rows from a table within this session.
754    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
755        let plan =
756            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
757        match self.select(plan)? {
758            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
759            other => Err(Error::Internal(format!(
760                "expected Select result when reading table '{table}', got {:?}",
761                other
762            ))),
763        }
764    }
765
766    /// Update rows (outside or inside transaction).
767    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
768        if self.has_active_transaction() {
769            let table_name = plan.table.clone();
770            let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
771                Ok(result) => result,
772                Err(e) => {
773                    // If an error occurs during a transaction, abort it
774                    self.abort_transaction();
775                    return Err(e);
776                }
777            };
778            match result {
779                TransactionResult::Update {
780                    rows_matched: _,
781                    rows_updated,
782                } => Ok(RuntimeStatementResult::Update {
783                    rows_updated,
784                    table_name,
785                }),
786                _ => Err(Error::Internal("expected Update result".into())),
787            }
788        } else {
789            // Call via TransactionContext trait
790            let context = self.inner.context();
791            let default_snapshot = context.ctx().default_snapshot();
792            TransactionContext::set_snapshot(&**context, default_snapshot);
793            let table_name = plan.table.clone();
794            let result = TransactionContext::update(&**context, plan)?;
795            match result {
796                TransactionResult::Update {
797                    rows_matched: _,
798                    rows_updated,
799                } => Ok(RuntimeStatementResult::Update {
800                    rows_updated,
801                    table_name,
802                }),
803                _ => Err(Error::Internal("expected Update result".into())),
804            }
805        }
806    }
807
808    /// Delete rows (outside or inside transaction).
809    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
810        if self.has_active_transaction() {
811            let table_name = plan.table.clone();
812            let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
813                Ok(result) => result,
814                Err(e) => {
815                    // If an error occurs during a transaction, abort it
816                    self.abort_transaction();
817                    return Err(e);
818                }
819            };
820            match result {
821                TransactionResult::Delete { rows_deleted } => Ok(RuntimeStatementResult::Delete {
822                    rows_deleted,
823                    table_name,
824                }),
825                _ => Err(Error::Internal("expected Delete result".into())),
826            }
827        } else {
828            // Call via TransactionContext trait
829            let context = self.inner.context();
830            let default_snapshot = context.ctx().default_snapshot();
831            TransactionContext::set_snapshot(&**context, default_snapshot);
832            let table_name = plan.table.clone();
833            let result = TransactionContext::delete(&**context, plan)?;
834            match result {
835                TransactionResult::Delete { rows_deleted } => Ok(RuntimeStatementResult::Delete {
836                    rows_deleted,
837                    table_name,
838                }),
839                _ => Err(Error::Internal("expected Delete result".into())),
840            }
841        }
842    }
843}
844
845pub struct RuntimeEngine<P>
846where
847    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
848{
849    context: Arc<RuntimeContext<P>>,
850    session: RuntimeSession<P>,
851}
852
853impl<P> Clone for RuntimeEngine<P>
854where
855    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
856{
857    fn clone(&self) -> Self {
858        // IMPORTANT: Reuse the same session to maintain transaction state!
859        // Creating a new session would break multi-statement transactions.
860        tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
861        Self {
862            context: Arc::clone(&self.context),
863            session: self.session.clone_session(),
864        }
865    }
866}
867
868impl<P> RuntimeEngine<P>
869where
870    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
871{
872    pub fn new(pager: Arc<P>) -> Self {
873        let context = Arc::new(RuntimeContext::new(pager));
874        Self::from_context(context)
875    }
876
877    pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
878        tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
879        let session = context.create_session();
880        tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
881        Self { context, session }
882    }
883
884    pub fn context(&self) -> Arc<RuntimeContext<P>> {
885        Arc::clone(&self.context)
886    }
887
888    pub fn session(&self) -> &RuntimeSession<P> {
889        &self.session
890    }
891
892    pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
893        match statement {
894            PlanStatement::BeginTransaction => self.session.begin_transaction(),
895            PlanStatement::CommitTransaction => self.session.commit_transaction(),
896            PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
897            PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
898            PlanStatement::Insert(plan) => self.session.insert(plan),
899            PlanStatement::Update(plan) => self.session.update(plan),
900            PlanStatement::Delete(plan) => self.session.delete(plan),
901            PlanStatement::Select(plan) => self.session.select(plan),
902        }
903    }
904
905    pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
906    where
907        I: IntoIterator<Item = PlanStatement>,
908    {
909        let mut results = Vec::new();
910        for statement in statements {
911            results.push(self.execute_statement(statement)?);
912        }
913        Ok(results)
914    }
915}
916
917/// In-memory execution context shared by plan-based queries.
918///
919/// Important: "lazy loading" here refers to *table metadata only* (schema,
920/// executor-side column descriptors, and a small next-row-id counter). We do
921/// NOT eagerly load or materialize the table's row data into memory. All
922/// row/column data remains on the ColumnStore and is streamed in chunks during
923/// query execution. This keeps the memory footprint low even for very large
924/// tables.
925///
926/// Typical resource usage:
927/// - Metadata per table: ~100s of bytes to a few KB (schema + field ids)
928/// - ExecutorTable struct: small (handles + counters)
929/// - Actual table rows: streamed from disk in chunks (never fully resident)
930pub struct RuntimeContext<P>
931where
932    P: Pager<Blob = EntryHandle> + Send + Sync,
933{
934    pager: Arc<P>,
935    tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
936    dropped_tables: RwLock<FxHashSet<String>>,
937    // Centralized catalog for table/field name resolution
938    catalog: Arc<llkv_table::catalog::TableCatalog>,
939    // Transaction manager for session-based transactions
940    transaction_manager:
941        TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
942    txn_manager: Arc<TxnIdManager>,
943}
944
945impl<P> RuntimeContext<P>
946where
947    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
948{
949    pub fn new(pager: Arc<P>) -> Self {
950        tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
951
952        // Load transaction state and table registry from catalog if it exists
953        let (next_txn_id, last_committed, loaded_tables) = match ColumnStore::open(Arc::clone(
954            &pager,
955        )) {
956            Ok(store) => {
957                let catalog = SysCatalog::new(&store);
958                let next_txn_id = match catalog.get_next_txn_id() {
959                    Ok(Some(id)) => {
960                        tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
961                        id
962                    }
963                    Ok(None) => {
964                        tracing::debug!(
965                            "[CONTEXT] No persisted next_txn_id found, starting from default"
966                        );
967                        TXN_ID_AUTO_COMMIT + 1
968                    }
969                    Err(e) => {
970                        tracing::warn!(
971                            "[CONTEXT] Failed to load next_txn_id: {}, using default",
972                            e
973                        );
974                        TXN_ID_AUTO_COMMIT + 1
975                    }
976                };
977                let last_committed = match catalog.get_last_committed_txn_id() {
978                    Ok(Some(id)) => {
979                        tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
980                        id
981                    }
982                    Ok(None) => {
983                        tracing::debug!(
984                            "[CONTEXT] No persisted last_committed found, starting from default"
985                        );
986                        TXN_ID_AUTO_COMMIT
987                    }
988                    Err(e) => {
989                        tracing::warn!(
990                            "[CONTEXT] Failed to load last_committed: {}, using default",
991                            e
992                        );
993                        TXN_ID_AUTO_COMMIT
994                    }
995                };
996
997                // Load table registry from catalog
998                let loaded_tables = match catalog.all_table_metas() {
999                    Ok(metas) => {
1000                        tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1001                        metas
1002                    }
1003                    Err(e) => {
1004                        tracing::warn!(
1005                            "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1006                            e
1007                        );
1008                        Vec::new()
1009                    }
1010                };
1011
1012                (next_txn_id, last_committed, loaded_tables)
1013            }
1014            Err(e) => {
1015                tracing::warn!(
1016                    "[CONTEXT] Failed to open ColumnStore: {}, using default state",
1017                    e
1018                );
1019                (TXN_ID_AUTO_COMMIT + 1, TXN_ID_AUTO_COMMIT, Vec::new())
1020            }
1021        };
1022
1023        let transaction_manager =
1024            TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1025        let txn_manager = transaction_manager.txn_manager();
1026
1027        // LAZY LOADING: Only load table metadata at first access. We intentionally
1028        // avoid loading any row/column data into memory here. The executor
1029        // performs streaming reads from the ColumnStore when a query runs, so
1030        // large tables are never fully materialized.
1031        //
1032        // Benefits of this approach:
1033        // - Instant database open (no upfront I/O for table data)
1034        // - Lower memory footprint (only metadata cached)
1035        // - Natural parallelism: if multiple threads request different tables
1036        //   concurrently, those tables will be loaded concurrently by the
1037        //   caller threads (no global preload required).
1038        //
1039        // Future Optimizations (if profiling shows need):
1040        // 1. Eager parallel preload of a short "hot" list of tables (rayon)
1041        // 2. Background preload of catalog entries after startup
1042        // 3. LRU-based eviction for extremely large deployments
1043        // 4. Cache compact representations of schemas to reduce per-table RAM
1044        //
1045        // Note: `loaded_tables` holds catalog metadata that helped us discover
1046        // which tables exist; we discard it here because metadata will be
1047        // fetched on-demand during lazy loads.
1048        tracing::debug!(
1049            "[CONTEXT] Initialized with lazy loading for {} table(s)",
1050            loaded_tables.len()
1051        );
1052
1053        // Initialize catalog and populate with existing tables
1054        let catalog = Arc::new(llkv_table::catalog::TableCatalog::new());
1055        for (_table_id, table_meta) in &loaded_tables {
1056            if let Some(ref table_name) = table_meta.name
1057                && let Err(e) = catalog.register_table(table_name)
1058            {
1059                tracing::warn!(
1060                    "[CONTEXT] Failed to register table '{}' in catalog: {}",
1061                    table_name,
1062                    e
1063                );
1064            }
1065        }
1066        tracing::debug!(
1067            "[CONTEXT] Catalog initialized with {} table(s)",
1068            catalog.table_count()
1069        );
1070
1071        Self {
1072            pager,
1073            tables: RwLock::new(FxHashMap::default()), // Start with empty table cache
1074            dropped_tables: RwLock::new(FxHashSet::default()),
1075            catalog,
1076            transaction_manager,
1077            txn_manager,
1078        }
1079    }
1080
1081    /// Return the transaction ID manager shared with sessions.
1082    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1083        Arc::clone(&self.txn_manager)
1084    }
1085
1086    /// Persist the next_txn_id to the catalog.
1087    pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1088        let store = ColumnStore::open(Arc::clone(&self.pager))?;
1089        let catalog = SysCatalog::new(&store);
1090        catalog.put_next_txn_id(next_txn_id)?;
1091        let last_committed = self.txn_manager.last_committed();
1092        catalog.put_last_committed_txn_id(last_committed)?;
1093        tracing::debug!(
1094            "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1095            next_txn_id,
1096            last_committed
1097        );
1098        Ok(())
1099    }
1100
1101    /// Construct the default snapshot for auto-commit operations.
1102    pub fn default_snapshot(&self) -> TransactionSnapshot {
1103        TransactionSnapshot {
1104            txn_id: TXN_ID_AUTO_COMMIT,
1105            snapshot_id: self.txn_manager.last_committed(),
1106        }
1107    }
1108
1109    /// Create a new session for transaction management.
1110    /// Each session can have its own independent transaction.
1111    pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1112        tracing::debug!("[SESSION] RuntimeContext::create_session called");
1113        let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1114        let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1115        tracing::debug!(
1116            "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1117        );
1118        RuntimeSession { inner }
1119    }
1120
1121    /// Get a handle to an existing table by name.
1122    pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1123        RuntimeTableHandle::new(Arc::clone(self), name)
1124    }
1125
1126    /// Check if there's an active transaction (legacy - checks if ANY session has a transaction).
1127    #[deprecated(note = "Use session-based transactions instead")]
1128    pub fn has_active_transaction(&self) -> bool {
1129        self.transaction_manager.has_active_transaction()
1130    }
1131
1132    pub fn create_table<C, I>(
1133        self: &Arc<Self>,
1134        name: &str,
1135        columns: I,
1136    ) -> Result<RuntimeTableHandle<P>>
1137    where
1138        C: IntoColumnSpec,
1139        I: IntoIterator<Item = C>,
1140    {
1141        self.create_table_with_options(name, columns, false)
1142    }
1143
1144    pub fn create_table_if_not_exists<C, I>(
1145        self: &Arc<Self>,
1146        name: &str,
1147        columns: I,
1148    ) -> Result<RuntimeTableHandle<P>>
1149    where
1150        C: IntoColumnSpec,
1151        I: IntoIterator<Item = C>,
1152    {
1153        self.create_table_with_options(name, columns, true)
1154    }
1155
1156    pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1157        if plan.columns.is_empty() && plan.source.is_none() {
1158            return Err(Error::InvalidArgumentError(
1159                "CREATE TABLE requires explicit columns or a source".into(),
1160            ));
1161        }
1162
1163        let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1164        tracing::trace!(
1165            "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1166            display_name,
1167            plan.if_not_exists,
1168            plan.columns.len()
1169        );
1170        for (idx, col) in plan.columns.iter().enumerate() {
1171            tracing::trace!(
1172                "  plan column[{}]: name='{}' primary_key={}",
1173                idx,
1174                col.name,
1175                col.primary_key
1176            );
1177        }
1178        let exists = {
1179            let tables = self.tables.read().unwrap();
1180            tables.contains_key(&canonical_name)
1181        };
1182        tracing::trace!("DEBUG create_table_plan: exists={}", exists);
1183        if exists {
1184            if plan.or_replace {
1185                tracing::trace!(
1186                    "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1187                    display_name
1188                );
1189                self.remove_table_entry(&canonical_name);
1190            } else if plan.if_not_exists {
1191                tracing::trace!(
1192                    "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1193                    display_name
1194                );
1195                return Ok(RuntimeStatementResult::CreateTable {
1196                    table_name: display_name,
1197                });
1198            } else {
1199                return Err(Error::CatalogError(format!(
1200                    "Catalog Error: Table '{}' already exists",
1201                    display_name
1202                )));
1203            }
1204        }
1205
1206        self.dropped_tables.write().unwrap().remove(&canonical_name);
1207
1208        match plan.source {
1209            Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1210                display_name,
1211                canonical_name,
1212                schema,
1213                batches,
1214                plan.if_not_exists,
1215            ),
1216            Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1217                "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1218                    .into(),
1219            )),
1220            None => self.create_table_from_columns(
1221                display_name,
1222                canonical_name,
1223                plan.columns,
1224                plan.if_not_exists,
1225            ),
1226        }
1227    }
1228
1229    pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1230        // Use catalog for table names (single source of truth)
1231        self.catalog.table_names()
1232    }
1233
1234    fn filter_visible_row_ids(
1235        &self,
1236        table: &ExecutorTable<P>,
1237        row_ids: Vec<u64>,
1238        snapshot: TransactionSnapshot,
1239    ) -> Result<Vec<u64>> {
1240        filter_row_ids_for_snapshot(
1241            table.table.store(),
1242            table.table.table_id(),
1243            row_ids,
1244            &self.txn_manager,
1245            snapshot,
1246        )
1247    }
1248
1249    pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1250        RuntimeCreateTableBuilder {
1251            ctx: self,
1252            plan: CreateTablePlan::new(name),
1253        }
1254    }
1255
1256    pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1257        let (_, canonical_name) = canonical_table_name(name)?;
1258        let table = self.lookup_table(&canonical_name)?;
1259        Ok(table
1260            .schema
1261            .columns
1262            .iter()
1263            .map(|column| {
1264                ColumnSpec::new(
1265                    column.name.clone(),
1266                    column.data_type.clone(),
1267                    column.nullable,
1268                )
1269                .with_primary_key(column.primary_key)
1270            })
1271            .collect())
1272    }
1273
1274    /// Copy table metadata from this context to a staging context.
1275    /// This creates new Table instances that use the staging context's pager.
1276    fn copy_tables_to_staging<Q>(&self, staging: &RuntimeContext<Q>) -> Result<()>
1277    where
1278        Q: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1279    {
1280        let base_store = ColumnStore::open(Arc::clone(&self.pager))?;
1281        let base_catalog = SysCatalog::new(&base_store);
1282
1283        let staging_store = ColumnStore::open(Arc::clone(&staging.pager))?;
1284        let staging_catalog = SysCatalog::new(&staging_store);
1285
1286        let mut next_table_id = match base_catalog.get_next_table_id()? {
1287            Some(value) => value,
1288            None => {
1289                let seed = base_catalog.max_table_id()?.unwrap_or(CATALOG_TABLE_ID);
1290                seed.checked_add(1).ok_or_else(|| {
1291                    Error::InvalidArgumentError("exhausted available table ids".into())
1292                })?
1293            }
1294        };
1295        if next_table_id == CATALOG_TABLE_ID {
1296            next_table_id = next_table_id.checked_add(1).ok_or_else(|| {
1297                Error::InvalidArgumentError("exhausted available table ids".into())
1298            })?;
1299        }
1300
1301        staging_catalog.put_next_table_id(next_table_id)?;
1302
1303        let source_tables: Vec<(String, Arc<ExecutorTable<P>>)> = {
1304            let guard = self.tables.read().unwrap();
1305            guard
1306                .iter()
1307                .map(|(name, table)| (name.clone(), Arc::clone(table)))
1308                .collect()
1309        };
1310
1311        for (table_name, source_table) in source_tables.iter() {
1312            tracing::trace!(
1313                "!!! COPY_TABLES_TO_STAGING: Copying table '{}' with {} columns:",
1314                table_name,
1315                source_table.schema.columns.len()
1316            );
1317            for (idx, col) in source_table.schema.columns.iter().enumerate() {
1318                tracing::trace!(
1319                    "    source column[{}]: name='{}' primary_key={}",
1320                    idx,
1321                    col.name,
1322                    col.primary_key
1323                );
1324            }
1325
1326            // Create a new Table instance with the same table_id but using the staging pager
1327            let new_table = Table::new(source_table.table.table_id(), Arc::clone(&staging.pager))?;
1328
1329            // Create a new ExecutorTable with the new table but same schema
1330            // Start with fresh row counters (0) for transaction isolation
1331            let new_executor_table = Arc::new(ExecutorTable {
1332                table: Arc::new(new_table),
1333                schema: source_table.schema.clone(),
1334                next_row_id: AtomicU64::new(0),
1335                total_rows: AtomicU64::new(0),
1336            });
1337
1338            tracing::trace!(
1339                "!!! COPY_TABLES_TO_STAGING: After copy, {} columns in new executor table:",
1340                new_executor_table.schema.columns.len()
1341            );
1342            for (idx, col) in new_executor_table.schema.columns.iter().enumerate() {
1343                tracing::trace!(
1344                    "    new column[{}]: name='{}' primary_key={}",
1345                    idx,
1346                    col.name,
1347                    col.primary_key
1348                );
1349            }
1350
1351            {
1352                let mut staging_tables = staging.tables.write().unwrap();
1353                staging_tables.insert(table_name.clone(), Arc::clone(&new_executor_table));
1354            }
1355
1356            // Snapshot existing rows (with row_ids) into staging for transaction isolation.
1357            // This provides full REPEATABLE READ isolation by copying data at BEGIN.
1358            let batches = match self.get_batches_with_row_ids(table_name, None) {
1359                Ok(batches) => batches,
1360                Err(Error::NotFound) => {
1361                    // Table exists but has no data yet (empty table)
1362                    Vec::new()
1363                }
1364                Err(e) => return Err(e),
1365            };
1366            if !batches.is_empty() {
1367                for batch in batches {
1368                    new_executor_table.table.append(&batch)?;
1369                }
1370            }
1371
1372            let next_row_id = source_table.next_row_id.load(Ordering::SeqCst);
1373            new_executor_table
1374                .next_row_id
1375                .store(next_row_id, Ordering::SeqCst);
1376            let total_rows = source_table.total_rows.load(Ordering::SeqCst);
1377            new_executor_table
1378                .total_rows
1379                .store(total_rows, Ordering::SeqCst);
1380        }
1381
1382        let staging_count = staging.tables.read().unwrap().len();
1383        tracing::trace!(
1384            "!!! COPY_TABLES_TO_STAGING: Copied {} tables to staging context",
1385            staging_count
1386        );
1387        Ok(())
1388    }
1389
1390    pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1391        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1392        handle.lazy()?.collect_rows()
1393    }
1394
1395    fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1396        self.create_table_plan(plan)
1397    }
1398
1399    fn create_table_with_options<C, I>(
1400        self: &Arc<Self>,
1401        name: &str,
1402        columns: I,
1403        if_not_exists: bool,
1404    ) -> Result<RuntimeTableHandle<P>>
1405    where
1406        C: IntoColumnSpec,
1407        I: IntoIterator<Item = C>,
1408    {
1409        let mut plan = CreateTablePlan::new(name);
1410        plan.if_not_exists = if_not_exists;
1411        plan.columns = columns
1412            .into_iter()
1413            .map(|column| column.into_column_spec())
1414            .collect();
1415        let result = self.create_table_plan(plan)?;
1416        match result {
1417            RuntimeStatementResult::CreateTable { .. } => {
1418                RuntimeTableHandle::new(Arc::clone(self), name)
1419            }
1420            other => Err(Error::InvalidArgumentError(format!(
1421                "unexpected statement result {other:?} when creating table"
1422            ))),
1423        }
1424    }
1425
1426    pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1427        // For non-transactional inserts, use TXN_ID_AUTO_COMMIT directly
1428        // instead of creating a temporary transaction
1429        let snapshot = TransactionSnapshot {
1430            txn_id: TXN_ID_AUTO_COMMIT,
1431            snapshot_id: self.txn_manager.last_committed(),
1432        };
1433        self.insert_with_snapshot(plan, snapshot)
1434    }
1435
1436    pub fn insert_with_snapshot(
1437        &self,
1438        plan: InsertPlan,
1439        snapshot: TransactionSnapshot,
1440    ) -> Result<RuntimeStatementResult<P>> {
1441        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1442        let table = self.lookup_table(&canonical_name)?;
1443
1444        // Targeted debug for 'keys' table only
1445        if display_name == "keys" {
1446            tracing::trace!(
1447                "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1448                table.table.table_id(),
1449                &*self.pager
1450            );
1451            tracing::trace!(
1452                "[KEYS] Table has {} columns, primary_key columns: {:?}",
1453                table.schema.columns.len(),
1454                table
1455                    .schema
1456                    .columns
1457                    .iter()
1458                    .filter(|c| c.primary_key)
1459                    .map(|c| &c.name)
1460                    .collect::<Vec<_>>()
1461            );
1462        }
1463
1464        let result = match plan.source {
1465            InsertSource::Rows(rows) => self.insert_rows(
1466                table.as_ref(),
1467                display_name.clone(),
1468                rows,
1469                plan.columns,
1470                snapshot,
1471            ),
1472            InsertSource::Batches(batches) => self.insert_batches(
1473                table.as_ref(),
1474                display_name.clone(),
1475                batches,
1476                plan.columns,
1477                snapshot,
1478            ),
1479            InsertSource::Select { .. } => Err(Error::Internal(
1480                "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1481                    .into(),
1482            )),
1483        };
1484
1485        if display_name == "keys" {
1486            tracing::trace!(
1487                "[KEYS] INSERT completed: {:?}",
1488                result
1489                    .as_ref()
1490                    .map(|_| "OK")
1491                    .map_err(|e| format!("{:?}", e))
1492            );
1493        }
1494
1495        result
1496    }
1497
1498    /// Get raw batches from a table including row_ids, optionally filtered.
1499    /// This is used for transaction seeding where we need to preserve existing row_ids.
1500    pub fn get_batches_with_row_ids(
1501        &self,
1502        table_name: &str,
1503        filter: Option<LlkvExpr<'static, String>>,
1504    ) -> Result<Vec<RecordBatch>> {
1505        self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1506    }
1507
1508    pub fn get_batches_with_row_ids_with_snapshot(
1509        &self,
1510        table_name: &str,
1511        filter: Option<LlkvExpr<'static, String>>,
1512        snapshot: TransactionSnapshot,
1513    ) -> Result<Vec<RecordBatch>> {
1514        let (_, canonical_name) = canonical_table_name(table_name)?;
1515        let table = self.lookup_table(&canonical_name)?;
1516
1517        let filter_expr = match filter {
1518            Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1519            None => {
1520                let field_id = table.schema.first_field_id().ok_or_else(|| {
1521                    Error::InvalidArgumentError(
1522                        "table has no columns; cannot perform wildcard scan".into(),
1523                    )
1524                })?;
1525                full_table_scan_filter(field_id)
1526            }
1527        };
1528
1529        // First, get the row_ids that match the filter
1530        let row_ids = table.table.filter_row_ids(&filter_expr)?;
1531        if row_ids.is_empty() {
1532            return Ok(Vec::new());
1533        }
1534
1535        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1536        if visible_row_ids.is_empty() {
1537            return Ok(Vec::new());
1538        }
1539
1540        // Scan to get the column data
1541        let table_id = table.table.table_id();
1542
1543        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1544        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table.schema.columns.len() + 1);
1545
1546        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1547        arrays.push(Arc::new(UInt64Array::from(visible_row_ids.clone())));
1548
1549        for column in &table.schema.columns {
1550            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1551            let gathered = table.table.store().gather_rows(
1552                &[logical_field_id],
1553                &visible_row_ids,
1554                GatherNullPolicy::IncludeNulls,
1555            )?;
1556            let field = mvcc_columns::build_field_with_metadata(
1557                &column.name,
1558                column.data_type.clone(),
1559                column.nullable,
1560                column.field_id,
1561            );
1562            fields.push(field);
1563            arrays.push(gathered.column(0).clone());
1564        }
1565
1566        let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
1567        Ok(vec![batch])
1568    }
1569
1570    /// Append batches directly to a table, preserving row_ids from the batches.
1571    /// This is used for transaction seeding where we need to preserve existing row_ids.
1572    pub fn append_batches_with_row_ids(
1573        &self,
1574        table_name: &str,
1575        batches: Vec<RecordBatch>,
1576    ) -> Result<usize> {
1577        let (_, canonical_name) = canonical_table_name(table_name)?;
1578        let table = self.lookup_table(&canonical_name)?;
1579
1580        let mut total_rows = 0;
1581        for batch in batches {
1582            if batch.num_rows() == 0 {
1583                continue;
1584            }
1585
1586            // Verify the batch has a row_id column
1587            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
1588                Error::InvalidArgumentError(
1589                    "batch must contain row_id column for direct append".into(),
1590                )
1591            })?;
1592
1593            // Append the batch directly to the underlying table
1594            table.table.append(&batch)?;
1595            total_rows += batch.num_rows();
1596        }
1597
1598        Ok(total_rows)
1599    }
1600
1601    pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
1602        let snapshot = self.txn_manager.begin_transaction();
1603        let result = self.update_with_snapshot(plan, snapshot)?;
1604        self.txn_manager.mark_committed(snapshot.txn_id);
1605        Ok(result)
1606    }
1607
1608    pub fn update_with_snapshot(
1609        &self,
1610        plan: UpdatePlan,
1611        snapshot: TransactionSnapshot,
1612    ) -> Result<RuntimeStatementResult<P>> {
1613        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1614        let table = self.lookup_table(&canonical_name)?;
1615        match plan.filter {
1616            Some(filter) => self.update_filtered_rows(
1617                table.as_ref(),
1618                display_name,
1619                plan.assignments,
1620                filter,
1621                snapshot,
1622            ),
1623            None => self.update_all_rows(table.as_ref(), display_name, plan.assignments, snapshot),
1624        }
1625    }
1626
1627    pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1628        let snapshot = self.txn_manager.begin_transaction();
1629        let result = self.delete_with_snapshot(plan, snapshot)?;
1630        self.txn_manager.mark_committed(snapshot.txn_id);
1631        Ok(result)
1632    }
1633
1634    pub fn delete_with_snapshot(
1635        &self,
1636        plan: DeletePlan,
1637        snapshot: TransactionSnapshot,
1638    ) -> Result<RuntimeStatementResult<P>> {
1639        let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1640        let table = self.lookup_table(&canonical_name)?;
1641        match plan.filter {
1642            Some(filter) => self.delete_filtered_rows(
1643                table.as_ref(),
1644                display_name,
1645                filter,
1646                snapshot,
1647                snapshot.txn_id,
1648            ),
1649            None => self.delete_all_rows(table.as_ref(), display_name, snapshot, snapshot.txn_id),
1650        }
1651    }
1652
1653    pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1654        RuntimeTableHandle::new(Arc::clone(self), name)
1655    }
1656
1657    pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
1658        let (_display_name, canonical_name) = canonical_table_name(&plan.table)?;
1659        // Verify table exists
1660        let _table = self.lookup_table(&canonical_name)?;
1661
1662        // Create a plan with canonical table name
1663        let mut canonical_plan = plan.clone();
1664        canonical_plan.table = canonical_name;
1665
1666        // Use the QueryExecutor from llkv-executor
1667        let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
1668            context: Arc::clone(self),
1669        });
1670        let executor = QueryExecutor::new(provider);
1671        executor.execute_select(canonical_plan)
1672    }
1673
1674    pub fn execute_select_with_snapshot(
1675        self: &Arc<Self>,
1676        plan: SelectPlan,
1677        snapshot: TransactionSnapshot,
1678    ) -> Result<SelectExecution<P>> {
1679        let (_display_name, canonical_name) = canonical_table_name(&plan.table)?;
1680        self.lookup_table(&canonical_name)?;
1681
1682        let mut canonical_plan = plan.clone();
1683        canonical_plan.table = canonical_name;
1684
1685        let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
1686            context: Arc::clone(self),
1687        });
1688        let executor = QueryExecutor::new(provider);
1689        let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
1690            Arc::clone(&self.txn_manager),
1691            snapshot,
1692        ));
1693        executor.execute_select_with_filter(canonical_plan, Some(row_filter))
1694    }
1695
1696    fn create_table_from_columns(
1697        &self,
1698        display_name: String,
1699        canonical_name: String,
1700        columns: Vec<ColumnSpec>,
1701        if_not_exists: bool,
1702    ) -> Result<RuntimeStatementResult<P>> {
1703        tracing::trace!(
1704            "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
1705            display_name,
1706            columns.len()
1707        );
1708        for (idx, col) in columns.iter().enumerate() {
1709            tracing::trace!(
1710                "  input column[{}]: name='{}' primary_key={}",
1711                idx,
1712                col.name,
1713                col.primary_key
1714            );
1715        }
1716        if columns.is_empty() {
1717            return Err(Error::InvalidArgumentError(
1718                "CREATE TABLE requires at least one column".into(),
1719            ));
1720        }
1721
1722        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(columns.len());
1723        let mut lookup = FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
1724        for (idx, column) in columns.iter().enumerate() {
1725            let normalized = column.name.to_ascii_lowercase();
1726            if lookup.insert(normalized.clone(), idx).is_some() {
1727                return Err(Error::InvalidArgumentError(format!(
1728                    "duplicate column name '{}' in table '{}'",
1729                    column.name, display_name
1730                )));
1731            }
1732            tracing::trace!(
1733                "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={}",
1734                idx,
1735                column.name,
1736                column.data_type,
1737                column.nullable,
1738                column.primary_key
1739            );
1740            column_defs.push(ExecutorColumn {
1741                name: column.name.clone(),
1742                data_type: column.data_type.clone(),
1743                nullable: column.nullable,
1744                primary_key: column.primary_key,
1745                field_id: (idx + 1) as FieldId,
1746            });
1747            let pushed = column_defs.last().unwrap();
1748            tracing::trace!(
1749                "DEBUG create_table_from_columns[{}]: pushed ExecutorColumn name='{}' primary_key={}",
1750                idx,
1751                pushed.name,
1752                pushed.primary_key
1753            );
1754        }
1755
1756        let table_id = self.reserve_table_id()?;
1757        tracing::trace!(
1758            "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
1759            display_name,
1760            table_id,
1761            &*self.pager
1762        );
1763        let table = Table::new(table_id, Arc::clone(&self.pager))?;
1764        table.put_table_meta(&TableMeta {
1765            table_id,
1766            name: Some(display_name.clone()),
1767            created_at_micros: current_time_micros(),
1768            flags: 0,
1769            epoch: 0,
1770        });
1771
1772        for column in &column_defs {
1773            table.put_col_meta(&ColMeta {
1774                col_id: column.field_id,
1775                name: Some(column.name.clone()),
1776                flags: 0,
1777                default: None,
1778            });
1779        }
1780
1781        let schema = Arc::new(ExecutorSchema {
1782            columns: column_defs.clone(), // Clone for catalog registration below
1783            lookup,
1784        });
1785        let table_entry = Arc::new(ExecutorTable {
1786            table: Arc::new(table),
1787            schema,
1788            next_row_id: AtomicU64::new(0),
1789            total_rows: AtomicU64::new(0),
1790        });
1791
1792        let mut tables = self.tables.write().unwrap();
1793        if tables.contains_key(&canonical_name) {
1794            if if_not_exists {
1795                return Ok(RuntimeStatementResult::CreateTable {
1796                    table_name: display_name,
1797                });
1798            }
1799            return Err(Error::CatalogError(format!(
1800                "Catalog Error: Table '{}' already exists",
1801                display_name
1802            )));
1803        }
1804        tables.insert(canonical_name.clone(), table_entry);
1805        drop(tables); // Release write lock before catalog operations
1806
1807        // Register table in catalog
1808        let registered_table_id = self.catalog.register_table(&display_name)?;
1809        tracing::debug!(
1810            "[CATALOG] Registered table '{}' with catalog_id={}",
1811            display_name,
1812            registered_table_id
1813        );
1814
1815        // Register fields in catalog
1816        if let Some(field_resolver) = self.catalog.field_resolver(registered_table_id) {
1817            for column in &column_defs {
1818                if let Err(e) = field_resolver.register_field(&column.name) {
1819                    tracing::warn!(
1820                        "[CATALOG] Failed to register field '{}' in table '{}': {}",
1821                        column.name,
1822                        display_name,
1823                        e
1824                    );
1825                }
1826            }
1827            tracing::debug!(
1828                "[CATALOG] Registered {} field(s) for table '{}'",
1829                column_defs.len(),
1830                display_name
1831            );
1832        }
1833
1834        Ok(RuntimeStatementResult::CreateTable {
1835            table_name: display_name,
1836        })
1837    }
1838
1839    fn create_table_from_batches(
1840        &self,
1841        display_name: String,
1842        canonical_name: String,
1843        schema: Arc<Schema>,
1844        batches: Vec<RecordBatch>,
1845        if_not_exists: bool,
1846    ) -> Result<RuntimeStatementResult<P>> {
1847        if schema.fields().is_empty() {
1848            return Err(Error::InvalidArgumentError(
1849                "CREATE TABLE AS SELECT requires at least one column".into(),
1850            ));
1851        }
1852        let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(schema.fields().len());
1853        let mut lookup =
1854            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
1855        for (idx, field) in schema.fields().iter().enumerate() {
1856            let data_type = match field.data_type() {
1857                DataType::Int64 | DataType::Float64 | DataType::Utf8 | DataType::Date32 => {
1858                    field.data_type().clone()
1859                }
1860                other => {
1861                    return Err(Error::InvalidArgumentError(format!(
1862                        "unsupported column type in CTAS result: {other:?}"
1863                    )));
1864                }
1865            };
1866            let normalized = field.name().to_ascii_lowercase();
1867            if lookup.insert(normalized.clone(), idx).is_some() {
1868                return Err(Error::InvalidArgumentError(format!(
1869                    "duplicate column name '{}' in CTAS result",
1870                    field.name()
1871                )));
1872            }
1873            column_defs.push(ExecutorColumn {
1874                name: field.name().to_string(),
1875                data_type,
1876                nullable: field.is_nullable(),
1877                primary_key: false, // CTAS does not preserve PRIMARY KEY constraints
1878                field_id: (idx + 1) as FieldId,
1879            });
1880        }
1881
1882        let table_id = self.reserve_table_id()?;
1883        let table = Table::new(table_id, Arc::clone(&self.pager))?;
1884        table.put_table_meta(&TableMeta {
1885            table_id,
1886            name: Some(display_name.clone()),
1887            created_at_micros: current_time_micros(),
1888            flags: 0,
1889            epoch: 0,
1890        });
1891
1892        for column in &column_defs {
1893            table.put_col_meta(&ColMeta {
1894                col_id: column.field_id,
1895                name: Some(column.name.clone()),
1896                flags: 0,
1897                default: None,
1898            });
1899        }
1900
1901        let schema_arc = Arc::new(ExecutorSchema {
1902            columns: column_defs.clone(),
1903            lookup,
1904        });
1905        let table_entry = Arc::new(ExecutorTable {
1906            table: Arc::new(table),
1907            schema: schema_arc,
1908            next_row_id: AtomicU64::new(0),
1909            total_rows: AtomicU64::new(0),
1910        });
1911
1912        let mut next_row_id: RowId = 0;
1913        let mut total_rows: u64 = 0;
1914        let creator_snapshot = self.txn_manager.begin_transaction();
1915        let creator_txn_id = creator_snapshot.txn_id;
1916        for batch in batches {
1917            let row_count = batch.num_rows();
1918            if row_count == 0 {
1919                continue;
1920            }
1921            if batch.num_columns() != column_defs.len() {
1922                return Err(Error::InvalidArgumentError(
1923                    "CTAS query returned unexpected column count".into(),
1924                ));
1925            }
1926            let start_row = next_row_id;
1927            next_row_id += row_count as u64;
1928            total_rows += row_count as u64;
1929
1930            // Build MVCC columns using helper
1931            let (row_id_array, created_by_array, deleted_by_array) =
1932                mvcc_columns::build_insert_mvcc_columns(row_count, start_row, creator_txn_id);
1933
1934            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_defs.len() + 3);
1935            arrays.push(row_id_array);
1936            arrays.push(created_by_array);
1937            arrays.push(deleted_by_array);
1938
1939            // Build schema fields
1940            let mut fields: Vec<Field> = Vec::with_capacity(column_defs.len() + 3);
1941            fields.extend(mvcc_columns::build_mvcc_fields());
1942
1943            for (idx, column) in column_defs.iter().enumerate() {
1944                let field = mvcc_columns::build_field_with_metadata(
1945                    &column.name,
1946                    column.data_type.clone(),
1947                    column.nullable,
1948                    column.field_id,
1949                );
1950                fields.push(field);
1951                arrays.push(batch.column(idx).clone());
1952            }
1953
1954            let append_schema = Arc::new(Schema::new(fields));
1955            let append_batch = RecordBatch::try_new(append_schema, arrays)?;
1956            table_entry.table.append(&append_batch)?;
1957        }
1958
1959        self.txn_manager.mark_committed(creator_txn_id);
1960
1961        table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
1962        table_entry.total_rows.store(total_rows, Ordering::SeqCst);
1963
1964        let mut tables = self.tables.write().unwrap();
1965        if tables.contains_key(&canonical_name) {
1966            if if_not_exists {
1967                return Ok(RuntimeStatementResult::CreateTable {
1968                    table_name: display_name,
1969                });
1970            }
1971            return Err(Error::CatalogError(format!(
1972                "Catalog Error: Table '{}' already exists",
1973                display_name
1974            )));
1975        }
1976        tracing::trace!(
1977            "=== INSERTING TABLE '{}' INTO TABLES MAP (pager={:p}) ===",
1978            canonical_name,
1979            &*self.pager
1980        );
1981        for (idx, col) in table_entry.schema.columns.iter().enumerate() {
1982            tracing::trace!(
1983                "  inserting column[{}]: name='{}' primary_key={}",
1984                idx,
1985                col.name,
1986                col.primary_key
1987            );
1988        }
1989        tables.insert(canonical_name.clone(), table_entry);
1990        drop(tables); // Release write lock before catalog operations
1991
1992        // Register table in catalog
1993        let registered_table_id = self.catalog.register_table(&display_name)?;
1994        tracing::debug!(
1995            "[CATALOG] Registered table '{}' (CTAS) with catalog_id={}",
1996            display_name,
1997            registered_table_id
1998        );
1999
2000        // Register fields in catalog
2001        if let Some(field_resolver) = self.catalog.field_resolver(registered_table_id) {
2002            for column in &column_defs {
2003                if let Err(e) = field_resolver.register_field(&column.name) {
2004                    tracing::warn!(
2005                        "[CATALOG] Failed to register field '{}' in table '{}': {}",
2006                        column.name,
2007                        display_name,
2008                        e
2009                    );
2010                }
2011            }
2012            tracing::debug!(
2013                "[CATALOG] Registered {} field(s) for table '{}' (CTAS)",
2014                column_defs.len(),
2015                display_name
2016            );
2017        }
2018
2019        Ok(RuntimeStatementResult::CreateTable {
2020            table_name: display_name,
2021        })
2022    }
2023
2024    fn check_primary_key_constraints(
2025        &self,
2026        table: &ExecutorTable<P>,
2027        rows: &[Vec<PlanValue>],
2028        column_order: &[usize],
2029        snapshot: TransactionSnapshot,
2030    ) -> Result<()> {
2031        let _table_id = table.table.table_id();
2032        // Find columns with PRIMARY KEY constraint
2033        let primary_key_columns: Vec<(usize, &ExecutorColumn)> = table
2034            .schema
2035            .columns
2036            .iter()
2037            .enumerate()
2038            .filter(|(_, col)| col.primary_key)
2039            .collect();
2040
2041        if primary_key_columns.is_empty() {
2042            return Ok(());
2043        }
2044
2045        // For each PRIMARY KEY column, check for duplicates
2046        for (col_idx, column) in primary_key_columns {
2047            // Get existing values for this column from the table
2048            let field_id = column.field_id;
2049            let existing_values = self.scan_column_values(table, field_id, snapshot)?;
2050
2051            tracing::trace!(
2052                "[PK_CHECK] snapshot(txn={}, snap_id={}) column '{}': found {} existing VISIBLE values: {:?}",
2053                snapshot.txn_id,
2054                snapshot.snapshot_id,
2055                column.name,
2056                existing_values.len(),
2057                existing_values
2058            );
2059
2060            // Check each new row value against existing values
2061            for row in rows {
2062                // Find which position in the INSERT statement corresponds to this column
2063                let insert_position = column_order
2064                    .iter()
2065                    .position(|&dest_idx| dest_idx == col_idx);
2066
2067                if let Some(pos) = insert_position {
2068                    let new_value = &row[pos];
2069
2070                    // Skip NULL values (PRIMARY KEY typically requires NOT NULL, but we check anyway)
2071                    if matches!(new_value, PlanValue::Null) {
2072                        continue;
2073                    }
2074
2075                    // Check if this value already exists
2076                    if existing_values.contains(new_value) {
2077                        return Err(Error::ConstraintError(format!(
2078                            "constraint violation on column '{}'",
2079                            column.name
2080                        )));
2081                    }
2082                }
2083            }
2084        }
2085
2086        Ok(())
2087    }
2088
2089    fn scan_column_values(
2090        &self,
2091        table: &ExecutorTable<P>,
2092        field_id: FieldId,
2093        snapshot: TransactionSnapshot,
2094    ) -> Result<Vec<PlanValue>> {
2095        let table_id = table.table.table_id();
2096        use llkv_expr::{Expr, Filter, Operator};
2097        use std::ops::Bound;
2098
2099        // Create a filter that matches all rows (unbounded range)
2100        let match_all_filter = Filter {
2101            field_id,
2102            op: Operator::Range {
2103                lower: Bound::Unbounded,
2104                upper: Bound::Unbounded,
2105            },
2106        };
2107        let filter_expr = Expr::Pred(match_all_filter);
2108
2109        // Get all matching row_ids first
2110        let row_ids = match table.table.filter_row_ids(&filter_expr) {
2111            Ok(ids) => ids,
2112            Err(Error::NotFound) => return Ok(Vec::new()),
2113            Err(e) => return Err(e),
2114        };
2115
2116        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
2117        let row_ids = filter_row_ids_for_snapshot(
2118            table.table.store(),
2119            table_id,
2120            row_ids,
2121            &self.txn_manager,
2122            snapshot,
2123        )?;
2124
2125        if row_ids.is_empty() {
2126            return Ok(Vec::new());
2127        }
2128
2129        // Gather the column values for visible rows
2130        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2131        let batch = match table.table.store().gather_rows(
2132            &[logical_field_id],
2133            &row_ids,
2134            GatherNullPolicy::IncludeNulls,
2135        ) {
2136            Ok(b) => b,
2137            Err(Error::NotFound) => return Ok(Vec::new()),
2138            Err(e) => return Err(e),
2139        };
2140
2141        let mut values = Vec::with_capacity(row_ids.len());
2142        if batch.num_columns() > 0 {
2143            let array = batch.column(0);
2144            for row_idx in 0..batch.num_rows() {
2145                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2146                    values.push(value);
2147                }
2148            }
2149        }
2150
2151        Ok(values)
2152    }
2153
2154    fn insert_rows(
2155        &self,
2156        table: &ExecutorTable<P>,
2157        display_name: String,
2158        rows: Vec<Vec<PlanValue>>,
2159        columns: Vec<String>,
2160        snapshot: TransactionSnapshot,
2161    ) -> Result<RuntimeStatementResult<P>> {
2162        if rows.is_empty() {
2163            return Err(Error::InvalidArgumentError(
2164                "INSERT requires at least one row".into(),
2165            ));
2166        }
2167
2168        let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
2169        let expected_len = column_order.len();
2170        for row in &rows {
2171            if row.len() != expected_len {
2172                return Err(Error::InvalidArgumentError(format!(
2173                    "expected {} values in INSERT row, found {}",
2174                    expected_len,
2175                    row.len()
2176                )));
2177            }
2178        }
2179
2180        // Check PRIMARY KEY constraints
2181        self.check_primary_key_constraints(table, &rows, &column_order, snapshot)?;
2182
2183        if display_name == "keys" {
2184            tracing::trace!(
2185                "[KEYS] Checking PRIMARY KEY constraints - {} rows to insert",
2186                rows.len()
2187            );
2188            for (i, row) in rows.iter().enumerate() {
2189                tracing::trace!("[KEYS]   row[{}]: {:?}", i, row);
2190            }
2191        }
2192
2193        let constraint_result =
2194            self.check_primary_key_constraints(table, &rows, &column_order, snapshot);
2195
2196        if display_name == "keys" {
2197            match &constraint_result {
2198                Ok(_) => tracing::trace!("[KEYS] PRIMARY KEY check PASSED"),
2199                Err(e) => tracing::trace!("[KEYS] PRIMARY KEY check FAILED: {:?}", e),
2200            }
2201        }
2202
2203        constraint_result?;
2204
2205        let row_count = rows.len();
2206        let mut column_values: Vec<Vec<PlanValue>> =
2207            vec![Vec::with_capacity(row_count); table.schema.columns.len()];
2208        for row in rows {
2209            for (idx, value) in row.into_iter().enumerate() {
2210                let dest_index = column_order[idx];
2211                column_values[dest_index].push(value);
2212            }
2213        }
2214
2215        let start_row = table.next_row_id.load(Ordering::SeqCst);
2216
2217        // Build MVCC columns using helper
2218        let (row_id_array, created_by_array, deleted_by_array) =
2219            mvcc_columns::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id);
2220
2221        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
2222        arrays.push(row_id_array);
2223        arrays.push(created_by_array);
2224        arrays.push(deleted_by_array);
2225
2226        let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
2227        fields.extend(mvcc_columns::build_mvcc_fields());
2228
2229        for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
2230            let array = build_array_for_column(&column.data_type, &values)?;
2231            let field = mvcc_columns::build_field_with_metadata(
2232                &column.name,
2233                column.data_type.clone(),
2234                column.nullable,
2235                column.field_id,
2236            );
2237            arrays.push(array);
2238            fields.push(field);
2239        }
2240
2241        let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
2242        table.table.append(&batch)?;
2243        table
2244            .next_row_id
2245            .store(start_row + row_count as u64, Ordering::SeqCst);
2246        table
2247            .total_rows
2248            .fetch_add(row_count as u64, Ordering::SeqCst);
2249
2250        Ok(RuntimeStatementResult::Insert {
2251            table_name: display_name,
2252            rows_inserted: row_count,
2253        })
2254    }
2255
2256    fn insert_batches(
2257        &self,
2258        table: &ExecutorTable<P>,
2259        display_name: String,
2260        batches: Vec<RecordBatch>,
2261        columns: Vec<String>,
2262        snapshot: TransactionSnapshot,
2263    ) -> Result<RuntimeStatementResult<P>> {
2264        if batches.is_empty() {
2265            return Ok(RuntimeStatementResult::Insert {
2266                table_name: display_name,
2267                rows_inserted: 0,
2268            });
2269        }
2270
2271        let expected_len = if columns.is_empty() {
2272            table.schema.columns.len()
2273        } else {
2274            columns.len()
2275        };
2276        let mut total_rows_inserted = 0usize;
2277
2278        for batch in batches {
2279            if batch.num_columns() != expected_len {
2280                return Err(Error::InvalidArgumentError(format!(
2281                    "expected {} columns in INSERT batch, found {}",
2282                    expected_len,
2283                    batch.num_columns()
2284                )));
2285            }
2286            let row_count = batch.num_rows();
2287            if row_count == 0 {
2288                continue;
2289            }
2290            let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
2291            for row_idx in 0..row_count {
2292                let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
2293                for col_idx in 0..expected_len {
2294                    let array = batch.column(col_idx);
2295                    row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
2296                }
2297                rows.push(row);
2298            }
2299
2300            match self.insert_rows(table, display_name.clone(), rows, columns.clone(), snapshot)? {
2301                RuntimeStatementResult::Insert { rows_inserted, .. } => {
2302                    total_rows_inserted += rows_inserted;
2303                }
2304                _ => unreachable!("insert_rows must return Insert result"),
2305            }
2306        }
2307
2308        Ok(RuntimeStatementResult::Insert {
2309            table_name: display_name,
2310            rows_inserted: total_rows_inserted,
2311        })
2312    }
2313
2314    fn update_filtered_rows(
2315        &self,
2316        table: &ExecutorTable<P>,
2317        display_name: String,
2318        assignments: Vec<ColumnAssignment>,
2319        filter: LlkvExpr<'static, String>,
2320        snapshot: TransactionSnapshot,
2321    ) -> Result<RuntimeStatementResult<P>> {
2322        if assignments.is_empty() {
2323            return Err(Error::InvalidArgumentError(
2324                "UPDATE requires at least one assignment".into(),
2325            ));
2326        }
2327
2328        let schema = table.schema.as_ref();
2329        let filter_expr = translate_predicate(filter, schema)?;
2330
2331        // TODO: Dedupe
2332        enum PreparedValue {
2333            Literal(PlanValue),
2334            Expression { expr_index: usize },
2335        }
2336
2337        let mut seen_columns: FxHashSet<String> =
2338            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
2339        let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
2340            Vec::with_capacity(assignments.len());
2341        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
2342
2343        for assignment in assignments {
2344            let normalized = assignment.column.to_ascii_lowercase();
2345            if !seen_columns.insert(normalized.clone()) {
2346                return Err(Error::InvalidArgumentError(format!(
2347                    "duplicate column '{}' in UPDATE assignments",
2348                    assignment.column
2349                )));
2350            }
2351            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
2352                Error::InvalidArgumentError(format!(
2353                    "unknown column '{}' in UPDATE",
2354                    assignment.column
2355                ))
2356            })?;
2357
2358            match assignment.value {
2359                AssignmentValue::Literal(value) => {
2360                    prepared.push((column.clone(), PreparedValue::Literal(value)));
2361                }
2362                AssignmentValue::Expression(expr) => {
2363                    let translated = translate_scalar(&expr, schema)?;
2364                    let expr_index = scalar_exprs.len();
2365                    scalar_exprs.push(translated);
2366                    prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
2367                }
2368            }
2369        }
2370
2371        let (row_ids, mut expr_values) =
2372            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
2373
2374        if row_ids.is_empty() {
2375            return Ok(RuntimeStatementResult::Update {
2376                table_name: display_name,
2377                rows_updated: 0,
2378            });
2379        }
2380
2381        let row_count = row_ids.len();
2382        let table_id = table.table.table_id();
2383        let logical_fields: Vec<LogicalFieldId> = table
2384            .schema
2385            .columns
2386            .iter()
2387            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
2388            .collect();
2389
2390        let gathered = table.table.store().gather_rows(
2391            &logical_fields,
2392            &row_ids,
2393            GatherNullPolicy::IncludeNulls,
2394        )?;
2395
2396        let mut new_rows: Vec<Vec<PlanValue>> =
2397            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
2398        for (col_idx, _column) in table.schema.columns.iter().enumerate() {
2399            let array = gathered.column(col_idx);
2400            for (row_idx, row) in new_rows.iter_mut().enumerate().take(row_count) {
2401                let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2402                row.push(value);
2403            }
2404        }
2405
2406        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
2407            table
2408                .schema
2409                .columns
2410                .iter()
2411                .enumerate()
2412                .map(|(idx, column)| (column.field_id, idx)),
2413        );
2414
2415        for (column, value) in prepared {
2416            let column_index =
2417                column_positions
2418                    .get(&column.field_id)
2419                    .copied()
2420                    .ok_or_else(|| {
2421                        Error::InvalidArgumentError(format!(
2422                            "column '{}' missing in table schema during UPDATE",
2423                            column.name
2424                        ))
2425                    })?;
2426
2427            let values = match value {
2428                PreparedValue::Literal(lit) => vec![lit; row_count],
2429                PreparedValue::Expression { expr_index } => {
2430                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
2431                        Error::InvalidArgumentError(
2432                            "expression assignment value missing during UPDATE".into(),
2433                        )
2434                    })?;
2435                    if column_values.len() != row_count {
2436                        return Err(Error::InvalidArgumentError(
2437                            "expression result count did not match targeted row count".into(),
2438                        ));
2439                    }
2440                    mem::take(column_values)
2441                }
2442            };
2443
2444            for (row_idx, new_value) in values.into_iter().enumerate() {
2445                if let Some(row) = new_rows.get_mut(row_idx) {
2446                    row[column_index] = new_value;
2447                }
2448            }
2449        }
2450
2451        let _ = self.apply_delete(
2452            table,
2453            display_name.clone(),
2454            row_ids.clone(),
2455            snapshot.txn_id,
2456        )?;
2457
2458        let column_names: Vec<String> = table
2459            .schema
2460            .columns
2461            .iter()
2462            .map(|column| column.name.clone())
2463            .collect();
2464
2465        let _ = self.insert_rows(
2466            table,
2467            display_name.clone(),
2468            new_rows,
2469            column_names,
2470            snapshot,
2471        )?;
2472
2473        Ok(RuntimeStatementResult::Update {
2474            table_name: display_name,
2475            rows_updated: row_count,
2476        })
2477    }
2478
2479    fn update_all_rows(
2480        &self,
2481        table: &ExecutorTable<P>,
2482        display_name: String,
2483        assignments: Vec<ColumnAssignment>,
2484        snapshot: TransactionSnapshot,
2485    ) -> Result<RuntimeStatementResult<P>> {
2486        if assignments.is_empty() {
2487            return Err(Error::InvalidArgumentError(
2488                "UPDATE requires at least one assignment".into(),
2489            ));
2490        }
2491
2492        let total_rows = table.total_rows.load(Ordering::SeqCst);
2493        let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
2494            Error::InvalidArgumentError("table row count exceeds supported range".into())
2495        })?;
2496        if total_rows_usize == 0 {
2497            return Ok(RuntimeStatementResult::Update {
2498                table_name: display_name,
2499                rows_updated: 0,
2500            });
2501        }
2502
2503        let schema = table.schema.as_ref();
2504
2505        // TODO: Dedupe
2506        enum PreparedValue {
2507            Literal(PlanValue),
2508            Expression { expr_index: usize },
2509        }
2510
2511        let mut seen_columns: FxHashSet<String> =
2512            FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
2513        let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
2514            Vec::with_capacity(assignments.len());
2515        let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
2516        let mut first_field_id: Option<FieldId> = None;
2517
2518        for assignment in assignments {
2519            let normalized = assignment.column.to_ascii_lowercase();
2520            if !seen_columns.insert(normalized.clone()) {
2521                return Err(Error::InvalidArgumentError(format!(
2522                    "duplicate column '{}' in UPDATE assignments",
2523                    assignment.column
2524                )));
2525            }
2526            let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
2527                Error::InvalidArgumentError(format!(
2528                    "unknown column '{}' in UPDATE",
2529                    assignment.column
2530                ))
2531            })?;
2532            if first_field_id.is_none() {
2533                first_field_id = Some(column.field_id);
2534            }
2535
2536            match assignment.value {
2537                AssignmentValue::Literal(value) => {
2538                    prepared.push((column.clone(), PreparedValue::Literal(value)));
2539                }
2540                AssignmentValue::Expression(expr) => {
2541                    let translated = translate_scalar(&expr, schema)?;
2542                    let expr_index = scalar_exprs.len();
2543                    scalar_exprs.push(translated);
2544                    prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
2545                }
2546            }
2547        }
2548
2549        let anchor_field = first_field_id.ok_or_else(|| {
2550            Error::InvalidArgumentError("UPDATE requires at least one target column".into())
2551        })?;
2552
2553        let filter_expr = full_table_scan_filter(anchor_field);
2554        let (row_ids, mut expr_values) =
2555            self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
2556
2557        if row_ids.is_empty() {
2558            return Ok(RuntimeStatementResult::Update {
2559                table_name: display_name,
2560                rows_updated: 0,
2561            });
2562        }
2563
2564        let row_count = row_ids.len();
2565        let table_id = table.table.table_id();
2566        let logical_fields: Vec<LogicalFieldId> = table
2567            .schema
2568            .columns
2569            .iter()
2570            .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
2571            .collect();
2572
2573        let gathered = table.table.store().gather_rows(
2574            &logical_fields,
2575            &row_ids,
2576            GatherNullPolicy::IncludeNulls,
2577        )?;
2578
2579        let mut new_rows: Vec<Vec<PlanValue>> =
2580            vec![Vec::with_capacity(table.schema.columns.len()); row_count];
2581        for (col_idx, _column) in table.schema.columns.iter().enumerate() {
2582            let array = gathered.column(col_idx);
2583            for (row_idx, row) in new_rows.iter_mut().enumerate().take(row_count) {
2584                let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2585                row.push(value);
2586            }
2587        }
2588
2589        let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
2590            table
2591                .schema
2592                .columns
2593                .iter()
2594                .enumerate()
2595                .map(|(idx, column)| (column.field_id, idx)),
2596        );
2597
2598        for (column, value) in prepared {
2599            let column_index =
2600                column_positions
2601                    .get(&column.field_id)
2602                    .copied()
2603                    .ok_or_else(|| {
2604                        Error::InvalidArgumentError(format!(
2605                            "column '{}' missing in table schema during UPDATE",
2606                            column.name
2607                        ))
2608                    })?;
2609
2610            let values = match value {
2611                PreparedValue::Literal(lit) => vec![lit; row_count],
2612                PreparedValue::Expression { expr_index } => {
2613                    let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
2614                        Error::InvalidArgumentError(
2615                            "expression assignment value missing during UPDATE".into(),
2616                        )
2617                    })?;
2618                    if column_values.len() != row_count {
2619                        return Err(Error::InvalidArgumentError(
2620                            "expression result count did not match targeted row count".into(),
2621                        ));
2622                    }
2623                    mem::take(column_values)
2624                }
2625            };
2626
2627            for (row_idx, new_value) in values.into_iter().enumerate() {
2628                if let Some(row) = new_rows.get_mut(row_idx) {
2629                    row[column_index] = new_value;
2630                }
2631            }
2632        }
2633
2634        let _ = self.apply_delete(
2635            table,
2636            display_name.clone(),
2637            row_ids.clone(),
2638            snapshot.txn_id,
2639        )?;
2640
2641        let column_names: Vec<String> = table
2642            .schema
2643            .columns
2644            .iter()
2645            .map(|column| column.name.clone())
2646            .collect();
2647
2648        let _ = self.insert_rows(
2649            table,
2650            display_name.clone(),
2651            new_rows,
2652            column_names,
2653            snapshot,
2654        )?;
2655
2656        Ok(RuntimeStatementResult::Update {
2657            table_name: display_name,
2658            rows_updated: row_count,
2659        })
2660    }
2661
2662    fn delete_filtered_rows(
2663        &self,
2664        table: &ExecutorTable<P>,
2665        display_name: String,
2666        filter: LlkvExpr<'static, String>,
2667        snapshot: TransactionSnapshot,
2668        txn_id: TxnId,
2669    ) -> Result<RuntimeStatementResult<P>> {
2670        let schema = table.schema.as_ref();
2671        let filter_expr = translate_predicate(filter, schema)?;
2672        let row_ids = table.table.filter_row_ids(&filter_expr)?;
2673        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2674        tracing::trace!(
2675            table = %display_name,
2676            rows = row_ids.len(),
2677            "delete_filtered_rows collected row ids"
2678        );
2679        self.apply_delete(table, display_name, row_ids, txn_id)
2680    }
2681
2682    fn delete_all_rows(
2683        &self,
2684        table: &ExecutorTable<P>,
2685        display_name: String,
2686        snapshot: TransactionSnapshot,
2687        txn_id: TxnId,
2688    ) -> Result<RuntimeStatementResult<P>> {
2689        let total_rows = table.total_rows.load(Ordering::SeqCst);
2690        if total_rows == 0 {
2691            return Ok(RuntimeStatementResult::Delete {
2692                table_name: display_name,
2693                rows_deleted: 0,
2694            });
2695        }
2696
2697        let anchor_field = table.schema.first_field_id().ok_or_else(|| {
2698            Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
2699        })?;
2700        let filter_expr = full_table_scan_filter(anchor_field);
2701        let row_ids = table.table.filter_row_ids(&filter_expr)?;
2702        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2703        self.apply_delete(table, display_name, row_ids, txn_id)
2704    }
2705
2706    fn apply_delete(
2707        &self,
2708        table: &ExecutorTable<P>,
2709        display_name: String,
2710        row_ids: Vec<u64>,
2711        txn_id: TxnId,
2712    ) -> Result<RuntimeStatementResult<P>> {
2713        if row_ids.is_empty() {
2714            return Ok(RuntimeStatementResult::Delete {
2715                table_name: display_name,
2716                rows_deleted: 0,
2717            });
2718        }
2719
2720        let removed = row_ids.len();
2721
2722        // Build DELETE batch using helper
2723        let batch = mvcc_columns::build_delete_batch(row_ids.clone(), txn_id)?;
2724        table.table.append(&batch)?;
2725
2726        let removed_u64 = u64::try_from(removed)
2727            .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
2728        table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
2729
2730        Ok(RuntimeStatementResult::Delete {
2731            table_name: display_name,
2732            rows_deleted: removed,
2733        })
2734    }
2735
2736    fn collect_update_rows(
2737        &self,
2738        table: &ExecutorTable<P>,
2739        filter_expr: &LlkvExpr<'static, FieldId>,
2740        expressions: &[ScalarExpr<FieldId>],
2741        snapshot: TransactionSnapshot,
2742    ) -> Result<(Vec<u64>, Vec<Vec<PlanValue>>)> {
2743        let row_ids = table.table.filter_row_ids(filter_expr)?;
2744        let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2745        if row_ids.is_empty() {
2746            return Ok((row_ids, vec![Vec::new(); expressions.len()]));
2747        }
2748
2749        if expressions.is_empty() {
2750            return Ok((row_ids, Vec::new()));
2751        }
2752
2753        let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
2754        for (idx, expr) in expressions.iter().enumerate() {
2755            let alias = format!("__expr_{idx}");
2756            projections.push(ScanProjection::computed(expr.clone(), alias));
2757        }
2758
2759        let mut expr_values: Vec<Vec<PlanValue>> =
2760            vec![Vec::with_capacity(row_ids.len()); expressions.len()];
2761        let mut error: Option<Error> = None;
2762        let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
2763            Arc::clone(&self.txn_manager),
2764            snapshot,
2765        ));
2766        let options = ScanStreamOptions {
2767            include_nulls: true,
2768            order: None,
2769            row_id_filter: Some(row_filter),
2770        };
2771
2772        table
2773            .table
2774            .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
2775                if error.is_some() {
2776                    return;
2777                }
2778                if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
2779                    error = Some(err);
2780                }
2781            })?;
2782
2783        if let Some(err) = error {
2784            return Err(err);
2785        }
2786
2787        for values in &expr_values {
2788            if values.len() != row_ids.len() {
2789                return Err(Error::InvalidArgumentError(
2790                    "expression result count did not match targeted row count".into(),
2791                ));
2792            }
2793        }
2794
2795        Ok((row_ids, expr_values))
2796    }
2797
2798    fn collect_expression_values(
2799        expr_values: &mut [Vec<PlanValue>],
2800        batch: RecordBatch,
2801    ) -> Result<()> {
2802        for row_idx in 0..batch.num_rows() {
2803            for (expr_index, values) in expr_values.iter_mut().enumerate() {
2804                let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
2805                values.push(value);
2806            }
2807        }
2808
2809        Ok(())
2810    }
2811
2812    fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
2813        // Fast path: check if table is already loaded
2814        {
2815            let tables = self.tables.read().unwrap();
2816            if let Some(table) = tables.get(canonical_name) {
2817                tracing::trace!(
2818                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
2819                    canonical_name,
2820                    table.table.table_id(),
2821                    table.schema.columns.len(),
2822                    &*self.pager
2823                );
2824                return Ok(Arc::clone(table));
2825            }
2826        } // Release read lock
2827
2828        // Slow path: load table from catalog (happens once per table)
2829        tracing::debug!(
2830            "[LAZY_LOAD] Loading table '{}' from catalog",
2831            canonical_name
2832        );
2833
2834        // Check catalog first for table existence
2835        let _catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
2836            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
2837        })?;
2838
2839        let store = ColumnStore::open(Arc::clone(&self.pager))?;
2840        let catalog = SysCatalog::new(&store);
2841
2842        // Find the table metadata in the catalog
2843        let all_metas = catalog.all_table_metas()?;
2844        let (table_id, _meta) = all_metas
2845            .iter()
2846            .find(|(_, meta)| {
2847                meta.name
2848                    .as_ref()
2849                    .map(|n| n.to_ascii_lowercase() == canonical_name)
2850                    .unwrap_or(false)
2851            })
2852            .ok_or_else(|| {
2853                Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
2854            })?;
2855
2856        // Open the table and build ExecutorTable
2857        let table = Table::new(*table_id, Arc::clone(&self.pager))?;
2858        let schema = table.schema()?;
2859
2860        // Build ExecutorSchema from Arrow schema (skip row_id field at index 0)
2861        let mut executor_columns = Vec::new();
2862        let mut lookup =
2863            FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
2864
2865        for (idx, field) in schema.fields().iter().enumerate().skip(1) {
2866            // Get field_id from metadata
2867            let field_id = field
2868                .metadata()
2869                .get(llkv_table::constants::FIELD_ID_META_KEY)
2870                .and_then(|s| s.parse::<FieldId>().ok())
2871                .unwrap_or(idx as FieldId);
2872
2873            let normalized = field.name().to_ascii_lowercase();
2874            let col_idx = executor_columns.len();
2875            lookup.insert(normalized, col_idx);
2876
2877            executor_columns.push(ExecutorColumn {
2878                name: field.name().to_string(),
2879                data_type: field.data_type().clone(),
2880                nullable: field.is_nullable(),
2881                primary_key: false, // Not stored in schema metadata currently
2882                field_id,
2883            });
2884        }
2885
2886        let exec_schema = Arc::new(ExecutorSchema {
2887            columns: executor_columns,
2888            lookup,
2889        });
2890
2891        // Find the maximum row_id in the table to set next_row_id correctly
2892        let max_row_id = {
2893            use arrow::array::UInt64Array;
2894            use llkv_column_map::store::rowid_fid;
2895            use llkv_column_map::store::scan::{
2896                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
2897                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
2898            };
2899
2900            struct MaxRowIdVisitor {
2901                max: RowId,
2902            }
2903
2904            impl PrimitiveVisitor for MaxRowIdVisitor {
2905                fn u64_chunk(&mut self, values: &UInt64Array) {
2906                    for i in 0..values.len() {
2907                        let val = values.value(i);
2908                        if val > self.max {
2909                            self.max = val;
2910                        }
2911                    }
2912                }
2913            }
2914
2915            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
2916            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
2917            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
2918
2919            // Scan the row_id column for any user field in this table
2920            let row_id_field = rowid_fid(LogicalFieldId::for_user(*table_id, 1));
2921            let mut visitor = MaxRowIdVisitor { max: 0 };
2922
2923            match ScanBuilder::new(table.store(), row_id_field)
2924                .options(ScanOptions::default())
2925                .run(&mut visitor)
2926            {
2927                Ok(_) => visitor.max,
2928                Err(llkv_result::Error::NotFound) => 0,
2929                Err(e) => {
2930                    tracing::warn!(
2931                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
2932                        canonical_name,
2933                        e
2934                    );
2935                    0
2936                }
2937            }
2938        };
2939
2940        let next_row_id = if max_row_id > 0 {
2941            max_row_id.saturating_add(1)
2942        } else {
2943            0
2944        };
2945
2946        // Get the actual persisted row count from table metadata
2947        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
2948        // Fallback to 0 for truly empty tables
2949        let total_rows = table.total_rows().unwrap_or(0);
2950
2951        let executor_table = Arc::new(ExecutorTable {
2952            table: Arc::new(table),
2953            schema: exec_schema,
2954            next_row_id: AtomicU64::new(next_row_id),
2955            total_rows: AtomicU64::new(total_rows),
2956        });
2957
2958        // Cache the loaded table
2959        {
2960            let mut tables = self.tables.write().unwrap();
2961            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
2962        }
2963
2964        // Register fields in catalog (may already be registered from RuntimeContext::new())
2965        if let Some(field_resolver) = self.catalog.field_resolver(_catalog_table_id) {
2966            for col in &executor_table.schema.columns {
2967                let _ = field_resolver.register_field(&col.name); // Ignore "already exists" errors
2968            }
2969            tracing::debug!(
2970                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
2971                executor_table.schema.columns.len(),
2972                canonical_name
2973            );
2974        }
2975
2976        tracing::debug!(
2977            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
2978            canonical_name,
2979            table_id,
2980            schema.fields().len() - 1,
2981            next_row_id
2982        );
2983
2984        Ok(executor_table)
2985    }
2986
2987    fn remove_table_entry(&self, canonical_name: &str) {
2988        let mut tables = self.tables.write().unwrap();
2989        if tables.remove(canonical_name).is_some() {
2990            tracing::trace!(
2991                "remove_table_entry: removed table '{}' from context cache",
2992                canonical_name
2993            );
2994        }
2995    }
2996
2997    pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
2998        let (display_name, canonical_name) = canonical_table_name(name)?;
2999        let tables = self.tables.read().unwrap();
3000        if !tables.contains_key(&canonical_name) {
3001            if if_exists {
3002                return Ok(());
3003            } else {
3004                return Err(Error::CatalogError(format!(
3005                    "Catalog Error: Table '{}' does not exist",
3006                    display_name
3007                )));
3008            }
3009        }
3010        drop(tables);
3011
3012        // Unregister from catalog
3013        self.catalog.unregister_table(&canonical_name);
3014        tracing::debug!(
3015            "[CATALOG] Unregistered table '{}' from catalog",
3016            canonical_name
3017        );
3018
3019        self.dropped_tables.write().unwrap().insert(canonical_name);
3020        Ok(())
3021    }
3022
3023    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
3024        self.dropped_tables.read().unwrap().contains(canonical_name)
3025    }
3026
3027    fn reserve_table_id(&self) -> Result<TableId> {
3028        let store = ColumnStore::open(Arc::clone(&self.pager))?;
3029        let catalog = SysCatalog::new(&store);
3030
3031        let mut next = match catalog.get_next_table_id()? {
3032            Some(value) => value,
3033            None => {
3034                let seed = catalog.max_table_id()?.unwrap_or(CATALOG_TABLE_ID);
3035                let initial = seed.checked_add(1).ok_or_else(|| {
3036                    Error::InvalidArgumentError("exhausted available table ids".into())
3037                })?;
3038                catalog.put_next_table_id(initial)?;
3039                initial
3040            }
3041        };
3042
3043        // Skip any reserved table IDs
3044        while llkv_table::reserved::is_reserved_table_id(next) {
3045            next = next.checked_add(1).ok_or_else(|| {
3046                Error::InvalidArgumentError("exhausted available table ids".into())
3047            })?;
3048        }
3049
3050        let mut following = next
3051            .checked_add(1)
3052            .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
3053
3054        // Skip any reserved table IDs for the next allocation
3055        while llkv_table::reserved::is_reserved_table_id(following) {
3056            following = following.checked_add(1).ok_or_else(|| {
3057                Error::InvalidArgumentError("exhausted available table ids".into())
3058            })?;
3059        }
3060
3061        catalog.put_next_table_id(following)?;
3062        Ok(next)
3063    }
3064}
3065
3066// Implement TransactionContext for ContextWrapper to enable llkv-transaction integration
3067impl<P> TransactionContext for RuntimeContextWrapper<P>
3068where
3069    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3070{
3071    type Pager = P;
3072
3073    fn set_snapshot(&self, snapshot: TransactionSnapshot) {
3074        self.update_snapshot(snapshot);
3075    }
3076
3077    fn snapshot(&self) -> TransactionSnapshot {
3078        self.current_snapshot()
3079    }
3080
3081    fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
3082        RuntimeContext::table_column_specs(self.context(), table_name)
3083    }
3084
3085    fn export_table_rows(
3086        &self,
3087        table_name: &str,
3088    ) -> llkv_result::Result<llkv_transaction::RowBatch> {
3089        let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
3090        // Convert from llkv_executor::RowBatch to llkv_transaction::RowBatch
3091        Ok(llkv_transaction::RowBatch {
3092            columns: batch.columns,
3093            rows: batch.rows,
3094        })
3095    }
3096
3097    fn get_batches_with_row_ids(
3098        &self,
3099        table_name: &str,
3100        filter: Option<LlkvExpr<'static, String>>,
3101    ) -> llkv_result::Result<Vec<RecordBatch>> {
3102        RuntimeContext::get_batches_with_row_ids_with_snapshot(
3103            self.context(),
3104            table_name,
3105            filter,
3106            self.snapshot(),
3107        )
3108    }
3109
3110    fn execute_select(
3111        &self,
3112        plan: SelectPlan,
3113    ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
3114        RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
3115    }
3116
3117    fn create_table_plan(
3118        &self,
3119        plan: CreateTablePlan,
3120    ) -> llkv_result::Result<TransactionResult<P>> {
3121        let result = RuntimeContext::create_table_plan(self.context(), plan)?;
3122        Ok(convert_statement_result(result))
3123    }
3124
3125    fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
3126        tracing::trace!(
3127            "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
3128            plan.table,
3129            &*self.ctx.pager
3130        );
3131        let snapshot = self.current_snapshot();
3132        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3133            self.ctx().insert(plan)?
3134        } else {
3135            RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
3136        };
3137        Ok(convert_statement_result(result))
3138    }
3139
3140    fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
3141        let snapshot = self.current_snapshot();
3142        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3143            self.ctx().update(plan)?
3144        } else {
3145            RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
3146        };
3147        Ok(convert_statement_result(result))
3148    }
3149
3150    fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
3151        let snapshot = self.current_snapshot();
3152        let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3153            self.ctx().delete(plan)?
3154        } else {
3155            RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
3156        };
3157        Ok(convert_statement_result(result))
3158    }
3159
3160    fn append_batches_with_row_ids(
3161        &self,
3162        table_name: &str,
3163        batches: Vec<RecordBatch>,
3164    ) -> llkv_result::Result<usize> {
3165        RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
3166    }
3167
3168    fn table_names(&self) -> Vec<String> {
3169        RuntimeContext::table_names(self.context())
3170    }
3171
3172    fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
3173        // Check CURRENT state: if table is marked as dropped, return error
3174        // This is used by conflict detection to detect if a table was dropped
3175        let ctx = self.context();
3176        if ctx.is_table_marked_dropped(table_name) {
3177            return Err(Error::InvalidArgumentError(format!(
3178                "table '{}' has been dropped",
3179                table_name
3180            )));
3181        }
3182
3183        let table = ctx.lookup_table(table_name)?;
3184        Ok(table.table.table_id())
3185    }
3186
3187    fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
3188        let ctx = self.context();
3189        ctx.catalog.snapshot()
3190    }
3191}
3192
3193// Helper to convert StatementResult between types (legacy)
3194fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
3195where
3196    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3197{
3198    use llkv_transaction::TransactionResult as TxResult;
3199    match result {
3200        RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
3201        RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
3202        RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
3203            rows_matched: rows_updated,
3204            rows_updated,
3205        },
3206        RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
3207        RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
3208        _ => panic!("unsupported StatementResult conversion"),
3209    }
3210}
3211
3212fn filter_row_ids_for_snapshot<P>(
3213    store: &ColumnStore<P>,
3214    table_id: TableId,
3215    row_ids: Vec<u64>,
3216    txn_manager: &TxnIdManager,
3217    snapshot: TransactionSnapshot,
3218) -> Result<Vec<u64>>
3219where
3220    P: Pager<Blob = EntryHandle> + Send + Sync,
3221{
3222    tracing::debug!(
3223        "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
3224        row_ids.len(),
3225        snapshot.txn_id,
3226        snapshot.snapshot_id
3227    );
3228
3229    if row_ids.is_empty() {
3230        return Ok(row_ids);
3231    }
3232
3233    let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
3234    let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3235
3236    let version_batch = match store.gather_rows(
3237        &[created_lfid, deleted_lfid],
3238        &row_ids,
3239        GatherNullPolicy::IncludeNulls,
3240    ) {
3241        Ok(batch) => batch,
3242        Err(Error::NotFound) => {
3243            tracing::trace!(
3244                "[FILTER_ROWS] gather_rows returned NotFound for MVCC columns, treating all {} rows as visible (committed)",
3245                row_ids.len()
3246            );
3247            return Ok(row_ids);
3248        }
3249        Err(err) => {
3250            tracing::error!("[FILTER_ROWS] gather_rows error: {:?}", err);
3251            return Err(err);
3252        }
3253    };
3254
3255    if version_batch.num_columns() < 2 {
3256        tracing::debug!(
3257            "[FILTER_ROWS] version_batch has < 2 columns, returning all {} rows",
3258            row_ids.len()
3259        );
3260        return Ok(row_ids);
3261    }
3262
3263    let created_column = version_batch
3264        .column(0)
3265        .as_any()
3266        .downcast_ref::<UInt64Array>();
3267    let deleted_column = version_batch
3268        .column(1)
3269        .as_any()
3270        .downcast_ref::<UInt64Array>();
3271
3272    if created_column.is_none() || deleted_column.is_none() {
3273        tracing::debug!(
3274            "[FILTER_ROWS] Failed to downcast columns, returning all {} rows",
3275            row_ids.len()
3276        );
3277        return Ok(row_ids);
3278    }
3279
3280    let created_column = created_column.unwrap();
3281    let deleted_column = deleted_column.unwrap();
3282
3283    let mut visible = Vec::with_capacity(row_ids.len());
3284    for (idx, row_id) in row_ids.iter().enumerate() {
3285        let created_by = if created_column.is_null(idx) {
3286            TXN_ID_AUTO_COMMIT
3287        } else {
3288            created_column.value(idx)
3289        };
3290        let deleted_by = if deleted_column.is_null(idx) {
3291            TXN_ID_NONE
3292        } else {
3293            deleted_column.value(idx)
3294        };
3295
3296        let version = RowVersion {
3297            created_by,
3298            deleted_by,
3299        };
3300        let is_visible = version.is_visible_for(txn_manager, snapshot);
3301        tracing::trace!(
3302            "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
3303            row_id,
3304            created_by,
3305            deleted_by,
3306            is_visible
3307        );
3308        if is_visible {
3309            visible.push(*row_id);
3310        }
3311    }
3312
3313    tracing::debug!(
3314        "[FILTER_ROWS] Filtered from {} to {} visible rows",
3315        row_ids.len(),
3316        visible.len()
3317    );
3318    Ok(visible)
3319}
3320
3321struct MvccRowIdFilter<P>
3322where
3323    P: Pager<Blob = EntryHandle> + Send + Sync,
3324{
3325    txn_manager: Arc<TxnIdManager>,
3326    snapshot: TransactionSnapshot,
3327    _marker: PhantomData<fn(P)>,
3328}
3329
3330impl<P> MvccRowIdFilter<P>
3331where
3332    P: Pager<Blob = EntryHandle> + Send + Sync,
3333{
3334    fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
3335        Self {
3336            txn_manager,
3337            snapshot,
3338            _marker: PhantomData,
3339        }
3340    }
3341}
3342
3343impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
3344where
3345    P: Pager<Blob = EntryHandle> + Send + Sync,
3346{
3347    fn filter(&self, table: &Table<P>, row_ids: Vec<u64>) -> Result<Vec<u64>> {
3348        tracing::trace!(
3349            "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
3350            row_ids,
3351            self.snapshot.txn_id,
3352            self.snapshot.snapshot_id
3353        );
3354        let result = filter_row_ids_for_snapshot(
3355            table.store(),
3356            table.table_id(),
3357            row_ids,
3358            &self.txn_manager,
3359            self.snapshot,
3360        );
3361        if let Ok(ref visible) = result {
3362            tracing::trace!(
3363                "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
3364                visible
3365            );
3366        }
3367        result
3368    }
3369}
3370
3371// Wrapper to implement TableProvider for Context
3372struct ContextProvider<P>
3373where
3374    P: Pager<Blob = EntryHandle> + Send + Sync,
3375{
3376    context: Arc<RuntimeContext<P>>,
3377}
3378
3379impl<P> TableProvider<P> for ContextProvider<P>
3380where
3381    P: Pager<Blob = EntryHandle> + Send + Sync,
3382{
3383    fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
3384        self.context.lookup_table(canonical_name)
3385    }
3386}
3387
3388/// Lazily built logical plan (thin wrapper over SelectPlan).
3389pub struct RuntimeLazyFrame<P>
3390where
3391    P: Pager<Blob = EntryHandle> + Send + Sync,
3392{
3393    context: Arc<RuntimeContext<P>>,
3394    plan: SelectPlan,
3395}
3396
3397impl<P> RuntimeLazyFrame<P>
3398where
3399    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3400{
3401    pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
3402        let (display, canonical) = canonical_table_name(table)?;
3403        context.lookup_table(&canonical)?;
3404        Ok(Self {
3405            context,
3406            plan: SelectPlan::new(display),
3407        })
3408    }
3409
3410    pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
3411        self.plan.filter = Some(predicate);
3412        self
3413    }
3414
3415    pub fn select_all(mut self) -> Self {
3416        self.plan.projections = vec![SelectProjection::AllColumns];
3417        self
3418    }
3419
3420    pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
3421    where
3422        S: AsRef<str>,
3423    {
3424        self.plan.projections = columns
3425            .into_iter()
3426            .map(|name| SelectProjection::Column {
3427                name: name.as_ref().to_string(),
3428                alias: None,
3429            })
3430            .collect();
3431        self
3432    }
3433
3434    pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
3435        self.plan.projections = projections;
3436        self
3437    }
3438
3439    pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
3440        self.plan.aggregates = aggregates;
3441        self
3442    }
3443
3444    pub fn collect(self) -> Result<SelectExecution<P>> {
3445        self.context.execute_select(self.plan)
3446    }
3447
3448    pub fn collect_rows(self) -> Result<RowBatch> {
3449        let execution = self.context.execute_select(self.plan)?;
3450        execution.collect_rows()
3451    }
3452
3453    pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
3454        Ok(self.collect_rows()?.rows)
3455    }
3456}
3457
3458pub fn canonical_table_name(name: &str) -> Result<(String, String)> {
3459    if name.is_empty() {
3460        return Err(Error::InvalidArgumentError(
3461            "table name must not be empty".into(),
3462        ));
3463    }
3464    let display = name.to_string();
3465    let canonical = display.to_ascii_lowercase();
3466    Ok((display, canonical))
3467}
3468
3469fn current_time_micros() -> u64 {
3470    SystemTime::now()
3471        .duration_since(UNIX_EPOCH)
3472        .unwrap_or_default()
3473        .as_micros() as u64
3474}
3475
3476pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
3477    if columns.is_empty() {
3478        return Ok((0..schema.columns.len()).collect());
3479    }
3480    let mut resolved = Vec::with_capacity(columns.len());
3481    for column in columns {
3482        let normalized = column.to_ascii_lowercase();
3483        let index = schema
3484            .lookup
3485            .get(&normalized)
3486            .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", column)))?;
3487        resolved.push(*index);
3488    }
3489    Ok(resolved)
3490}
3491
3492pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
3493    match dtype {
3494        DataType::Int64 => {
3495            let mut builder = Int64Builder::with_capacity(values.len());
3496            for value in values {
3497                match value {
3498                    PlanValue::Null => builder.append_null(),
3499                    PlanValue::Integer(v) => builder.append_value(*v),
3500                    PlanValue::Float(v) => builder.append_value(*v as i64),
3501                    PlanValue::String(_) => {
3502                        return Err(Error::InvalidArgumentError(
3503                            "cannot insert string into INT column".into(),
3504                        ));
3505                    }
3506                }
3507            }
3508            Ok(Arc::new(builder.finish()))
3509        }
3510        DataType::Float64 => {
3511            let mut builder = Float64Builder::with_capacity(values.len());
3512            for value in values {
3513                match value {
3514                    PlanValue::Null => builder.append_null(),
3515                    PlanValue::Integer(v) => builder.append_value(*v as f64),
3516                    PlanValue::Float(v) => builder.append_value(*v),
3517                    PlanValue::String(_) => {
3518                        return Err(Error::InvalidArgumentError(
3519                            "cannot insert string into DOUBLE column".into(),
3520                        ));
3521                    }
3522                }
3523            }
3524            Ok(Arc::new(builder.finish()))
3525        }
3526        DataType::Utf8 => {
3527            let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
3528            for value in values {
3529                match value {
3530                    PlanValue::Null => builder.append_null(),
3531                    PlanValue::Integer(v) => builder.append_value(v.to_string()),
3532                    PlanValue::Float(v) => builder.append_value(v.to_string()),
3533                    PlanValue::String(s) => builder.append_value(s),
3534                }
3535            }
3536            Ok(Arc::new(builder.finish()))
3537        }
3538        DataType::Date32 => {
3539            let mut builder = Date32Builder::with_capacity(values.len());
3540            for value in values {
3541                match value {
3542                    PlanValue::Null => builder.append_null(),
3543                    PlanValue::Integer(days) => {
3544                        let casted = i32::try_from(*days).map_err(|_| {
3545                            Error::InvalidArgumentError(
3546                                "integer literal out of range for DATE column".into(),
3547                            )
3548                        })?;
3549                        builder.append_value(casted);
3550                    }
3551                    PlanValue::Float(_) => {
3552                        return Err(Error::InvalidArgumentError(
3553                            "cannot insert float into DATE column".into(),
3554                        ));
3555                    }
3556                    PlanValue::String(text) => {
3557                        let days = parse_date32_literal(text)?;
3558                        builder.append_value(days);
3559                    }
3560                }
3561            }
3562            Ok(Arc::new(builder.finish()))
3563        }
3564        other => Err(Error::InvalidArgumentError(format!(
3565            "unsupported Arrow data type for INSERT: {other:?}"
3566        ))),
3567    }
3568}
3569
3570fn parse_date32_literal(text: &str) -> Result<i32> {
3571    let mut parts = text.split('-');
3572    let year_str = parts
3573        .next()
3574        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3575    let month_str = parts
3576        .next()
3577        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3578    let day_str = parts
3579        .next()
3580        .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3581    if parts.next().is_some() {
3582        return Err(Error::InvalidArgumentError(format!(
3583            "invalid DATE literal '{text}'"
3584        )));
3585    }
3586
3587    let year = year_str.parse::<i32>().map_err(|_| {
3588        Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
3589    })?;
3590    let month_num = month_str.parse::<u8>().map_err(|_| {
3591        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
3592    })?;
3593    let day = day_str.parse::<u8>().map_err(|_| {
3594        Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
3595    })?;
3596
3597    let month = Month::try_from(month_num).map_err(|_| {
3598        Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
3599    })?;
3600
3601    let date = Date::from_calendar_date(year, month, day).map_err(|err| {
3602        Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
3603    })?;
3604    let days = date.to_julian_day() - epoch_julian_day();
3605    Ok(days)
3606}
3607
3608fn epoch_julian_day() -> i32 {
3609    Date::from_calendar_date(1970, Month::January, 1)
3610        .expect("1970-01-01 is a valid date")
3611        .to_julian_day()
3612}
3613
3614// Array -> PlanValue conversion is provided by llkv-plan::plan_value_from_array
3615
3616fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
3617    LlkvExpr::Pred(Filter {
3618        field_id,
3619        op: Operator::Range {
3620            lower: Bound::Unbounded,
3621            upper: Bound::Unbounded,
3622        },
3623    })
3624}
3625
3626fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
3627    if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
3628        return Ok(ROW_ID_FIELD_ID);
3629    }
3630
3631    schema
3632        .resolve(name)
3633        .map(|column| column.field_id)
3634        .ok_or_else(|| {
3635            Error::InvalidArgumentError(format!("unknown column '{name}' in expression"))
3636        })
3637}
3638
3639fn translate_predicate(
3640    expr: LlkvExpr<'static, String>,
3641    schema: &ExecutorSchema,
3642) -> Result<LlkvExpr<'static, FieldId>> {
3643    match expr {
3644        LlkvExpr::And(list) => {
3645            let mut converted = Vec::with_capacity(list.len());
3646            for item in list {
3647                converted.push(translate_predicate(item, schema)?);
3648            }
3649            Ok(LlkvExpr::And(converted))
3650        }
3651        LlkvExpr::Or(list) => {
3652            let mut converted = Vec::with_capacity(list.len());
3653            for item in list {
3654                converted.push(translate_predicate(item, schema)?);
3655            }
3656            Ok(LlkvExpr::Or(converted))
3657        }
3658        LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
3659            *inner, schema,
3660        )?))),
3661        LlkvExpr::Pred(Filter { field_id, op }) => {
3662            let resolved = resolve_field_id_from_schema(schema, &field_id)?;
3663            Ok(LlkvExpr::Pred(Filter {
3664                field_id: resolved,
3665                op,
3666            }))
3667        }
3668        LlkvExpr::Compare { left, op, right } => {
3669            let left = translate_scalar(&left, schema)?;
3670            let right = translate_scalar(&right, schema)?;
3671            Ok(LlkvExpr::Compare { left, op, right })
3672        }
3673    }
3674}
3675
3676fn translate_scalar(
3677    expr: &ScalarExpr<String>,
3678    schema: &ExecutorSchema,
3679) -> Result<ScalarExpr<FieldId>> {
3680    match expr {
3681        ScalarExpr::Column(name) => {
3682            let field_id = resolve_field_id_from_schema(schema, name)?;
3683            Ok(ScalarExpr::column(field_id))
3684        }
3685        ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
3686        ScalarExpr::Binary { left, op, right } => {
3687            let left_expr = translate_scalar(left, schema)?;
3688            let right_expr = translate_scalar(right, schema)?;
3689            Ok(ScalarExpr::Binary {
3690                left: Box::new(left_expr),
3691                op: *op,
3692                right: Box::new(right_expr),
3693            })
3694        }
3695        ScalarExpr::Aggregate(agg) => {
3696            // Translate column names in aggregate calls to field IDs
3697            use llkv_expr::expr::AggregateCall;
3698            let translated_agg = match agg {
3699                AggregateCall::CountStar => AggregateCall::CountStar,
3700                AggregateCall::Count(name) => {
3701                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3702                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3703                    })?;
3704                    AggregateCall::Count(field_id)
3705                }
3706                AggregateCall::Sum(name) => {
3707                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3708                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3709                    })?;
3710                    AggregateCall::Sum(field_id)
3711                }
3712                AggregateCall::Min(name) => {
3713                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3714                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3715                    })?;
3716                    AggregateCall::Min(field_id)
3717                }
3718                AggregateCall::Max(name) => {
3719                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3720                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3721                    })?;
3722                    AggregateCall::Max(field_id)
3723                }
3724                AggregateCall::CountNulls(name) => {
3725                    let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3726                        Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3727                    })?;
3728                    AggregateCall::CountNulls(field_id)
3729                }
3730            };
3731            Ok(ScalarExpr::Aggregate(translated_agg))
3732        }
3733    }
3734}
3735
3736fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
3737    match expr {
3738        SqlExpr::Value(value) => plan_value_from_sql_value(value),
3739        SqlExpr::UnaryOp {
3740            op: UnaryOperator::Minus,
3741            expr,
3742        } => match plan_value_from_sql_expr(expr)? {
3743            PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
3744            PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
3745            PlanValue::Null | PlanValue::String(_) => Err(Error::InvalidArgumentError(
3746                "cannot negate non-numeric literal".into(),
3747            )),
3748        },
3749        SqlExpr::UnaryOp {
3750            op: UnaryOperator::Plus,
3751            expr,
3752        } => plan_value_from_sql_expr(expr),
3753        SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
3754        other => Err(Error::InvalidArgumentError(format!(
3755            "unsupported literal expression: {other:?}"
3756        ))),
3757    }
3758}
3759
3760fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
3761    match &value.value {
3762        Value::Null => Ok(PlanValue::Null),
3763        Value::Number(text, _) => {
3764            if text.contains(['.', 'e', 'E']) {
3765                let parsed = text.parse::<f64>().map_err(|err| {
3766                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3767                })?;
3768                Ok(PlanValue::Float(parsed))
3769            } else {
3770                let parsed = text.parse::<i64>().map_err(|err| {
3771                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3772                })?;
3773                Ok(PlanValue::Integer(parsed))
3774            }
3775        }
3776        Value::Boolean(_) => Err(Error::InvalidArgumentError(
3777            "BOOLEAN literals are not supported yet".into(),
3778        )),
3779        other => {
3780            if let Some(text) = other.clone().into_string() {
3781                Ok(PlanValue::String(text))
3782            } else {
3783                Err(Error::InvalidArgumentError(format!(
3784                    "unsupported literal: {other:?}"
3785                )))
3786            }
3787        }
3788    }
3789}
3790
3791fn group_by_is_empty(expr: &GroupByExpr) -> bool {
3792    matches!(
3793        expr,
3794        GroupByExpr::Expressions(exprs, modifiers)
3795            if exprs.is_empty() && modifiers.is_empty()
3796    )
3797}
3798
3799#[derive(Clone)]
3800pub struct RuntimeRangeSelectRows {
3801    rows: Vec<Vec<PlanValue>>,
3802}
3803
3804impl RuntimeRangeSelectRows {
3805    pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
3806        self.rows
3807    }
3808}
3809
3810#[derive(Clone)]
3811enum RangeProjection {
3812    Column,
3813    Literal(PlanValue),
3814}
3815
3816#[derive(Clone)]
3817pub struct RuntimeRangeSpec {
3818    start: i64,
3819    #[allow(dead_code)] // Used for validation, computed into row_count
3820    end: i64,
3821    row_count: usize,
3822    column_name_lower: String,
3823    table_alias_lower: Option<String>,
3824}
3825
3826impl RuntimeRangeSpec {
3827    fn matches_identifier(&self, ident: &str) -> bool {
3828        let lower = ident.to_ascii_lowercase();
3829        lower == self.column_name_lower || lower == "range"
3830    }
3831
3832    fn matches_table_alias(&self, ident: &str) -> bool {
3833        let lower = ident.to_ascii_lowercase();
3834        match &self.table_alias_lower {
3835            Some(alias) => lower == *alias,
3836            None => lower == "range",
3837        }
3838    }
3839
3840    fn matches_object_name(&self, name: &ObjectName) -> bool {
3841        if name.0.len() != 1 {
3842            return false;
3843        }
3844        match &name.0[0] {
3845            ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
3846            _ => false,
3847        }
3848    }
3849}
3850
3851pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
3852    let spec = match parse_range_spec(select)? {
3853        Some(spec) => spec,
3854        None => return Ok(None),
3855    };
3856
3857    if select.selection.is_some() {
3858        return Err(Error::InvalidArgumentError(
3859            "WHERE clauses are not supported for range() SELECT statements".into(),
3860        ));
3861    }
3862    if select.having.is_some()
3863        || !select.named_window.is_empty()
3864        || select.qualify.is_some()
3865        || select.distinct.is_some()
3866        || select.top.is_some()
3867        || select.into.is_some()
3868        || select.prewhere.is_some()
3869        || !select.lateral_views.is_empty()
3870        || select.value_table_mode.is_some()
3871        || !group_by_is_empty(&select.group_by)
3872    {
3873        return Err(Error::InvalidArgumentError(
3874            "advanced SELECT clauses are not supported for range() SELECT statements".into(),
3875        ));
3876    }
3877
3878    let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
3879
3880    // If projection is empty, treat it as SELECT * (implicit wildcard)
3881    if select.projection.is_empty() {
3882        projections.push(RangeProjection::Column);
3883    } else {
3884        for item in &select.projection {
3885            let projection = match item {
3886                SelectItem::Wildcard(_) => RangeProjection::Column,
3887                SelectItem::QualifiedWildcard(kind, _) => match kind {
3888                    SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
3889                        if spec.matches_object_name(object_name) {
3890                            RangeProjection::Column
3891                        } else {
3892                            return Err(Error::InvalidArgumentError(
3893                                "qualified wildcard must reference the range() source".into(),
3894                            ));
3895                        }
3896                    }
3897                    SelectItemQualifiedWildcardKind::Expr(_) => {
3898                        return Err(Error::InvalidArgumentError(
3899                            "expression-qualified wildcards are not supported for range() SELECT statements".into(),
3900                        ));
3901                    }
3902                },
3903                SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
3904                SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
3905            };
3906            projections.push(projection);
3907        }
3908    }
3909
3910    let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
3911    for idx in 0..spec.row_count {
3912        let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
3913        let value = spec.start + (idx as i64);
3914        for projection in &projections {
3915            match projection {
3916                RangeProjection::Column => row.push(PlanValue::Integer(value)),
3917                RangeProjection::Literal(value) => row.push(value.clone()),
3918            }
3919        }
3920        rows.push(row);
3921    }
3922
3923    Ok(Some(RuntimeRangeSelectRows { rows }))
3924}
3925
3926fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
3927    match expr {
3928        SqlExpr::Identifier(ident) => {
3929            if spec.matches_identifier(&ident.value) {
3930                Ok(RangeProjection::Column)
3931            } else {
3932                Err(Error::InvalidArgumentError(format!(
3933                    "unknown column '{}' in range() SELECT",
3934                    ident.value
3935                )))
3936            }
3937        }
3938        SqlExpr::CompoundIdentifier(parts) => {
3939            if parts.len() == 2
3940                && spec.matches_table_alias(&parts[0].value)
3941                && spec.matches_identifier(&parts[1].value)
3942            {
3943                Ok(RangeProjection::Column)
3944            } else {
3945                Err(Error::InvalidArgumentError(
3946                    "compound identifiers must reference the range() source".into(),
3947                ))
3948            }
3949        }
3950        SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
3951        other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
3952    }
3953}
3954
3955fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
3956    if select.from.len() != 1 {
3957        return Ok(None);
3958    }
3959    let item = &select.from[0];
3960    if !item.joins.is_empty() {
3961        return Err(Error::InvalidArgumentError(
3962            "JOIN clauses are not supported for range() SELECT statements".into(),
3963        ));
3964    }
3965
3966    match &item.relation {
3967        TableFactor::Function {
3968            lateral,
3969            name,
3970            args,
3971            alias,
3972        } => {
3973            if *lateral {
3974                return Err(Error::InvalidArgumentError(
3975                    "LATERAL range() is not supported".into(),
3976                ));
3977            }
3978            parse_range_spec_from_args(name, args, alias)
3979        }
3980        TableFactor::Table {
3981            name,
3982            alias,
3983            args: Some(table_args),
3984            with_ordinality,
3985            ..
3986        } => {
3987            if *with_ordinality {
3988                return Err(Error::InvalidArgumentError(
3989                    "WITH ORDINALITY is not supported for range()".into(),
3990                ));
3991            }
3992            if table_args.settings.is_some() {
3993                return Err(Error::InvalidArgumentError(
3994                    "range() SETTINGS clause is not supported".into(),
3995                ));
3996            }
3997            parse_range_spec_from_args(name, &table_args.args, alias)
3998        }
3999        _ => Ok(None),
4000    }
4001}
4002
4003fn parse_range_spec_from_args(
4004    name: &ObjectName,
4005    args: &[FunctionArg],
4006    alias: &Option<TableAlias>,
4007) -> Result<Option<RuntimeRangeSpec>> {
4008    if name.0.len() != 1 {
4009        return Ok(None);
4010    }
4011    let func_name = match &name.0[0] {
4012        ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
4013        _ => return Ok(None),
4014    };
4015    if func_name != "range" {
4016        return Ok(None);
4017    }
4018
4019    if args.is_empty() || args.len() > 2 {
4020        return Err(Error::InvalidArgumentError(
4021            "range() requires one or two arguments".into(),
4022        ));
4023    }
4024
4025    // Helper to extract integer from argument
4026    let extract_int = |arg: &FunctionArg| -> Result<i64> {
4027        let arg_expr = match arg {
4028            FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4029            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
4030            | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
4031                return Err(Error::InvalidArgumentError(
4032                    "range() argument must be an integer literal".into(),
4033                ));
4034            }
4035            FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
4036                return Err(Error::InvalidArgumentError(
4037                    "named arguments are not supported for range()".into(),
4038                ));
4039            }
4040        };
4041
4042        let value = plan_value_from_sql_expr(arg_expr)?;
4043        match value {
4044            PlanValue::Integer(v) => Ok(v),
4045            _ => Err(Error::InvalidArgumentError(
4046                "range() argument must be an integer literal".into(),
4047            )),
4048        }
4049    };
4050
4051    let (start, end, row_count) = if args.len() == 1 {
4052        // range(count) - generate [0, count)
4053        let count = extract_int(&args[0])?;
4054        if count < 0 {
4055            return Err(Error::InvalidArgumentError(
4056                "range() argument must be non-negative".into(),
4057            ));
4058        }
4059        (0, count, count as usize)
4060    } else {
4061        // range(start, end) - generate [start, end)
4062        let start = extract_int(&args[0])?;
4063        let end = extract_int(&args[1])?;
4064        if end < start {
4065            return Err(Error::InvalidArgumentError(
4066                "range() end must be >= start".into(),
4067            ));
4068        }
4069        let row_count = (end - start) as usize;
4070        (start, end, row_count)
4071    };
4072
4073    let column_name_lower = alias
4074        .as_ref()
4075        .and_then(|a| {
4076            a.columns
4077                .first()
4078                .map(|col| col.name.value.to_ascii_lowercase())
4079        })
4080        .unwrap_or_else(|| "range".to_string());
4081    let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
4082
4083    Ok(Some(RuntimeRangeSpec {
4084        start,
4085        end,
4086        row_count,
4087        column_name_lower,
4088        table_alias_lower,
4089    }))
4090}
4091
4092pub struct RuntimeCreateTableBuilder<'ctx, P>
4093where
4094    P: Pager<Blob = EntryHandle> + Send + Sync,
4095{
4096    ctx: &'ctx RuntimeContext<P>,
4097    plan: CreateTablePlan,
4098}
4099
4100impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
4101where
4102    P: Pager<Blob = EntryHandle> + Send + Sync,
4103{
4104    pub fn if_not_exists(mut self) -> Self {
4105        self.plan.if_not_exists = true;
4106        self
4107    }
4108
4109    pub fn or_replace(mut self) -> Self {
4110        self.plan.or_replace = true;
4111        self
4112    }
4113
4114    pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
4115        self.plan
4116            .columns
4117            .push(ColumnSpec::new(name.into(), data_type, true));
4118        self
4119    }
4120
4121    pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
4122        self.plan
4123            .columns
4124            .push(ColumnSpec::new(name.into(), data_type, false));
4125        self
4126    }
4127
4128    pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
4129        self.plan.columns.push(spec);
4130        self
4131    }
4132
4133    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
4134        self.ctx.execute_create_table(self.plan)
4135    }
4136}
4137
4138#[derive(Clone, Debug, Default)]
4139pub struct RuntimeRow {
4140    values: Vec<(String, PlanValue)>,
4141}
4142
4143impl RuntimeRow {
4144    pub fn new() -> Self {
4145        Self { values: Vec::new() }
4146    }
4147
4148    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
4149        self.set(name, value);
4150        self
4151    }
4152
4153    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
4154        let name = name.into();
4155        let value = value.into();
4156        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
4157            *existing = value;
4158        } else {
4159            self.values.push((name, value));
4160        }
4161        self
4162    }
4163
4164    fn columns(&self) -> Vec<String> {
4165        self.values.iter().map(|(n, _)| n.clone()).collect()
4166    }
4167
4168    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
4169        let mut out = Vec::with_capacity(columns.len());
4170        for column in columns {
4171            let value = self
4172                .values
4173                .iter()
4174                .find(|(name, _)| name == column)
4175                .ok_or_else(|| {
4176                    Error::InvalidArgumentError(format!(
4177                        "insert row missing value for column '{}'",
4178                        column
4179                    ))
4180                })?;
4181            out.push(value.1.clone());
4182        }
4183        Ok(out)
4184    }
4185}
4186
4187pub fn row() -> RuntimeRow {
4188    RuntimeRow::new()
4189}
4190
4191#[doc(hidden)]
4192pub enum RuntimeInsertRowKind {
4193    Named {
4194        columns: Vec<String>,
4195        values: Vec<PlanValue>,
4196    },
4197    Positional(Vec<PlanValue>),
4198}
4199
4200pub trait IntoInsertRow {
4201    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
4202}
4203
4204impl IntoInsertRow for RuntimeRow {
4205    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4206        let row = self;
4207        if row.values.is_empty() {
4208            return Err(Error::InvalidArgumentError(
4209                "insert requires at least one column".into(),
4210            ));
4211        }
4212        let columns = row.columns();
4213        let values = row.values_for_columns(&columns)?;
4214        Ok(RuntimeInsertRowKind::Named { columns, values })
4215    }
4216}
4217
4218// Remove the generic impl for `&T` which caused unconditional-recursion
4219// and noop-clone clippy warnings. Callers can pass owned values or use
4220// the provided tuple/array/Vec implementations.
4221
4222impl<T> IntoInsertRow for Vec<T>
4223where
4224    T: Into<PlanValue>,
4225{
4226    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4227        if self.is_empty() {
4228            return Err(Error::InvalidArgumentError(
4229                "insert requires at least one column".into(),
4230            ));
4231        }
4232        Ok(RuntimeInsertRowKind::Positional(
4233            self.into_iter().map(Into::into).collect(),
4234        ))
4235    }
4236}
4237
4238impl<T, const N: usize> IntoInsertRow for [T; N]
4239where
4240    T: Into<PlanValue>,
4241{
4242    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4243        if N == 0 {
4244            return Err(Error::InvalidArgumentError(
4245                "insert requires at least one column".into(),
4246            ));
4247        }
4248        Ok(RuntimeInsertRowKind::Positional(
4249            self.into_iter().map(Into::into).collect(),
4250        ))
4251    }
4252}
4253
4254macro_rules! impl_into_insert_row_tuple {
4255    ($($type:ident => $value:ident),+) => {
4256        impl<$($type,)+> IntoInsertRow for ($($type,)+)
4257        where
4258            $($type: Into<PlanValue>,)+
4259        {
4260            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4261                let ($($value,)+) = self;
4262                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
4263            }
4264        }
4265    };
4266}
4267
4268impl_into_insert_row_tuple!(T1 => v1);
4269impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
4270impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
4271impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
4272impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
4273impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
4274impl_into_insert_row_tuple!(
4275    T1 => v1,
4276    T2 => v2,
4277    T3 => v3,
4278    T4 => v4,
4279    T5 => v5,
4280    T6 => v6,
4281    T7 => v7
4282);
4283impl_into_insert_row_tuple!(
4284    T1 => v1,
4285    T2 => v2,
4286    T3 => v3,
4287    T4 => v4,
4288    T5 => v5,
4289    T6 => v6,
4290    T7 => v7,
4291    T8 => v8
4292);
4293
4294pub struct RuntimeTableHandle<P>
4295where
4296    P: Pager<Blob = EntryHandle> + Send + Sync,
4297{
4298    context: Arc<RuntimeContext<P>>,
4299    display_name: String,
4300    _canonical_name: String,
4301}
4302
4303impl<P> RuntimeTableHandle<P>
4304where
4305    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4306{
4307    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
4308        let (display_name, canonical_name) = canonical_table_name(name)?;
4309        context.lookup_table(&canonical_name)?;
4310        Ok(Self {
4311            context,
4312            display_name,
4313            _canonical_name: canonical_name,
4314        })
4315    }
4316
4317    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
4318        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
4319    }
4320
4321    pub fn insert_rows<R>(
4322        &self,
4323        rows: impl IntoIterator<Item = R>,
4324    ) -> Result<RuntimeStatementResult<P>>
4325    where
4326        R: IntoInsertRow,
4327    {
4328        enum InsertMode {
4329            Named,
4330            Positional,
4331        }
4332
4333        let table = self.context.lookup_table(&self._canonical_name)?;
4334        let schema = table.schema.as_ref();
4335        let schema_column_names: Vec<String> =
4336            schema.columns.iter().map(|col| col.name.clone()).collect();
4337        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
4338        let mut mode: Option<InsertMode> = None;
4339        let mut column_names: Option<Vec<String>> = None;
4340        let mut row_count = 0usize;
4341
4342        for row in rows.into_iter() {
4343            row_count += 1;
4344            match row.into_insert_row()? {
4345                RuntimeInsertRowKind::Named { columns, values } => {
4346                    if let Some(existing) = &mode {
4347                        if !matches!(existing, InsertMode::Named) {
4348                            return Err(Error::InvalidArgumentError(
4349                                "cannot mix positional and named insert rows".into(),
4350                            ));
4351                        }
4352                    } else {
4353                        mode = Some(InsertMode::Named);
4354                        let mut seen =
4355                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
4356                        for column in &columns {
4357                            if !seen.insert(column.clone()) {
4358                                return Err(Error::InvalidArgumentError(format!(
4359                                    "duplicate column '{}' in insert row",
4360                                    column
4361                                )));
4362                            }
4363                        }
4364                        column_names = Some(columns.clone());
4365                    }
4366
4367                    let expected = column_names
4368                        .as_ref()
4369                        .expect("column names must be initialized for named insert");
4370                    if columns != *expected {
4371                        return Err(Error::InvalidArgumentError(
4372                            "insert rows must specify the same columns".into(),
4373                        ));
4374                    }
4375                    if values.len() != expected.len() {
4376                        return Err(Error::InvalidArgumentError(format!(
4377                            "insert row expected {} values, found {}",
4378                            expected.len(),
4379                            values.len()
4380                        )));
4381                    }
4382                    normalized_rows.push(values);
4383                }
4384                RuntimeInsertRowKind::Positional(values) => {
4385                    if let Some(existing) = &mode {
4386                        if !matches!(existing, InsertMode::Positional) {
4387                            return Err(Error::InvalidArgumentError(
4388                                "cannot mix positional and named insert rows".into(),
4389                            ));
4390                        }
4391                    } else {
4392                        mode = Some(InsertMode::Positional);
4393                        column_names = Some(schema_column_names.clone());
4394                    }
4395
4396                    if values.len() != schema.columns.len() {
4397                        return Err(Error::InvalidArgumentError(format!(
4398                            "insert row expected {} values, found {}",
4399                            schema.columns.len(),
4400                            values.len()
4401                        )));
4402                    }
4403                    normalized_rows.push(values);
4404                }
4405            }
4406        }
4407
4408        if row_count == 0 {
4409            return Err(Error::InvalidArgumentError(
4410                "insert requires at least one row".into(),
4411            ));
4412        }
4413
4414        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
4415        self.insert_row_batch(RowBatch {
4416            columns,
4417            rows: normalized_rows,
4418        })
4419    }
4420
4421    pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
4422        if batch.rows.is_empty() {
4423            return Err(Error::InvalidArgumentError(
4424                "insert requires at least one row".into(),
4425            ));
4426        }
4427        if batch.columns.is_empty() {
4428            return Err(Error::InvalidArgumentError(
4429                "insert requires at least one column".into(),
4430            ));
4431        }
4432        for row in &batch.rows {
4433            if row.len() != batch.columns.len() {
4434                return Err(Error::InvalidArgumentError(
4435                    "insert rows must have values for every column".into(),
4436                ));
4437            }
4438        }
4439
4440        let plan = InsertPlan {
4441            table: self.display_name.clone(),
4442            columns: batch.columns,
4443            source: InsertSource::Rows(batch.rows),
4444        };
4445        self.context.insert(plan)
4446    }
4447
4448    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
4449        let plan = InsertPlan {
4450            table: self.display_name.clone(),
4451            columns: Vec::new(),
4452            source: InsertSource::Batches(batches),
4453        };
4454        self.context.insert(plan)
4455    }
4456
4457    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
4458        let RowBatch { columns, rows } = frame.collect_rows()?;
4459        self.insert_row_batch(RowBatch { columns, rows })
4460    }
4461
4462    pub fn name(&self) -> &str {
4463        &self.display_name
4464    }
4465}
4466
4467#[cfg(test)]
4468mod tests {
4469    use super::*;
4470    use arrow::array::{Array, Int64Array, StringArray};
4471    use llkv_storage::pager::MemPager;
4472    use std::sync::Arc;
4473
4474    #[test]
4475    fn create_insert_select_roundtrip() {
4476        let pager = Arc::new(MemPager::default());
4477        let context = Arc::new(RuntimeContext::new(pager));
4478
4479        let table = context
4480            .create_table(
4481                "people",
4482                [
4483                    ("id", DataType::Int64, NotNull),
4484                    ("name", DataType::Utf8, Nullable),
4485                ],
4486            )
4487            .expect("create table");
4488        table
4489            .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
4490            .expect("insert rows");
4491
4492        let execution = table.lazy().expect("lazy scan");
4493        let select = execution.collect().expect("build select execution");
4494        let batches = select.collect().expect("collect batches");
4495        assert_eq!(batches.len(), 1);
4496        let column = batches[0]
4497            .column(1)
4498            .as_any()
4499            .downcast_ref::<StringArray>()
4500            .expect("string column");
4501        assert_eq!(column.len(), 2);
4502    }
4503
4504    #[test]
4505    fn aggregate_count_nulls() {
4506        let pager = Arc::new(MemPager::default());
4507        let context = Arc::new(RuntimeContext::new(pager));
4508
4509        let table = context
4510            .create_table("ints", [("i", DataType::Int64)])
4511            .expect("create table");
4512        table
4513            .insert_rows([
4514                (PlanValue::Null,),
4515                (PlanValue::Integer(1),),
4516                (PlanValue::Null,),
4517            ])
4518            .expect("insert rows");
4519
4520        let plan =
4521            SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
4522        let execution = context.execute_select(plan).expect("select");
4523        let batches = execution.collect().expect("collect batches");
4524        let column = batches[0]
4525            .column(0)
4526            .as_any()
4527            .downcast_ref::<Int64Array>()
4528            .expect("int column");
4529        assert_eq!(column.value(0), 2);
4530    }
4531}