1#![forbid(unsafe_code)]
29
30use std::fmt;
31use std::marker::PhantomData;
32use std::mem;
33use std::ops::Bound;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, RwLock};
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use rustc_hash::{FxHashMap, FxHashSet};
39
40use arrow::array::{
41 Array, ArrayRef, Date32Builder, Float64Builder, Int64Builder, StringBuilder, UInt64Array,
42 UInt64Builder,
43};
44use arrow::datatypes::{DataType, Field, Schema};
45use arrow::record_batch::RecordBatch;
46use llkv_column_map::ColumnStore;
47use llkv_column_map::store::{
48 CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, GatherNullPolicy, ROW_ID_COLUMN_NAME,
49};
50use llkv_column_map::types::LogicalFieldId;
51use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
52use llkv_result::Error;
54use llkv_storage::pager::{MemPager, Pager};
55use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
56use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId, TableId};
57use llkv_table::{CATALOG_TABLE_ID, ColMeta, SysCatalog, TableMeta};
58use simd_r_drive_entry_handle::EntryHandle;
59use sqlparser::ast::{
60 Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
61 SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
62 ValueWithSpan,
63};
64use time::{Date, Month};
65
66pub type Result<T> = llkv_result::Result<T>;
67
68pub use llkv_plan::{
70 AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
71 ColumnSpec, CreateTablePlan, CreateTableSource, DeletePlan, InsertPlan, InsertSource,
72 IntoColumnSpec, NotNull, Nullable, OrderByPlan, OrderSortType, OrderTarget, PlanOperation,
73 PlanStatement, PlanValue, SelectPlan, SelectProjection, UpdatePlan,
74};
75
76use llkv_executor::{ExecutorColumn, ExecutorSchema, ExecutorTable};
78pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
79
80pub use llkv_transaction::TransactionKind;
82use llkv_transaction::{
83 RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
84 TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
85};
86
87use llkv_transaction::TransactionSession;
89
90mod mvcc_columns {
97 use super::*;
98 use std::collections::HashMap;
99
100 pub(crate) fn build_insert_mvcc_columns(
104 row_count: usize,
105 start_row_id: RowId,
106 creator_txn_id: TxnId,
107 ) -> (ArrayRef, ArrayRef, ArrayRef) {
108 let mut row_builder = UInt64Builder::with_capacity(row_count);
109 for offset in 0..row_count {
110 row_builder.append_value(start_row_id + offset as u64);
111 }
112
113 let mut created_builder = UInt64Builder::with_capacity(row_count);
114 let mut deleted_builder = UInt64Builder::with_capacity(row_count);
115 for _ in 0..row_count {
116 created_builder.append_value(creator_txn_id);
117 deleted_builder.append_value(TXN_ID_NONE);
118 }
119
120 (
121 Arc::new(row_builder.finish()) as ArrayRef,
122 Arc::new(created_builder.finish()) as ArrayRef,
123 Arc::new(deleted_builder.finish()) as ArrayRef,
124 )
125 }
126
127 pub(crate) fn build_mvcc_fields() -> Vec<Field> {
131 vec![
132 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
133 Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
134 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
135 ]
136 }
137
138 pub(crate) fn build_field_with_metadata(
140 name: &str,
141 data_type: DataType,
142 nullable: bool,
143 field_id: FieldId,
144 ) -> Field {
145 let mut metadata = FxHashMap::with_capacity_and_hasher(1, Default::default());
146 metadata.insert(
147 llkv_table::constants::FIELD_ID_META_KEY.to_string(),
148 field_id.to_string(),
149 );
150 Field::new(name, data_type, nullable)
151 .with_metadata(metadata.into_iter().collect::<HashMap<String, String>>())
152 }
153
154 pub(crate) fn build_delete_batch(
158 row_ids: Vec<u64>,
159 deleted_by_txn_id: TxnId,
160 ) -> llkv_result::Result<RecordBatch> {
161 let row_count = row_ids.len();
162
163 let fields = vec![
164 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
165 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
166 ];
167
168 let arrays: Vec<ArrayRef> = vec![
169 Arc::new(UInt64Array::from(row_ids)),
170 Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
171 ];
172
173 RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
174 }
175}
176
177#[allow(clippy::large_enum_variant)]
179#[derive(Clone)]
180pub enum RuntimeStatementResult<P>
181where
182 P: Pager<Blob = EntryHandle> + Send + Sync,
183{
184 CreateTable {
185 table_name: String,
186 },
187 NoOp,
188 Insert {
189 table_name: String,
190 rows_inserted: usize,
191 },
192 Update {
193 table_name: String,
194 rows_updated: usize,
195 },
196 Delete {
197 table_name: String,
198 rows_deleted: usize,
199 },
200 Select {
201 table_name: String,
202 schema: Arc<Schema>,
203 execution: SelectExecution<P>,
204 },
205 Transaction {
206 kind: TransactionKind,
207 },
208}
209
210impl<P> fmt::Debug for RuntimeStatementResult<P>
211where
212 P: Pager<Blob = EntryHandle> + Send + Sync,
213{
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 match self {
216 RuntimeStatementResult::CreateTable { table_name } => f
217 .debug_struct("CreateTable")
218 .field("table_name", table_name)
219 .finish(),
220 RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
221 RuntimeStatementResult::Insert {
222 table_name,
223 rows_inserted,
224 } => f
225 .debug_struct("Insert")
226 .field("table_name", table_name)
227 .field("rows_inserted", rows_inserted)
228 .finish(),
229 RuntimeStatementResult::Update {
230 table_name,
231 rows_updated,
232 } => f
233 .debug_struct("Update")
234 .field("table_name", table_name)
235 .field("rows_updated", rows_updated)
236 .finish(),
237 RuntimeStatementResult::Delete {
238 table_name,
239 rows_deleted,
240 } => f
241 .debug_struct("Delete")
242 .field("table_name", table_name)
243 .field("rows_deleted", rows_deleted)
244 .finish(),
245 RuntimeStatementResult::Select {
246 table_name, schema, ..
247 } => f
248 .debug_struct("Select")
249 .field("table_name", table_name)
250 .field("schema", schema)
251 .finish(),
252 RuntimeStatementResult::Transaction { kind } => {
253 f.debug_struct("Transaction").field("kind", kind).finish()
254 }
255 }
256 }
257}
258
259impl<P> RuntimeStatementResult<P>
260where
261 P: Pager<Blob = EntryHandle> + Send + Sync,
262{
263 #[allow(dead_code)]
266 pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
267 where
268 Q: Pager<Blob = EntryHandle> + Send + Sync,
269 {
270 match self {
271 RuntimeStatementResult::CreateTable { table_name } => {
272 Ok(RuntimeStatementResult::CreateTable { table_name })
273 }
274 RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
275 RuntimeStatementResult::Insert {
276 table_name,
277 rows_inserted,
278 } => Ok(RuntimeStatementResult::Insert {
279 table_name,
280 rows_inserted,
281 }),
282 RuntimeStatementResult::Update {
283 table_name,
284 rows_updated,
285 } => Ok(RuntimeStatementResult::Update {
286 table_name,
287 rows_updated,
288 }),
289 RuntimeStatementResult::Delete {
290 table_name,
291 rows_deleted,
292 } => Ok(RuntimeStatementResult::Delete {
293 table_name,
294 rows_deleted,
295 }),
296 RuntimeStatementResult::Transaction { kind } => {
297 Ok(RuntimeStatementResult::Transaction { kind })
298 }
299 RuntimeStatementResult::Select { .. } => Err(Error::Internal(
300 "Cannot convert SELECT result between pager types in transaction".into(),
301 )),
302 }
303 }
304}
305
306pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
313 match statement {
314 PlanStatement::CreateTable(plan) => Some(&plan.name),
315 PlanStatement::Insert(plan) => Some(&plan.table),
316 PlanStatement::Update(plan) => Some(&plan.table),
317 PlanStatement::Delete(plan) => Some(&plan.table),
318 PlanStatement::Select(plan) => Some(&plan.table),
319 PlanStatement::BeginTransaction
320 | PlanStatement::CommitTransaction
321 | PlanStatement::RollbackTransaction => None,
322 }
323}
324
325pub struct RuntimeContextWrapper<P>
344where
345 P: Pager<Blob = EntryHandle> + Send + Sync,
346{
347 ctx: Arc<RuntimeContext<P>>,
348 snapshot: RwLock<TransactionSnapshot>,
349}
350
351impl<P> RuntimeContextWrapper<P>
352where
353 P: Pager<Blob = EntryHandle> + Send + Sync,
354{
355 fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
356 let snapshot = ctx.default_snapshot();
357 Self {
358 ctx,
359 snapshot: RwLock::new(snapshot),
360 }
361 }
362
363 fn update_snapshot(&self, snapshot: TransactionSnapshot) {
364 let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
365 *guard = snapshot;
366 }
367
368 fn current_snapshot(&self) -> TransactionSnapshot {
369 *self.snapshot.read().expect("snapshot lock poisoned")
370 }
371
372 fn context(&self) -> &Arc<RuntimeContext<P>> {
373 &self.ctx
374 }
375
376 fn ctx(&self) -> &RuntimeContext<P> {
377 &self.ctx
378 }
379}
380
381pub struct RuntimeSession<P>
386where
387 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
388{
389 inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
391}
392
393impl<P> RuntimeSession<P>
394where
395 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
396{
397 pub(crate) fn clone_session(&self) -> Self {
400 Self {
401 inner: self.inner.clone_session(),
402 }
403 }
404
405 pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
408 let staging_pager = Arc::new(MemPager::default());
409 tracing::trace!(
410 "BEGIN_TRANSACTION: Created staging pager at {:p}",
411 &*staging_pager
412 );
413 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
414
415 self.inner
418 .context()
419 .ctx()
420 .copy_tables_to_staging(&staging_ctx)?;
421
422 let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
423
424 self.inner.begin_transaction(staging_wrapper)?;
425 Ok(RuntimeStatementResult::Transaction {
426 kind: TransactionKind::Begin,
427 })
428 }
429
430 pub fn abort_transaction(&self) {
433 self.inner.abort_transaction();
434 }
435
436 pub fn has_active_transaction(&self) -> bool {
438 let result = self.inner.has_active_transaction();
439 tracing::trace!("SESSION: has_active_transaction() = {}", result);
440 result
441 }
442
443 pub fn is_aborted(&self) -> bool {
445 self.inner.is_aborted()
446 }
447
448 pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
451 tracing::trace!("Session::commit_transaction called");
452 let (tx_result, operations) = self.inner.commit_transaction()?;
453 tracing::trace!(
454 "Session::commit_transaction got {} operations",
455 operations.len()
456 );
457
458 if !operations.is_empty() {
459 let dropped_tables = self
460 .inner
461 .context()
462 .ctx()
463 .dropped_tables
464 .read()
465 .unwrap()
466 .clone();
467 if !dropped_tables.is_empty() {
468 for operation in &operations {
469 let table_name_opt = match operation {
470 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
471 PlanOperation::Update(plan) => Some(plan.table.as_str()),
472 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
473 _ => None,
474 };
475 if let Some(table_name) = table_name_opt {
476 let (_, canonical) = canonical_table_name(table_name)?;
477 if dropped_tables.contains(&canonical) {
478 self.abort_transaction();
479 return Err(Error::TransactionContextError(
480 "another transaction has dropped this table".into(),
481 ));
482 }
483 }
484 }
485 }
486 }
487
488 let kind = match tx_result {
490 TransactionResult::Transaction { kind } => kind,
491 _ => {
492 return Err(Error::Internal(
493 "commit_transaction returned non-transaction result".into(),
494 ));
495 }
496 };
497 tracing::trace!("Session::commit_transaction kind={:?}", kind);
498
499 for operation in operations {
501 match operation {
502 PlanOperation::CreateTable(plan) => {
503 TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
504 }
505 PlanOperation::Insert(plan) => {
506 TransactionContext::insert(&**self.inner.context(), plan)?;
507 }
508 PlanOperation::Update(plan) => {
509 TransactionContext::update(&**self.inner.context(), plan)?;
510 }
511 PlanOperation::Delete(plan) => {
512 TransactionContext::delete(&**self.inner.context(), plan)?;
513 }
514 _ => {}
515 }
516 }
517
518 let base_ctx = self.inner.context();
521 let default_snapshot = base_ctx.ctx().default_snapshot();
522 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
523
524 if matches!(kind, TransactionKind::Commit) {
526 let ctx = base_ctx.ctx();
527 let next_txn_id = ctx.txn_manager().current_next_txn_id();
528 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
529 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
530 }
531 }
532
533 Ok(RuntimeStatementResult::Transaction { kind })
535 }
536
537 pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
539 self.inner.rollback_transaction()?;
540 let base_ctx = self.inner.context();
541 let default_snapshot = base_ctx.ctx().default_snapshot();
542 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
543 Ok(RuntimeStatementResult::Transaction {
544 kind: TransactionKind::Rollback,
545 })
546 }
547
548 fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
549 if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
550 let select_result = self.select(*select_plan)?;
551 let (schema, batches) = match select_result {
552 RuntimeStatementResult::Select {
553 schema, execution, ..
554 } => {
555 let batches = execution.collect()?;
556 (schema, batches)
557 }
558 _ => {
559 return Err(Error::Internal(
560 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
561 ));
562 }
563 };
564 plan.source = Some(CreateTableSource::Batches { schema, batches });
565 }
566 Ok(plan)
567 }
568
569 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
571 let plan = self.materialize_create_table_plan(plan)?;
572 if self.has_active_transaction() {
573 let table_name = plan.name.clone();
574 match self
575 .inner
576 .execute_operation(PlanOperation::CreateTable(plan))
577 {
578 Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
579 Err(e) => {
580 self.abort_transaction();
582 Err(e)
583 }
584 }
585 } else {
586 let table_name = plan.name.clone();
588 TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
589 Ok(RuntimeStatementResult::CreateTable { table_name })
590 }
591 }
592
593 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
594 let InsertPlan {
595 table,
596 columns,
597 source,
598 } = plan;
599
600 match source {
601 InsertSource::Rows(rows) => {
602 let count = rows.len();
603 Ok((
604 InsertPlan {
605 table,
606 columns,
607 source: InsertSource::Rows(rows),
608 },
609 count,
610 ))
611 }
612 InsertSource::Batches(batches) => {
613 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
614 Ok((
615 InsertPlan {
616 table,
617 columns,
618 source: InsertSource::Batches(batches),
619 },
620 count,
621 ))
622 }
623 InsertSource::Select { plan: select_plan } => {
624 let select_result = self.select(*select_plan)?;
625 let rows = match select_result {
626 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
627 _ => {
628 return Err(Error::Internal(
629 "expected Select result when executing INSERT ... SELECT".into(),
630 ));
631 }
632 };
633 let count = rows.len();
634 Ok((
635 InsertPlan {
636 table,
637 columns,
638 source: InsertSource::Rows(rows),
639 },
640 count,
641 ))
642 }
643 }
644 }
645
646 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
648 tracing::trace!("Session::insert called for table={}", plan.table);
649 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
650 let table_name = plan.table.clone();
651
652 if self.has_active_transaction() {
653 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
654 Ok(_) => {
655 tracing::trace!("Session::insert succeeded for table={}", table_name);
656 Ok(RuntimeStatementResult::Insert {
657 rows_inserted,
658 table_name,
659 })
660 }
661 Err(e) => {
662 tracing::trace!(
663 "Session::insert failed for table={}, error={:?}",
664 table_name,
665 e
666 );
667 if matches!(e, Error::ConstraintError(_)) {
669 tracing::trace!("Transaction is_aborted=true");
670 self.abort_transaction();
671 }
672 Err(e)
673 }
674 }
675 } else {
676 let context = self.inner.context();
678 let default_snapshot = context.ctx().default_snapshot();
679 TransactionContext::set_snapshot(&**context, default_snapshot);
680 TransactionContext::insert(&**context, plan)?;
681 Ok(RuntimeStatementResult::Insert {
682 rows_inserted,
683 table_name,
684 })
685 }
686 }
687
688 pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
690 if self.has_active_transaction() {
691 let tx_result = match self
692 .inner
693 .execute_operation(PlanOperation::Select(plan.clone()))
694 {
695 Ok(result) => result,
696 Err(e) => {
697 if matches!(e, Error::ConstraintError(_)) {
700 self.abort_transaction();
701 }
702 return Err(e);
703 }
704 };
705 match tx_result {
706 TransactionResult::Select {
707 table_name,
708 schema,
709 execution: staging_execution,
710 } => {
711 let batches = staging_execution.collect().unwrap_or_default();
714 let combined = if batches.is_empty() {
715 RecordBatch::new_empty(Arc::clone(&schema))
716 } else if batches.len() == 1 {
717 batches.into_iter().next().unwrap()
718 } else {
719 let refs: Vec<&RecordBatch> = batches.iter().collect();
720 arrow::compute::concat_batches(&schema, refs)?
721 };
722
723 let execution = SelectExecution::from_batch(
724 table_name.clone(),
725 Arc::clone(&schema),
726 combined,
727 );
728
729 Ok(RuntimeStatementResult::Select {
730 execution,
731 table_name,
732 schema,
733 })
734 }
735 _ => Err(Error::Internal("expected Select result".into())),
736 }
737 } else {
738 let context = self.inner.context();
740 let default_snapshot = context.ctx().default_snapshot();
741 TransactionContext::set_snapshot(&**context, default_snapshot);
742 let table_name = plan.table.clone();
743 let execution = TransactionContext::execute_select(&**context, plan)?;
744 let schema = execution.schema();
745 Ok(RuntimeStatementResult::Select {
746 execution,
747 table_name,
748 schema,
749 })
750 }
751 }
752
753 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
755 let plan =
756 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
757 match self.select(plan)? {
758 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
759 other => Err(Error::Internal(format!(
760 "expected Select result when reading table '{table}', got {:?}",
761 other
762 ))),
763 }
764 }
765
766 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
768 if self.has_active_transaction() {
769 let table_name = plan.table.clone();
770 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
771 Ok(result) => result,
772 Err(e) => {
773 self.abort_transaction();
775 return Err(e);
776 }
777 };
778 match result {
779 TransactionResult::Update {
780 rows_matched: _,
781 rows_updated,
782 } => Ok(RuntimeStatementResult::Update {
783 rows_updated,
784 table_name,
785 }),
786 _ => Err(Error::Internal("expected Update result".into())),
787 }
788 } else {
789 let context = self.inner.context();
791 let default_snapshot = context.ctx().default_snapshot();
792 TransactionContext::set_snapshot(&**context, default_snapshot);
793 let table_name = plan.table.clone();
794 let result = TransactionContext::update(&**context, plan)?;
795 match result {
796 TransactionResult::Update {
797 rows_matched: _,
798 rows_updated,
799 } => Ok(RuntimeStatementResult::Update {
800 rows_updated,
801 table_name,
802 }),
803 _ => Err(Error::Internal("expected Update result".into())),
804 }
805 }
806 }
807
808 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
810 if self.has_active_transaction() {
811 let table_name = plan.table.clone();
812 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
813 Ok(result) => result,
814 Err(e) => {
815 self.abort_transaction();
817 return Err(e);
818 }
819 };
820 match result {
821 TransactionResult::Delete { rows_deleted } => Ok(RuntimeStatementResult::Delete {
822 rows_deleted,
823 table_name,
824 }),
825 _ => Err(Error::Internal("expected Delete result".into())),
826 }
827 } else {
828 let context = self.inner.context();
830 let default_snapshot = context.ctx().default_snapshot();
831 TransactionContext::set_snapshot(&**context, default_snapshot);
832 let table_name = plan.table.clone();
833 let result = TransactionContext::delete(&**context, plan)?;
834 match result {
835 TransactionResult::Delete { rows_deleted } => Ok(RuntimeStatementResult::Delete {
836 rows_deleted,
837 table_name,
838 }),
839 _ => Err(Error::Internal("expected Delete result".into())),
840 }
841 }
842 }
843}
844
845pub struct RuntimeEngine<P>
846where
847 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
848{
849 context: Arc<RuntimeContext<P>>,
850 session: RuntimeSession<P>,
851}
852
853impl<P> Clone for RuntimeEngine<P>
854where
855 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
856{
857 fn clone(&self) -> Self {
858 tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
861 Self {
862 context: Arc::clone(&self.context),
863 session: self.session.clone_session(),
864 }
865 }
866}
867
868impl<P> RuntimeEngine<P>
869where
870 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
871{
872 pub fn new(pager: Arc<P>) -> Self {
873 let context = Arc::new(RuntimeContext::new(pager));
874 Self::from_context(context)
875 }
876
877 pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
878 tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
879 let session = context.create_session();
880 tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
881 Self { context, session }
882 }
883
884 pub fn context(&self) -> Arc<RuntimeContext<P>> {
885 Arc::clone(&self.context)
886 }
887
888 pub fn session(&self) -> &RuntimeSession<P> {
889 &self.session
890 }
891
892 pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
893 match statement {
894 PlanStatement::BeginTransaction => self.session.begin_transaction(),
895 PlanStatement::CommitTransaction => self.session.commit_transaction(),
896 PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
897 PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
898 PlanStatement::Insert(plan) => self.session.insert(plan),
899 PlanStatement::Update(plan) => self.session.update(plan),
900 PlanStatement::Delete(plan) => self.session.delete(plan),
901 PlanStatement::Select(plan) => self.session.select(plan),
902 }
903 }
904
905 pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
906 where
907 I: IntoIterator<Item = PlanStatement>,
908 {
909 let mut results = Vec::new();
910 for statement in statements {
911 results.push(self.execute_statement(statement)?);
912 }
913 Ok(results)
914 }
915}
916
917pub struct RuntimeContext<P>
931where
932 P: Pager<Blob = EntryHandle> + Send + Sync,
933{
934 pager: Arc<P>,
935 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
936 dropped_tables: RwLock<FxHashSet<String>>,
937 catalog: Arc<llkv_table::catalog::TableCatalog>,
939 transaction_manager:
941 TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
942 txn_manager: Arc<TxnIdManager>,
943}
944
945impl<P> RuntimeContext<P>
946where
947 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
948{
949 pub fn new(pager: Arc<P>) -> Self {
950 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
951
952 let (next_txn_id, last_committed, loaded_tables) = match ColumnStore::open(Arc::clone(
954 &pager,
955 )) {
956 Ok(store) => {
957 let catalog = SysCatalog::new(&store);
958 let next_txn_id = match catalog.get_next_txn_id() {
959 Ok(Some(id)) => {
960 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
961 id
962 }
963 Ok(None) => {
964 tracing::debug!(
965 "[CONTEXT] No persisted next_txn_id found, starting from default"
966 );
967 TXN_ID_AUTO_COMMIT + 1
968 }
969 Err(e) => {
970 tracing::warn!(
971 "[CONTEXT] Failed to load next_txn_id: {}, using default",
972 e
973 );
974 TXN_ID_AUTO_COMMIT + 1
975 }
976 };
977 let last_committed = match catalog.get_last_committed_txn_id() {
978 Ok(Some(id)) => {
979 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
980 id
981 }
982 Ok(None) => {
983 tracing::debug!(
984 "[CONTEXT] No persisted last_committed found, starting from default"
985 );
986 TXN_ID_AUTO_COMMIT
987 }
988 Err(e) => {
989 tracing::warn!(
990 "[CONTEXT] Failed to load last_committed: {}, using default",
991 e
992 );
993 TXN_ID_AUTO_COMMIT
994 }
995 };
996
997 let loaded_tables = match catalog.all_table_metas() {
999 Ok(metas) => {
1000 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1001 metas
1002 }
1003 Err(e) => {
1004 tracing::warn!(
1005 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1006 e
1007 );
1008 Vec::new()
1009 }
1010 };
1011
1012 (next_txn_id, last_committed, loaded_tables)
1013 }
1014 Err(e) => {
1015 tracing::warn!(
1016 "[CONTEXT] Failed to open ColumnStore: {}, using default state",
1017 e
1018 );
1019 (TXN_ID_AUTO_COMMIT + 1, TXN_ID_AUTO_COMMIT, Vec::new())
1020 }
1021 };
1022
1023 let transaction_manager =
1024 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1025 let txn_manager = transaction_manager.txn_manager();
1026
1027 tracing::debug!(
1049 "[CONTEXT] Initialized with lazy loading for {} table(s)",
1050 loaded_tables.len()
1051 );
1052
1053 let catalog = Arc::new(llkv_table::catalog::TableCatalog::new());
1055 for (_table_id, table_meta) in &loaded_tables {
1056 if let Some(ref table_name) = table_meta.name
1057 && let Err(e) = catalog.register_table(table_name)
1058 {
1059 tracing::warn!(
1060 "[CONTEXT] Failed to register table '{}' in catalog: {}",
1061 table_name,
1062 e
1063 );
1064 }
1065 }
1066 tracing::debug!(
1067 "[CONTEXT] Catalog initialized with {} table(s)",
1068 catalog.table_count()
1069 );
1070
1071 Self {
1072 pager,
1073 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
1075 catalog,
1076 transaction_manager,
1077 txn_manager,
1078 }
1079 }
1080
1081 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1083 Arc::clone(&self.txn_manager)
1084 }
1085
1086 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1088 let store = ColumnStore::open(Arc::clone(&self.pager))?;
1089 let catalog = SysCatalog::new(&store);
1090 catalog.put_next_txn_id(next_txn_id)?;
1091 let last_committed = self.txn_manager.last_committed();
1092 catalog.put_last_committed_txn_id(last_committed)?;
1093 tracing::debug!(
1094 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1095 next_txn_id,
1096 last_committed
1097 );
1098 Ok(())
1099 }
1100
1101 pub fn default_snapshot(&self) -> TransactionSnapshot {
1103 TransactionSnapshot {
1104 txn_id: TXN_ID_AUTO_COMMIT,
1105 snapshot_id: self.txn_manager.last_committed(),
1106 }
1107 }
1108
1109 pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1112 tracing::debug!("[SESSION] RuntimeContext::create_session called");
1113 let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1114 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1115 tracing::debug!(
1116 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1117 );
1118 RuntimeSession { inner }
1119 }
1120
1121 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1123 RuntimeTableHandle::new(Arc::clone(self), name)
1124 }
1125
1126 #[deprecated(note = "Use session-based transactions instead")]
1128 pub fn has_active_transaction(&self) -> bool {
1129 self.transaction_manager.has_active_transaction()
1130 }
1131
1132 pub fn create_table<C, I>(
1133 self: &Arc<Self>,
1134 name: &str,
1135 columns: I,
1136 ) -> Result<RuntimeTableHandle<P>>
1137 where
1138 C: IntoColumnSpec,
1139 I: IntoIterator<Item = C>,
1140 {
1141 self.create_table_with_options(name, columns, false)
1142 }
1143
1144 pub fn create_table_if_not_exists<C, I>(
1145 self: &Arc<Self>,
1146 name: &str,
1147 columns: I,
1148 ) -> Result<RuntimeTableHandle<P>>
1149 where
1150 C: IntoColumnSpec,
1151 I: IntoIterator<Item = C>,
1152 {
1153 self.create_table_with_options(name, columns, true)
1154 }
1155
1156 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1157 if plan.columns.is_empty() && plan.source.is_none() {
1158 return Err(Error::InvalidArgumentError(
1159 "CREATE TABLE requires explicit columns or a source".into(),
1160 ));
1161 }
1162
1163 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1164 tracing::trace!(
1165 "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1166 display_name,
1167 plan.if_not_exists,
1168 plan.columns.len()
1169 );
1170 for (idx, col) in plan.columns.iter().enumerate() {
1171 tracing::trace!(
1172 " plan column[{}]: name='{}' primary_key={}",
1173 idx,
1174 col.name,
1175 col.primary_key
1176 );
1177 }
1178 let exists = {
1179 let tables = self.tables.read().unwrap();
1180 tables.contains_key(&canonical_name)
1181 };
1182 tracing::trace!("DEBUG create_table_plan: exists={}", exists);
1183 if exists {
1184 if plan.or_replace {
1185 tracing::trace!(
1186 "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1187 display_name
1188 );
1189 self.remove_table_entry(&canonical_name);
1190 } else if plan.if_not_exists {
1191 tracing::trace!(
1192 "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1193 display_name
1194 );
1195 return Ok(RuntimeStatementResult::CreateTable {
1196 table_name: display_name,
1197 });
1198 } else {
1199 return Err(Error::CatalogError(format!(
1200 "Catalog Error: Table '{}' already exists",
1201 display_name
1202 )));
1203 }
1204 }
1205
1206 self.dropped_tables.write().unwrap().remove(&canonical_name);
1207
1208 match plan.source {
1209 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1210 display_name,
1211 canonical_name,
1212 schema,
1213 batches,
1214 plan.if_not_exists,
1215 ),
1216 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1217 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1218 .into(),
1219 )),
1220 None => self.create_table_from_columns(
1221 display_name,
1222 canonical_name,
1223 plan.columns,
1224 plan.if_not_exists,
1225 ),
1226 }
1227 }
1228
1229 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1230 self.catalog.table_names()
1232 }
1233
1234 fn filter_visible_row_ids(
1235 &self,
1236 table: &ExecutorTable<P>,
1237 row_ids: Vec<u64>,
1238 snapshot: TransactionSnapshot,
1239 ) -> Result<Vec<u64>> {
1240 filter_row_ids_for_snapshot(
1241 table.table.store(),
1242 table.table.table_id(),
1243 row_ids,
1244 &self.txn_manager,
1245 snapshot,
1246 )
1247 }
1248
1249 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1250 RuntimeCreateTableBuilder {
1251 ctx: self,
1252 plan: CreateTablePlan::new(name),
1253 }
1254 }
1255
1256 pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1257 let (_, canonical_name) = canonical_table_name(name)?;
1258 let table = self.lookup_table(&canonical_name)?;
1259 Ok(table
1260 .schema
1261 .columns
1262 .iter()
1263 .map(|column| {
1264 ColumnSpec::new(
1265 column.name.clone(),
1266 column.data_type.clone(),
1267 column.nullable,
1268 )
1269 .with_primary_key(column.primary_key)
1270 })
1271 .collect())
1272 }
1273
1274 fn copy_tables_to_staging<Q>(&self, staging: &RuntimeContext<Q>) -> Result<()>
1277 where
1278 Q: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1279 {
1280 let base_store = ColumnStore::open(Arc::clone(&self.pager))?;
1281 let base_catalog = SysCatalog::new(&base_store);
1282
1283 let staging_store = ColumnStore::open(Arc::clone(&staging.pager))?;
1284 let staging_catalog = SysCatalog::new(&staging_store);
1285
1286 let mut next_table_id = match base_catalog.get_next_table_id()? {
1287 Some(value) => value,
1288 None => {
1289 let seed = base_catalog.max_table_id()?.unwrap_or(CATALOG_TABLE_ID);
1290 seed.checked_add(1).ok_or_else(|| {
1291 Error::InvalidArgumentError("exhausted available table ids".into())
1292 })?
1293 }
1294 };
1295 if next_table_id == CATALOG_TABLE_ID {
1296 next_table_id = next_table_id.checked_add(1).ok_or_else(|| {
1297 Error::InvalidArgumentError("exhausted available table ids".into())
1298 })?;
1299 }
1300
1301 staging_catalog.put_next_table_id(next_table_id)?;
1302
1303 let source_tables: Vec<(String, Arc<ExecutorTable<P>>)> = {
1304 let guard = self.tables.read().unwrap();
1305 guard
1306 .iter()
1307 .map(|(name, table)| (name.clone(), Arc::clone(table)))
1308 .collect()
1309 };
1310
1311 for (table_name, source_table) in source_tables.iter() {
1312 tracing::trace!(
1313 "!!! COPY_TABLES_TO_STAGING: Copying table '{}' with {} columns:",
1314 table_name,
1315 source_table.schema.columns.len()
1316 );
1317 for (idx, col) in source_table.schema.columns.iter().enumerate() {
1318 tracing::trace!(
1319 " source column[{}]: name='{}' primary_key={}",
1320 idx,
1321 col.name,
1322 col.primary_key
1323 );
1324 }
1325
1326 let new_table = Table::new(source_table.table.table_id(), Arc::clone(&staging.pager))?;
1328
1329 let new_executor_table = Arc::new(ExecutorTable {
1332 table: Arc::new(new_table),
1333 schema: source_table.schema.clone(),
1334 next_row_id: AtomicU64::new(0),
1335 total_rows: AtomicU64::new(0),
1336 });
1337
1338 tracing::trace!(
1339 "!!! COPY_TABLES_TO_STAGING: After copy, {} columns in new executor table:",
1340 new_executor_table.schema.columns.len()
1341 );
1342 for (idx, col) in new_executor_table.schema.columns.iter().enumerate() {
1343 tracing::trace!(
1344 " new column[{}]: name='{}' primary_key={}",
1345 idx,
1346 col.name,
1347 col.primary_key
1348 );
1349 }
1350
1351 {
1352 let mut staging_tables = staging.tables.write().unwrap();
1353 staging_tables.insert(table_name.clone(), Arc::clone(&new_executor_table));
1354 }
1355
1356 let batches = match self.get_batches_with_row_ids(table_name, None) {
1359 Ok(batches) => batches,
1360 Err(Error::NotFound) => {
1361 Vec::new()
1363 }
1364 Err(e) => return Err(e),
1365 };
1366 if !batches.is_empty() {
1367 for batch in batches {
1368 new_executor_table.table.append(&batch)?;
1369 }
1370 }
1371
1372 let next_row_id = source_table.next_row_id.load(Ordering::SeqCst);
1373 new_executor_table
1374 .next_row_id
1375 .store(next_row_id, Ordering::SeqCst);
1376 let total_rows = source_table.total_rows.load(Ordering::SeqCst);
1377 new_executor_table
1378 .total_rows
1379 .store(total_rows, Ordering::SeqCst);
1380 }
1381
1382 let staging_count = staging.tables.read().unwrap().len();
1383 tracing::trace!(
1384 "!!! COPY_TABLES_TO_STAGING: Copied {} tables to staging context",
1385 staging_count
1386 );
1387 Ok(())
1388 }
1389
1390 pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1391 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1392 handle.lazy()?.collect_rows()
1393 }
1394
1395 fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1396 self.create_table_plan(plan)
1397 }
1398
1399 fn create_table_with_options<C, I>(
1400 self: &Arc<Self>,
1401 name: &str,
1402 columns: I,
1403 if_not_exists: bool,
1404 ) -> Result<RuntimeTableHandle<P>>
1405 where
1406 C: IntoColumnSpec,
1407 I: IntoIterator<Item = C>,
1408 {
1409 let mut plan = CreateTablePlan::new(name);
1410 plan.if_not_exists = if_not_exists;
1411 plan.columns = columns
1412 .into_iter()
1413 .map(|column| column.into_column_spec())
1414 .collect();
1415 let result = self.create_table_plan(plan)?;
1416 match result {
1417 RuntimeStatementResult::CreateTable { .. } => {
1418 RuntimeTableHandle::new(Arc::clone(self), name)
1419 }
1420 other => Err(Error::InvalidArgumentError(format!(
1421 "unexpected statement result {other:?} when creating table"
1422 ))),
1423 }
1424 }
1425
1426 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1427 let snapshot = TransactionSnapshot {
1430 txn_id: TXN_ID_AUTO_COMMIT,
1431 snapshot_id: self.txn_manager.last_committed(),
1432 };
1433 self.insert_with_snapshot(plan, snapshot)
1434 }
1435
1436 pub fn insert_with_snapshot(
1437 &self,
1438 plan: InsertPlan,
1439 snapshot: TransactionSnapshot,
1440 ) -> Result<RuntimeStatementResult<P>> {
1441 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1442 let table = self.lookup_table(&canonical_name)?;
1443
1444 if display_name == "keys" {
1446 tracing::trace!(
1447 "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1448 table.table.table_id(),
1449 &*self.pager
1450 );
1451 tracing::trace!(
1452 "[KEYS] Table has {} columns, primary_key columns: {:?}",
1453 table.schema.columns.len(),
1454 table
1455 .schema
1456 .columns
1457 .iter()
1458 .filter(|c| c.primary_key)
1459 .map(|c| &c.name)
1460 .collect::<Vec<_>>()
1461 );
1462 }
1463
1464 let result = match plan.source {
1465 InsertSource::Rows(rows) => self.insert_rows(
1466 table.as_ref(),
1467 display_name.clone(),
1468 rows,
1469 plan.columns,
1470 snapshot,
1471 ),
1472 InsertSource::Batches(batches) => self.insert_batches(
1473 table.as_ref(),
1474 display_name.clone(),
1475 batches,
1476 plan.columns,
1477 snapshot,
1478 ),
1479 InsertSource::Select { .. } => Err(Error::Internal(
1480 "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1481 .into(),
1482 )),
1483 };
1484
1485 if display_name == "keys" {
1486 tracing::trace!(
1487 "[KEYS] INSERT completed: {:?}",
1488 result
1489 .as_ref()
1490 .map(|_| "OK")
1491 .map_err(|e| format!("{:?}", e))
1492 );
1493 }
1494
1495 result
1496 }
1497
1498 pub fn get_batches_with_row_ids(
1501 &self,
1502 table_name: &str,
1503 filter: Option<LlkvExpr<'static, String>>,
1504 ) -> Result<Vec<RecordBatch>> {
1505 self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1506 }
1507
1508 pub fn get_batches_with_row_ids_with_snapshot(
1509 &self,
1510 table_name: &str,
1511 filter: Option<LlkvExpr<'static, String>>,
1512 snapshot: TransactionSnapshot,
1513 ) -> Result<Vec<RecordBatch>> {
1514 let (_, canonical_name) = canonical_table_name(table_name)?;
1515 let table = self.lookup_table(&canonical_name)?;
1516
1517 let filter_expr = match filter {
1518 Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1519 None => {
1520 let field_id = table.schema.first_field_id().ok_or_else(|| {
1521 Error::InvalidArgumentError(
1522 "table has no columns; cannot perform wildcard scan".into(),
1523 )
1524 })?;
1525 full_table_scan_filter(field_id)
1526 }
1527 };
1528
1529 let row_ids = table.table.filter_row_ids(&filter_expr)?;
1531 if row_ids.is_empty() {
1532 return Ok(Vec::new());
1533 }
1534
1535 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1536 if visible_row_ids.is_empty() {
1537 return Ok(Vec::new());
1538 }
1539
1540 let table_id = table.table.table_id();
1542
1543 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1544 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(table.schema.columns.len() + 1);
1545
1546 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1547 arrays.push(Arc::new(UInt64Array::from(visible_row_ids.clone())));
1548
1549 for column in &table.schema.columns {
1550 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1551 let gathered = table.table.store().gather_rows(
1552 &[logical_field_id],
1553 &visible_row_ids,
1554 GatherNullPolicy::IncludeNulls,
1555 )?;
1556 let field = mvcc_columns::build_field_with_metadata(
1557 &column.name,
1558 column.data_type.clone(),
1559 column.nullable,
1560 column.field_id,
1561 );
1562 fields.push(field);
1563 arrays.push(gathered.column(0).clone());
1564 }
1565
1566 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
1567 Ok(vec![batch])
1568 }
1569
1570 pub fn append_batches_with_row_ids(
1573 &self,
1574 table_name: &str,
1575 batches: Vec<RecordBatch>,
1576 ) -> Result<usize> {
1577 let (_, canonical_name) = canonical_table_name(table_name)?;
1578 let table = self.lookup_table(&canonical_name)?;
1579
1580 let mut total_rows = 0;
1581 for batch in batches {
1582 if batch.num_rows() == 0 {
1583 continue;
1584 }
1585
1586 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
1588 Error::InvalidArgumentError(
1589 "batch must contain row_id column for direct append".into(),
1590 )
1591 })?;
1592
1593 table.table.append(&batch)?;
1595 total_rows += batch.num_rows();
1596 }
1597
1598 Ok(total_rows)
1599 }
1600
1601 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
1602 let snapshot = self.txn_manager.begin_transaction();
1603 let result = self.update_with_snapshot(plan, snapshot)?;
1604 self.txn_manager.mark_committed(snapshot.txn_id);
1605 Ok(result)
1606 }
1607
1608 pub fn update_with_snapshot(
1609 &self,
1610 plan: UpdatePlan,
1611 snapshot: TransactionSnapshot,
1612 ) -> Result<RuntimeStatementResult<P>> {
1613 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1614 let table = self.lookup_table(&canonical_name)?;
1615 match plan.filter {
1616 Some(filter) => self.update_filtered_rows(
1617 table.as_ref(),
1618 display_name,
1619 plan.assignments,
1620 filter,
1621 snapshot,
1622 ),
1623 None => self.update_all_rows(table.as_ref(), display_name, plan.assignments, snapshot),
1624 }
1625 }
1626
1627 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1628 let snapshot = self.txn_manager.begin_transaction();
1629 let result = self.delete_with_snapshot(plan, snapshot)?;
1630 self.txn_manager.mark_committed(snapshot.txn_id);
1631 Ok(result)
1632 }
1633
1634 pub fn delete_with_snapshot(
1635 &self,
1636 plan: DeletePlan,
1637 snapshot: TransactionSnapshot,
1638 ) -> Result<RuntimeStatementResult<P>> {
1639 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1640 let table = self.lookup_table(&canonical_name)?;
1641 match plan.filter {
1642 Some(filter) => self.delete_filtered_rows(
1643 table.as_ref(),
1644 display_name,
1645 filter,
1646 snapshot,
1647 snapshot.txn_id,
1648 ),
1649 None => self.delete_all_rows(table.as_ref(), display_name, snapshot, snapshot.txn_id),
1650 }
1651 }
1652
1653 pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1654 RuntimeTableHandle::new(Arc::clone(self), name)
1655 }
1656
1657 pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
1658 let (_display_name, canonical_name) = canonical_table_name(&plan.table)?;
1659 let _table = self.lookup_table(&canonical_name)?;
1661
1662 let mut canonical_plan = plan.clone();
1664 canonical_plan.table = canonical_name;
1665
1666 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
1668 context: Arc::clone(self),
1669 });
1670 let executor = QueryExecutor::new(provider);
1671 executor.execute_select(canonical_plan)
1672 }
1673
1674 pub fn execute_select_with_snapshot(
1675 self: &Arc<Self>,
1676 plan: SelectPlan,
1677 snapshot: TransactionSnapshot,
1678 ) -> Result<SelectExecution<P>> {
1679 let (_display_name, canonical_name) = canonical_table_name(&plan.table)?;
1680 self.lookup_table(&canonical_name)?;
1681
1682 let mut canonical_plan = plan.clone();
1683 canonical_plan.table = canonical_name;
1684
1685 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
1686 context: Arc::clone(self),
1687 });
1688 let executor = QueryExecutor::new(provider);
1689 let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
1690 Arc::clone(&self.txn_manager),
1691 snapshot,
1692 ));
1693 executor.execute_select_with_filter(canonical_plan, Some(row_filter))
1694 }
1695
1696 fn create_table_from_columns(
1697 &self,
1698 display_name: String,
1699 canonical_name: String,
1700 columns: Vec<ColumnSpec>,
1701 if_not_exists: bool,
1702 ) -> Result<RuntimeStatementResult<P>> {
1703 tracing::trace!(
1704 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
1705 display_name,
1706 columns.len()
1707 );
1708 for (idx, col) in columns.iter().enumerate() {
1709 tracing::trace!(
1710 " input column[{}]: name='{}' primary_key={}",
1711 idx,
1712 col.name,
1713 col.primary_key
1714 );
1715 }
1716 if columns.is_empty() {
1717 return Err(Error::InvalidArgumentError(
1718 "CREATE TABLE requires at least one column".into(),
1719 ));
1720 }
1721
1722 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(columns.len());
1723 let mut lookup = FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
1724 for (idx, column) in columns.iter().enumerate() {
1725 let normalized = column.name.to_ascii_lowercase();
1726 if lookup.insert(normalized.clone(), idx).is_some() {
1727 return Err(Error::InvalidArgumentError(format!(
1728 "duplicate column name '{}' in table '{}'",
1729 column.name, display_name
1730 )));
1731 }
1732 tracing::trace!(
1733 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={}",
1734 idx,
1735 column.name,
1736 column.data_type,
1737 column.nullable,
1738 column.primary_key
1739 );
1740 column_defs.push(ExecutorColumn {
1741 name: column.name.clone(),
1742 data_type: column.data_type.clone(),
1743 nullable: column.nullable,
1744 primary_key: column.primary_key,
1745 field_id: (idx + 1) as FieldId,
1746 });
1747 let pushed = column_defs.last().unwrap();
1748 tracing::trace!(
1749 "DEBUG create_table_from_columns[{}]: pushed ExecutorColumn name='{}' primary_key={}",
1750 idx,
1751 pushed.name,
1752 pushed.primary_key
1753 );
1754 }
1755
1756 let table_id = self.reserve_table_id()?;
1757 tracing::trace!(
1758 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
1759 display_name,
1760 table_id,
1761 &*self.pager
1762 );
1763 let table = Table::new(table_id, Arc::clone(&self.pager))?;
1764 table.put_table_meta(&TableMeta {
1765 table_id,
1766 name: Some(display_name.clone()),
1767 created_at_micros: current_time_micros(),
1768 flags: 0,
1769 epoch: 0,
1770 });
1771
1772 for column in &column_defs {
1773 table.put_col_meta(&ColMeta {
1774 col_id: column.field_id,
1775 name: Some(column.name.clone()),
1776 flags: 0,
1777 default: None,
1778 });
1779 }
1780
1781 let schema = Arc::new(ExecutorSchema {
1782 columns: column_defs.clone(), lookup,
1784 });
1785 let table_entry = Arc::new(ExecutorTable {
1786 table: Arc::new(table),
1787 schema,
1788 next_row_id: AtomicU64::new(0),
1789 total_rows: AtomicU64::new(0),
1790 });
1791
1792 let mut tables = self.tables.write().unwrap();
1793 if tables.contains_key(&canonical_name) {
1794 if if_not_exists {
1795 return Ok(RuntimeStatementResult::CreateTable {
1796 table_name: display_name,
1797 });
1798 }
1799 return Err(Error::CatalogError(format!(
1800 "Catalog Error: Table '{}' already exists",
1801 display_name
1802 )));
1803 }
1804 tables.insert(canonical_name.clone(), table_entry);
1805 drop(tables); let registered_table_id = self.catalog.register_table(&display_name)?;
1809 tracing::debug!(
1810 "[CATALOG] Registered table '{}' with catalog_id={}",
1811 display_name,
1812 registered_table_id
1813 );
1814
1815 if let Some(field_resolver) = self.catalog.field_resolver(registered_table_id) {
1817 for column in &column_defs {
1818 if let Err(e) = field_resolver.register_field(&column.name) {
1819 tracing::warn!(
1820 "[CATALOG] Failed to register field '{}' in table '{}': {}",
1821 column.name,
1822 display_name,
1823 e
1824 );
1825 }
1826 }
1827 tracing::debug!(
1828 "[CATALOG] Registered {} field(s) for table '{}'",
1829 column_defs.len(),
1830 display_name
1831 );
1832 }
1833
1834 Ok(RuntimeStatementResult::CreateTable {
1835 table_name: display_name,
1836 })
1837 }
1838
1839 fn create_table_from_batches(
1840 &self,
1841 display_name: String,
1842 canonical_name: String,
1843 schema: Arc<Schema>,
1844 batches: Vec<RecordBatch>,
1845 if_not_exists: bool,
1846 ) -> Result<RuntimeStatementResult<P>> {
1847 if schema.fields().is_empty() {
1848 return Err(Error::InvalidArgumentError(
1849 "CREATE TABLE AS SELECT requires at least one column".into(),
1850 ));
1851 }
1852 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(schema.fields().len());
1853 let mut lookup =
1854 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
1855 for (idx, field) in schema.fields().iter().enumerate() {
1856 let data_type = match field.data_type() {
1857 DataType::Int64 | DataType::Float64 | DataType::Utf8 | DataType::Date32 => {
1858 field.data_type().clone()
1859 }
1860 other => {
1861 return Err(Error::InvalidArgumentError(format!(
1862 "unsupported column type in CTAS result: {other:?}"
1863 )));
1864 }
1865 };
1866 let normalized = field.name().to_ascii_lowercase();
1867 if lookup.insert(normalized.clone(), idx).is_some() {
1868 return Err(Error::InvalidArgumentError(format!(
1869 "duplicate column name '{}' in CTAS result",
1870 field.name()
1871 )));
1872 }
1873 column_defs.push(ExecutorColumn {
1874 name: field.name().to_string(),
1875 data_type,
1876 nullable: field.is_nullable(),
1877 primary_key: false, field_id: (idx + 1) as FieldId,
1879 });
1880 }
1881
1882 let table_id = self.reserve_table_id()?;
1883 let table = Table::new(table_id, Arc::clone(&self.pager))?;
1884 table.put_table_meta(&TableMeta {
1885 table_id,
1886 name: Some(display_name.clone()),
1887 created_at_micros: current_time_micros(),
1888 flags: 0,
1889 epoch: 0,
1890 });
1891
1892 for column in &column_defs {
1893 table.put_col_meta(&ColMeta {
1894 col_id: column.field_id,
1895 name: Some(column.name.clone()),
1896 flags: 0,
1897 default: None,
1898 });
1899 }
1900
1901 let schema_arc = Arc::new(ExecutorSchema {
1902 columns: column_defs.clone(),
1903 lookup,
1904 });
1905 let table_entry = Arc::new(ExecutorTable {
1906 table: Arc::new(table),
1907 schema: schema_arc,
1908 next_row_id: AtomicU64::new(0),
1909 total_rows: AtomicU64::new(0),
1910 });
1911
1912 let mut next_row_id: RowId = 0;
1913 let mut total_rows: u64 = 0;
1914 let creator_snapshot = self.txn_manager.begin_transaction();
1915 let creator_txn_id = creator_snapshot.txn_id;
1916 for batch in batches {
1917 let row_count = batch.num_rows();
1918 if row_count == 0 {
1919 continue;
1920 }
1921 if batch.num_columns() != column_defs.len() {
1922 return Err(Error::InvalidArgumentError(
1923 "CTAS query returned unexpected column count".into(),
1924 ));
1925 }
1926 let start_row = next_row_id;
1927 next_row_id += row_count as u64;
1928 total_rows += row_count as u64;
1929
1930 let (row_id_array, created_by_array, deleted_by_array) =
1932 mvcc_columns::build_insert_mvcc_columns(row_count, start_row, creator_txn_id);
1933
1934 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_defs.len() + 3);
1935 arrays.push(row_id_array);
1936 arrays.push(created_by_array);
1937 arrays.push(deleted_by_array);
1938
1939 let mut fields: Vec<Field> = Vec::with_capacity(column_defs.len() + 3);
1941 fields.extend(mvcc_columns::build_mvcc_fields());
1942
1943 for (idx, column) in column_defs.iter().enumerate() {
1944 let field = mvcc_columns::build_field_with_metadata(
1945 &column.name,
1946 column.data_type.clone(),
1947 column.nullable,
1948 column.field_id,
1949 );
1950 fields.push(field);
1951 arrays.push(batch.column(idx).clone());
1952 }
1953
1954 let append_schema = Arc::new(Schema::new(fields));
1955 let append_batch = RecordBatch::try_new(append_schema, arrays)?;
1956 table_entry.table.append(&append_batch)?;
1957 }
1958
1959 self.txn_manager.mark_committed(creator_txn_id);
1960
1961 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
1962 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
1963
1964 let mut tables = self.tables.write().unwrap();
1965 if tables.contains_key(&canonical_name) {
1966 if if_not_exists {
1967 return Ok(RuntimeStatementResult::CreateTable {
1968 table_name: display_name,
1969 });
1970 }
1971 return Err(Error::CatalogError(format!(
1972 "Catalog Error: Table '{}' already exists",
1973 display_name
1974 )));
1975 }
1976 tracing::trace!(
1977 "=== INSERTING TABLE '{}' INTO TABLES MAP (pager={:p}) ===",
1978 canonical_name,
1979 &*self.pager
1980 );
1981 for (idx, col) in table_entry.schema.columns.iter().enumerate() {
1982 tracing::trace!(
1983 " inserting column[{}]: name='{}' primary_key={}",
1984 idx,
1985 col.name,
1986 col.primary_key
1987 );
1988 }
1989 tables.insert(canonical_name.clone(), table_entry);
1990 drop(tables); let registered_table_id = self.catalog.register_table(&display_name)?;
1994 tracing::debug!(
1995 "[CATALOG] Registered table '{}' (CTAS) with catalog_id={}",
1996 display_name,
1997 registered_table_id
1998 );
1999
2000 if let Some(field_resolver) = self.catalog.field_resolver(registered_table_id) {
2002 for column in &column_defs {
2003 if let Err(e) = field_resolver.register_field(&column.name) {
2004 tracing::warn!(
2005 "[CATALOG] Failed to register field '{}' in table '{}': {}",
2006 column.name,
2007 display_name,
2008 e
2009 );
2010 }
2011 }
2012 tracing::debug!(
2013 "[CATALOG] Registered {} field(s) for table '{}' (CTAS)",
2014 column_defs.len(),
2015 display_name
2016 );
2017 }
2018
2019 Ok(RuntimeStatementResult::CreateTable {
2020 table_name: display_name,
2021 })
2022 }
2023
2024 fn check_primary_key_constraints(
2025 &self,
2026 table: &ExecutorTable<P>,
2027 rows: &[Vec<PlanValue>],
2028 column_order: &[usize],
2029 snapshot: TransactionSnapshot,
2030 ) -> Result<()> {
2031 let _table_id = table.table.table_id();
2032 let primary_key_columns: Vec<(usize, &ExecutorColumn)> = table
2034 .schema
2035 .columns
2036 .iter()
2037 .enumerate()
2038 .filter(|(_, col)| col.primary_key)
2039 .collect();
2040
2041 if primary_key_columns.is_empty() {
2042 return Ok(());
2043 }
2044
2045 for (col_idx, column) in primary_key_columns {
2047 let field_id = column.field_id;
2049 let existing_values = self.scan_column_values(table, field_id, snapshot)?;
2050
2051 tracing::trace!(
2052 "[PK_CHECK] snapshot(txn={}, snap_id={}) column '{}': found {} existing VISIBLE values: {:?}",
2053 snapshot.txn_id,
2054 snapshot.snapshot_id,
2055 column.name,
2056 existing_values.len(),
2057 existing_values
2058 );
2059
2060 for row in rows {
2062 let insert_position = column_order
2064 .iter()
2065 .position(|&dest_idx| dest_idx == col_idx);
2066
2067 if let Some(pos) = insert_position {
2068 let new_value = &row[pos];
2069
2070 if matches!(new_value, PlanValue::Null) {
2072 continue;
2073 }
2074
2075 if existing_values.contains(new_value) {
2077 return Err(Error::ConstraintError(format!(
2078 "constraint violation on column '{}'",
2079 column.name
2080 )));
2081 }
2082 }
2083 }
2084 }
2085
2086 Ok(())
2087 }
2088
2089 fn scan_column_values(
2090 &self,
2091 table: &ExecutorTable<P>,
2092 field_id: FieldId,
2093 snapshot: TransactionSnapshot,
2094 ) -> Result<Vec<PlanValue>> {
2095 let table_id = table.table.table_id();
2096 use llkv_expr::{Expr, Filter, Operator};
2097 use std::ops::Bound;
2098
2099 let match_all_filter = Filter {
2101 field_id,
2102 op: Operator::Range {
2103 lower: Bound::Unbounded,
2104 upper: Bound::Unbounded,
2105 },
2106 };
2107 let filter_expr = Expr::Pred(match_all_filter);
2108
2109 let row_ids = match table.table.filter_row_ids(&filter_expr) {
2111 Ok(ids) => ids,
2112 Err(Error::NotFound) => return Ok(Vec::new()),
2113 Err(e) => return Err(e),
2114 };
2115
2116 let row_ids = filter_row_ids_for_snapshot(
2118 table.table.store(),
2119 table_id,
2120 row_ids,
2121 &self.txn_manager,
2122 snapshot,
2123 )?;
2124
2125 if row_ids.is_empty() {
2126 return Ok(Vec::new());
2127 }
2128
2129 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2131 let batch = match table.table.store().gather_rows(
2132 &[logical_field_id],
2133 &row_ids,
2134 GatherNullPolicy::IncludeNulls,
2135 ) {
2136 Ok(b) => b,
2137 Err(Error::NotFound) => return Ok(Vec::new()),
2138 Err(e) => return Err(e),
2139 };
2140
2141 let mut values = Vec::with_capacity(row_ids.len());
2142 if batch.num_columns() > 0 {
2143 let array = batch.column(0);
2144 for row_idx in 0..batch.num_rows() {
2145 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2146 values.push(value);
2147 }
2148 }
2149 }
2150
2151 Ok(values)
2152 }
2153
2154 fn insert_rows(
2155 &self,
2156 table: &ExecutorTable<P>,
2157 display_name: String,
2158 rows: Vec<Vec<PlanValue>>,
2159 columns: Vec<String>,
2160 snapshot: TransactionSnapshot,
2161 ) -> Result<RuntimeStatementResult<P>> {
2162 if rows.is_empty() {
2163 return Err(Error::InvalidArgumentError(
2164 "INSERT requires at least one row".into(),
2165 ));
2166 }
2167
2168 let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
2169 let expected_len = column_order.len();
2170 for row in &rows {
2171 if row.len() != expected_len {
2172 return Err(Error::InvalidArgumentError(format!(
2173 "expected {} values in INSERT row, found {}",
2174 expected_len,
2175 row.len()
2176 )));
2177 }
2178 }
2179
2180 self.check_primary_key_constraints(table, &rows, &column_order, snapshot)?;
2182
2183 if display_name == "keys" {
2184 tracing::trace!(
2185 "[KEYS] Checking PRIMARY KEY constraints - {} rows to insert",
2186 rows.len()
2187 );
2188 for (i, row) in rows.iter().enumerate() {
2189 tracing::trace!("[KEYS] row[{}]: {:?}", i, row);
2190 }
2191 }
2192
2193 let constraint_result =
2194 self.check_primary_key_constraints(table, &rows, &column_order, snapshot);
2195
2196 if display_name == "keys" {
2197 match &constraint_result {
2198 Ok(_) => tracing::trace!("[KEYS] PRIMARY KEY check PASSED"),
2199 Err(e) => tracing::trace!("[KEYS] PRIMARY KEY check FAILED: {:?}", e),
2200 }
2201 }
2202
2203 constraint_result?;
2204
2205 let row_count = rows.len();
2206 let mut column_values: Vec<Vec<PlanValue>> =
2207 vec![Vec::with_capacity(row_count); table.schema.columns.len()];
2208 for row in rows {
2209 for (idx, value) in row.into_iter().enumerate() {
2210 let dest_index = column_order[idx];
2211 column_values[dest_index].push(value);
2212 }
2213 }
2214
2215 let start_row = table.next_row_id.load(Ordering::SeqCst);
2216
2217 let (row_id_array, created_by_array, deleted_by_array) =
2219 mvcc_columns::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id);
2220
2221 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
2222 arrays.push(row_id_array);
2223 arrays.push(created_by_array);
2224 arrays.push(deleted_by_array);
2225
2226 let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
2227 fields.extend(mvcc_columns::build_mvcc_fields());
2228
2229 for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
2230 let array = build_array_for_column(&column.data_type, &values)?;
2231 let field = mvcc_columns::build_field_with_metadata(
2232 &column.name,
2233 column.data_type.clone(),
2234 column.nullable,
2235 column.field_id,
2236 );
2237 arrays.push(array);
2238 fields.push(field);
2239 }
2240
2241 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
2242 table.table.append(&batch)?;
2243 table
2244 .next_row_id
2245 .store(start_row + row_count as u64, Ordering::SeqCst);
2246 table
2247 .total_rows
2248 .fetch_add(row_count as u64, Ordering::SeqCst);
2249
2250 Ok(RuntimeStatementResult::Insert {
2251 table_name: display_name,
2252 rows_inserted: row_count,
2253 })
2254 }
2255
2256 fn insert_batches(
2257 &self,
2258 table: &ExecutorTable<P>,
2259 display_name: String,
2260 batches: Vec<RecordBatch>,
2261 columns: Vec<String>,
2262 snapshot: TransactionSnapshot,
2263 ) -> Result<RuntimeStatementResult<P>> {
2264 if batches.is_empty() {
2265 return Ok(RuntimeStatementResult::Insert {
2266 table_name: display_name,
2267 rows_inserted: 0,
2268 });
2269 }
2270
2271 let expected_len = if columns.is_empty() {
2272 table.schema.columns.len()
2273 } else {
2274 columns.len()
2275 };
2276 let mut total_rows_inserted = 0usize;
2277
2278 for batch in batches {
2279 if batch.num_columns() != expected_len {
2280 return Err(Error::InvalidArgumentError(format!(
2281 "expected {} columns in INSERT batch, found {}",
2282 expected_len,
2283 batch.num_columns()
2284 )));
2285 }
2286 let row_count = batch.num_rows();
2287 if row_count == 0 {
2288 continue;
2289 }
2290 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
2291 for row_idx in 0..row_count {
2292 let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
2293 for col_idx in 0..expected_len {
2294 let array = batch.column(col_idx);
2295 row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
2296 }
2297 rows.push(row);
2298 }
2299
2300 match self.insert_rows(table, display_name.clone(), rows, columns.clone(), snapshot)? {
2301 RuntimeStatementResult::Insert { rows_inserted, .. } => {
2302 total_rows_inserted += rows_inserted;
2303 }
2304 _ => unreachable!("insert_rows must return Insert result"),
2305 }
2306 }
2307
2308 Ok(RuntimeStatementResult::Insert {
2309 table_name: display_name,
2310 rows_inserted: total_rows_inserted,
2311 })
2312 }
2313
2314 fn update_filtered_rows(
2315 &self,
2316 table: &ExecutorTable<P>,
2317 display_name: String,
2318 assignments: Vec<ColumnAssignment>,
2319 filter: LlkvExpr<'static, String>,
2320 snapshot: TransactionSnapshot,
2321 ) -> Result<RuntimeStatementResult<P>> {
2322 if assignments.is_empty() {
2323 return Err(Error::InvalidArgumentError(
2324 "UPDATE requires at least one assignment".into(),
2325 ));
2326 }
2327
2328 let schema = table.schema.as_ref();
2329 let filter_expr = translate_predicate(filter, schema)?;
2330
2331 enum PreparedValue {
2333 Literal(PlanValue),
2334 Expression { expr_index: usize },
2335 }
2336
2337 let mut seen_columns: FxHashSet<String> =
2338 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
2339 let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
2340 Vec::with_capacity(assignments.len());
2341 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
2342
2343 for assignment in assignments {
2344 let normalized = assignment.column.to_ascii_lowercase();
2345 if !seen_columns.insert(normalized.clone()) {
2346 return Err(Error::InvalidArgumentError(format!(
2347 "duplicate column '{}' in UPDATE assignments",
2348 assignment.column
2349 )));
2350 }
2351 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
2352 Error::InvalidArgumentError(format!(
2353 "unknown column '{}' in UPDATE",
2354 assignment.column
2355 ))
2356 })?;
2357
2358 match assignment.value {
2359 AssignmentValue::Literal(value) => {
2360 prepared.push((column.clone(), PreparedValue::Literal(value)));
2361 }
2362 AssignmentValue::Expression(expr) => {
2363 let translated = translate_scalar(&expr, schema)?;
2364 let expr_index = scalar_exprs.len();
2365 scalar_exprs.push(translated);
2366 prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
2367 }
2368 }
2369 }
2370
2371 let (row_ids, mut expr_values) =
2372 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
2373
2374 if row_ids.is_empty() {
2375 return Ok(RuntimeStatementResult::Update {
2376 table_name: display_name,
2377 rows_updated: 0,
2378 });
2379 }
2380
2381 let row_count = row_ids.len();
2382 let table_id = table.table.table_id();
2383 let logical_fields: Vec<LogicalFieldId> = table
2384 .schema
2385 .columns
2386 .iter()
2387 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
2388 .collect();
2389
2390 let gathered = table.table.store().gather_rows(
2391 &logical_fields,
2392 &row_ids,
2393 GatherNullPolicy::IncludeNulls,
2394 )?;
2395
2396 let mut new_rows: Vec<Vec<PlanValue>> =
2397 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
2398 for (col_idx, _column) in table.schema.columns.iter().enumerate() {
2399 let array = gathered.column(col_idx);
2400 for (row_idx, row) in new_rows.iter_mut().enumerate().take(row_count) {
2401 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2402 row.push(value);
2403 }
2404 }
2405
2406 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
2407 table
2408 .schema
2409 .columns
2410 .iter()
2411 .enumerate()
2412 .map(|(idx, column)| (column.field_id, idx)),
2413 );
2414
2415 for (column, value) in prepared {
2416 let column_index =
2417 column_positions
2418 .get(&column.field_id)
2419 .copied()
2420 .ok_or_else(|| {
2421 Error::InvalidArgumentError(format!(
2422 "column '{}' missing in table schema during UPDATE",
2423 column.name
2424 ))
2425 })?;
2426
2427 let values = match value {
2428 PreparedValue::Literal(lit) => vec![lit; row_count],
2429 PreparedValue::Expression { expr_index } => {
2430 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
2431 Error::InvalidArgumentError(
2432 "expression assignment value missing during UPDATE".into(),
2433 )
2434 })?;
2435 if column_values.len() != row_count {
2436 return Err(Error::InvalidArgumentError(
2437 "expression result count did not match targeted row count".into(),
2438 ));
2439 }
2440 mem::take(column_values)
2441 }
2442 };
2443
2444 for (row_idx, new_value) in values.into_iter().enumerate() {
2445 if let Some(row) = new_rows.get_mut(row_idx) {
2446 row[column_index] = new_value;
2447 }
2448 }
2449 }
2450
2451 let _ = self.apply_delete(
2452 table,
2453 display_name.clone(),
2454 row_ids.clone(),
2455 snapshot.txn_id,
2456 )?;
2457
2458 let column_names: Vec<String> = table
2459 .schema
2460 .columns
2461 .iter()
2462 .map(|column| column.name.clone())
2463 .collect();
2464
2465 let _ = self.insert_rows(
2466 table,
2467 display_name.clone(),
2468 new_rows,
2469 column_names,
2470 snapshot,
2471 )?;
2472
2473 Ok(RuntimeStatementResult::Update {
2474 table_name: display_name,
2475 rows_updated: row_count,
2476 })
2477 }
2478
2479 fn update_all_rows(
2480 &self,
2481 table: &ExecutorTable<P>,
2482 display_name: String,
2483 assignments: Vec<ColumnAssignment>,
2484 snapshot: TransactionSnapshot,
2485 ) -> Result<RuntimeStatementResult<P>> {
2486 if assignments.is_empty() {
2487 return Err(Error::InvalidArgumentError(
2488 "UPDATE requires at least one assignment".into(),
2489 ));
2490 }
2491
2492 let total_rows = table.total_rows.load(Ordering::SeqCst);
2493 let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
2494 Error::InvalidArgumentError("table row count exceeds supported range".into())
2495 })?;
2496 if total_rows_usize == 0 {
2497 return Ok(RuntimeStatementResult::Update {
2498 table_name: display_name,
2499 rows_updated: 0,
2500 });
2501 }
2502
2503 let schema = table.schema.as_ref();
2504
2505 enum PreparedValue {
2507 Literal(PlanValue),
2508 Expression { expr_index: usize },
2509 }
2510
2511 let mut seen_columns: FxHashSet<String> =
2512 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
2513 let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
2514 Vec::with_capacity(assignments.len());
2515 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
2516 let mut first_field_id: Option<FieldId> = None;
2517
2518 for assignment in assignments {
2519 let normalized = assignment.column.to_ascii_lowercase();
2520 if !seen_columns.insert(normalized.clone()) {
2521 return Err(Error::InvalidArgumentError(format!(
2522 "duplicate column '{}' in UPDATE assignments",
2523 assignment.column
2524 )));
2525 }
2526 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
2527 Error::InvalidArgumentError(format!(
2528 "unknown column '{}' in UPDATE",
2529 assignment.column
2530 ))
2531 })?;
2532 if first_field_id.is_none() {
2533 first_field_id = Some(column.field_id);
2534 }
2535
2536 match assignment.value {
2537 AssignmentValue::Literal(value) => {
2538 prepared.push((column.clone(), PreparedValue::Literal(value)));
2539 }
2540 AssignmentValue::Expression(expr) => {
2541 let translated = translate_scalar(&expr, schema)?;
2542 let expr_index = scalar_exprs.len();
2543 scalar_exprs.push(translated);
2544 prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
2545 }
2546 }
2547 }
2548
2549 let anchor_field = first_field_id.ok_or_else(|| {
2550 Error::InvalidArgumentError("UPDATE requires at least one target column".into())
2551 })?;
2552
2553 let filter_expr = full_table_scan_filter(anchor_field);
2554 let (row_ids, mut expr_values) =
2555 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
2556
2557 if row_ids.is_empty() {
2558 return Ok(RuntimeStatementResult::Update {
2559 table_name: display_name,
2560 rows_updated: 0,
2561 });
2562 }
2563
2564 let row_count = row_ids.len();
2565 let table_id = table.table.table_id();
2566 let logical_fields: Vec<LogicalFieldId> = table
2567 .schema
2568 .columns
2569 .iter()
2570 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
2571 .collect();
2572
2573 let gathered = table.table.store().gather_rows(
2574 &logical_fields,
2575 &row_ids,
2576 GatherNullPolicy::IncludeNulls,
2577 )?;
2578
2579 let mut new_rows: Vec<Vec<PlanValue>> =
2580 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
2581 for (col_idx, _column) in table.schema.columns.iter().enumerate() {
2582 let array = gathered.column(col_idx);
2583 for (row_idx, row) in new_rows.iter_mut().enumerate().take(row_count) {
2584 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2585 row.push(value);
2586 }
2587 }
2588
2589 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
2590 table
2591 .schema
2592 .columns
2593 .iter()
2594 .enumerate()
2595 .map(|(idx, column)| (column.field_id, idx)),
2596 );
2597
2598 for (column, value) in prepared {
2599 let column_index =
2600 column_positions
2601 .get(&column.field_id)
2602 .copied()
2603 .ok_or_else(|| {
2604 Error::InvalidArgumentError(format!(
2605 "column '{}' missing in table schema during UPDATE",
2606 column.name
2607 ))
2608 })?;
2609
2610 let values = match value {
2611 PreparedValue::Literal(lit) => vec![lit; row_count],
2612 PreparedValue::Expression { expr_index } => {
2613 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
2614 Error::InvalidArgumentError(
2615 "expression assignment value missing during UPDATE".into(),
2616 )
2617 })?;
2618 if column_values.len() != row_count {
2619 return Err(Error::InvalidArgumentError(
2620 "expression result count did not match targeted row count".into(),
2621 ));
2622 }
2623 mem::take(column_values)
2624 }
2625 };
2626
2627 for (row_idx, new_value) in values.into_iter().enumerate() {
2628 if let Some(row) = new_rows.get_mut(row_idx) {
2629 row[column_index] = new_value;
2630 }
2631 }
2632 }
2633
2634 let _ = self.apply_delete(
2635 table,
2636 display_name.clone(),
2637 row_ids.clone(),
2638 snapshot.txn_id,
2639 )?;
2640
2641 let column_names: Vec<String> = table
2642 .schema
2643 .columns
2644 .iter()
2645 .map(|column| column.name.clone())
2646 .collect();
2647
2648 let _ = self.insert_rows(
2649 table,
2650 display_name.clone(),
2651 new_rows,
2652 column_names,
2653 snapshot,
2654 )?;
2655
2656 Ok(RuntimeStatementResult::Update {
2657 table_name: display_name,
2658 rows_updated: row_count,
2659 })
2660 }
2661
2662 fn delete_filtered_rows(
2663 &self,
2664 table: &ExecutorTable<P>,
2665 display_name: String,
2666 filter: LlkvExpr<'static, String>,
2667 snapshot: TransactionSnapshot,
2668 txn_id: TxnId,
2669 ) -> Result<RuntimeStatementResult<P>> {
2670 let schema = table.schema.as_ref();
2671 let filter_expr = translate_predicate(filter, schema)?;
2672 let row_ids = table.table.filter_row_ids(&filter_expr)?;
2673 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2674 tracing::trace!(
2675 table = %display_name,
2676 rows = row_ids.len(),
2677 "delete_filtered_rows collected row ids"
2678 );
2679 self.apply_delete(table, display_name, row_ids, txn_id)
2680 }
2681
2682 fn delete_all_rows(
2683 &self,
2684 table: &ExecutorTable<P>,
2685 display_name: String,
2686 snapshot: TransactionSnapshot,
2687 txn_id: TxnId,
2688 ) -> Result<RuntimeStatementResult<P>> {
2689 let total_rows = table.total_rows.load(Ordering::SeqCst);
2690 if total_rows == 0 {
2691 return Ok(RuntimeStatementResult::Delete {
2692 table_name: display_name,
2693 rows_deleted: 0,
2694 });
2695 }
2696
2697 let anchor_field = table.schema.first_field_id().ok_or_else(|| {
2698 Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
2699 })?;
2700 let filter_expr = full_table_scan_filter(anchor_field);
2701 let row_ids = table.table.filter_row_ids(&filter_expr)?;
2702 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2703 self.apply_delete(table, display_name, row_ids, txn_id)
2704 }
2705
2706 fn apply_delete(
2707 &self,
2708 table: &ExecutorTable<P>,
2709 display_name: String,
2710 row_ids: Vec<u64>,
2711 txn_id: TxnId,
2712 ) -> Result<RuntimeStatementResult<P>> {
2713 if row_ids.is_empty() {
2714 return Ok(RuntimeStatementResult::Delete {
2715 table_name: display_name,
2716 rows_deleted: 0,
2717 });
2718 }
2719
2720 let removed = row_ids.len();
2721
2722 let batch = mvcc_columns::build_delete_batch(row_ids.clone(), txn_id)?;
2724 table.table.append(&batch)?;
2725
2726 let removed_u64 = u64::try_from(removed)
2727 .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
2728 table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
2729
2730 Ok(RuntimeStatementResult::Delete {
2731 table_name: display_name,
2732 rows_deleted: removed,
2733 })
2734 }
2735
2736 fn collect_update_rows(
2737 &self,
2738 table: &ExecutorTable<P>,
2739 filter_expr: &LlkvExpr<'static, FieldId>,
2740 expressions: &[ScalarExpr<FieldId>],
2741 snapshot: TransactionSnapshot,
2742 ) -> Result<(Vec<u64>, Vec<Vec<PlanValue>>)> {
2743 let row_ids = table.table.filter_row_ids(filter_expr)?;
2744 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
2745 if row_ids.is_empty() {
2746 return Ok((row_ids, vec![Vec::new(); expressions.len()]));
2747 }
2748
2749 if expressions.is_empty() {
2750 return Ok((row_ids, Vec::new()));
2751 }
2752
2753 let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
2754 for (idx, expr) in expressions.iter().enumerate() {
2755 let alias = format!("__expr_{idx}");
2756 projections.push(ScanProjection::computed(expr.clone(), alias));
2757 }
2758
2759 let mut expr_values: Vec<Vec<PlanValue>> =
2760 vec![Vec::with_capacity(row_ids.len()); expressions.len()];
2761 let mut error: Option<Error> = None;
2762 let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
2763 Arc::clone(&self.txn_manager),
2764 snapshot,
2765 ));
2766 let options = ScanStreamOptions {
2767 include_nulls: true,
2768 order: None,
2769 row_id_filter: Some(row_filter),
2770 };
2771
2772 table
2773 .table
2774 .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
2775 if error.is_some() {
2776 return;
2777 }
2778 if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
2779 error = Some(err);
2780 }
2781 })?;
2782
2783 if let Some(err) = error {
2784 return Err(err);
2785 }
2786
2787 for values in &expr_values {
2788 if values.len() != row_ids.len() {
2789 return Err(Error::InvalidArgumentError(
2790 "expression result count did not match targeted row count".into(),
2791 ));
2792 }
2793 }
2794
2795 Ok((row_ids, expr_values))
2796 }
2797
2798 fn collect_expression_values(
2799 expr_values: &mut [Vec<PlanValue>],
2800 batch: RecordBatch,
2801 ) -> Result<()> {
2802 for row_idx in 0..batch.num_rows() {
2803 for (expr_index, values) in expr_values.iter_mut().enumerate() {
2804 let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
2805 values.push(value);
2806 }
2807 }
2808
2809 Ok(())
2810 }
2811
2812 fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
2813 {
2815 let tables = self.tables.read().unwrap();
2816 if let Some(table) = tables.get(canonical_name) {
2817 tracing::trace!(
2818 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
2819 canonical_name,
2820 table.table.table_id(),
2821 table.schema.columns.len(),
2822 &*self.pager
2823 );
2824 return Ok(Arc::clone(table));
2825 }
2826 } tracing::debug!(
2830 "[LAZY_LOAD] Loading table '{}' from catalog",
2831 canonical_name
2832 );
2833
2834 let _catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
2836 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
2837 })?;
2838
2839 let store = ColumnStore::open(Arc::clone(&self.pager))?;
2840 let catalog = SysCatalog::new(&store);
2841
2842 let all_metas = catalog.all_table_metas()?;
2844 let (table_id, _meta) = all_metas
2845 .iter()
2846 .find(|(_, meta)| {
2847 meta.name
2848 .as_ref()
2849 .map(|n| n.to_ascii_lowercase() == canonical_name)
2850 .unwrap_or(false)
2851 })
2852 .ok_or_else(|| {
2853 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
2854 })?;
2855
2856 let table = Table::new(*table_id, Arc::clone(&self.pager))?;
2858 let schema = table.schema()?;
2859
2860 let mut executor_columns = Vec::new();
2862 let mut lookup =
2863 FxHashMap::with_capacity_and_hasher(schema.fields().len(), Default::default());
2864
2865 for (idx, field) in schema.fields().iter().enumerate().skip(1) {
2866 let field_id = field
2868 .metadata()
2869 .get(llkv_table::constants::FIELD_ID_META_KEY)
2870 .and_then(|s| s.parse::<FieldId>().ok())
2871 .unwrap_or(idx as FieldId);
2872
2873 let normalized = field.name().to_ascii_lowercase();
2874 let col_idx = executor_columns.len();
2875 lookup.insert(normalized, col_idx);
2876
2877 executor_columns.push(ExecutorColumn {
2878 name: field.name().to_string(),
2879 data_type: field.data_type().clone(),
2880 nullable: field.is_nullable(),
2881 primary_key: false, field_id,
2883 });
2884 }
2885
2886 let exec_schema = Arc::new(ExecutorSchema {
2887 columns: executor_columns,
2888 lookup,
2889 });
2890
2891 let max_row_id = {
2893 use arrow::array::UInt64Array;
2894 use llkv_column_map::store::rowid_fid;
2895 use llkv_column_map::store::scan::{
2896 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
2897 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
2898 };
2899
2900 struct MaxRowIdVisitor {
2901 max: RowId,
2902 }
2903
2904 impl PrimitiveVisitor for MaxRowIdVisitor {
2905 fn u64_chunk(&mut self, values: &UInt64Array) {
2906 for i in 0..values.len() {
2907 let val = values.value(i);
2908 if val > self.max {
2909 self.max = val;
2910 }
2911 }
2912 }
2913 }
2914
2915 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
2916 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
2917 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
2918
2919 let row_id_field = rowid_fid(LogicalFieldId::for_user(*table_id, 1));
2921 let mut visitor = MaxRowIdVisitor { max: 0 };
2922
2923 match ScanBuilder::new(table.store(), row_id_field)
2924 .options(ScanOptions::default())
2925 .run(&mut visitor)
2926 {
2927 Ok(_) => visitor.max,
2928 Err(llkv_result::Error::NotFound) => 0,
2929 Err(e) => {
2930 tracing::warn!(
2931 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
2932 canonical_name,
2933 e
2934 );
2935 0
2936 }
2937 }
2938 };
2939
2940 let next_row_id = if max_row_id > 0 {
2941 max_row_id.saturating_add(1)
2942 } else {
2943 0
2944 };
2945
2946 let total_rows = table.total_rows().unwrap_or(0);
2950
2951 let executor_table = Arc::new(ExecutorTable {
2952 table: Arc::new(table),
2953 schema: exec_schema,
2954 next_row_id: AtomicU64::new(next_row_id),
2955 total_rows: AtomicU64::new(total_rows),
2956 });
2957
2958 {
2960 let mut tables = self.tables.write().unwrap();
2961 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
2962 }
2963
2964 if let Some(field_resolver) = self.catalog.field_resolver(_catalog_table_id) {
2966 for col in &executor_table.schema.columns {
2967 let _ = field_resolver.register_field(&col.name); }
2969 tracing::debug!(
2970 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
2971 executor_table.schema.columns.len(),
2972 canonical_name
2973 );
2974 }
2975
2976 tracing::debug!(
2977 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
2978 canonical_name,
2979 table_id,
2980 schema.fields().len() - 1,
2981 next_row_id
2982 );
2983
2984 Ok(executor_table)
2985 }
2986
2987 fn remove_table_entry(&self, canonical_name: &str) {
2988 let mut tables = self.tables.write().unwrap();
2989 if tables.remove(canonical_name).is_some() {
2990 tracing::trace!(
2991 "remove_table_entry: removed table '{}' from context cache",
2992 canonical_name
2993 );
2994 }
2995 }
2996
2997 pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
2998 let (display_name, canonical_name) = canonical_table_name(name)?;
2999 let tables = self.tables.read().unwrap();
3000 if !tables.contains_key(&canonical_name) {
3001 if if_exists {
3002 return Ok(());
3003 } else {
3004 return Err(Error::CatalogError(format!(
3005 "Catalog Error: Table '{}' does not exist",
3006 display_name
3007 )));
3008 }
3009 }
3010 drop(tables);
3011
3012 self.catalog.unregister_table(&canonical_name);
3014 tracing::debug!(
3015 "[CATALOG] Unregistered table '{}' from catalog",
3016 canonical_name
3017 );
3018
3019 self.dropped_tables.write().unwrap().insert(canonical_name);
3020 Ok(())
3021 }
3022
3023 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
3024 self.dropped_tables.read().unwrap().contains(canonical_name)
3025 }
3026
3027 fn reserve_table_id(&self) -> Result<TableId> {
3028 let store = ColumnStore::open(Arc::clone(&self.pager))?;
3029 let catalog = SysCatalog::new(&store);
3030
3031 let mut next = match catalog.get_next_table_id()? {
3032 Some(value) => value,
3033 None => {
3034 let seed = catalog.max_table_id()?.unwrap_or(CATALOG_TABLE_ID);
3035 let initial = seed.checked_add(1).ok_or_else(|| {
3036 Error::InvalidArgumentError("exhausted available table ids".into())
3037 })?;
3038 catalog.put_next_table_id(initial)?;
3039 initial
3040 }
3041 };
3042
3043 while llkv_table::reserved::is_reserved_table_id(next) {
3045 next = next.checked_add(1).ok_or_else(|| {
3046 Error::InvalidArgumentError("exhausted available table ids".into())
3047 })?;
3048 }
3049
3050 let mut following = next
3051 .checked_add(1)
3052 .ok_or_else(|| Error::InvalidArgumentError("exhausted available table ids".into()))?;
3053
3054 while llkv_table::reserved::is_reserved_table_id(following) {
3056 following = following.checked_add(1).ok_or_else(|| {
3057 Error::InvalidArgumentError("exhausted available table ids".into())
3058 })?;
3059 }
3060
3061 catalog.put_next_table_id(following)?;
3062 Ok(next)
3063 }
3064}
3065
3066impl<P> TransactionContext for RuntimeContextWrapper<P>
3068where
3069 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3070{
3071 type Pager = P;
3072
3073 fn set_snapshot(&self, snapshot: TransactionSnapshot) {
3074 self.update_snapshot(snapshot);
3075 }
3076
3077 fn snapshot(&self) -> TransactionSnapshot {
3078 self.current_snapshot()
3079 }
3080
3081 fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
3082 RuntimeContext::table_column_specs(self.context(), table_name)
3083 }
3084
3085 fn export_table_rows(
3086 &self,
3087 table_name: &str,
3088 ) -> llkv_result::Result<llkv_transaction::RowBatch> {
3089 let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
3090 Ok(llkv_transaction::RowBatch {
3092 columns: batch.columns,
3093 rows: batch.rows,
3094 })
3095 }
3096
3097 fn get_batches_with_row_ids(
3098 &self,
3099 table_name: &str,
3100 filter: Option<LlkvExpr<'static, String>>,
3101 ) -> llkv_result::Result<Vec<RecordBatch>> {
3102 RuntimeContext::get_batches_with_row_ids_with_snapshot(
3103 self.context(),
3104 table_name,
3105 filter,
3106 self.snapshot(),
3107 )
3108 }
3109
3110 fn execute_select(
3111 &self,
3112 plan: SelectPlan,
3113 ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
3114 RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
3115 }
3116
3117 fn create_table_plan(
3118 &self,
3119 plan: CreateTablePlan,
3120 ) -> llkv_result::Result<TransactionResult<P>> {
3121 let result = RuntimeContext::create_table_plan(self.context(), plan)?;
3122 Ok(convert_statement_result(result))
3123 }
3124
3125 fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
3126 tracing::trace!(
3127 "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
3128 plan.table,
3129 &*self.ctx.pager
3130 );
3131 let snapshot = self.current_snapshot();
3132 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3133 self.ctx().insert(plan)?
3134 } else {
3135 RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
3136 };
3137 Ok(convert_statement_result(result))
3138 }
3139
3140 fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
3141 let snapshot = self.current_snapshot();
3142 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3143 self.ctx().update(plan)?
3144 } else {
3145 RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
3146 };
3147 Ok(convert_statement_result(result))
3148 }
3149
3150 fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
3151 let snapshot = self.current_snapshot();
3152 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
3153 self.ctx().delete(plan)?
3154 } else {
3155 RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
3156 };
3157 Ok(convert_statement_result(result))
3158 }
3159
3160 fn append_batches_with_row_ids(
3161 &self,
3162 table_name: &str,
3163 batches: Vec<RecordBatch>,
3164 ) -> llkv_result::Result<usize> {
3165 RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
3166 }
3167
3168 fn table_names(&self) -> Vec<String> {
3169 RuntimeContext::table_names(self.context())
3170 }
3171
3172 fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
3173 let ctx = self.context();
3176 if ctx.is_table_marked_dropped(table_name) {
3177 return Err(Error::InvalidArgumentError(format!(
3178 "table '{}' has been dropped",
3179 table_name
3180 )));
3181 }
3182
3183 let table = ctx.lookup_table(table_name)?;
3184 Ok(table.table.table_id())
3185 }
3186
3187 fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
3188 let ctx = self.context();
3189 ctx.catalog.snapshot()
3190 }
3191}
3192
3193fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
3195where
3196 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3197{
3198 use llkv_transaction::TransactionResult as TxResult;
3199 match result {
3200 RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
3201 RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
3202 RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
3203 rows_matched: rows_updated,
3204 rows_updated,
3205 },
3206 RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
3207 RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
3208 _ => panic!("unsupported StatementResult conversion"),
3209 }
3210}
3211
3212fn filter_row_ids_for_snapshot<P>(
3213 store: &ColumnStore<P>,
3214 table_id: TableId,
3215 row_ids: Vec<u64>,
3216 txn_manager: &TxnIdManager,
3217 snapshot: TransactionSnapshot,
3218) -> Result<Vec<u64>>
3219where
3220 P: Pager<Blob = EntryHandle> + Send + Sync,
3221{
3222 tracing::debug!(
3223 "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
3224 row_ids.len(),
3225 snapshot.txn_id,
3226 snapshot.snapshot_id
3227 );
3228
3229 if row_ids.is_empty() {
3230 return Ok(row_ids);
3231 }
3232
3233 let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
3234 let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3235
3236 let version_batch = match store.gather_rows(
3237 &[created_lfid, deleted_lfid],
3238 &row_ids,
3239 GatherNullPolicy::IncludeNulls,
3240 ) {
3241 Ok(batch) => batch,
3242 Err(Error::NotFound) => {
3243 tracing::trace!(
3244 "[FILTER_ROWS] gather_rows returned NotFound for MVCC columns, treating all {} rows as visible (committed)",
3245 row_ids.len()
3246 );
3247 return Ok(row_ids);
3248 }
3249 Err(err) => {
3250 tracing::error!("[FILTER_ROWS] gather_rows error: {:?}", err);
3251 return Err(err);
3252 }
3253 };
3254
3255 if version_batch.num_columns() < 2 {
3256 tracing::debug!(
3257 "[FILTER_ROWS] version_batch has < 2 columns, returning all {} rows",
3258 row_ids.len()
3259 );
3260 return Ok(row_ids);
3261 }
3262
3263 let created_column = version_batch
3264 .column(0)
3265 .as_any()
3266 .downcast_ref::<UInt64Array>();
3267 let deleted_column = version_batch
3268 .column(1)
3269 .as_any()
3270 .downcast_ref::<UInt64Array>();
3271
3272 if created_column.is_none() || deleted_column.is_none() {
3273 tracing::debug!(
3274 "[FILTER_ROWS] Failed to downcast columns, returning all {} rows",
3275 row_ids.len()
3276 );
3277 return Ok(row_ids);
3278 }
3279
3280 let created_column = created_column.unwrap();
3281 let deleted_column = deleted_column.unwrap();
3282
3283 let mut visible = Vec::with_capacity(row_ids.len());
3284 for (idx, row_id) in row_ids.iter().enumerate() {
3285 let created_by = if created_column.is_null(idx) {
3286 TXN_ID_AUTO_COMMIT
3287 } else {
3288 created_column.value(idx)
3289 };
3290 let deleted_by = if deleted_column.is_null(idx) {
3291 TXN_ID_NONE
3292 } else {
3293 deleted_column.value(idx)
3294 };
3295
3296 let version = RowVersion {
3297 created_by,
3298 deleted_by,
3299 };
3300 let is_visible = version.is_visible_for(txn_manager, snapshot);
3301 tracing::trace!(
3302 "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
3303 row_id,
3304 created_by,
3305 deleted_by,
3306 is_visible
3307 );
3308 if is_visible {
3309 visible.push(*row_id);
3310 }
3311 }
3312
3313 tracing::debug!(
3314 "[FILTER_ROWS] Filtered from {} to {} visible rows",
3315 row_ids.len(),
3316 visible.len()
3317 );
3318 Ok(visible)
3319}
3320
3321struct MvccRowIdFilter<P>
3322where
3323 P: Pager<Blob = EntryHandle> + Send + Sync,
3324{
3325 txn_manager: Arc<TxnIdManager>,
3326 snapshot: TransactionSnapshot,
3327 _marker: PhantomData<fn(P)>,
3328}
3329
3330impl<P> MvccRowIdFilter<P>
3331where
3332 P: Pager<Blob = EntryHandle> + Send + Sync,
3333{
3334 fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
3335 Self {
3336 txn_manager,
3337 snapshot,
3338 _marker: PhantomData,
3339 }
3340 }
3341}
3342
3343impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
3344where
3345 P: Pager<Blob = EntryHandle> + Send + Sync,
3346{
3347 fn filter(&self, table: &Table<P>, row_ids: Vec<u64>) -> Result<Vec<u64>> {
3348 tracing::trace!(
3349 "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
3350 row_ids,
3351 self.snapshot.txn_id,
3352 self.snapshot.snapshot_id
3353 );
3354 let result = filter_row_ids_for_snapshot(
3355 table.store(),
3356 table.table_id(),
3357 row_ids,
3358 &self.txn_manager,
3359 self.snapshot,
3360 );
3361 if let Ok(ref visible) = result {
3362 tracing::trace!(
3363 "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
3364 visible
3365 );
3366 }
3367 result
3368 }
3369}
3370
3371struct ContextProvider<P>
3373where
3374 P: Pager<Blob = EntryHandle> + Send + Sync,
3375{
3376 context: Arc<RuntimeContext<P>>,
3377}
3378
3379impl<P> TableProvider<P> for ContextProvider<P>
3380where
3381 P: Pager<Blob = EntryHandle> + Send + Sync,
3382{
3383 fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
3384 self.context.lookup_table(canonical_name)
3385 }
3386}
3387
3388pub struct RuntimeLazyFrame<P>
3390where
3391 P: Pager<Blob = EntryHandle> + Send + Sync,
3392{
3393 context: Arc<RuntimeContext<P>>,
3394 plan: SelectPlan,
3395}
3396
3397impl<P> RuntimeLazyFrame<P>
3398where
3399 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
3400{
3401 pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
3402 let (display, canonical) = canonical_table_name(table)?;
3403 context.lookup_table(&canonical)?;
3404 Ok(Self {
3405 context,
3406 plan: SelectPlan::new(display),
3407 })
3408 }
3409
3410 pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
3411 self.plan.filter = Some(predicate);
3412 self
3413 }
3414
3415 pub fn select_all(mut self) -> Self {
3416 self.plan.projections = vec![SelectProjection::AllColumns];
3417 self
3418 }
3419
3420 pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
3421 where
3422 S: AsRef<str>,
3423 {
3424 self.plan.projections = columns
3425 .into_iter()
3426 .map(|name| SelectProjection::Column {
3427 name: name.as_ref().to_string(),
3428 alias: None,
3429 })
3430 .collect();
3431 self
3432 }
3433
3434 pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
3435 self.plan.projections = projections;
3436 self
3437 }
3438
3439 pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
3440 self.plan.aggregates = aggregates;
3441 self
3442 }
3443
3444 pub fn collect(self) -> Result<SelectExecution<P>> {
3445 self.context.execute_select(self.plan)
3446 }
3447
3448 pub fn collect_rows(self) -> Result<RowBatch> {
3449 let execution = self.context.execute_select(self.plan)?;
3450 execution.collect_rows()
3451 }
3452
3453 pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
3454 Ok(self.collect_rows()?.rows)
3455 }
3456}
3457
3458pub fn canonical_table_name(name: &str) -> Result<(String, String)> {
3459 if name.is_empty() {
3460 return Err(Error::InvalidArgumentError(
3461 "table name must not be empty".into(),
3462 ));
3463 }
3464 let display = name.to_string();
3465 let canonical = display.to_ascii_lowercase();
3466 Ok((display, canonical))
3467}
3468
3469fn current_time_micros() -> u64 {
3470 SystemTime::now()
3471 .duration_since(UNIX_EPOCH)
3472 .unwrap_or_default()
3473 .as_micros() as u64
3474}
3475
3476pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
3477 if columns.is_empty() {
3478 return Ok((0..schema.columns.len()).collect());
3479 }
3480 let mut resolved = Vec::with_capacity(columns.len());
3481 for column in columns {
3482 let normalized = column.to_ascii_lowercase();
3483 let index = schema
3484 .lookup
3485 .get(&normalized)
3486 .ok_or_else(|| Error::InvalidArgumentError(format!("unknown column '{}'", column)))?;
3487 resolved.push(*index);
3488 }
3489 Ok(resolved)
3490}
3491
3492pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
3493 match dtype {
3494 DataType::Int64 => {
3495 let mut builder = Int64Builder::with_capacity(values.len());
3496 for value in values {
3497 match value {
3498 PlanValue::Null => builder.append_null(),
3499 PlanValue::Integer(v) => builder.append_value(*v),
3500 PlanValue::Float(v) => builder.append_value(*v as i64),
3501 PlanValue::String(_) => {
3502 return Err(Error::InvalidArgumentError(
3503 "cannot insert string into INT column".into(),
3504 ));
3505 }
3506 }
3507 }
3508 Ok(Arc::new(builder.finish()))
3509 }
3510 DataType::Float64 => {
3511 let mut builder = Float64Builder::with_capacity(values.len());
3512 for value in values {
3513 match value {
3514 PlanValue::Null => builder.append_null(),
3515 PlanValue::Integer(v) => builder.append_value(*v as f64),
3516 PlanValue::Float(v) => builder.append_value(*v),
3517 PlanValue::String(_) => {
3518 return Err(Error::InvalidArgumentError(
3519 "cannot insert string into DOUBLE column".into(),
3520 ));
3521 }
3522 }
3523 }
3524 Ok(Arc::new(builder.finish()))
3525 }
3526 DataType::Utf8 => {
3527 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
3528 for value in values {
3529 match value {
3530 PlanValue::Null => builder.append_null(),
3531 PlanValue::Integer(v) => builder.append_value(v.to_string()),
3532 PlanValue::Float(v) => builder.append_value(v.to_string()),
3533 PlanValue::String(s) => builder.append_value(s),
3534 }
3535 }
3536 Ok(Arc::new(builder.finish()))
3537 }
3538 DataType::Date32 => {
3539 let mut builder = Date32Builder::with_capacity(values.len());
3540 for value in values {
3541 match value {
3542 PlanValue::Null => builder.append_null(),
3543 PlanValue::Integer(days) => {
3544 let casted = i32::try_from(*days).map_err(|_| {
3545 Error::InvalidArgumentError(
3546 "integer literal out of range for DATE column".into(),
3547 )
3548 })?;
3549 builder.append_value(casted);
3550 }
3551 PlanValue::Float(_) => {
3552 return Err(Error::InvalidArgumentError(
3553 "cannot insert float into DATE column".into(),
3554 ));
3555 }
3556 PlanValue::String(text) => {
3557 let days = parse_date32_literal(text)?;
3558 builder.append_value(days);
3559 }
3560 }
3561 }
3562 Ok(Arc::new(builder.finish()))
3563 }
3564 other => Err(Error::InvalidArgumentError(format!(
3565 "unsupported Arrow data type for INSERT: {other:?}"
3566 ))),
3567 }
3568}
3569
3570fn parse_date32_literal(text: &str) -> Result<i32> {
3571 let mut parts = text.split('-');
3572 let year_str = parts
3573 .next()
3574 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3575 let month_str = parts
3576 .next()
3577 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3578 let day_str = parts
3579 .next()
3580 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
3581 if parts.next().is_some() {
3582 return Err(Error::InvalidArgumentError(format!(
3583 "invalid DATE literal '{text}'"
3584 )));
3585 }
3586
3587 let year = year_str.parse::<i32>().map_err(|_| {
3588 Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
3589 })?;
3590 let month_num = month_str.parse::<u8>().map_err(|_| {
3591 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
3592 })?;
3593 let day = day_str.parse::<u8>().map_err(|_| {
3594 Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
3595 })?;
3596
3597 let month = Month::try_from(month_num).map_err(|_| {
3598 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
3599 })?;
3600
3601 let date = Date::from_calendar_date(year, month, day).map_err(|err| {
3602 Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
3603 })?;
3604 let days = date.to_julian_day() - epoch_julian_day();
3605 Ok(days)
3606}
3607
3608fn epoch_julian_day() -> i32 {
3609 Date::from_calendar_date(1970, Month::January, 1)
3610 .expect("1970-01-01 is a valid date")
3611 .to_julian_day()
3612}
3613
3614fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
3617 LlkvExpr::Pred(Filter {
3618 field_id,
3619 op: Operator::Range {
3620 lower: Bound::Unbounded,
3621 upper: Bound::Unbounded,
3622 },
3623 })
3624}
3625
3626fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
3627 if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
3628 return Ok(ROW_ID_FIELD_ID);
3629 }
3630
3631 schema
3632 .resolve(name)
3633 .map(|column| column.field_id)
3634 .ok_or_else(|| {
3635 Error::InvalidArgumentError(format!("unknown column '{name}' in expression"))
3636 })
3637}
3638
3639fn translate_predicate(
3640 expr: LlkvExpr<'static, String>,
3641 schema: &ExecutorSchema,
3642) -> Result<LlkvExpr<'static, FieldId>> {
3643 match expr {
3644 LlkvExpr::And(list) => {
3645 let mut converted = Vec::with_capacity(list.len());
3646 for item in list {
3647 converted.push(translate_predicate(item, schema)?);
3648 }
3649 Ok(LlkvExpr::And(converted))
3650 }
3651 LlkvExpr::Or(list) => {
3652 let mut converted = Vec::with_capacity(list.len());
3653 for item in list {
3654 converted.push(translate_predicate(item, schema)?);
3655 }
3656 Ok(LlkvExpr::Or(converted))
3657 }
3658 LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
3659 *inner, schema,
3660 )?))),
3661 LlkvExpr::Pred(Filter { field_id, op }) => {
3662 let resolved = resolve_field_id_from_schema(schema, &field_id)?;
3663 Ok(LlkvExpr::Pred(Filter {
3664 field_id: resolved,
3665 op,
3666 }))
3667 }
3668 LlkvExpr::Compare { left, op, right } => {
3669 let left = translate_scalar(&left, schema)?;
3670 let right = translate_scalar(&right, schema)?;
3671 Ok(LlkvExpr::Compare { left, op, right })
3672 }
3673 }
3674}
3675
3676fn translate_scalar(
3677 expr: &ScalarExpr<String>,
3678 schema: &ExecutorSchema,
3679) -> Result<ScalarExpr<FieldId>> {
3680 match expr {
3681 ScalarExpr::Column(name) => {
3682 let field_id = resolve_field_id_from_schema(schema, name)?;
3683 Ok(ScalarExpr::column(field_id))
3684 }
3685 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
3686 ScalarExpr::Binary { left, op, right } => {
3687 let left_expr = translate_scalar(left, schema)?;
3688 let right_expr = translate_scalar(right, schema)?;
3689 Ok(ScalarExpr::Binary {
3690 left: Box::new(left_expr),
3691 op: *op,
3692 right: Box::new(right_expr),
3693 })
3694 }
3695 ScalarExpr::Aggregate(agg) => {
3696 use llkv_expr::expr::AggregateCall;
3698 let translated_agg = match agg {
3699 AggregateCall::CountStar => AggregateCall::CountStar,
3700 AggregateCall::Count(name) => {
3701 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3702 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3703 })?;
3704 AggregateCall::Count(field_id)
3705 }
3706 AggregateCall::Sum(name) => {
3707 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3708 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3709 })?;
3710 AggregateCall::Sum(field_id)
3711 }
3712 AggregateCall::Min(name) => {
3713 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3714 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3715 })?;
3716 AggregateCall::Min(field_id)
3717 }
3718 AggregateCall::Max(name) => {
3719 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3720 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3721 })?;
3722 AggregateCall::Max(field_id)
3723 }
3724 AggregateCall::CountNulls(name) => {
3725 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
3726 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
3727 })?;
3728 AggregateCall::CountNulls(field_id)
3729 }
3730 };
3731 Ok(ScalarExpr::Aggregate(translated_agg))
3732 }
3733 }
3734}
3735
3736fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
3737 match expr {
3738 SqlExpr::Value(value) => plan_value_from_sql_value(value),
3739 SqlExpr::UnaryOp {
3740 op: UnaryOperator::Minus,
3741 expr,
3742 } => match plan_value_from_sql_expr(expr)? {
3743 PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
3744 PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
3745 PlanValue::Null | PlanValue::String(_) => Err(Error::InvalidArgumentError(
3746 "cannot negate non-numeric literal".into(),
3747 )),
3748 },
3749 SqlExpr::UnaryOp {
3750 op: UnaryOperator::Plus,
3751 expr,
3752 } => plan_value_from_sql_expr(expr),
3753 SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
3754 other => Err(Error::InvalidArgumentError(format!(
3755 "unsupported literal expression: {other:?}"
3756 ))),
3757 }
3758}
3759
3760fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
3761 match &value.value {
3762 Value::Null => Ok(PlanValue::Null),
3763 Value::Number(text, _) => {
3764 if text.contains(['.', 'e', 'E']) {
3765 let parsed = text.parse::<f64>().map_err(|err| {
3766 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3767 })?;
3768 Ok(PlanValue::Float(parsed))
3769 } else {
3770 let parsed = text.parse::<i64>().map_err(|err| {
3771 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3772 })?;
3773 Ok(PlanValue::Integer(parsed))
3774 }
3775 }
3776 Value::Boolean(_) => Err(Error::InvalidArgumentError(
3777 "BOOLEAN literals are not supported yet".into(),
3778 )),
3779 other => {
3780 if let Some(text) = other.clone().into_string() {
3781 Ok(PlanValue::String(text))
3782 } else {
3783 Err(Error::InvalidArgumentError(format!(
3784 "unsupported literal: {other:?}"
3785 )))
3786 }
3787 }
3788 }
3789}
3790
3791fn group_by_is_empty(expr: &GroupByExpr) -> bool {
3792 matches!(
3793 expr,
3794 GroupByExpr::Expressions(exprs, modifiers)
3795 if exprs.is_empty() && modifiers.is_empty()
3796 )
3797}
3798
3799#[derive(Clone)]
3800pub struct RuntimeRangeSelectRows {
3801 rows: Vec<Vec<PlanValue>>,
3802}
3803
3804impl RuntimeRangeSelectRows {
3805 pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
3806 self.rows
3807 }
3808}
3809
3810#[derive(Clone)]
3811enum RangeProjection {
3812 Column,
3813 Literal(PlanValue),
3814}
3815
3816#[derive(Clone)]
3817pub struct RuntimeRangeSpec {
3818 start: i64,
3819 #[allow(dead_code)] end: i64,
3821 row_count: usize,
3822 column_name_lower: String,
3823 table_alias_lower: Option<String>,
3824}
3825
3826impl RuntimeRangeSpec {
3827 fn matches_identifier(&self, ident: &str) -> bool {
3828 let lower = ident.to_ascii_lowercase();
3829 lower == self.column_name_lower || lower == "range"
3830 }
3831
3832 fn matches_table_alias(&self, ident: &str) -> bool {
3833 let lower = ident.to_ascii_lowercase();
3834 match &self.table_alias_lower {
3835 Some(alias) => lower == *alias,
3836 None => lower == "range",
3837 }
3838 }
3839
3840 fn matches_object_name(&self, name: &ObjectName) -> bool {
3841 if name.0.len() != 1 {
3842 return false;
3843 }
3844 match &name.0[0] {
3845 ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
3846 _ => false,
3847 }
3848 }
3849}
3850
3851pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
3852 let spec = match parse_range_spec(select)? {
3853 Some(spec) => spec,
3854 None => return Ok(None),
3855 };
3856
3857 if select.selection.is_some() {
3858 return Err(Error::InvalidArgumentError(
3859 "WHERE clauses are not supported for range() SELECT statements".into(),
3860 ));
3861 }
3862 if select.having.is_some()
3863 || !select.named_window.is_empty()
3864 || select.qualify.is_some()
3865 || select.distinct.is_some()
3866 || select.top.is_some()
3867 || select.into.is_some()
3868 || select.prewhere.is_some()
3869 || !select.lateral_views.is_empty()
3870 || select.value_table_mode.is_some()
3871 || !group_by_is_empty(&select.group_by)
3872 {
3873 return Err(Error::InvalidArgumentError(
3874 "advanced SELECT clauses are not supported for range() SELECT statements".into(),
3875 ));
3876 }
3877
3878 let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
3879
3880 if select.projection.is_empty() {
3882 projections.push(RangeProjection::Column);
3883 } else {
3884 for item in &select.projection {
3885 let projection = match item {
3886 SelectItem::Wildcard(_) => RangeProjection::Column,
3887 SelectItem::QualifiedWildcard(kind, _) => match kind {
3888 SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
3889 if spec.matches_object_name(object_name) {
3890 RangeProjection::Column
3891 } else {
3892 return Err(Error::InvalidArgumentError(
3893 "qualified wildcard must reference the range() source".into(),
3894 ));
3895 }
3896 }
3897 SelectItemQualifiedWildcardKind::Expr(_) => {
3898 return Err(Error::InvalidArgumentError(
3899 "expression-qualified wildcards are not supported for range() SELECT statements".into(),
3900 ));
3901 }
3902 },
3903 SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
3904 SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
3905 };
3906 projections.push(projection);
3907 }
3908 }
3909
3910 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
3911 for idx in 0..spec.row_count {
3912 let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
3913 let value = spec.start + (idx as i64);
3914 for projection in &projections {
3915 match projection {
3916 RangeProjection::Column => row.push(PlanValue::Integer(value)),
3917 RangeProjection::Literal(value) => row.push(value.clone()),
3918 }
3919 }
3920 rows.push(row);
3921 }
3922
3923 Ok(Some(RuntimeRangeSelectRows { rows }))
3924}
3925
3926fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
3927 match expr {
3928 SqlExpr::Identifier(ident) => {
3929 if spec.matches_identifier(&ident.value) {
3930 Ok(RangeProjection::Column)
3931 } else {
3932 Err(Error::InvalidArgumentError(format!(
3933 "unknown column '{}' in range() SELECT",
3934 ident.value
3935 )))
3936 }
3937 }
3938 SqlExpr::CompoundIdentifier(parts) => {
3939 if parts.len() == 2
3940 && spec.matches_table_alias(&parts[0].value)
3941 && spec.matches_identifier(&parts[1].value)
3942 {
3943 Ok(RangeProjection::Column)
3944 } else {
3945 Err(Error::InvalidArgumentError(
3946 "compound identifiers must reference the range() source".into(),
3947 ))
3948 }
3949 }
3950 SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
3951 other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
3952 }
3953}
3954
3955fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
3956 if select.from.len() != 1 {
3957 return Ok(None);
3958 }
3959 let item = &select.from[0];
3960 if !item.joins.is_empty() {
3961 return Err(Error::InvalidArgumentError(
3962 "JOIN clauses are not supported for range() SELECT statements".into(),
3963 ));
3964 }
3965
3966 match &item.relation {
3967 TableFactor::Function {
3968 lateral,
3969 name,
3970 args,
3971 alias,
3972 } => {
3973 if *lateral {
3974 return Err(Error::InvalidArgumentError(
3975 "LATERAL range() is not supported".into(),
3976 ));
3977 }
3978 parse_range_spec_from_args(name, args, alias)
3979 }
3980 TableFactor::Table {
3981 name,
3982 alias,
3983 args: Some(table_args),
3984 with_ordinality,
3985 ..
3986 } => {
3987 if *with_ordinality {
3988 return Err(Error::InvalidArgumentError(
3989 "WITH ORDINALITY is not supported for range()".into(),
3990 ));
3991 }
3992 if table_args.settings.is_some() {
3993 return Err(Error::InvalidArgumentError(
3994 "range() SETTINGS clause is not supported".into(),
3995 ));
3996 }
3997 parse_range_spec_from_args(name, &table_args.args, alias)
3998 }
3999 _ => Ok(None),
4000 }
4001}
4002
4003fn parse_range_spec_from_args(
4004 name: &ObjectName,
4005 args: &[FunctionArg],
4006 alias: &Option<TableAlias>,
4007) -> Result<Option<RuntimeRangeSpec>> {
4008 if name.0.len() != 1 {
4009 return Ok(None);
4010 }
4011 let func_name = match &name.0[0] {
4012 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
4013 _ => return Ok(None),
4014 };
4015 if func_name != "range" {
4016 return Ok(None);
4017 }
4018
4019 if args.is_empty() || args.len() > 2 {
4020 return Err(Error::InvalidArgumentError(
4021 "range() requires one or two arguments".into(),
4022 ));
4023 }
4024
4025 let extract_int = |arg: &FunctionArg| -> Result<i64> {
4027 let arg_expr = match arg {
4028 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4029 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
4030 | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
4031 return Err(Error::InvalidArgumentError(
4032 "range() argument must be an integer literal".into(),
4033 ));
4034 }
4035 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
4036 return Err(Error::InvalidArgumentError(
4037 "named arguments are not supported for range()".into(),
4038 ));
4039 }
4040 };
4041
4042 let value = plan_value_from_sql_expr(arg_expr)?;
4043 match value {
4044 PlanValue::Integer(v) => Ok(v),
4045 _ => Err(Error::InvalidArgumentError(
4046 "range() argument must be an integer literal".into(),
4047 )),
4048 }
4049 };
4050
4051 let (start, end, row_count) = if args.len() == 1 {
4052 let count = extract_int(&args[0])?;
4054 if count < 0 {
4055 return Err(Error::InvalidArgumentError(
4056 "range() argument must be non-negative".into(),
4057 ));
4058 }
4059 (0, count, count as usize)
4060 } else {
4061 let start = extract_int(&args[0])?;
4063 let end = extract_int(&args[1])?;
4064 if end < start {
4065 return Err(Error::InvalidArgumentError(
4066 "range() end must be >= start".into(),
4067 ));
4068 }
4069 let row_count = (end - start) as usize;
4070 (start, end, row_count)
4071 };
4072
4073 let column_name_lower = alias
4074 .as_ref()
4075 .and_then(|a| {
4076 a.columns
4077 .first()
4078 .map(|col| col.name.value.to_ascii_lowercase())
4079 })
4080 .unwrap_or_else(|| "range".to_string());
4081 let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
4082
4083 Ok(Some(RuntimeRangeSpec {
4084 start,
4085 end,
4086 row_count,
4087 column_name_lower,
4088 table_alias_lower,
4089 }))
4090}
4091
4092pub struct RuntimeCreateTableBuilder<'ctx, P>
4093where
4094 P: Pager<Blob = EntryHandle> + Send + Sync,
4095{
4096 ctx: &'ctx RuntimeContext<P>,
4097 plan: CreateTablePlan,
4098}
4099
4100impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
4101where
4102 P: Pager<Blob = EntryHandle> + Send + Sync,
4103{
4104 pub fn if_not_exists(mut self) -> Self {
4105 self.plan.if_not_exists = true;
4106 self
4107 }
4108
4109 pub fn or_replace(mut self) -> Self {
4110 self.plan.or_replace = true;
4111 self
4112 }
4113
4114 pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
4115 self.plan
4116 .columns
4117 .push(ColumnSpec::new(name.into(), data_type, true));
4118 self
4119 }
4120
4121 pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
4122 self.plan
4123 .columns
4124 .push(ColumnSpec::new(name.into(), data_type, false));
4125 self
4126 }
4127
4128 pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
4129 self.plan.columns.push(spec);
4130 self
4131 }
4132
4133 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
4134 self.ctx.execute_create_table(self.plan)
4135 }
4136}
4137
4138#[derive(Clone, Debug, Default)]
4139pub struct RuntimeRow {
4140 values: Vec<(String, PlanValue)>,
4141}
4142
4143impl RuntimeRow {
4144 pub fn new() -> Self {
4145 Self { values: Vec::new() }
4146 }
4147
4148 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
4149 self.set(name, value);
4150 self
4151 }
4152
4153 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
4154 let name = name.into();
4155 let value = value.into();
4156 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
4157 *existing = value;
4158 } else {
4159 self.values.push((name, value));
4160 }
4161 self
4162 }
4163
4164 fn columns(&self) -> Vec<String> {
4165 self.values.iter().map(|(n, _)| n.clone()).collect()
4166 }
4167
4168 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
4169 let mut out = Vec::with_capacity(columns.len());
4170 for column in columns {
4171 let value = self
4172 .values
4173 .iter()
4174 .find(|(name, _)| name == column)
4175 .ok_or_else(|| {
4176 Error::InvalidArgumentError(format!(
4177 "insert row missing value for column '{}'",
4178 column
4179 ))
4180 })?;
4181 out.push(value.1.clone());
4182 }
4183 Ok(out)
4184 }
4185}
4186
4187pub fn row() -> RuntimeRow {
4188 RuntimeRow::new()
4189}
4190
4191#[doc(hidden)]
4192pub enum RuntimeInsertRowKind {
4193 Named {
4194 columns: Vec<String>,
4195 values: Vec<PlanValue>,
4196 },
4197 Positional(Vec<PlanValue>),
4198}
4199
4200pub trait IntoInsertRow {
4201 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
4202}
4203
4204impl IntoInsertRow for RuntimeRow {
4205 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4206 let row = self;
4207 if row.values.is_empty() {
4208 return Err(Error::InvalidArgumentError(
4209 "insert requires at least one column".into(),
4210 ));
4211 }
4212 let columns = row.columns();
4213 let values = row.values_for_columns(&columns)?;
4214 Ok(RuntimeInsertRowKind::Named { columns, values })
4215 }
4216}
4217
4218impl<T> IntoInsertRow for Vec<T>
4223where
4224 T: Into<PlanValue>,
4225{
4226 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4227 if self.is_empty() {
4228 return Err(Error::InvalidArgumentError(
4229 "insert requires at least one column".into(),
4230 ));
4231 }
4232 Ok(RuntimeInsertRowKind::Positional(
4233 self.into_iter().map(Into::into).collect(),
4234 ))
4235 }
4236}
4237
4238impl<T, const N: usize> IntoInsertRow for [T; N]
4239where
4240 T: Into<PlanValue>,
4241{
4242 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4243 if N == 0 {
4244 return Err(Error::InvalidArgumentError(
4245 "insert requires at least one column".into(),
4246 ));
4247 }
4248 Ok(RuntimeInsertRowKind::Positional(
4249 self.into_iter().map(Into::into).collect(),
4250 ))
4251 }
4252}
4253
4254macro_rules! impl_into_insert_row_tuple {
4255 ($($type:ident => $value:ident),+) => {
4256 impl<$($type,)+> IntoInsertRow for ($($type,)+)
4257 where
4258 $($type: Into<PlanValue>,)+
4259 {
4260 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
4261 let ($($value,)+) = self;
4262 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
4263 }
4264 }
4265 };
4266}
4267
4268impl_into_insert_row_tuple!(T1 => v1);
4269impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
4270impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
4271impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
4272impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
4273impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
4274impl_into_insert_row_tuple!(
4275 T1 => v1,
4276 T2 => v2,
4277 T3 => v3,
4278 T4 => v4,
4279 T5 => v5,
4280 T6 => v6,
4281 T7 => v7
4282);
4283impl_into_insert_row_tuple!(
4284 T1 => v1,
4285 T2 => v2,
4286 T3 => v3,
4287 T4 => v4,
4288 T5 => v5,
4289 T6 => v6,
4290 T7 => v7,
4291 T8 => v8
4292);
4293
4294pub struct RuntimeTableHandle<P>
4295where
4296 P: Pager<Blob = EntryHandle> + Send + Sync,
4297{
4298 context: Arc<RuntimeContext<P>>,
4299 display_name: String,
4300 _canonical_name: String,
4301}
4302
4303impl<P> RuntimeTableHandle<P>
4304where
4305 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4306{
4307 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
4308 let (display_name, canonical_name) = canonical_table_name(name)?;
4309 context.lookup_table(&canonical_name)?;
4310 Ok(Self {
4311 context,
4312 display_name,
4313 _canonical_name: canonical_name,
4314 })
4315 }
4316
4317 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
4318 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
4319 }
4320
4321 pub fn insert_rows<R>(
4322 &self,
4323 rows: impl IntoIterator<Item = R>,
4324 ) -> Result<RuntimeStatementResult<P>>
4325 where
4326 R: IntoInsertRow,
4327 {
4328 enum InsertMode {
4329 Named,
4330 Positional,
4331 }
4332
4333 let table = self.context.lookup_table(&self._canonical_name)?;
4334 let schema = table.schema.as_ref();
4335 let schema_column_names: Vec<String> =
4336 schema.columns.iter().map(|col| col.name.clone()).collect();
4337 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
4338 let mut mode: Option<InsertMode> = None;
4339 let mut column_names: Option<Vec<String>> = None;
4340 let mut row_count = 0usize;
4341
4342 for row in rows.into_iter() {
4343 row_count += 1;
4344 match row.into_insert_row()? {
4345 RuntimeInsertRowKind::Named { columns, values } => {
4346 if let Some(existing) = &mode {
4347 if !matches!(existing, InsertMode::Named) {
4348 return Err(Error::InvalidArgumentError(
4349 "cannot mix positional and named insert rows".into(),
4350 ));
4351 }
4352 } else {
4353 mode = Some(InsertMode::Named);
4354 let mut seen =
4355 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
4356 for column in &columns {
4357 if !seen.insert(column.clone()) {
4358 return Err(Error::InvalidArgumentError(format!(
4359 "duplicate column '{}' in insert row",
4360 column
4361 )));
4362 }
4363 }
4364 column_names = Some(columns.clone());
4365 }
4366
4367 let expected = column_names
4368 .as_ref()
4369 .expect("column names must be initialized for named insert");
4370 if columns != *expected {
4371 return Err(Error::InvalidArgumentError(
4372 "insert rows must specify the same columns".into(),
4373 ));
4374 }
4375 if values.len() != expected.len() {
4376 return Err(Error::InvalidArgumentError(format!(
4377 "insert row expected {} values, found {}",
4378 expected.len(),
4379 values.len()
4380 )));
4381 }
4382 normalized_rows.push(values);
4383 }
4384 RuntimeInsertRowKind::Positional(values) => {
4385 if let Some(existing) = &mode {
4386 if !matches!(existing, InsertMode::Positional) {
4387 return Err(Error::InvalidArgumentError(
4388 "cannot mix positional and named insert rows".into(),
4389 ));
4390 }
4391 } else {
4392 mode = Some(InsertMode::Positional);
4393 column_names = Some(schema_column_names.clone());
4394 }
4395
4396 if values.len() != schema.columns.len() {
4397 return Err(Error::InvalidArgumentError(format!(
4398 "insert row expected {} values, found {}",
4399 schema.columns.len(),
4400 values.len()
4401 )));
4402 }
4403 normalized_rows.push(values);
4404 }
4405 }
4406 }
4407
4408 if row_count == 0 {
4409 return Err(Error::InvalidArgumentError(
4410 "insert requires at least one row".into(),
4411 ));
4412 }
4413
4414 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
4415 self.insert_row_batch(RowBatch {
4416 columns,
4417 rows: normalized_rows,
4418 })
4419 }
4420
4421 pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
4422 if batch.rows.is_empty() {
4423 return Err(Error::InvalidArgumentError(
4424 "insert requires at least one row".into(),
4425 ));
4426 }
4427 if batch.columns.is_empty() {
4428 return Err(Error::InvalidArgumentError(
4429 "insert requires at least one column".into(),
4430 ));
4431 }
4432 for row in &batch.rows {
4433 if row.len() != batch.columns.len() {
4434 return Err(Error::InvalidArgumentError(
4435 "insert rows must have values for every column".into(),
4436 ));
4437 }
4438 }
4439
4440 let plan = InsertPlan {
4441 table: self.display_name.clone(),
4442 columns: batch.columns,
4443 source: InsertSource::Rows(batch.rows),
4444 };
4445 self.context.insert(plan)
4446 }
4447
4448 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
4449 let plan = InsertPlan {
4450 table: self.display_name.clone(),
4451 columns: Vec::new(),
4452 source: InsertSource::Batches(batches),
4453 };
4454 self.context.insert(plan)
4455 }
4456
4457 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
4458 let RowBatch { columns, rows } = frame.collect_rows()?;
4459 self.insert_row_batch(RowBatch { columns, rows })
4460 }
4461
4462 pub fn name(&self) -> &str {
4463 &self.display_name
4464 }
4465}
4466
4467#[cfg(test)]
4468mod tests {
4469 use super::*;
4470 use arrow::array::{Array, Int64Array, StringArray};
4471 use llkv_storage::pager::MemPager;
4472 use std::sync::Arc;
4473
4474 #[test]
4475 fn create_insert_select_roundtrip() {
4476 let pager = Arc::new(MemPager::default());
4477 let context = Arc::new(RuntimeContext::new(pager));
4478
4479 let table = context
4480 .create_table(
4481 "people",
4482 [
4483 ("id", DataType::Int64, NotNull),
4484 ("name", DataType::Utf8, Nullable),
4485 ],
4486 )
4487 .expect("create table");
4488 table
4489 .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
4490 .expect("insert rows");
4491
4492 let execution = table.lazy().expect("lazy scan");
4493 let select = execution.collect().expect("build select execution");
4494 let batches = select.collect().expect("collect batches");
4495 assert_eq!(batches.len(), 1);
4496 let column = batches[0]
4497 .column(1)
4498 .as_any()
4499 .downcast_ref::<StringArray>()
4500 .expect("string column");
4501 assert_eq!(column.len(), 2);
4502 }
4503
4504 #[test]
4505 fn aggregate_count_nulls() {
4506 let pager = Arc::new(MemPager::default());
4507 let context = Arc::new(RuntimeContext::new(pager));
4508
4509 let table = context
4510 .create_table("ints", [("i", DataType::Int64)])
4511 .expect("create table");
4512 table
4513 .insert_rows([
4514 (PlanValue::Null,),
4515 (PlanValue::Integer(1),),
4516 (PlanValue::Null,),
4517 ])
4518 .expect("insert rows");
4519
4520 let plan =
4521 SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
4522 let execution = context.execute_select(plan).expect("select");
4523 let batches = execution.collect().expect("collect batches");
4524 let column = batches[0]
4525 .column(0)
4526 .as_any()
4527 .downcast_ref::<Int64Array>()
4528 .expect("int column");
4529 assert_eq!(column.value(0), 2);
4530 }
4531}