1#![forbid(unsafe_code)]
30
31pub mod storage_namespace;
32
33use std::fmt;
34use std::marker::PhantomData;
35use std::mem;
36use std::ops::Bound;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, RwLock};
39use std::time::{SystemTime, UNIX_EPOCH};
40
41use rustc_hash::{FxHashMap, FxHashSet};
42
43use arrow::array::{
44 Array, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
45 UInt64Array, UInt64Builder,
46};
47use arrow::datatypes::{DataType, Field, FieldRef, Schema};
48use arrow::record_batch::RecordBatch;
49use llkv_column_map::ColumnStore;
50use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
51use llkv_column_map::types::LogicalFieldId;
52use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
53use llkv_result::Error;
54use llkv_storage::pager::{BoxedPager, MemPager, Pager};
55use llkv_table::catalog::{FieldConstraints, FieldDefinition, TableCatalog};
56use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
57use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId};
58use llkv_table::{
59 CatalogManager, ConstraintColumnInfo, ConstraintService, CreateTableResult, ForeignKeyColumn,
60 ForeignKeyTableInfo, ForeignKeyView, InsertColumnConstraint, InsertMultiColumnUnique,
61 InsertUniqueColumn, MetadataManager, MultiColumnUniqueEntryMeta, MultiColumnUniqueRegistration,
62 SysCatalog, TableConstraintSummaryView, TableView, UniqueKey, build_composite_unique_key,
63 canonical_table_name, constraints::ConstraintKind, ensure_multi_column_unique,
64 ensure_single_column_unique,
65};
66use simd_r_drive_entry_handle::EntryHandle;
67use sqlparser::ast::{
68 Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
69 SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
70 ValueWithSpan,
71};
72use time::{Date, Month};
73
74pub type Result<T> = llkv_result::Result<T>;
75
76pub use llkv_plan::{
78 AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
79 ColumnSpec, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan, ForeignKeyAction,
80 ForeignKeySpec, IndexColumnPlan, InsertPlan, InsertSource, IntoColumnSpec, NotNull, Nullable,
81 OrderByPlan, OrderSortType, OrderTarget, PlanOperation, PlanStatement, PlanValue, SelectPlan,
82 SelectProjection, UpdatePlan,
83};
84
85use llkv_executor::{ExecutorColumn, ExecutorMultiColumnUnique, ExecutorSchema, ExecutorTable};
87pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
88
89use crate::storage_namespace::{
90 PersistentNamespace, StorageNamespace, StorageNamespaceRegistry, TemporaryNamespace,
91};
92
93pub use llkv_transaction::TransactionKind;
95use llkv_transaction::{
96 RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
97 TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
98};
99
100use llkv_transaction::TransactionSession;
102
103use llkv_table::mvcc;
106
107struct TableConstraintContext {
108 schema_field_ids: Vec<FieldId>,
109 column_constraints: Vec<InsertColumnConstraint>,
110 unique_columns: Vec<InsertUniqueColumn>,
111 multi_column_uniques: Vec<InsertMultiColumnUnique>,
112 primary_key: Option<InsertMultiColumnUnique>,
113}
114
115#[allow(clippy::large_enum_variant)]
117#[derive(Clone)]
118pub enum RuntimeStatementResult<P>
119where
120 P: Pager<Blob = EntryHandle> + Send + Sync,
121{
122 CreateTable {
123 table_name: String,
124 },
125 CreateIndex {
126 table_name: String,
127 index_name: Option<String>,
128 },
129 NoOp,
130 Insert {
131 table_name: String,
132 rows_inserted: usize,
133 },
134 Update {
135 table_name: String,
136 rows_updated: usize,
137 },
138 Delete {
139 table_name: String,
140 rows_deleted: usize,
141 },
142 Select {
143 table_name: String,
144 schema: Arc<Schema>,
145 execution: SelectExecution<P>,
146 },
147 Transaction {
148 kind: TransactionKind,
149 },
150}
151
152impl<P> fmt::Debug for RuntimeStatementResult<P>
153where
154 P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 match self {
158 RuntimeStatementResult::CreateTable { table_name } => f
159 .debug_struct("CreateTable")
160 .field("table_name", table_name)
161 .finish(),
162 RuntimeStatementResult::CreateIndex {
163 table_name,
164 index_name,
165 } => f
166 .debug_struct("CreateIndex")
167 .field("table_name", table_name)
168 .field("index_name", index_name)
169 .finish(),
170 RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
171 RuntimeStatementResult::Insert {
172 table_name,
173 rows_inserted,
174 } => f
175 .debug_struct("Insert")
176 .field("table_name", table_name)
177 .field("rows_inserted", rows_inserted)
178 .finish(),
179 RuntimeStatementResult::Update {
180 table_name,
181 rows_updated,
182 } => f
183 .debug_struct("Update")
184 .field("table_name", table_name)
185 .field("rows_updated", rows_updated)
186 .finish(),
187 RuntimeStatementResult::Delete {
188 table_name,
189 rows_deleted,
190 } => f
191 .debug_struct("Delete")
192 .field("table_name", table_name)
193 .field("rows_deleted", rows_deleted)
194 .finish(),
195 RuntimeStatementResult::Select {
196 table_name, schema, ..
197 } => f
198 .debug_struct("Select")
199 .field("table_name", table_name)
200 .field("schema", schema)
201 .finish(),
202 RuntimeStatementResult::Transaction { kind } => {
203 f.debug_struct("Transaction").field("kind", kind).finish()
204 }
205 }
206 }
207}
208
209impl<P> RuntimeStatementResult<P>
210where
211 P: Pager<Blob = EntryHandle> + Send + Sync,
212{
213 #[allow(dead_code)]
216 pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
217 where
218 Q: Pager<Blob = EntryHandle> + Send + Sync,
219 {
220 match self {
221 RuntimeStatementResult::CreateTable { table_name } => {
222 Ok(RuntimeStatementResult::CreateTable { table_name })
223 }
224 RuntimeStatementResult::CreateIndex {
225 table_name,
226 index_name,
227 } => Ok(RuntimeStatementResult::CreateIndex {
228 table_name,
229 index_name,
230 }),
231 RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
232 RuntimeStatementResult::Insert {
233 table_name,
234 rows_inserted,
235 } => Ok(RuntimeStatementResult::Insert {
236 table_name,
237 rows_inserted,
238 }),
239 RuntimeStatementResult::Update {
240 table_name,
241 rows_updated,
242 } => Ok(RuntimeStatementResult::Update {
243 table_name,
244 rows_updated,
245 }),
246 RuntimeStatementResult::Delete {
247 table_name,
248 rows_deleted,
249 } => Ok(RuntimeStatementResult::Delete {
250 table_name,
251 rows_deleted,
252 }),
253 RuntimeStatementResult::Transaction { kind } => {
254 Ok(RuntimeStatementResult::Transaction { kind })
255 }
256 RuntimeStatementResult::Select { .. } => Err(Error::Internal(
257 "Cannot convert SELECT result between pager types in transaction".into(),
258 )),
259 }
260 }
261}
262
263pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
270 match statement {
271 PlanStatement::CreateTable(plan) => Some(&plan.name),
272 PlanStatement::CreateIndex(plan) => Some(&plan.table),
273 PlanStatement::Insert(plan) => Some(&plan.table),
274 PlanStatement::Update(plan) => Some(&plan.table),
275 PlanStatement::Delete(plan) => Some(&plan.table),
276 PlanStatement::Select(plan) => {
277 if plan.tables.len() == 1 {
279 Some(&plan.tables[0].table)
280 } else {
281 None
282 }
283 }
284 PlanStatement::BeginTransaction
285 | PlanStatement::CommitTransaction
286 | PlanStatement::RollbackTransaction => None,
287 }
288}
289
290pub struct RuntimeContextWrapper<P>
309where
310 P: Pager<Blob = EntryHandle> + Send + Sync,
311{
312 ctx: Arc<RuntimeContext<P>>,
313 snapshot: RwLock<TransactionSnapshot>,
314}
315
316impl<P> RuntimeContextWrapper<P>
317where
318 P: Pager<Blob = EntryHandle> + Send + Sync,
319{
320 fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
321 let snapshot = ctx.default_snapshot();
322 Self {
323 ctx,
324 snapshot: RwLock::new(snapshot),
325 }
326 }
327
328 fn update_snapshot(&self, snapshot: TransactionSnapshot) {
329 let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
330 *guard = snapshot;
331 }
332
333 fn current_snapshot(&self) -> TransactionSnapshot {
334 *self.snapshot.read().expect("snapshot lock poisoned")
335 }
336
337 fn context(&self) -> &Arc<RuntimeContext<P>> {
338 &self.ctx
339 }
340
341 fn ctx(&self) -> &RuntimeContext<P> {
342 &self.ctx
343 }
344}
345
346struct SessionNamespaces<P>
347where
348 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
349{
350 persistent: Arc<PersistentNamespace<P>>,
351 temporary: Option<Arc<TemporaryNamespace<BoxedPager>>>,
352 registry: Arc<RwLock<StorageNamespaceRegistry>>,
353}
354
355impl<P> SessionNamespaces<P>
356where
357 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
358{
359 fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
360 let persistent = Arc::new(PersistentNamespace::new(
361 storage_namespace::PERSISTENT_NAMESPACE_ID.to_string(),
362 Arc::clone(&base_context),
363 ));
364
365 let mut registry = StorageNamespaceRegistry::new(persistent.namespace_id().clone());
366 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
367
368 let temporary = {
369 let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
370 let temp_context = Arc::new(RuntimeContext::new(temp_pager));
371 let namespace = Arc::new(TemporaryNamespace::new(
372 storage_namespace::TEMPORARY_NAMESPACE_ID.to_string(),
373 temp_context,
374 ));
375 registry.register_namespace(
376 Arc::clone(&namespace),
377 vec![storage_namespace::TEMPORARY_NAMESPACE_ID.to_string()],
378 true,
379 );
380 namespace
381 };
382
383 Self {
384 persistent,
385 temporary: Some(temporary),
386 registry: Arc::new(RwLock::new(registry)),
387 }
388 }
389
390 fn persistent(&self) -> Arc<PersistentNamespace<P>> {
391 Arc::clone(&self.persistent)
392 }
393
394 fn temporary(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
395 self.temporary.as_ref().map(Arc::clone)
396 }
397
398 fn registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
399 Arc::clone(&self.registry)
400 }
401}
402
403impl<P> Drop for SessionNamespaces<P>
404where
405 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
406{
407 fn drop(&mut self) {
408 if let Some(temp) = &self.temporary {
409 temp.clear_tables();
410 }
411 }
412}
413
414pub struct RuntimeSession<P>
419where
420 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
421{
422 inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
424 namespaces: Arc<SessionNamespaces<P>>,
425}
426
427impl<P> RuntimeSession<P>
428where
429 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
430{
431 pub(crate) fn clone_session(&self) -> Self {
434 Self {
435 inner: self.inner.clone_session(),
436 namespaces: self.namespaces.clone(),
437 }
438 }
439
440 pub fn namespace_registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
441 self.namespaces.registry()
442 }
443
444 fn resolve_namespace_for_table(&self, canonical: &str) -> storage_namespace::NamespaceId {
445 self.namespace_registry()
446 .read()
447 .expect("namespace registry poisoned")
448 .namespace_for_table(canonical)
449 }
450
451 fn namespace_for_select_plan(
452 &self,
453 plan: &SelectPlan,
454 ) -> Option<storage_namespace::NamespaceId> {
455 if plan.tables.len() != 1 {
456 return None;
457 }
458
459 let qualified = plan.tables[0].qualified_name();
460 let (_, canonical) = canonical_table_name(&qualified).ok()?;
461 Some(self.resolve_namespace_for_table(&canonical))
462 }
463
464 fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
465 let temp_namespace = self
466 .temporary_namespace()
467 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
468
469 let table_name = if plan.tables.len() == 1 {
470 plan.tables[0].qualified_name()
471 } else {
472 String::new()
473 };
474
475 let execution = temp_namespace.context().execute_select(plan.clone())?;
476 let schema = execution.schema();
477 let batches = execution.collect()?;
478
479 let combined = if batches.is_empty() {
480 RecordBatch::new_empty(Arc::clone(&schema))
481 } else if batches.len() == 1 {
482 batches.into_iter().next().unwrap()
483 } else {
484 let refs: Vec<&RecordBatch> = batches.iter().collect();
485 arrow::compute::concat_batches(&schema, refs)?
486 };
487
488 let execution =
489 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
490
491 Ok(RuntimeStatementResult::Select {
492 execution,
493 table_name,
494 schema,
495 })
496 }
497
498 fn persistent_namespace(&self) -> Arc<PersistentNamespace<P>> {
499 self.namespaces.persistent()
500 }
501
502 #[allow(dead_code)]
503 fn temporary_namespace(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
504 self.namespaces.temporary()
505 }
506
507 pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
511 let staging_pager = Arc::new(MemPager::default());
512 tracing::trace!(
513 "BEGIN_TRANSACTION: Created staging pager at {:p}",
514 &*staging_pager
515 );
516 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
517
518 let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
523
524 self.inner.begin_transaction(staging_wrapper)?;
525 Ok(RuntimeStatementResult::Transaction {
526 kind: TransactionKind::Begin,
527 })
528 }
529
530 pub fn abort_transaction(&self) {
533 self.inner.abort_transaction();
534 }
535
536 pub fn has_active_transaction(&self) -> bool {
538 let result = self.inner.has_active_transaction();
539 tracing::trace!("SESSION: has_active_transaction() = {}", result);
540 result
541 }
542
543 pub fn is_aborted(&self) -> bool {
545 self.inner.is_aborted()
546 }
547
548 pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
551 tracing::trace!("Session::commit_transaction called");
552 let (tx_result, operations) = self.inner.commit_transaction()?;
553 tracing::trace!(
554 "Session::commit_transaction got {} operations",
555 operations.len()
556 );
557
558 if !operations.is_empty() {
559 let dropped_tables = self
560 .inner
561 .context()
562 .ctx()
563 .dropped_tables
564 .read()
565 .unwrap()
566 .clone();
567 if !dropped_tables.is_empty() {
568 for operation in &operations {
569 let table_name_opt = match operation {
570 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
571 PlanOperation::Update(plan) => Some(plan.table.as_str()),
572 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
573 _ => None,
574 };
575 if let Some(table_name) = table_name_opt {
576 let (_, canonical) = canonical_table_name(table_name)?;
577 if dropped_tables.contains(&canonical) {
578 self.abort_transaction();
579 return Err(Error::TransactionContextError(
580 "another transaction has dropped this table".into(),
581 ));
582 }
583 }
584 }
585 }
586 }
587
588 let kind = match tx_result {
590 TransactionResult::Transaction { kind } => kind,
591 _ => {
592 return Err(Error::Internal(
593 "commit_transaction returned non-transaction result".into(),
594 ));
595 }
596 };
597 tracing::trace!("Session::commit_transaction kind={:?}", kind);
598
599 for operation in operations {
601 match operation {
602 PlanOperation::CreateTable(plan) => {
603 TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
604 }
605 PlanOperation::Insert(plan) => {
606 TransactionContext::insert(&**self.inner.context(), plan)?;
607 }
608 PlanOperation::Update(plan) => {
609 TransactionContext::update(&**self.inner.context(), plan)?;
610 }
611 PlanOperation::Delete(plan) => {
612 TransactionContext::delete(&**self.inner.context(), plan)?;
613 }
614 _ => {}
615 }
616 }
617
618 let base_ctx = self.inner.context();
621 let default_snapshot = base_ctx.ctx().default_snapshot();
622 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
623
624 if matches!(kind, TransactionKind::Commit) {
626 let ctx = base_ctx.ctx();
627 let next_txn_id = ctx.txn_manager().current_next_txn_id();
628 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
629 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
630 }
631 }
632
633 Ok(RuntimeStatementResult::Transaction { kind })
635 }
636
637 pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
639 self.inner.rollback_transaction()?;
640 let base_ctx = self.inner.context();
641 let default_snapshot = base_ctx.ctx().default_snapshot();
642 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
643 Ok(RuntimeStatementResult::Transaction {
644 kind: TransactionKind::Rollback,
645 })
646 }
647
648 fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
649 if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
650 let select_result = self.select(*select_plan)?;
651 let (schema, batches) = match select_result {
652 RuntimeStatementResult::Select {
653 schema, execution, ..
654 } => {
655 let batches = execution.collect()?;
656 (schema, batches)
657 }
658 _ => {
659 return Err(Error::Internal(
660 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
661 ));
662 }
663 };
664 plan.source = Some(CreateTableSource::Batches { schema, batches });
665 }
666 Ok(plan)
667 }
668
669 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
671 let mut plan = self.materialize_create_table_plan(plan)?;
672 let namespace_id = plan
673 .namespace
674 .clone()
675 .unwrap_or_else(|| storage_namespace::PERSISTENT_NAMESPACE_ID.to_string());
676 plan.namespace = Some(namespace_id.clone());
677
678 match namespace_id.as_str() {
679 storage_namespace::TEMPORARY_NAMESPACE_ID => {
680 let temp_namespace = self
681 .temporary_namespace()
682 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
683 temp_namespace.create_table(plan)?.convert_pager_type::<P>()
684 }
685 storage_namespace::PERSISTENT_NAMESPACE_ID => {
686 if self.has_active_transaction() {
687 let table_name = plan.name.clone();
688 match self
689 .inner
690 .execute_operation(PlanOperation::CreateTable(plan))
691 {
692 Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
693 Err(e) => {
694 self.abort_transaction();
696 Err(e)
697 }
698 }
699 } else {
700 self.persistent_namespace().create_table(plan)
701 }
702 }
703 other => Err(Error::InvalidArgumentError(format!(
704 "Unknown storage namespace '{}'",
705 other
706 ))),
707 }
708 }
709
710 pub fn drop_table(&self, name: &str, if_exists: bool) -> Result<()> {
711 let (_, canonical_table) = canonical_table_name(name)?;
712 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
713
714 match namespace_id.as_str() {
715 storage_namespace::TEMPORARY_NAMESPACE_ID => {
716 let temp_namespace = self
717 .temporary_namespace()
718 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
719 temp_namespace.drop_table(name, if_exists)
720 }
721 storage_namespace::PERSISTENT_NAMESPACE_ID => {
722 self.persistent_namespace().drop_table(name, if_exists)
723 }
724 other => Err(Error::InvalidArgumentError(format!(
725 "Unknown storage namespace '{}'",
726 other
727 ))),
728 }
729 }
730 pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
732 let (_, canonical_table) = canonical_table_name(&plan.table)?;
733 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
734
735 match namespace_id.as_str() {
736 storage_namespace::TEMPORARY_NAMESPACE_ID => {
737 let temp_namespace = self
738 .temporary_namespace()
739 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
740 temp_namespace.create_index(plan)?.convert_pager_type::<P>()
741 }
742 storage_namespace::PERSISTENT_NAMESPACE_ID => {
743 if self.has_active_transaction() {
744 return Err(Error::InvalidArgumentError(
745 "CREATE INDEX is not supported inside an active transaction".into(),
746 ));
747 }
748
749 self.persistent_namespace().create_index(plan)
750 }
751 other => Err(Error::InvalidArgumentError(format!(
752 "Unknown storage namespace '{}'",
753 other
754 ))),
755 }
756 }
757
758 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
759 let InsertPlan {
760 table,
761 columns,
762 source,
763 } = plan;
764
765 match source {
766 InsertSource::Rows(rows) => {
767 let count = rows.len();
768 Ok((
769 InsertPlan {
770 table,
771 columns,
772 source: InsertSource::Rows(rows),
773 },
774 count,
775 ))
776 }
777 InsertSource::Batches(batches) => {
778 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
779 Ok((
780 InsertPlan {
781 table,
782 columns,
783 source: InsertSource::Batches(batches),
784 },
785 count,
786 ))
787 }
788 InsertSource::Select { plan: select_plan } => {
789 let select_result = self.select(*select_plan)?;
790 let rows = match select_result {
791 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
792 _ => {
793 return Err(Error::Internal(
794 "expected Select result when executing INSERT ... SELECT".into(),
795 ));
796 }
797 };
798 let count = rows.len();
799 Ok((
800 InsertPlan {
801 table,
802 columns,
803 source: InsertSource::Rows(rows),
804 },
805 count,
806 ))
807 }
808 }
809 }
810
811 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
813 tracing::trace!("Session::insert called for table={}", plan.table);
814 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
815 let table_name = plan.table.clone();
816 let (_, canonical_table) = canonical_table_name(&plan.table)?;
817 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
818
819 match namespace_id.as_str() {
820 storage_namespace::TEMPORARY_NAMESPACE_ID => {
821 let temp_namespace = self
822 .temporary_namespace()
823 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
824 temp_namespace
825 .context()
826 .insert(plan)?
827 .convert_pager_type::<P>()?;
828 Ok(RuntimeStatementResult::Insert {
829 rows_inserted,
830 table_name,
831 })
832 }
833 storage_namespace::PERSISTENT_NAMESPACE_ID => {
834 if self.has_active_transaction() {
835 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
836 Ok(_) => {
837 tracing::trace!("Session::insert succeeded for table={}", table_name);
838 Ok(RuntimeStatementResult::Insert {
839 rows_inserted,
840 table_name,
841 })
842 }
843 Err(e) => {
844 tracing::trace!(
845 "Session::insert failed for table={}, error={:?}",
846 table_name,
847 e
848 );
849 if matches!(e, Error::ConstraintError(_)) {
850 tracing::trace!("Transaction is_aborted=true");
851 self.abort_transaction();
852 }
853 Err(e)
854 }
855 }
856 } else {
857 let context = self.inner.context();
858 let default_snapshot = context.ctx().default_snapshot();
859 TransactionContext::set_snapshot(&**context, default_snapshot);
860 TransactionContext::insert(&**context, plan)?;
861 Ok(RuntimeStatementResult::Insert {
862 rows_inserted,
863 table_name,
864 })
865 }
866 }
867 other => Err(Error::InvalidArgumentError(format!(
868 "Unknown storage namespace '{}'",
869 other
870 ))),
871 }
872 }
873
874 pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
876 if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
877 && namespace_id == storage_namespace::TEMPORARY_NAMESPACE_ID
878 {
879 return self.select_from_temporary(plan);
880 }
881
882 if self.has_active_transaction() {
883 let tx_result = match self
884 .inner
885 .execute_operation(PlanOperation::Select(plan.clone()))
886 {
887 Ok(result) => result,
888 Err(e) => {
889 if matches!(e, Error::ConstraintError(_)) {
892 self.abort_transaction();
893 }
894 return Err(e);
895 }
896 };
897 match tx_result {
898 TransactionResult::Select {
899 table_name,
900 schema,
901 execution: staging_execution,
902 } => {
903 let batches = staging_execution.collect().unwrap_or_default();
906 let combined = if batches.is_empty() {
907 RecordBatch::new_empty(Arc::clone(&schema))
908 } else if batches.len() == 1 {
909 batches.into_iter().next().unwrap()
910 } else {
911 let refs: Vec<&RecordBatch> = batches.iter().collect();
912 arrow::compute::concat_batches(&schema, refs)?
913 };
914
915 let execution = SelectExecution::from_batch(
916 table_name.clone(),
917 Arc::clone(&schema),
918 combined,
919 );
920
921 Ok(RuntimeStatementResult::Select {
922 execution,
923 table_name,
924 schema,
925 })
926 }
927 _ => Err(Error::Internal("expected Select result".into())),
928 }
929 } else {
930 let context = self.inner.context();
932 let default_snapshot = context.ctx().default_snapshot();
933 TransactionContext::set_snapshot(&**context, default_snapshot);
934 let table_name = if plan.tables.len() == 1 {
935 plan.tables[0].qualified_name()
936 } else {
937 String::new()
938 };
939 let execution = TransactionContext::execute_select(&**context, plan)?;
940 let schema = execution.schema();
941 Ok(RuntimeStatementResult::Select {
942 execution,
943 table_name,
944 schema,
945 })
946 }
947 }
948
949 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
951 let plan =
952 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
953 match self.select(plan)? {
954 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
955 other => Err(Error::Internal(format!(
956 "expected Select result when reading table '{table}', got {:?}",
957 other
958 ))),
959 }
960 }
961
962 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
964 let (_, canonical_table) = canonical_table_name(&plan.table)?;
965 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
966
967 match namespace_id.as_str() {
968 storage_namespace::TEMPORARY_NAMESPACE_ID => {
969 let temp_namespace = self
970 .temporary_namespace()
971 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
972 temp_namespace
973 .context()
974 .update(plan)?
975 .convert_pager_type::<P>()
976 }
977 storage_namespace::PERSISTENT_NAMESPACE_ID => {
978 if self.has_active_transaction() {
979 let table_name = plan.table.clone();
980 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
981 Ok(result) => result,
982 Err(e) => {
983 self.abort_transaction();
985 return Err(e);
986 }
987 };
988 match result {
989 TransactionResult::Update {
990 rows_matched: _,
991 rows_updated,
992 } => Ok(RuntimeStatementResult::Update {
993 rows_updated,
994 table_name,
995 }),
996 _ => Err(Error::Internal("expected Update result".into())),
997 }
998 } else {
999 let context = self.inner.context();
1001 let default_snapshot = context.ctx().default_snapshot();
1002 TransactionContext::set_snapshot(&**context, default_snapshot);
1003 let table_name = plan.table.clone();
1004 let result = TransactionContext::update(&**context, plan)?;
1005 match result {
1006 TransactionResult::Update {
1007 rows_matched: _,
1008 rows_updated,
1009 } => Ok(RuntimeStatementResult::Update {
1010 rows_updated,
1011 table_name,
1012 }),
1013 _ => Err(Error::Internal("expected Update result".into())),
1014 }
1015 }
1016 }
1017 other => Err(Error::InvalidArgumentError(format!(
1018 "Unknown storage namespace '{}'",
1019 other
1020 ))),
1021 }
1022 }
1023
1024 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1026 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1027 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1028
1029 match namespace_id.as_str() {
1030 storage_namespace::TEMPORARY_NAMESPACE_ID => {
1031 let temp_namespace = self
1032 .temporary_namespace()
1033 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1034 temp_namespace
1035 .context()
1036 .delete(plan)?
1037 .convert_pager_type::<P>()
1038 }
1039 storage_namespace::PERSISTENT_NAMESPACE_ID => {
1040 if self.has_active_transaction() {
1041 let table_name = plan.table.clone();
1042 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1043 Ok(result) => result,
1044 Err(e) => {
1045 self.abort_transaction();
1047 return Err(e);
1048 }
1049 };
1050 match result {
1051 TransactionResult::Delete { rows_deleted } => {
1052 Ok(RuntimeStatementResult::Delete {
1053 rows_deleted,
1054 table_name,
1055 })
1056 }
1057 _ => Err(Error::Internal("expected Delete result".into())),
1058 }
1059 } else {
1060 let context = self.inner.context();
1062 let default_snapshot = context.ctx().default_snapshot();
1063 TransactionContext::set_snapshot(&**context, default_snapshot);
1064 let table_name = plan.table.clone();
1065 let result = TransactionContext::delete(&**context, plan)?;
1066 match result {
1067 TransactionResult::Delete { rows_deleted } => {
1068 Ok(RuntimeStatementResult::Delete {
1069 rows_deleted,
1070 table_name,
1071 })
1072 }
1073 _ => Err(Error::Internal("expected Delete result".into())),
1074 }
1075 }
1076 }
1077 other => Err(Error::InvalidArgumentError(format!(
1078 "Unknown storage namespace '{}'",
1079 other
1080 ))),
1081 }
1082 }
1083}
1084
1085pub struct RuntimeEngine<P>
1086where
1087 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1088{
1089 context: Arc<RuntimeContext<P>>,
1090 session: RuntimeSession<P>,
1091}
1092
1093impl<P> Clone for RuntimeEngine<P>
1094where
1095 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1096{
1097 fn clone(&self) -> Self {
1098 tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
1101 Self {
1102 context: Arc::clone(&self.context),
1103 session: self.session.clone_session(),
1104 }
1105 }
1106}
1107
1108impl<P> RuntimeEngine<P>
1109where
1110 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1111{
1112 pub fn new(pager: Arc<P>) -> Self {
1113 let context = Arc::new(RuntimeContext::new(pager));
1114 Self::from_context(context)
1115 }
1116
1117 pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
1118 tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
1119 let session = context.create_session();
1120 tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
1121 Self { context, session }
1122 }
1123
1124 pub fn context(&self) -> Arc<RuntimeContext<P>> {
1125 Arc::clone(&self.context)
1126 }
1127
1128 pub fn session(&self) -> &RuntimeSession<P> {
1129 &self.session
1130 }
1131
1132 pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
1133 match statement {
1134 PlanStatement::BeginTransaction => self.session.begin_transaction(),
1135 PlanStatement::CommitTransaction => self.session.commit_transaction(),
1136 PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
1137 PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
1138 PlanStatement::CreateIndex(plan) => self.session.create_index(plan),
1139 PlanStatement::Insert(plan) => self.session.insert(plan),
1140 PlanStatement::Update(plan) => self.session.update(plan),
1141 PlanStatement::Delete(plan) => self.session.delete(plan),
1142 PlanStatement::Select(plan) => self.session.select(plan),
1143 }
1144 }
1145
1146 pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
1147 where
1148 I: IntoIterator<Item = PlanStatement>,
1149 {
1150 let mut results = Vec::new();
1151 for statement in statements {
1152 results.push(self.execute_statement(statement)?);
1153 }
1154 Ok(results)
1155 }
1156}
1157
1158pub struct RuntimeContext<P>
1172where
1173 P: Pager<Blob = EntryHandle> + Send + Sync,
1174{
1175 pager: Arc<P>,
1176 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
1177 dropped_tables: RwLock<FxHashSet<String>>,
1178 metadata: Arc<MetadataManager<P>>,
1179 constraint_service: ConstraintService<P>,
1180 catalog_service: CatalogManager<P>,
1181 catalog: Arc<TableCatalog>,
1183 store: Arc<ColumnStore<P>>,
1186 transaction_manager:
1188 TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
1189 txn_manager: Arc<TxnIdManager>,
1190 txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
1191}
1192
1193impl<P> RuntimeContext<P>
1194where
1195 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1196{
1197 pub fn new(pager: Arc<P>) -> Self {
1198 Self::new_with_catalog_inner(pager, None)
1199 }
1200
1201 pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
1202 Self::new_with_catalog_inner(pager, Some(catalog))
1203 }
1204
1205 fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
1206 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
1207
1208 let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
1209 let catalog = SysCatalog::new(&store);
1210
1211 let next_txn_id = match catalog.get_next_txn_id() {
1212 Ok(Some(id)) => {
1213 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
1214 id
1215 }
1216 Ok(None) => {
1217 tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
1218 TXN_ID_AUTO_COMMIT + 1
1219 }
1220 Err(e) => {
1221 tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
1222 TXN_ID_AUTO_COMMIT + 1
1223 }
1224 };
1225
1226 let last_committed = match catalog.get_last_committed_txn_id() {
1227 Ok(Some(id)) => {
1228 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
1229 id
1230 }
1231 Ok(None) => {
1232 tracing::debug!(
1233 "[CONTEXT] No persisted last_committed found, starting from default"
1234 );
1235 TXN_ID_AUTO_COMMIT
1236 }
1237 Err(e) => {
1238 tracing::warn!(
1239 "[CONTEXT] Failed to load last_committed: {}, using default",
1240 e
1241 );
1242 TXN_ID_AUTO_COMMIT
1243 }
1244 };
1245
1246 let store_arc = Arc::new(store);
1247 let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
1248
1249 let loaded_tables = match metadata.all_table_metas() {
1250 Ok(metas) => {
1251 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1252 metas
1253 }
1254 Err(e) => {
1255 tracing::warn!(
1256 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1257 e
1258 );
1259 Vec::new()
1260 }
1261 };
1262
1263 let transaction_manager =
1264 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1265 let txn_manager = transaction_manager.txn_manager();
1266
1267 tracing::debug!(
1289 "[CONTEXT] Initialized with lazy loading for {} table(s)",
1290 loaded_tables.len()
1291 );
1292
1293 let (catalog, is_shared_catalog) = match shared_catalog {
1295 Some(existing) => (existing, true),
1296 None => (Arc::new(TableCatalog::new()), false),
1297 };
1298 for (table_id, table_meta) in &loaded_tables {
1299 if let Some(ref table_name) = table_meta.name
1300 && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
1301 {
1302 match e {
1303 Error::CatalogError(ref msg)
1304 if is_shared_catalog && msg.contains("already exists") =>
1305 {
1306 tracing::debug!(
1307 "[CONTEXT] Shared catalog already contains table '{}' with id={}",
1308 table_name,
1309 table_id
1310 );
1311 }
1312 other => {
1313 tracing::warn!(
1314 "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
1315 table_name,
1316 table_id,
1317 other
1318 );
1319 }
1320 }
1321 }
1322 }
1323 tracing::debug!(
1324 "[CONTEXT] Catalog initialized with {} table(s)",
1325 catalog.table_count()
1326 );
1327
1328 let constraint_service =
1329 ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
1330 let catalog_service = CatalogManager::new(
1331 Arc::clone(&metadata),
1332 Arc::clone(&catalog),
1333 Arc::clone(&store_arc),
1334 );
1335
1336 Self {
1337 pager,
1338 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
1340 metadata,
1341 constraint_service,
1342 catalog_service,
1343 catalog,
1344 store: store_arc,
1345 transaction_manager,
1346 txn_manager,
1347 txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
1348 }
1349 }
1350
1351 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1353 Arc::clone(&self.txn_manager)
1354 }
1355
1356 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1358 let catalog = SysCatalog::new(&self.store);
1359 catalog.put_next_txn_id(next_txn_id)?;
1360 let last_committed = self.txn_manager.last_committed();
1361 catalog.put_last_committed_txn_id(last_committed)?;
1362 tracing::debug!(
1363 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1364 next_txn_id,
1365 last_committed
1366 );
1367 Ok(())
1368 }
1369
1370 fn build_executor_multi_column_uniques(
1371 table: &ExecutorTable<P>,
1372 stored: &[MultiColumnUniqueEntryMeta],
1373 ) -> Vec<ExecutorMultiColumnUnique> {
1374 let mut results = Vec::with_capacity(stored.len());
1375
1376 'outer: for entry in stored {
1377 if entry.column_ids.is_empty() {
1378 continue;
1379 }
1380
1381 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
1382 for field_id in &entry.column_ids {
1383 if let Some((idx, _)) = table
1384 .schema
1385 .columns
1386 .iter()
1387 .enumerate()
1388 .find(|(_, col)| &col.field_id == field_id)
1389 {
1390 column_indices.push(idx);
1391 } else {
1392 tracing::warn!(
1393 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
1394 entry.index_name,
1395 table.table.table_id(),
1396 field_id
1397 );
1398 continue 'outer;
1399 }
1400 }
1401
1402 results.push(ExecutorMultiColumnUnique {
1403 index_name: entry.index_name.clone(),
1404 column_indices,
1405 });
1406 }
1407
1408 results
1409 }
1410
1411 pub fn default_snapshot(&self) -> TransactionSnapshot {
1413 TransactionSnapshot {
1414 txn_id: TXN_ID_AUTO_COMMIT,
1415 snapshot_id: self.txn_manager.last_committed(),
1416 }
1417 }
1418
1419 pub fn table_catalog(&self) -> Arc<TableCatalog> {
1421 Arc::clone(&self.catalog)
1422 }
1423
1424 pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1427 tracing::debug!("[SESSION] RuntimeContext::create_session called");
1428 let namespaces = Arc::new(SessionNamespaces::new(Arc::clone(self)));
1429 let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1430 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1431 tracing::debug!(
1432 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1433 );
1434 RuntimeSession { inner, namespaces }
1435 }
1436
1437 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1439 RuntimeTableHandle::new(Arc::clone(self), name)
1440 }
1441
1442 #[deprecated(note = "Use session-based transactions instead")]
1444 pub fn has_active_transaction(&self) -> bool {
1445 self.transaction_manager.has_active_transaction()
1446 }
1447
1448 pub fn create_table<C, I>(
1449 self: &Arc<Self>,
1450 name: &str,
1451 columns: I,
1452 ) -> Result<RuntimeTableHandle<P>>
1453 where
1454 C: IntoColumnSpec,
1455 I: IntoIterator<Item = C>,
1456 {
1457 self.create_table_with_options(name, columns, false)
1458 }
1459
1460 pub fn create_table_if_not_exists<C, I>(
1461 self: &Arc<Self>,
1462 name: &str,
1463 columns: I,
1464 ) -> Result<RuntimeTableHandle<P>>
1465 where
1466 C: IntoColumnSpec,
1467 I: IntoIterator<Item = C>,
1468 {
1469 self.create_table_with_options(name, columns, true)
1470 }
1471
1472 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1473 if plan.columns.is_empty() && plan.source.is_none() {
1474 return Err(Error::InvalidArgumentError(
1475 "CREATE TABLE requires explicit columns or a source".into(),
1476 ));
1477 }
1478
1479 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1480 let CreateTablePlan {
1481 name: _,
1482 if_not_exists,
1483 or_replace,
1484 columns,
1485 source,
1486 namespace: _,
1487 foreign_keys,
1488 } = plan;
1489
1490 tracing::trace!(
1491 "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1492 display_name,
1493 if_not_exists,
1494 columns.len()
1495 );
1496 for (idx, col) in columns.iter().enumerate() {
1497 tracing::trace!(
1498 " plan column[{}]: name='{}' primary_key={}",
1499 idx,
1500 col.name,
1501 col.primary_key
1502 );
1503 }
1504 let (exists, is_dropped) = {
1505 let tables = self.tables.read().unwrap();
1506 let in_cache = tables.contains_key(&canonical_name);
1507 let is_dropped = self
1508 .dropped_tables
1509 .read()
1510 .unwrap()
1511 .contains(&canonical_name);
1512 (in_cache && !is_dropped, is_dropped)
1514 };
1515 tracing::trace!(
1516 "DEBUG create_table_plan: exists={}, is_dropped={}",
1517 exists,
1518 is_dropped
1519 );
1520
1521 if is_dropped {
1523 self.remove_table_entry(&canonical_name);
1524 self.dropped_tables.write().unwrap().remove(&canonical_name);
1525 }
1526
1527 if exists {
1528 if or_replace {
1529 tracing::trace!(
1530 "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1531 display_name
1532 );
1533 self.remove_table_entry(&canonical_name);
1534 } else if if_not_exists {
1535 tracing::trace!(
1536 "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1537 display_name
1538 );
1539 return Ok(RuntimeStatementResult::CreateTable {
1540 table_name: display_name,
1541 });
1542 } else {
1543 return Err(Error::CatalogError(format!(
1544 "Catalog Error: Table '{}' already exists",
1545 display_name
1546 )));
1547 }
1548 }
1549
1550 match source {
1551 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1552 display_name,
1553 canonical_name,
1554 schema,
1555 batches,
1556 if_not_exists,
1557 ),
1558 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1559 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1560 .into(),
1561 )),
1562 None => self.create_table_from_columns(
1563 display_name,
1564 canonical_name,
1565 columns,
1566 foreign_keys,
1567 if_not_exists,
1568 ),
1569 }
1570 }
1571
1572 pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
1573 if plan.columns.is_empty() {
1574 return Err(Error::InvalidArgumentError(
1575 "CREATE INDEX requires at least one column".into(),
1576 ));
1577 }
1578
1579 for column_plan in &plan.columns {
1580 if !column_plan.ascending || column_plan.nulls_first {
1581 return Err(Error::InvalidArgumentError(
1582 "only ASC indexes with NULLS LAST are supported".into(),
1583 ));
1584 }
1585 }
1586
1587 let index_name = plan.name.clone();
1588 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1589 let table = self.lookup_table(&canonical_name)?;
1590
1591 let mut column_indices = Vec::with_capacity(plan.columns.len());
1592 let mut field_ids = Vec::with_capacity(plan.columns.len());
1593 let mut column_names = Vec::with_capacity(plan.columns.len());
1594 let mut seen_column_indices = FxHashSet::default();
1595
1596 for column_plan in &plan.columns {
1597 let normalized = column_plan.name.to_ascii_lowercase();
1598 let col_idx = table
1599 .schema
1600 .lookup
1601 .get(&normalized)
1602 .copied()
1603 .ok_or_else(|| {
1604 Error::InvalidArgumentError(format!(
1605 "column '{}' does not exist in table '{}'",
1606 column_plan.name, display_name
1607 ))
1608 })?;
1609 if !seen_column_indices.insert(col_idx) {
1610 return Err(Error::InvalidArgumentError(format!(
1611 "duplicate column '{}' in CREATE INDEX",
1612 column_plan.name
1613 )));
1614 }
1615
1616 let column = &table.schema.columns[col_idx];
1617 column_indices.push(col_idx);
1618 field_ids.push(column.field_id);
1619 column_names.push(column.name.clone());
1620 }
1621
1622 if plan.columns.len() == 1 {
1623 let field_id = field_ids[0];
1624 let column_name = column_names[0].clone();
1625
1626 if plan.unique {
1627 let snapshot = self.default_snapshot();
1628 let existing_values =
1629 self.scan_column_values(table.as_ref(), field_id, snapshot)?;
1630 ensure_single_column_unique(&existing_values, &[], &column_name)?;
1631 }
1632
1633 let created = self.catalog_service.register_single_column_index(
1634 &display_name,
1635 &canonical_name,
1636 &table.table,
1637 field_id,
1638 &column_name,
1639 plan.unique,
1640 plan.if_not_exists,
1641 )?;
1642
1643 if !created {
1644 return Ok(RuntimeStatementResult::CreateIndex {
1646 table_name: display_name,
1647 index_name,
1648 });
1649 }
1650
1651 if let Some(updated_table) =
1652 Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1653 {
1654 self.tables
1655 .write()
1656 .unwrap()
1657 .insert(canonical_name.clone(), Arc::clone(&updated_table));
1658 } else {
1659 self.remove_table_entry(&canonical_name);
1660 }
1661
1662 drop(table);
1663
1664 return Ok(RuntimeStatementResult::CreateIndex {
1665 table_name: display_name,
1666 index_name,
1667 });
1668 }
1669
1670 if !plan.unique {
1671 return Err(Error::InvalidArgumentError(
1672 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1673 ));
1674 }
1675
1676 let table_id = table.table.table_id();
1677
1678 let snapshot = self.default_snapshot();
1679 let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1680 ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1681
1682 let executor_entry = ExecutorMultiColumnUnique {
1683 index_name: index_name.clone(),
1684 column_indices: column_indices.clone(),
1685 };
1686
1687 let registration = self.catalog_service.register_multi_column_unique_index(
1688 table_id,
1689 &field_ids,
1690 index_name.clone(),
1691 )?;
1692
1693 match registration {
1694 MultiColumnUniqueRegistration::Created => {
1695 table.add_multi_column_unique(executor_entry);
1696 }
1697 MultiColumnUniqueRegistration::AlreadyExists {
1698 index_name: existing,
1699 } => {
1700 if plan.if_not_exists {
1701 drop(table);
1702 return Ok(RuntimeStatementResult::CreateIndex {
1703 table_name: display_name,
1704 index_name: existing,
1705 });
1706 }
1707 return Err(Error::CatalogError(format!(
1708 "Index already exists on columns '{}'",
1709 column_names.join(", ")
1710 )));
1711 }
1712 }
1713
1714 Ok(RuntimeStatementResult::CreateIndex {
1715 table_name: display_name,
1716 index_name,
1717 })
1718 }
1719
1720 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1721 self.catalog.table_names()
1723 }
1724
1725 pub fn table_view(&self, canonical_name: &str) -> Result<TableView> {
1726 self.catalog_service.table_view(canonical_name)
1727 }
1728
1729 fn filter_visible_row_ids(
1730 &self,
1731 table: &ExecutorTable<P>,
1732 row_ids: Vec<RowId>,
1733 snapshot: TransactionSnapshot,
1734 ) -> Result<Vec<RowId>> {
1735 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
1736 }
1737
1738 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1739 RuntimeCreateTableBuilder {
1740 ctx: self,
1741 plan: CreateTablePlan::new(name),
1742 }
1743 }
1744
1745 pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1746 let (_, canonical_name) = canonical_table_name(name)?;
1747 self.catalog_service.table_column_specs(&canonical_name)
1748 }
1749
1750 pub fn foreign_key_views(self: &Arc<Self>, name: &str) -> Result<Vec<ForeignKeyView>> {
1751 let (_, canonical_name) = canonical_table_name(name)?;
1752 self.catalog_service.foreign_key_views(&canonical_name)
1753 }
1754
1755 pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1756 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1757 handle.lazy()?.collect_rows()
1758 }
1759
1760 fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1761 self.create_table_plan(plan)
1762 }
1763
1764 fn create_table_with_options<C, I>(
1765 self: &Arc<Self>,
1766 name: &str,
1767 columns: I,
1768 if_not_exists: bool,
1769 ) -> Result<RuntimeTableHandle<P>>
1770 where
1771 C: IntoColumnSpec,
1772 I: IntoIterator<Item = C>,
1773 {
1774 let mut plan = CreateTablePlan::new(name);
1775 plan.if_not_exists = if_not_exists;
1776 plan.columns = columns
1777 .into_iter()
1778 .map(|column| column.into_column_spec())
1779 .collect();
1780 let result = self.create_table_plan(plan)?;
1781 match result {
1782 RuntimeStatementResult::CreateTable { .. } => {
1783 RuntimeTableHandle::new(Arc::clone(self), name)
1784 }
1785 other => Err(Error::InvalidArgumentError(format!(
1786 "unexpected statement result {other:?} when creating table"
1787 ))),
1788 }
1789 }
1790
1791 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1792 let snapshot = TransactionSnapshot {
1795 txn_id: TXN_ID_AUTO_COMMIT,
1796 snapshot_id: self.txn_manager.last_committed(),
1797 };
1798 self.insert_with_snapshot(plan, snapshot)
1799 }
1800
1801 pub fn insert_with_snapshot(
1802 &self,
1803 plan: InsertPlan,
1804 snapshot: TransactionSnapshot,
1805 ) -> Result<RuntimeStatementResult<P>> {
1806 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1807 let table = self.lookup_table(&canonical_name)?;
1808
1809 if display_name == "keys" {
1811 tracing::trace!(
1812 "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1813 table.table.table_id(),
1814 &*self.pager
1815 );
1816 tracing::trace!(
1817 "[KEYS] Table has {} columns, primary_key columns: {:?}",
1818 table.schema.columns.len(),
1819 table
1820 .schema
1821 .columns
1822 .iter()
1823 .filter(|c| c.primary_key)
1824 .map(|c| &c.name)
1825 .collect::<Vec<_>>()
1826 );
1827 }
1828
1829 let result = match plan.source {
1830 InsertSource::Rows(rows) => self.insert_rows(
1831 table.as_ref(),
1832 display_name.clone(),
1833 canonical_name.clone(),
1834 rows,
1835 plan.columns,
1836 snapshot,
1837 ),
1838 InsertSource::Batches(batches) => self.insert_batches(
1839 table.as_ref(),
1840 display_name.clone(),
1841 canonical_name.clone(),
1842 batches,
1843 plan.columns,
1844 snapshot,
1845 ),
1846 InsertSource::Select { .. } => Err(Error::Internal(
1847 "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1848 .into(),
1849 )),
1850 };
1851
1852 if display_name == "keys" {
1853 tracing::trace!(
1854 "[KEYS] INSERT completed: {:?}",
1855 result
1856 .as_ref()
1857 .map(|_| "OK")
1858 .map_err(|e| format!("{:?}", e))
1859 );
1860 }
1861
1862 if matches!(result, Err(Error::NotFound)) {
1863 panic!(
1864 "BUG: insert yielded Error::NotFound for table '{}'. \
1865 This should never happen: insert should never return NotFound after successful table lookup. \
1866 This indicates a logic error in the runtime.",
1867 display_name
1868 );
1869 }
1870
1871 result
1872 }
1873
1874 pub fn get_batches_with_row_ids(
1877 &self,
1878 table_name: &str,
1879 filter: Option<LlkvExpr<'static, String>>,
1880 ) -> Result<Vec<RecordBatch>> {
1881 self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1882 }
1883
1884 pub fn get_batches_with_row_ids_with_snapshot(
1885 &self,
1886 table_name: &str,
1887 filter: Option<LlkvExpr<'static, String>>,
1888 snapshot: TransactionSnapshot,
1889 ) -> Result<Vec<RecordBatch>> {
1890 let (_, canonical_name) = canonical_table_name(table_name)?;
1891 let table = self.lookup_table(&canonical_name)?;
1892
1893 let filter_expr = match filter {
1894 Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1895 None => {
1896 let field_id = table.schema.first_field_id().ok_or_else(|| {
1897 Error::InvalidArgumentError(
1898 "table has no columns; cannot perform wildcard scan".into(),
1899 )
1900 })?;
1901 full_table_scan_filter(field_id)
1902 }
1903 };
1904
1905 let row_ids = table.table.filter_row_ids(&filter_expr)?;
1907 if row_ids.is_empty() {
1908 return Ok(Vec::new());
1909 }
1910
1911 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1912 if visible_row_ids.is_empty() {
1913 return Ok(Vec::new());
1914 }
1915
1916 let table_id = table.table.table_id();
1918
1919 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1920 let mut logical_fields: Vec<LogicalFieldId> =
1921 Vec::with_capacity(table.schema.columns.len());
1922
1923 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1924
1925 for column in &table.schema.columns {
1926 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1927 logical_fields.push(logical_field_id);
1928 let field = mvcc::build_field_with_metadata(
1929 &column.name,
1930 column.data_type.clone(),
1931 column.nullable,
1932 column.field_id,
1933 );
1934 fields.push(field);
1935 }
1936
1937 let schema = Arc::new(Schema::new(fields));
1938
1939 if logical_fields.is_empty() {
1940 let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
1942 for row_id in &visible_row_ids {
1943 row_id_builder.append_value(*row_id);
1944 }
1945 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
1946 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1947 return Ok(vec![batch]);
1948 }
1949
1950 let mut stream = table.table.stream_columns(
1951 Arc::from(logical_fields),
1952 visible_row_ids,
1953 GatherNullPolicy::IncludeNulls,
1954 )?;
1955
1956 let mut batches = Vec::new();
1957 while let Some(chunk) = stream.next_batch()? {
1958 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
1959
1960 let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
1961 for row_id in chunk.row_ids() {
1962 row_id_builder.append_value(*row_id);
1963 }
1964 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
1965
1966 let chunk_batch = chunk.into_batch();
1967 for column_array in chunk_batch.columns() {
1968 arrays.push(column_array.clone());
1969 }
1970
1971 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1972 batches.push(batch);
1973 }
1974
1975 Ok(batches)
1976 }
1977
1978 pub fn append_batches_with_row_ids(
1981 &self,
1982 table_name: &str,
1983 batches: Vec<RecordBatch>,
1984 ) -> Result<usize> {
1985 let (_, canonical_name) = canonical_table_name(table_name)?;
1986 let table = self.lookup_table(&canonical_name)?;
1987
1988 let mut total_rows = 0;
1989 for batch in batches {
1990 if batch.num_rows() == 0 {
1991 continue;
1992 }
1993
1994 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
1996 Error::InvalidArgumentError(
1997 "batch must contain row_id column for direct append".into(),
1998 )
1999 })?;
2000
2001 table.table.append(&batch)?;
2003 total_rows += batch.num_rows();
2004 }
2005
2006 Ok(total_rows)
2007 }
2008
2009 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
2010 let snapshot = self.txn_manager.begin_transaction();
2011 match self.update_with_snapshot(plan, snapshot) {
2012 Ok(result) => {
2013 self.txn_manager.mark_committed(snapshot.txn_id);
2014 Ok(result)
2015 }
2016 Err(err) => {
2017 self.txn_manager.mark_aborted(snapshot.txn_id);
2018 Err(err)
2019 }
2020 }
2021 }
2022
2023 pub fn update_with_snapshot(
2024 &self,
2025 plan: UpdatePlan,
2026 snapshot: TransactionSnapshot,
2027 ) -> Result<RuntimeStatementResult<P>> {
2028 let UpdatePlan {
2029 table,
2030 assignments,
2031 filter,
2032 } = plan;
2033 let (display_name, canonical_name) = canonical_table_name(&table)?;
2034 let table = self.lookup_table(&canonical_name)?;
2035 if let Some(filter) = filter {
2036 self.update_filtered_rows(
2037 table.as_ref(),
2038 display_name,
2039 canonical_name,
2040 assignments,
2041 filter,
2042 snapshot,
2043 )
2044 } else {
2045 self.update_all_rows(
2046 table.as_ref(),
2047 display_name,
2048 canonical_name,
2049 assignments,
2050 snapshot,
2051 )
2052 }
2053 }
2054
2055 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
2056 let snapshot = self.txn_manager.begin_transaction();
2057 match self.delete_with_snapshot(plan, snapshot) {
2058 Ok(result) => {
2059 self.txn_manager.mark_committed(snapshot.txn_id);
2060 Ok(result)
2061 }
2062 Err(err) => {
2063 self.txn_manager.mark_aborted(snapshot.txn_id);
2064 Err(err)
2065 }
2066 }
2067 }
2068
2069 pub fn delete_with_snapshot(
2070 &self,
2071 plan: DeletePlan,
2072 snapshot: TransactionSnapshot,
2073 ) -> Result<RuntimeStatementResult<P>> {
2074 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
2075 let table = match self.tables.read().unwrap().get(&canonical_name) {
2077 Some(t) => Arc::clone(t),
2078 None => return Err(Error::NotFound),
2079 };
2080 match plan.filter {
2081 Some(filter) => self.delete_filtered_rows(
2082 table.as_ref(),
2083 display_name,
2084 canonical_name.clone(),
2085 filter,
2086 snapshot,
2087 ),
2088 None => self.delete_all_rows(table.as_ref(), display_name, canonical_name, snapshot),
2089 }
2090 }
2091
2092 pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
2093 RuntimeTableHandle::new(Arc::clone(self), name)
2094 }
2095
2096 pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
2097 let snapshot = self.default_snapshot();
2098 self.execute_select_with_snapshot(plan, snapshot)
2099 }
2100
2101 pub fn execute_select_with_snapshot(
2102 self: &Arc<Self>,
2103 plan: SelectPlan,
2104 snapshot: TransactionSnapshot,
2105 ) -> Result<SelectExecution<P>> {
2106 if plan.tables.is_empty() {
2108 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2109 context: Arc::clone(self),
2110 });
2111 let executor = QueryExecutor::new(provider);
2112 return executor.execute_select_with_filter(plan, None);
2114 }
2115
2116 let mut canonical_tables = Vec::new();
2118 for table_ref in &plan.tables {
2119 let qualified = table_ref.qualified_name();
2120 let (_display, canonical) = canonical_table_name(&qualified)?;
2121 let parts: Vec<&str> = canonical.split('.').collect();
2123 let canon_ref = if parts.len() >= 2 {
2124 llkv_plan::TableRef::new(parts[0], parts[1])
2125 } else {
2126 llkv_plan::TableRef::new("", &canonical)
2127 };
2128 canonical_tables.push(canon_ref);
2129 }
2130
2131 let mut canonical_plan = plan.clone();
2132 canonical_plan.tables = canonical_tables;
2133
2134 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2135 context: Arc::clone(self),
2136 });
2137 let executor = QueryExecutor::new(provider);
2138
2139 let row_filter: Option<Arc<dyn RowIdFilter<P>>> = if canonical_plan.tables.len() == 1 {
2141 Some(Arc::new(MvccRowIdFilter::new(
2142 Arc::clone(&self.txn_manager),
2143 snapshot,
2144 )))
2145 } else {
2146 None
2148 };
2149
2150 executor.execute_select_with_filter(canonical_plan, row_filter)
2151 }
2152
2153 fn create_table_from_columns(
2154 &self,
2155 display_name: String,
2156 canonical_name: String,
2157 columns: Vec<ColumnSpec>,
2158 foreign_keys: Vec<ForeignKeySpec>,
2159 if_not_exists: bool,
2160 ) -> Result<RuntimeStatementResult<P>> {
2161 tracing::trace!(
2162 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
2163 display_name,
2164 columns.len()
2165 );
2166 for (idx, col) in columns.iter().enumerate() {
2167 tracing::trace!(
2168 " input column[{}]: name='{}' primary_key={}",
2169 idx,
2170 col.name,
2171 col.primary_key
2172 );
2173 }
2174 if columns.is_empty() {
2175 return Err(Error::InvalidArgumentError(
2176 "CREATE TABLE requires at least one column".into(),
2177 ));
2178 }
2179
2180 {
2182 let tables = self.tables.read().unwrap();
2183 if tables.contains_key(&canonical_name) {
2184 if if_not_exists {
2185 return Ok(RuntimeStatementResult::CreateTable {
2186 table_name: display_name,
2187 });
2188 }
2189
2190 return Err(Error::CatalogError(format!(
2191 "Catalog Error: Table '{}' already exists",
2192 display_name
2193 )));
2194 }
2195 }
2196
2197 let CreateTableResult {
2198 table_id,
2199 table,
2200 table_columns,
2201 column_lookup,
2202 } = Table::create_from_columns(
2203 &display_name,
2204 &canonical_name,
2205 &columns,
2206 self.metadata.clone(),
2207 self.catalog.clone(),
2208 self.store.clone(),
2209 )?;
2210
2211 tracing::trace!(
2212 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
2213 display_name,
2214 table_id,
2215 &*self.pager
2216 );
2217
2218 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2219 for (idx, column) in table_columns.iter().enumerate() {
2220 tracing::trace!(
2221 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
2222 idx,
2223 column.name,
2224 column.data_type,
2225 column.nullable,
2226 column.primary_key,
2227 column.unique
2228 );
2229 column_defs.push(ExecutorColumn {
2230 name: column.name.clone(),
2231 data_type: column.data_type.clone(),
2232 nullable: column.nullable,
2233 primary_key: column.primary_key,
2234 unique: column.unique,
2235 field_id: column.field_id,
2236 check_expr: column.check_expr.clone(),
2237 });
2238 }
2239
2240 let schema = Arc::new(ExecutorSchema {
2241 columns: column_defs.clone(),
2242 lookup: column_lookup,
2243 });
2244 let table_entry = Arc::new(ExecutorTable {
2245 table: Arc::clone(&table),
2246 schema,
2247 next_row_id: AtomicU64::new(0),
2248 total_rows: AtomicU64::new(0),
2249 multi_column_uniques: RwLock::new(Vec::new()),
2250 });
2251
2252 let mut tables = self.tables.write().unwrap();
2253 if tables.contains_key(&canonical_name) {
2254 drop(tables);
2255 let field_ids: Vec<FieldId> =
2256 table_columns.iter().map(|column| column.field_id).collect();
2257 let _ = self
2258 .catalog_service
2259 .drop_table(&canonical_name, table_id, &field_ids);
2260 if if_not_exists {
2261 return Ok(RuntimeStatementResult::CreateTable {
2262 table_name: display_name,
2263 });
2264 }
2265 return Err(Error::CatalogError(format!(
2266 "Catalog Error: Table '{}' already exists",
2267 display_name
2268 )));
2269 }
2270 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2271 drop(tables);
2272
2273 if !foreign_keys.is_empty() {
2274 let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
2275 table_id,
2276 &display_name,
2277 &canonical_name,
2278 &table_columns,
2279 &foreign_keys,
2280 |table_name| {
2281 let (display, canonical) = canonical_table_name(table_name)?;
2282 let referenced_table = self.lookup_table(&canonical).map_err(|_| {
2283 Error::InvalidArgumentError(format!(
2284 "referenced table '{}' does not exist",
2285 table_name
2286 ))
2287 })?;
2288
2289 let columns = referenced_table
2290 .schema
2291 .columns
2292 .iter()
2293 .map(|column| ForeignKeyColumn {
2294 name: column.name.clone(),
2295 data_type: column.data_type.clone(),
2296 nullable: column.nullable,
2297 primary_key: column.primary_key,
2298 unique: column.unique,
2299 field_id: column.field_id,
2300 })
2301 .collect();
2302
2303 Ok(ForeignKeyTableInfo {
2304 display_name: display,
2305 canonical_name: canonical,
2306 table_id: referenced_table.table.table_id(),
2307 columns,
2308 })
2309 },
2310 current_time_micros(),
2311 );
2312
2313 if let Err(err) = fk_result {
2314 let field_ids: Vec<FieldId> =
2315 table_columns.iter().map(|column| column.field_id).collect();
2316 let _ = self
2317 .catalog_service
2318 .drop_table(&canonical_name, table_id, &field_ids);
2319 self.remove_table_entry(&canonical_name);
2320 return Err(err);
2321 }
2322 }
2323
2324 Ok(RuntimeStatementResult::CreateTable {
2325 table_name: display_name,
2326 })
2327 }
2328
2329 fn create_table_from_batches(
2330 &self,
2331 display_name: String,
2332 canonical_name: String,
2333 schema: Arc<Schema>,
2334 batches: Vec<RecordBatch>,
2335 if_not_exists: bool,
2336 ) -> Result<RuntimeStatementResult<P>> {
2337 {
2338 let tables = self.tables.read().unwrap();
2339 if tables.contains_key(&canonical_name) {
2340 if if_not_exists {
2341 return Ok(RuntimeStatementResult::CreateTable {
2342 table_name: display_name,
2343 });
2344 }
2345 return Err(Error::CatalogError(format!(
2346 "Catalog Error: Table '{}' already exists",
2347 display_name
2348 )));
2349 }
2350 }
2351
2352 let CreateTableResult {
2353 table_id,
2354 table,
2355 table_columns,
2356 column_lookup,
2357 } = self.catalog_service.create_table_from_schema(
2358 &display_name,
2359 &canonical_name,
2360 &schema,
2361 )?;
2362
2363 tracing::trace!(
2364 "=== CTAS table '{}' created with table_id={} pager={:p} ===",
2365 display_name,
2366 table_id,
2367 &*self.pager
2368 );
2369
2370 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2371 for column in &table_columns {
2372 column_defs.push(ExecutorColumn {
2373 name: column.name.clone(),
2374 data_type: column.data_type.clone(),
2375 nullable: column.nullable,
2376 primary_key: column.primary_key,
2377 unique: column.unique,
2378 field_id: column.field_id,
2379 check_expr: column.check_expr.clone(),
2380 });
2381 }
2382
2383 let schema_arc = Arc::new(ExecutorSchema {
2384 columns: column_defs.clone(),
2385 lookup: column_lookup,
2386 });
2387 let table_entry = Arc::new(ExecutorTable {
2388 table: Arc::clone(&table),
2389 schema: schema_arc,
2390 next_row_id: AtomicU64::new(0),
2391 total_rows: AtomicU64::new(0),
2392 multi_column_uniques: RwLock::new(Vec::new()),
2393 });
2394
2395 let creator_snapshot = self.txn_manager.begin_transaction();
2396 let creator_txn_id = creator_snapshot.txn_id;
2397 let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
2398 table.as_ref(),
2399 &table_columns,
2400 &batches,
2401 creator_txn_id,
2402 TXN_ID_NONE,
2403 0,
2404 ) {
2405 Ok(result) => {
2406 self.txn_manager.mark_committed(creator_txn_id);
2407 result
2408 }
2409 Err(err) => {
2410 self.txn_manager.mark_aborted(creator_txn_id);
2411 let field_ids: Vec<FieldId> =
2412 table_columns.iter().map(|column| column.field_id).collect();
2413 let _ = self
2414 .catalog_service
2415 .drop_table(&canonical_name, table_id, &field_ids);
2416 return Err(err);
2417 }
2418 };
2419
2420 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
2421 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
2422
2423 let mut tables = self.tables.write().unwrap();
2424 if tables.contains_key(&canonical_name) {
2425 if if_not_exists {
2426 return Ok(RuntimeStatementResult::CreateTable {
2427 table_name: display_name,
2428 });
2429 }
2430 return Err(Error::CatalogError(format!(
2431 "Catalog Error: Table '{}' already exists",
2432 display_name
2433 )));
2434 }
2435 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2436 drop(tables); Ok(RuntimeStatementResult::CreateTable {
2439 table_name: display_name,
2440 })
2441 }
2442
2443 fn rebuild_executor_table_with_unique(
2444 table: &ExecutorTable<P>,
2445 field_id: FieldId,
2446 ) -> Option<Arc<ExecutorTable<P>>> {
2447 let mut columns = table.schema.columns.clone();
2448 let mut found = false;
2449 for column in &mut columns {
2450 if column.field_id == field_id {
2451 column.unique = true;
2452 found = true;
2453 break;
2454 }
2455 }
2456 if !found {
2457 return None;
2458 }
2459
2460 let schema = Arc::new(ExecutorSchema {
2461 columns,
2462 lookup: table.schema.lookup.clone(),
2463 });
2464
2465 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
2466 let total_rows = table.total_rows.load(Ordering::SeqCst);
2467 let uniques = table.multi_column_uniques();
2468
2469 Some(Arc::new(ExecutorTable {
2470 table: Arc::clone(&table.table),
2471 schema,
2472 next_row_id: AtomicU64::new(next_row_id),
2473 total_rows: AtomicU64::new(total_rows),
2474 multi_column_uniques: RwLock::new(uniques),
2475 }))
2476 }
2477
2478 fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
2479 if txn_id == TXN_ID_AUTO_COMMIT {
2480 return;
2481 }
2482
2483 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2484 guard.entry(txn_id).or_default().insert(canonical_name);
2485 }
2486
2487 fn collect_rows_created_by_txn(
2488 &self,
2489 table: &ExecutorTable<P>,
2490 txn_id: TxnId,
2491 ) -> Result<Vec<Vec<PlanValue>>> {
2492 if txn_id == TXN_ID_AUTO_COMMIT {
2493 return Ok(Vec::new());
2494 }
2495
2496 if table.schema.columns.is_empty() {
2497 return Ok(Vec::new());
2498 }
2499
2500 let Some(first_field_id) = table.schema.first_field_id() else {
2501 return Ok(Vec::new());
2502 };
2503 let filter_expr = full_table_scan_filter(first_field_id);
2504
2505 let row_ids = table.table.filter_row_ids(&filter_expr)?;
2506 if row_ids.is_empty() {
2507 return Ok(Vec::new());
2508 }
2509
2510 let table_id = table.table.table_id();
2511 let mut logical_fields: Vec<LogicalFieldId> =
2512 Vec::with_capacity(table.schema.columns.len() + 2);
2513 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
2514 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
2515 for column in &table.schema.columns {
2516 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
2517 }
2518
2519 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
2520 let mut stream = table.table.stream_columns(
2521 Arc::clone(&logical_fields),
2522 row_ids,
2523 GatherNullPolicy::IncludeNulls,
2524 )?;
2525
2526 let mut rows = Vec::new();
2527 while let Some(chunk) = stream.next_batch()? {
2528 let batch = chunk.batch();
2529 if batch.num_columns() < table.schema.columns.len() + 2 {
2530 continue;
2531 }
2532
2533 let created_col = batch
2534 .column(0)
2535 .as_any()
2536 .downcast_ref::<UInt64Array>()
2537 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
2538 let deleted_col = batch
2539 .column(1)
2540 .as_any()
2541 .downcast_ref::<UInt64Array>()
2542 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
2543
2544 for row_idx in 0..batch.num_rows() {
2545 let created_by = if created_col.is_null(row_idx) {
2546 TXN_ID_AUTO_COMMIT
2547 } else {
2548 created_col.value(row_idx)
2549 };
2550 if created_by != txn_id {
2551 continue;
2552 }
2553
2554 let deleted_by = if deleted_col.is_null(row_idx) {
2555 TXN_ID_NONE
2556 } else {
2557 deleted_col.value(row_idx)
2558 };
2559 if deleted_by != TXN_ID_NONE {
2560 continue;
2561 }
2562
2563 let mut row_values = Vec::with_capacity(table.schema.columns.len());
2564 for col_idx in 0..table.schema.columns.len() {
2565 let array = batch.column(col_idx + 2);
2566 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2567 row_values.push(value);
2568 }
2569 rows.push(row_values);
2570 }
2571 }
2572
2573 Ok(rows)
2574 }
2575
2576 fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
2577 if txn_id == TXN_ID_AUTO_COMMIT {
2578 return Ok(());
2579 }
2580
2581 let tables = {
2582 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2583 guard.remove(&txn_id)
2584 };
2585
2586 let Some(tables) = tables else {
2587 return Ok(());
2588 };
2589
2590 if tables.is_empty() {
2591 return Ok(());
2592 }
2593
2594 let snapshot = TransactionSnapshot {
2595 txn_id: TXN_ID_AUTO_COMMIT,
2596 snapshot_id: self.txn_manager.last_committed(),
2597 };
2598
2599 for table_name in tables {
2600 let table = self.lookup_table(&table_name)?;
2601 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
2602 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
2603 continue;
2604 };
2605
2606 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
2607 if new_rows.is_empty() {
2608 continue;
2609 }
2610
2611 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
2612 self.constraint_service.validate_primary_key_rows(
2613 &constraint_ctx.schema_field_ids,
2614 primary_key,
2615 &column_order,
2616 &new_rows,
2617 |field_ids| self.scan_multi_column_values(table.as_ref(), field_ids, snapshot),
2618 )?;
2619 }
2620
2621 Ok(())
2622 }
2623
2624 fn clear_transaction_state(&self, txn_id: TxnId) {
2625 if txn_id == TXN_ID_AUTO_COMMIT {
2626 return;
2627 }
2628
2629 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2630 guard.remove(&txn_id);
2631 }
2632
2633 fn coerce_plan_value_for_column(
2634 &self,
2635 value: PlanValue,
2636 column: &ExecutorColumn,
2637 ) -> Result<PlanValue> {
2638 match value {
2639 PlanValue::Null => Ok(PlanValue::Null),
2640 PlanValue::Integer(v) => match &column.data_type {
2641 DataType::Int64 => Ok(PlanValue::Integer(v)),
2642 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
2643 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
2644 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2645 DataType::Date32 => {
2646 let casted = i32::try_from(v).map_err(|_| {
2647 Error::InvalidArgumentError(format!(
2648 "integer literal out of range for DATE column '{}'",
2649 column.name
2650 ))
2651 })?;
2652 Ok(PlanValue::Integer(casted as i64))
2653 }
2654 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2655 "cannot assign integer to STRUCT column '{}'",
2656 column.name
2657 ))),
2658 _ => Ok(PlanValue::Integer(v)),
2659 },
2660 PlanValue::Float(v) => match &column.data_type {
2661 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
2662 DataType::Float64 => Ok(PlanValue::Float(v)),
2663 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
2664 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2665 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
2666 "cannot assign floating-point value to DATE column '{}'",
2667 column.name
2668 ))),
2669 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2670 "cannot assign floating-point value to STRUCT column '{}'",
2671 column.name
2672 ))),
2673 _ => Ok(PlanValue::Float(v)),
2674 },
2675 PlanValue::String(s) => match &column.data_type {
2676 DataType::Boolean => {
2677 let normalized = s.trim().to_ascii_lowercase();
2678 match normalized.as_str() {
2679 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
2680 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
2681 _ => Err(Error::InvalidArgumentError(format!(
2682 "cannot assign string '{}' to BOOLEAN column '{}'",
2683 s, column.name
2684 ))),
2685 }
2686 }
2687 DataType::Utf8 => Ok(PlanValue::String(s)),
2688 DataType::Date32 => {
2689 let days = parse_date32_literal(&s)?;
2690 Ok(PlanValue::Integer(days as i64))
2691 }
2692 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
2693 "cannot assign string '{}' to numeric column '{}'",
2694 s, column.name
2695 ))),
2696 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2697 "cannot assign string to STRUCT column '{}'",
2698 column.name
2699 ))),
2700 _ => Ok(PlanValue::String(s)),
2701 },
2702 PlanValue::Struct(map) => match &column.data_type {
2703 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
2704 _ => Err(Error::InvalidArgumentError(format!(
2705 "cannot assign struct value to column '{}'",
2706 column.name
2707 ))),
2708 },
2709 }
2710 }
2711
2712 fn scan_column_values(
2714 &self,
2715 table: &ExecutorTable<P>,
2716 field_id: FieldId,
2717 snapshot: TransactionSnapshot,
2718 ) -> Result<Vec<PlanValue>> {
2719 let table_id = table.table.table_id();
2720 use llkv_expr::{Expr, Filter, Operator};
2721 use std::ops::Bound;
2722
2723 let match_all_filter = Filter {
2725 field_id,
2726 op: Operator::Range {
2727 lower: Bound::Unbounded,
2728 upper: Bound::Unbounded,
2729 },
2730 };
2731 let filter_expr = Expr::Pred(match_all_filter);
2732
2733 let row_ids = match table.table.filter_row_ids(&filter_expr) {
2735 Ok(ids) => ids,
2736 Err(Error::NotFound) => return Ok(Vec::new()),
2737 Err(e) => return Err(e),
2738 };
2739
2740 let row_ids = filter_row_ids_for_snapshot(
2742 table.table.as_ref(),
2743 row_ids,
2744 &self.txn_manager,
2745 snapshot,
2746 )?;
2747
2748 if row_ids.is_empty() {
2749 return Ok(Vec::new());
2750 }
2751
2752 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2754 let row_count = row_ids.len();
2755 let mut stream = match table.table.stream_columns(
2756 vec![logical_field_id],
2757 row_ids,
2758 GatherNullPolicy::IncludeNulls,
2759 ) {
2760 Ok(stream) => stream,
2761 Err(Error::NotFound) => return Ok(Vec::new()),
2762 Err(e) => return Err(e),
2763 };
2764
2765 let mut values = Vec::with_capacity(row_count);
2767 while let Some(chunk) = stream.next_batch()? {
2768 let batch = chunk.batch();
2769 if batch.num_columns() == 0 {
2770 continue;
2771 }
2772 let array = batch.column(0);
2773 for row_idx in 0..batch.num_rows() {
2774 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2775 values.push(value);
2776 }
2777 }
2778 }
2779
2780 Ok(values)
2781 }
2782
2783 fn scan_multi_column_values(
2785 &self,
2786 table: &ExecutorTable<P>,
2787 field_ids: &[FieldId],
2788 snapshot: TransactionSnapshot,
2789 ) -> Result<Vec<Vec<PlanValue>>> {
2790 if field_ids.is_empty() {
2791 return Ok(Vec::new());
2792 }
2793
2794 let table_id = table.table.table_id();
2795 use llkv_expr::{Expr, Filter, Operator};
2796 use std::ops::Bound;
2797
2798 let match_all_filter = Filter {
2799 field_id: field_ids[0],
2800 op: Operator::Range {
2801 lower: Bound::Unbounded,
2802 upper: Bound::Unbounded,
2803 },
2804 };
2805 let filter_expr = Expr::Pred(match_all_filter);
2806
2807 let row_ids = match table.table.filter_row_ids(&filter_expr) {
2808 Ok(ids) => ids,
2809 Err(Error::NotFound) => return Ok(Vec::new()),
2810 Err(e) => return Err(e),
2811 };
2812
2813 let row_ids = filter_row_ids_for_snapshot(
2814 table.table.as_ref(),
2815 row_ids,
2816 &self.txn_manager,
2817 snapshot,
2818 )?;
2819
2820 if row_ids.is_empty() {
2821 return Ok(Vec::new());
2822 }
2823
2824 let logical_field_ids: Vec<_> = field_ids
2825 .iter()
2826 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2827 .collect();
2828
2829 let total_rows = row_ids.len();
2830 let mut stream = match table.table.stream_columns(
2831 logical_field_ids,
2832 row_ids,
2833 GatherNullPolicy::IncludeNulls,
2834 ) {
2835 Ok(stream) => stream,
2836 Err(Error::NotFound) => return Ok(Vec::new()),
2837 Err(e) => return Err(e),
2838 };
2839
2840 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
2841 while let Some(chunk) = stream.next_batch()? {
2842 let batch = chunk.batch();
2843 if batch.num_columns() == 0 {
2844 continue;
2845 }
2846
2847 let base = chunk.row_offset();
2848 let local_len = batch.num_rows();
2849 for col_idx in 0..batch.num_columns() {
2850 let array = batch.column(col_idx);
2851 for local_idx in 0..local_len {
2852 let target_index = base + local_idx;
2853 debug_assert!(
2854 target_index < rows.len(),
2855 "stream chunk produced out-of-bounds row index"
2856 );
2857 if let Some(row) = rows.get_mut(target_index) {
2858 match llkv_plan::plan_value_from_array(array, local_idx) {
2859 Ok(value) => row.push(value),
2860 Err(_) => row.push(PlanValue::Null),
2861 }
2862 }
2863 }
2864 }
2865 }
2866
2867 Ok(rows)
2868 }
2869
2870 fn collect_row_values_for_ids(
2871 &self,
2872 table: &ExecutorTable<P>,
2873 row_ids: &[RowId],
2874 field_ids: &[FieldId],
2875 ) -> Result<Vec<Vec<PlanValue>>> {
2876 if row_ids.is_empty() || field_ids.is_empty() {
2877 return Ok(Vec::new());
2878 }
2879
2880 let table_id = table.table.table_id();
2881 let logical_field_ids: Vec<LogicalFieldId> = field_ids
2882 .iter()
2883 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2884 .collect();
2885
2886 let mut stream = match table.table.stream_columns(
2887 logical_field_ids.clone(),
2888 row_ids.to_vec(),
2889 GatherNullPolicy::IncludeNulls,
2890 ) {
2891 Ok(stream) => stream,
2892 Err(Error::NotFound) => return Ok(Vec::new()),
2893 Err(e) => return Err(e),
2894 };
2895
2896 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
2897 while let Some(chunk) = stream.next_batch()? {
2898 let batch = chunk.batch();
2899 let base = chunk.row_offset();
2900 let local_len = batch.num_rows();
2901 for col_idx in 0..batch.num_columns() {
2902 let array = batch.column(col_idx);
2903 for local_idx in 0..local_len {
2904 let target_index = base + local_idx;
2905 if let Some(row) = rows.get_mut(target_index) {
2906 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2907 row.push(value);
2908 }
2909 }
2910 }
2911 }
2912
2913 Ok(rows)
2914 }
2915
2916 fn collect_visible_child_rows(
2917 &self,
2918 table: &ExecutorTable<P>,
2919 field_ids: &[FieldId],
2920 snapshot: TransactionSnapshot,
2921 ) -> Result<Vec<(RowId, Vec<PlanValue>)>> {
2922 if field_ids.is_empty() {
2923 return Ok(Vec::new());
2924 }
2925
2926 let anchor_field = field_ids[0];
2927 let filter_expr = full_table_scan_filter(anchor_field);
2928 let raw_row_ids = match table.table.filter_row_ids(&filter_expr) {
2929 Ok(ids) => ids,
2930 Err(Error::NotFound) => return Ok(Vec::new()),
2931 Err(e) => return Err(e),
2932 };
2933
2934 let visible_row_ids = filter_row_ids_for_snapshot(
2935 table.table.as_ref(),
2936 raw_row_ids,
2937 &self.txn_manager,
2938 snapshot,
2939 )?;
2940
2941 if visible_row_ids.is_empty() {
2942 return Ok(Vec::new());
2943 }
2944
2945 let table_id = table.table.table_id();
2946 let logical_field_ids: Vec<LogicalFieldId> = field_ids
2947 .iter()
2948 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2949 .collect();
2950
2951 let mut stream = match table.table.stream_columns(
2952 logical_field_ids.clone(),
2953 visible_row_ids.clone(),
2954 GatherNullPolicy::IncludeNulls,
2955 ) {
2956 Ok(stream) => stream,
2957 Err(Error::NotFound) => return Ok(Vec::new()),
2958 Err(e) => return Err(e),
2959 };
2960
2961 let mut rows = vec![Vec::with_capacity(field_ids.len()); visible_row_ids.len()];
2962 while let Some(chunk) = stream.next_batch()? {
2963 let batch = chunk.batch();
2964 let base = chunk.row_offset();
2965 let local_len = batch.num_rows();
2966 for col_idx in 0..batch.num_columns() {
2967 let array = batch.column(col_idx);
2968 for local_idx in 0..local_len {
2969 let target_index = base + local_idx;
2970 if let Some(row) = rows.get_mut(target_index) {
2971 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2972 row.push(value);
2973 }
2974 }
2975 }
2976 }
2977
2978 Ok(visible_row_ids.into_iter().zip(rows).collect())
2979 }
2980
2981 fn build_table_constraint_context(
2982 &self,
2983 table: &ExecutorTable<P>,
2984 ) -> Result<TableConstraintContext> {
2985 let schema_field_ids: Vec<FieldId> = table
2986 .schema
2987 .columns
2988 .iter()
2989 .map(|column| column.field_id)
2990 .collect();
2991
2992 let column_constraints: Vec<InsertColumnConstraint> = table
2993 .schema
2994 .columns
2995 .iter()
2996 .enumerate()
2997 .map(|(idx, column)| InsertColumnConstraint {
2998 schema_index: idx,
2999 column: ConstraintColumnInfo {
3000 name: column.name.clone(),
3001 field_id: column.field_id,
3002 data_type: column.data_type.clone(),
3003 nullable: column.nullable,
3004 check_expr: column.check_expr.clone(),
3005 },
3006 })
3007 .collect();
3008
3009 let unique_columns: Vec<InsertUniqueColumn> = table
3010 .schema
3011 .columns
3012 .iter()
3013 .enumerate()
3014 .filter(|(_, column)| column.unique && !column.primary_key)
3015 .map(|(idx, column)| InsertUniqueColumn {
3016 schema_index: idx,
3017 field_id: column.field_id,
3018 name: column.name.clone(),
3019 })
3020 .collect();
3021
3022 let mut multi_column_uniques: Vec<InsertMultiColumnUnique> = Vec::new();
3023 for constraint in table.multi_column_uniques() {
3024 if constraint.column_indices.is_empty() {
3025 continue;
3026 }
3027
3028 let mut schema_indices = Vec::with_capacity(constraint.column_indices.len());
3029 let mut field_ids = Vec::with_capacity(constraint.column_indices.len());
3030 let mut column_names = Vec::with_capacity(constraint.column_indices.len());
3031 for &col_idx in &constraint.column_indices {
3032 let column = table.schema.columns.get(col_idx).ok_or_else(|| {
3033 Error::Internal(format!(
3034 "multi-column UNIQUE constraint references invalid column index {}",
3035 col_idx
3036 ))
3037 })?;
3038 schema_indices.push(col_idx);
3039 field_ids.push(column.field_id);
3040 column_names.push(column.name.clone());
3041 }
3042
3043 multi_column_uniques.push(InsertMultiColumnUnique {
3044 schema_indices,
3045 field_ids,
3046 column_names,
3047 });
3048 }
3049
3050 let primary_indices: Vec<usize> = table
3051 .schema
3052 .columns
3053 .iter()
3054 .enumerate()
3055 .filter(|(_, column)| column.primary_key)
3056 .map(|(idx, _)| idx)
3057 .collect();
3058
3059 let primary_key = if primary_indices.is_empty() {
3060 None
3061 } else {
3062 let mut field_ids = Vec::with_capacity(primary_indices.len());
3063 let mut column_names = Vec::with_capacity(primary_indices.len());
3064 for &idx in &primary_indices {
3065 let column = table.schema.columns.get(idx).ok_or_else(|| {
3066 Error::Internal(format!(
3067 "primary key references invalid column index {}",
3068 idx
3069 ))
3070 })?;
3071 field_ids.push(column.field_id);
3072 column_names.push(column.name.clone());
3073 }
3074
3075 Some(InsertMultiColumnUnique {
3076 schema_indices: primary_indices.clone(),
3077 field_ids,
3078 column_names,
3079 })
3080 };
3081
3082 Ok(TableConstraintContext {
3083 schema_field_ids,
3084 column_constraints,
3085 unique_columns,
3086 multi_column_uniques,
3087 primary_key,
3088 })
3089 }
3090
3091 fn insert_rows(
3092 &self,
3093 table: &ExecutorTable<P>,
3094 display_name: String,
3095 canonical_name: String,
3096 mut rows: Vec<Vec<PlanValue>>,
3097 columns: Vec<String>,
3098 snapshot: TransactionSnapshot,
3099 ) -> Result<RuntimeStatementResult<P>> {
3100 if rows.is_empty() {
3101 return Err(Error::InvalidArgumentError(
3102 "INSERT requires at least one row".into(),
3103 ));
3104 }
3105
3106 let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
3107 let expected_len = column_order.len();
3108 for row in &rows {
3109 if row.len() != expected_len {
3110 return Err(Error::InvalidArgumentError(format!(
3111 "expected {} values in INSERT row, found {}",
3112 expected_len,
3113 row.len()
3114 )));
3115 }
3116 }
3117
3118 for row in rows.iter_mut() {
3119 for (position, value) in row.iter_mut().enumerate() {
3120 let schema_index = column_order
3121 .get(position)
3122 .copied()
3123 .ok_or_else(|| Error::Internal("invalid INSERT column index mapping".into()))?;
3124 let column = table.schema.columns.get(schema_index).ok_or_else(|| {
3125 Error::Internal(format!(
3126 "INSERT column index {} out of bounds for table '{}'",
3127 schema_index, display_name
3128 ))
3129 })?;
3130 let normalized = normalize_insert_value_for_column(column, value.clone())?;
3131 *value = normalized;
3132 }
3133 }
3134
3135 let constraint_ctx = self.build_table_constraint_context(table)?;
3136 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3137
3138 if display_name == "keys" {
3139 tracing::trace!(
3140 "[KEYS] Validating constraints for {} row(s) before insert",
3141 rows.len()
3142 );
3143 for (i, row) in rows.iter().enumerate() {
3144 tracing::trace!("[KEYS] row[{}]: {:?}", i, row);
3145 }
3146 }
3147
3148 self.constraint_service.validate_insert_constraints(
3149 &constraint_ctx.schema_field_ids,
3150 &constraint_ctx.column_constraints,
3151 &constraint_ctx.unique_columns,
3152 &constraint_ctx.multi_column_uniques,
3153 primary_key_spec,
3154 &column_order,
3155 &rows,
3156 |field_id| self.scan_column_values(table, field_id, snapshot),
3157 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3158 )?;
3159
3160 self.check_foreign_keys_on_insert(table, &display_name, &rows, &column_order, snapshot)?;
3161
3162 let row_count = rows.len();
3163 let mut column_values: Vec<Vec<PlanValue>> =
3164 vec![Vec::with_capacity(row_count); table.schema.columns.len()];
3165 for row in rows {
3166 for (idx, value) in row.into_iter().enumerate() {
3167 let dest_index = column_order[idx];
3168 column_values[dest_index].push(value);
3169 }
3170 }
3171
3172 let start_row = table.next_row_id.load(Ordering::SeqCst);
3173
3174 let (row_id_array, created_by_array, deleted_by_array) =
3176 mvcc::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id, TXN_ID_NONE);
3177
3178 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
3179 arrays.push(row_id_array);
3180 arrays.push(created_by_array);
3181 arrays.push(deleted_by_array);
3182
3183 let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
3184 fields.extend(mvcc::build_mvcc_fields());
3185
3186 for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
3187 let array = build_array_for_column(&column.data_type, &values)?;
3188 let field = mvcc::build_field_with_metadata(
3189 &column.name,
3190 column.data_type.clone(),
3191 column.nullable,
3192 column.field_id,
3193 );
3194 arrays.push(array);
3195 fields.push(field);
3196 }
3197
3198 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
3199 tracing::trace!(
3200 table_name = %display_name,
3201 store_ptr = ?std::ptr::addr_of!(*table.table.store()),
3202 "About to call table.append"
3203 );
3204 table.table.append(&batch)?;
3205 table
3206 .next_row_id
3207 .store(start_row + row_count as u64, Ordering::SeqCst);
3208 table
3209 .total_rows
3210 .fetch_add(row_count as u64, Ordering::SeqCst);
3211
3212 self.record_table_with_new_rows(snapshot.txn_id, canonical_name);
3213
3214 Ok(RuntimeStatementResult::Insert {
3215 table_name: display_name,
3216 rows_inserted: row_count,
3217 })
3218 }
3219
3220 fn insert_batches(
3221 &self,
3222 table: &ExecutorTable<P>,
3223 display_name: String,
3224 canonical_name: String,
3225 batches: Vec<RecordBatch>,
3226 columns: Vec<String>,
3227 snapshot: TransactionSnapshot,
3228 ) -> Result<RuntimeStatementResult<P>> {
3229 if batches.is_empty() {
3230 return Ok(RuntimeStatementResult::Insert {
3231 table_name: display_name,
3232 rows_inserted: 0,
3233 });
3234 }
3235
3236 let expected_len = if columns.is_empty() {
3237 table.schema.columns.len()
3238 } else {
3239 columns.len()
3240 };
3241 let mut total_rows_inserted = 0usize;
3242
3243 for batch in batches {
3244 if batch.num_columns() != expected_len {
3245 return Err(Error::InvalidArgumentError(format!(
3246 "expected {} columns in INSERT batch, found {}",
3247 expected_len,
3248 batch.num_columns()
3249 )));
3250 }
3251 let row_count = batch.num_rows();
3252 if row_count == 0 {
3253 continue;
3254 }
3255 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
3256 for row_idx in 0..row_count {
3257 let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
3258 for col_idx in 0..expected_len {
3259 let array = batch.column(col_idx);
3260 row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
3261 }
3262 rows.push(row);
3263 }
3264
3265 match self.insert_rows(
3266 table,
3267 display_name.clone(),
3268 canonical_name.clone(),
3269 rows,
3270 columns.clone(),
3271 snapshot,
3272 )? {
3273 RuntimeStatementResult::Insert { rows_inserted, .. } => {
3274 total_rows_inserted += rows_inserted;
3275 }
3276 _ => unreachable!("insert_rows must return Insert result"),
3277 }
3278 }
3279
3280 Ok(RuntimeStatementResult::Insert {
3281 table_name: display_name,
3282 rows_inserted: total_rows_inserted,
3283 })
3284 }
3285
3286 fn update_filtered_rows(
3287 &self,
3288 table: &ExecutorTable<P>,
3289 display_name: String,
3290 canonical_name: String,
3291 assignments: Vec<ColumnAssignment>,
3292 filter: LlkvExpr<'static, String>,
3293 snapshot: TransactionSnapshot,
3294 ) -> Result<RuntimeStatementResult<P>> {
3295 if assignments.is_empty() {
3296 return Err(Error::InvalidArgumentError(
3297 "UPDATE requires at least one assignment".into(),
3298 ));
3299 }
3300
3301 let schema = table.schema.as_ref();
3302 let filter_expr = translate_predicate(filter, schema)?;
3303
3304 enum PreparedValue {
3306 Literal(PlanValue),
3307 Expression { expr_index: usize },
3308 }
3309
3310 let mut seen_columns: FxHashSet<String> =
3311 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3312 let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
3313 Vec::with_capacity(assignments.len());
3314 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3315
3316 for assignment in assignments {
3317 let normalized = assignment.column.to_ascii_lowercase();
3318 if !seen_columns.insert(normalized.clone()) {
3319 return Err(Error::InvalidArgumentError(format!(
3320 "duplicate column '{}' in UPDATE assignments",
3321 assignment.column
3322 )));
3323 }
3324 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3325 Error::InvalidArgumentError(format!(
3326 "unknown column '{}' in UPDATE",
3327 assignment.column
3328 ))
3329 })?;
3330
3331 match assignment.value {
3332 AssignmentValue::Literal(value) => {
3333 prepared.push((column.clone(), PreparedValue::Literal(value)));
3334 }
3335 AssignmentValue::Expression(expr) => {
3336 let translated = translate_scalar(&expr, schema)?;
3337 let expr_index = scalar_exprs.len();
3338 scalar_exprs.push(translated);
3339 prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
3340 }
3341 }
3342 }
3343
3344 let (row_ids, mut expr_values) =
3345 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3346
3347 if row_ids.is_empty() {
3348 return Ok(RuntimeStatementResult::Update {
3349 table_name: display_name,
3350 rows_updated: 0,
3351 });
3352 }
3353
3354 let row_count = row_ids.len();
3355 let table_id = table.table.table_id();
3356 let logical_fields: Vec<LogicalFieldId> = table
3357 .schema
3358 .columns
3359 .iter()
3360 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3361 .collect();
3362
3363 let mut stream = table.table.stream_columns(
3364 logical_fields.clone(),
3365 row_ids.clone(),
3366 GatherNullPolicy::IncludeNulls,
3367 )?;
3368
3369 let mut new_rows: Vec<Vec<PlanValue>> =
3370 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3371 while let Some(chunk) = stream.next_batch()? {
3372 let batch = chunk.batch();
3373 let base = chunk.row_offset();
3374 let local_len = batch.num_rows();
3375 for col_idx in 0..batch.num_columns() {
3376 let array = batch.column(col_idx);
3377 for local_idx in 0..local_len {
3378 let target_index = base + local_idx;
3379 debug_assert!(
3380 target_index < new_rows.len(),
3381 "column stream produced out-of-range row index"
3382 );
3383 if let Some(row) = new_rows.get_mut(target_index) {
3384 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3385 row.push(value);
3386 }
3387 }
3388 }
3389 }
3390 debug_assert!(
3391 new_rows
3392 .iter()
3393 .all(|row| row.len() == table.schema.columns.len())
3394 );
3395
3396 tracing::trace!(
3397 table = %display_name,
3398 row_count,
3399 rows = ?new_rows,
3400 "update_filtered_rows captured source rows"
3401 );
3402
3403 let constraint_ctx = self.build_table_constraint_context(table)?;
3404 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3405 let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3406 if let Some(pk) = primary_key_spec {
3407 original_primary_key_keys.reserve(row_count);
3408 for row in &new_rows {
3409 let mut values = Vec::with_capacity(pk.schema_indices.len());
3410 for &idx in &pk.schema_indices {
3411 let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3412 values.push(value);
3413 }
3414 let key = build_composite_unique_key(&values, &pk.column_names)?;
3415 original_primary_key_keys.push(key);
3416 }
3417 }
3418
3419 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3420 table
3421 .schema
3422 .columns
3423 .iter()
3424 .enumerate()
3425 .map(|(idx, column)| (column.field_id, idx)),
3426 );
3427
3428 for (column, value) in prepared {
3429 let column_index =
3430 column_positions
3431 .get(&column.field_id)
3432 .copied()
3433 .ok_or_else(|| {
3434 Error::InvalidArgumentError(format!(
3435 "column '{}' missing in table schema during UPDATE",
3436 column.name
3437 ))
3438 })?;
3439
3440 let values = match value {
3441 PreparedValue::Literal(lit) => vec![lit; row_count],
3442 PreparedValue::Expression { expr_index } => {
3443 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3444 Error::InvalidArgumentError(
3445 "expression assignment value missing during UPDATE".into(),
3446 )
3447 })?;
3448 if column_values.len() != row_count {
3449 return Err(Error::InvalidArgumentError(
3450 "expression result count did not match targeted row count".into(),
3451 ));
3452 }
3453 mem::take(column_values)
3454 }
3455 };
3456
3457 for (row_idx, new_value) in values.into_iter().enumerate() {
3458 if let Some(row) = new_rows.get_mut(row_idx) {
3459 let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3460 row[column_index] = coerced;
3461 }
3462 }
3463 }
3464
3465 let column_names: Vec<String> = table
3466 .schema
3467 .columns
3468 .iter()
3469 .map(|column| column.name.clone())
3470 .collect();
3471 let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3472 self.constraint_service.validate_row_level_constraints(
3473 &constraint_ctx.schema_field_ids,
3474 &constraint_ctx.column_constraints,
3475 &column_order,
3476 &new_rows,
3477 )?;
3478
3479 if let Some(pk) = primary_key_spec {
3480 self.constraint_service.validate_update_primary_keys(
3481 &constraint_ctx.schema_field_ids,
3482 pk,
3483 &column_order,
3484 &new_rows,
3485 &original_primary_key_keys,
3486 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3487 )?;
3488 }
3489
3490 let _ = self.apply_delete(
3491 table,
3492 display_name.clone(),
3493 canonical_name.clone(),
3494 row_ids.clone(),
3495 snapshot,
3496 false,
3497 )?;
3498
3499 let _ = self.insert_rows(
3500 table,
3501 display_name.clone(),
3502 canonical_name,
3503 new_rows,
3504 column_names,
3505 snapshot,
3506 )?;
3507
3508 Ok(RuntimeStatementResult::Update {
3509 table_name: display_name,
3510 rows_updated: row_count,
3511 })
3512 }
3513
3514 fn update_all_rows(
3515 &self,
3516 table: &ExecutorTable<P>,
3517 display_name: String,
3518 canonical_name: String,
3519 assignments: Vec<ColumnAssignment>,
3520 snapshot: TransactionSnapshot,
3521 ) -> Result<RuntimeStatementResult<P>> {
3522 if assignments.is_empty() {
3523 return Err(Error::InvalidArgumentError(
3524 "UPDATE requires at least one assignment".into(),
3525 ));
3526 }
3527
3528 let total_rows = table.total_rows.load(Ordering::SeqCst);
3529 let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
3530 Error::InvalidArgumentError("table row count exceeds supported range".into())
3531 })?;
3532 if total_rows_usize == 0 {
3533 return Ok(RuntimeStatementResult::Update {
3534 table_name: display_name,
3535 rows_updated: 0,
3536 });
3537 }
3538
3539 let schema = table.schema.as_ref();
3540
3541 enum PreparedValue {
3543 Literal(PlanValue),
3544 Expression { expr_index: usize },
3545 }
3546
3547 let mut seen_columns: FxHashSet<String> =
3548 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3549 let mut prepared: Vec<(ExecutorColumn, PreparedValue)> =
3550 Vec::with_capacity(assignments.len());
3551 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3552 let mut first_field_id: Option<FieldId> = None;
3553
3554 for assignment in assignments {
3555 let normalized = assignment.column.to_ascii_lowercase();
3556 if !seen_columns.insert(normalized.clone()) {
3557 return Err(Error::InvalidArgumentError(format!(
3558 "duplicate column '{}' in UPDATE assignments",
3559 assignment.column
3560 )));
3561 }
3562 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3563 Error::InvalidArgumentError(format!(
3564 "unknown column '{}' in UPDATE",
3565 assignment.column
3566 ))
3567 })?;
3568 if first_field_id.is_none() {
3569 first_field_id = Some(column.field_id);
3570 }
3571
3572 match assignment.value {
3573 AssignmentValue::Literal(value) => {
3574 prepared.push((column.clone(), PreparedValue::Literal(value)));
3575 }
3576 AssignmentValue::Expression(expr) => {
3577 let translated = translate_scalar(&expr, schema)?;
3578 let expr_index = scalar_exprs.len();
3579 scalar_exprs.push(translated);
3580 prepared.push((column.clone(), PreparedValue::Expression { expr_index }));
3581 }
3582 }
3583 }
3584
3585 let anchor_field = first_field_id.ok_or_else(|| {
3586 Error::InvalidArgumentError("UPDATE requires at least one target column".into())
3587 })?;
3588
3589 let filter_expr = full_table_scan_filter(anchor_field);
3590 let (row_ids, mut expr_values) =
3591 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3592
3593 if row_ids.is_empty() {
3594 return Ok(RuntimeStatementResult::Update {
3595 table_name: display_name,
3596 rows_updated: 0,
3597 });
3598 }
3599
3600 let row_count = row_ids.len();
3601 let table_id = table.table.table_id();
3602 let logical_fields: Vec<LogicalFieldId> = table
3603 .schema
3604 .columns
3605 .iter()
3606 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3607 .collect();
3608
3609 let mut stream = table.table.stream_columns(
3610 logical_fields.clone(),
3611 row_ids.clone(),
3612 GatherNullPolicy::IncludeNulls,
3613 )?;
3614
3615 let mut new_rows: Vec<Vec<PlanValue>> =
3616 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3617 while let Some(chunk) = stream.next_batch()? {
3618 let batch = chunk.batch();
3619 let base = chunk.row_offset();
3620 let local_len = batch.num_rows();
3621 for col_idx in 0..batch.num_columns() {
3622 let array = batch.column(col_idx);
3623 for local_idx in 0..local_len {
3624 let target_index = base + local_idx;
3625 debug_assert!(
3626 target_index < new_rows.len(),
3627 "column stream produced out-of-range row index"
3628 );
3629 if let Some(row) = new_rows.get_mut(target_index) {
3630 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3631 row.push(value);
3632 }
3633 }
3634 }
3635 }
3636 debug_assert!(
3637 new_rows
3638 .iter()
3639 .all(|row| row.len() == table.schema.columns.len())
3640 );
3641
3642 let constraint_ctx = self.build_table_constraint_context(table)?;
3643 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3644 let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3645 if let Some(pk) = primary_key_spec {
3646 original_primary_key_keys.reserve(row_count);
3647 for row in &new_rows {
3648 let mut values = Vec::with_capacity(pk.schema_indices.len());
3649 for &idx in &pk.schema_indices {
3650 let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3651 values.push(value);
3652 }
3653 let key = build_composite_unique_key(&values, &pk.column_names)?;
3654 original_primary_key_keys.push(key);
3655 }
3656 }
3657
3658 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3659 table
3660 .schema
3661 .columns
3662 .iter()
3663 .enumerate()
3664 .map(|(idx, column)| (column.field_id, idx)),
3665 );
3666
3667 for (column, value) in prepared {
3668 let column_index =
3669 column_positions
3670 .get(&column.field_id)
3671 .copied()
3672 .ok_or_else(|| {
3673 Error::InvalidArgumentError(format!(
3674 "column '{}' missing in table schema during UPDATE",
3675 column.name
3676 ))
3677 })?;
3678
3679 let values = match value {
3680 PreparedValue::Literal(lit) => vec![lit; row_count],
3681 PreparedValue::Expression { expr_index } => {
3682 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3683 Error::InvalidArgumentError(
3684 "expression assignment value missing during UPDATE".into(),
3685 )
3686 })?;
3687 if column_values.len() != row_count {
3688 return Err(Error::InvalidArgumentError(
3689 "expression result count did not match targeted row count".into(),
3690 ));
3691 }
3692 mem::take(column_values)
3693 }
3694 };
3695
3696 for (row_idx, new_value) in values.into_iter().enumerate() {
3697 if let Some(row) = new_rows.get_mut(row_idx) {
3698 let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3699 row[column_index] = coerced;
3700 }
3701 }
3702 }
3703
3704 let column_names: Vec<String> = table
3705 .schema
3706 .columns
3707 .iter()
3708 .map(|column| column.name.clone())
3709 .collect();
3710 let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3711 self.constraint_service.validate_row_level_constraints(
3712 &constraint_ctx.schema_field_ids,
3713 &constraint_ctx.column_constraints,
3714 &column_order,
3715 &new_rows,
3716 )?;
3717
3718 if let Some(pk) = primary_key_spec {
3719 self.constraint_service.validate_update_primary_keys(
3720 &constraint_ctx.schema_field_ids,
3721 pk,
3722 &column_order,
3723 &new_rows,
3724 &original_primary_key_keys,
3725 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3726 )?;
3727 }
3728
3729 let _ = self.apply_delete(
3730 table,
3731 display_name.clone(),
3732 canonical_name.clone(),
3733 row_ids.clone(),
3734 snapshot,
3735 false,
3736 )?;
3737
3738 let _ = self.insert_rows(
3739 table,
3740 display_name.clone(),
3741 canonical_name,
3742 new_rows,
3743 column_names,
3744 snapshot,
3745 )?;
3746
3747 Ok(RuntimeStatementResult::Update {
3748 table_name: display_name,
3749 rows_updated: row_count,
3750 })
3751 }
3752
3753 fn delete_filtered_rows(
3754 &self,
3755 table: &ExecutorTable<P>,
3756 display_name: String,
3757 canonical_name: String,
3758 filter: LlkvExpr<'static, String>,
3759 snapshot: TransactionSnapshot,
3760 ) -> Result<RuntimeStatementResult<P>> {
3761 let schema = table.schema.as_ref();
3762 let filter_expr = translate_predicate(filter, schema)?;
3763 let row_ids = table.table.filter_row_ids(&filter_expr)?;
3764 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3765 tracing::trace!(
3766 table = %display_name,
3767 rows = row_ids.len(),
3768 "delete_filtered_rows collected row ids"
3769 );
3770 self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3771 }
3772
3773 fn delete_all_rows(
3774 &self,
3775 table: &ExecutorTable<P>,
3776 display_name: String,
3777 canonical_name: String,
3778 snapshot: TransactionSnapshot,
3779 ) -> Result<RuntimeStatementResult<P>> {
3780 let total_rows = table.total_rows.load(Ordering::SeqCst);
3781 if total_rows == 0 {
3782 return Ok(RuntimeStatementResult::Delete {
3783 table_name: display_name,
3784 rows_deleted: 0,
3785 });
3786 }
3787
3788 let anchor_field = table.schema.first_field_id().ok_or_else(|| {
3789 Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
3790 })?;
3791 let filter_expr = full_table_scan_filter(anchor_field);
3792 let row_ids = table.table.filter_row_ids(&filter_expr)?;
3793 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3794 self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3795 }
3796
3797 fn apply_delete(
3798 &self,
3799 table: &ExecutorTable<P>,
3800 display_name: String,
3801 canonical_name: String,
3802 row_ids: Vec<RowId>,
3803 snapshot: TransactionSnapshot,
3804 enforce_foreign_keys: bool,
3805 ) -> Result<RuntimeStatementResult<P>> {
3806 if row_ids.is_empty() {
3807 return Ok(RuntimeStatementResult::Delete {
3808 table_name: display_name,
3809 rows_deleted: 0,
3810 });
3811 }
3812
3813 if enforce_foreign_keys {
3814 self.check_foreign_keys_on_delete(
3815 table,
3816 &display_name,
3817 &canonical_name,
3818 &row_ids,
3819 snapshot,
3820 )?;
3821 }
3822
3823 self.detect_delete_conflicts(table, &display_name, &row_ids, snapshot)?;
3824
3825 let removed = row_ids.len();
3826
3827 let batch = mvcc::build_delete_batch(row_ids.clone(), snapshot.txn_id)?;
3829 table.table.append(&batch)?;
3830
3831 let removed_u64 = u64::try_from(removed)
3832 .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
3833 table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
3834
3835 Ok(RuntimeStatementResult::Delete {
3836 table_name: display_name,
3837 rows_deleted: removed,
3838 })
3839 }
3840
3841 fn check_foreign_keys_on_delete(
3842 &self,
3843 table: &ExecutorTable<P>,
3844 _display_name: &str,
3845 _canonical_name: &str,
3846 row_ids: &[RowId],
3847 snapshot: TransactionSnapshot,
3848 ) -> Result<()> {
3849 if row_ids.is_empty() {
3850 return Ok(());
3851 }
3852
3853 self.constraint_service.validate_delete_foreign_keys(
3854 table.table.table_id(),
3855 row_ids,
3856 |request| {
3857 self.collect_row_values_for_ids(
3858 table,
3859 request.referenced_row_ids,
3860 request.referenced_field_ids,
3861 )
3862 },
3863 |request| {
3864 let child_table = self.lookup_table(request.referencing_table_canonical)?;
3865 self.collect_visible_child_rows(
3866 child_table.as_ref(),
3867 request.referencing_field_ids,
3868 snapshot,
3869 )
3870 },
3871 )
3872 }
3873
3874 fn check_foreign_keys_on_insert(
3875 &self,
3876 table: &ExecutorTable<P>,
3877 _display_name: &str,
3878 rows: &[Vec<PlanValue>],
3879 column_order: &[usize],
3880 snapshot: TransactionSnapshot,
3881 ) -> Result<()> {
3882 if rows.is_empty() {
3883 return Ok(());
3884 }
3885
3886 let schema_field_ids: Vec<FieldId> = table
3887 .schema
3888 .columns
3889 .iter()
3890 .map(|column| column.field_id)
3891 .collect();
3892
3893 self.constraint_service.validate_insert_foreign_keys(
3894 table.table.table_id(),
3895 &schema_field_ids,
3896 column_order,
3897 rows,
3898 |request| {
3899 let parent_table = self.lookup_table(request.referenced_table_canonical)?;
3900 self.scan_multi_column_values(
3901 parent_table.as_ref(),
3902 request.referenced_field_ids,
3903 snapshot,
3904 )
3905 },
3906 )
3907 }
3908
3909 fn detect_delete_conflicts(
3910 &self,
3911 table: &ExecutorTable<P>,
3912 display_name: &str,
3913 row_ids: &[RowId],
3914 snapshot: TransactionSnapshot,
3915 ) -> Result<()> {
3916 if row_ids.is_empty() {
3917 return Ok(());
3918 }
3919
3920 let table_id = table.table.table_id();
3921 let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3922 let logical_fields: Arc<[LogicalFieldId]> = Arc::from([deleted_lfid]);
3923
3924 if let Err(err) = table
3925 .table
3926 .store()
3927 .prepare_gather_context(logical_fields.as_ref())
3928 {
3929 match err {
3930 Error::NotFound => return Ok(()),
3931 other => return Err(other),
3932 }
3933 }
3934
3935 let mut stream = table.table.stream_columns(
3936 Arc::clone(&logical_fields),
3937 row_ids.to_vec(),
3938 GatherNullPolicy::IncludeNulls,
3939 )?;
3940
3941 while let Some(chunk) = stream.next_batch()? {
3942 let batch = chunk.batch();
3943 let window = chunk.row_ids();
3944 let deleted_column = batch
3945 .column(0)
3946 .as_any()
3947 .downcast_ref::<UInt64Array>()
3948 .ok_or_else(|| {
3949 Error::Internal(
3950 "failed to read MVCC deleted_by column for conflict detection".into(),
3951 )
3952 })?;
3953
3954 for (idx, row_id) in window.iter().enumerate() {
3955 let deleted_by = if deleted_column.is_null(idx) {
3956 TXN_ID_NONE
3957 } else {
3958 deleted_column.value(idx)
3959 };
3960
3961 if deleted_by == TXN_ID_NONE || deleted_by == snapshot.txn_id {
3962 continue;
3963 }
3964
3965 let status = self.txn_manager.status(deleted_by);
3966 if !status.is_active() {
3967 continue;
3968 }
3969
3970 tracing::debug!(
3971 "[MVCC] delete conflict: table='{}' row_id={} deleted_by={} status={:?} current_txn={}",
3972 display_name,
3973 row_id,
3974 deleted_by,
3975 status,
3976 snapshot.txn_id
3977 );
3978
3979 return Err(Error::TransactionContextError(format!(
3980 "transaction conflict on table '{}' for row {}: row locked by transaction {} ({:?})",
3981 display_name, row_id, deleted_by, status
3982 )));
3983 }
3984 }
3985
3986 Ok(())
3987 }
3988
3989 fn collect_update_rows(
3990 &self,
3991 table: &ExecutorTable<P>,
3992 filter_expr: &LlkvExpr<'static, FieldId>,
3993 expressions: &[ScalarExpr<FieldId>],
3994 snapshot: TransactionSnapshot,
3995 ) -> Result<(Vec<RowId>, Vec<Vec<PlanValue>>)> {
3996 let row_ids = table.table.filter_row_ids(filter_expr)?;
3997 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3998 if row_ids.is_empty() {
3999 return Ok((row_ids, vec![Vec::new(); expressions.len()]));
4000 }
4001
4002 if expressions.is_empty() {
4003 return Ok((row_ids, Vec::new()));
4004 }
4005
4006 let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
4007 for (idx, expr) in expressions.iter().enumerate() {
4008 let alias = format!("__expr_{idx}");
4009 projections.push(ScanProjection::computed(expr.clone(), alias));
4010 }
4011
4012 let mut expr_values: Vec<Vec<PlanValue>> =
4013 vec![Vec::with_capacity(row_ids.len()); expressions.len()];
4014 let mut error: Option<Error> = None;
4015 let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
4016 Arc::clone(&self.txn_manager),
4017 snapshot,
4018 ));
4019 let options = ScanStreamOptions {
4020 include_nulls: true,
4021 order: None,
4022 row_id_filter: Some(row_filter),
4023 };
4024
4025 table
4026 .table
4027 .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
4028 if error.is_some() {
4029 return;
4030 }
4031 if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
4032 error = Some(err);
4033 }
4034 })?;
4035
4036 if let Some(err) = error {
4037 return Err(err);
4038 }
4039
4040 for values in &expr_values {
4041 if values.len() != row_ids.len() {
4042 return Err(Error::InvalidArgumentError(
4043 "expression result count did not match targeted row count".into(),
4044 ));
4045 }
4046 }
4047
4048 Ok((row_ids, expr_values))
4049 }
4050
4051 fn collect_expression_values(
4052 expr_values: &mut [Vec<PlanValue>],
4053 batch: RecordBatch,
4054 ) -> Result<()> {
4055 for row_idx in 0..batch.num_rows() {
4056 for (expr_index, values) in expr_values.iter_mut().enumerate() {
4057 let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
4058 values.push(value);
4059 }
4060 }
4061
4062 Ok(())
4063 }
4064
4065 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4066 {
4068 let tables = self.tables.read().unwrap();
4069 if let Some(table) = tables.get(canonical_name) {
4070 if self.dropped_tables.read().unwrap().contains(canonical_name) {
4072 return Err(Error::NotFound);
4074 }
4075 tracing::trace!(
4076 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
4077 canonical_name,
4078 table.table.table_id(),
4079 table.schema.columns.len(),
4080 &*self.pager
4081 );
4082 return Ok(Arc::clone(table));
4083 }
4084 } tracing::debug!(
4088 "[LAZY_LOAD] Loading table '{}' from catalog",
4089 canonical_name
4090 );
4091
4092 let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
4094 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4095 })?;
4096
4097 let table_id = catalog_table_id;
4098 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
4099 let store = table.store();
4100 let mut logical_fields = store.user_field_ids_for_table(table_id);
4101 logical_fields.sort_by_key(|lfid| lfid.field_id());
4102 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
4103 let summary = self
4104 .catalog_service
4105 .table_constraint_summary(canonical_name)?;
4106 let TableConstraintSummaryView {
4107 table_meta,
4108 column_metas,
4109 constraint_records,
4110 multi_column_uniques,
4111 } = summary;
4112 let _table_meta = table_meta.ok_or_else(|| {
4113 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4114 })?;
4115 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
4116 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
4117 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
4118 let mut has_primary_key_records = false;
4119 let mut has_single_unique_records = false;
4120
4121 for record in constraint_records
4122 .iter()
4123 .filter(|record| record.is_active())
4124 {
4125 match &record.kind {
4126 ConstraintKind::PrimaryKey(pk) => {
4127 has_primary_key_records = true;
4128 for field_id in &pk.field_ids {
4129 metadata_primary_keys.insert(*field_id);
4130 metadata_unique_fields.insert(*field_id);
4131 }
4132 }
4133 ConstraintKind::Unique(unique) => {
4134 if unique.field_ids.len() == 1 {
4135 has_single_unique_records = true;
4136 metadata_unique_fields.insert(unique.field_ids[0]);
4137 }
4138 }
4139 _ => {}
4140 }
4141 }
4142
4143 let mut executor_columns = Vec::new();
4145 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
4146
4147 for (idx, lfid) in logical_fields.iter().enumerate() {
4148 let field_id = lfid.field_id();
4149 let normalized_index = executor_columns.len();
4150
4151 let column_name = column_metas
4152 .get(idx)
4153 .and_then(|meta| meta.as_ref())
4154 .and_then(|meta| meta.name.clone())
4155 .unwrap_or_else(|| format!("col_{}", field_id));
4156
4157 let normalized = column_name.to_ascii_lowercase();
4158 lookup.insert(normalized, normalized_index);
4159
4160 let fallback_constraints: FieldConstraints = catalog_field_resolver
4161 .as_ref()
4162 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
4163 .unwrap_or_default();
4164
4165 let metadata_primary = metadata_primary_keys.contains(&field_id);
4166 let primary_key = if has_primary_key_records {
4167 metadata_primary
4168 } else {
4169 fallback_constraints.primary_key
4170 };
4171
4172 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
4173 let unique = if has_primary_key_records || has_single_unique_records {
4174 metadata_unique
4175 } else {
4176 fallback_constraints.primary_key || fallback_constraints.unique
4177 };
4178
4179 let data_type = store.data_type(*lfid)?;
4180 let nullable = !primary_key;
4181
4182 executor_columns.push(ExecutorColumn {
4183 name: column_name,
4184 data_type,
4185 nullable,
4186 primary_key,
4187 unique,
4188 field_id,
4189 check_expr: fallback_constraints.check_expr.clone(),
4190 });
4191 }
4192
4193 let exec_schema = Arc::new(ExecutorSchema {
4194 columns: executor_columns,
4195 lookup,
4196 });
4197
4198 let max_row_id = {
4200 use arrow::array::UInt64Array;
4201 use llkv_column_map::store::rowid_fid;
4202 use llkv_column_map::store::scan::{
4203 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
4204 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
4205 };
4206
4207 struct MaxRowIdVisitor {
4208 max: RowId,
4209 }
4210
4211 impl PrimitiveVisitor for MaxRowIdVisitor {
4212 fn u64_chunk(&mut self, values: &UInt64Array) {
4213 for i in 0..values.len() {
4214 let val = values.value(i);
4215 if val > self.max {
4216 self.max = val;
4217 }
4218 }
4219 }
4220 }
4221
4222 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
4223 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
4224 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
4225
4226 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
4228 let mut visitor = MaxRowIdVisitor { max: 0 };
4229
4230 match ScanBuilder::new(table.store(), row_id_field)
4231 .options(ScanOptions::default())
4232 .run(&mut visitor)
4233 {
4234 Ok(_) => visitor.max,
4235 Err(llkv_result::Error::NotFound) => 0,
4236 Err(e) => {
4237 tracing::warn!(
4238 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
4239 canonical_name,
4240 e
4241 );
4242 0
4243 }
4244 }
4245 };
4246
4247 let next_row_id = if max_row_id > 0 {
4248 max_row_id.saturating_add(1)
4249 } else {
4250 0
4251 };
4252
4253 let total_rows = table.total_rows().unwrap_or(0);
4257
4258 let executor_table = Arc::new(ExecutorTable {
4259 table: Arc::new(table),
4260 schema: exec_schema,
4261 next_row_id: AtomicU64::new(next_row_id),
4262 total_rows: AtomicU64::new(total_rows),
4263 multi_column_uniques: RwLock::new(Vec::new()),
4264 });
4265
4266 if !multi_column_uniques.is_empty() {
4267 let executor_uniques =
4268 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
4269 executor_table.set_multi_column_uniques(executor_uniques);
4270 }
4271
4272 {
4274 let mut tables = self.tables.write().unwrap();
4275 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
4276 }
4277
4278 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
4280 for col in &executor_table.schema.columns {
4281 let definition = FieldDefinition::new(&col.name)
4282 .with_primary_key(col.primary_key)
4283 .with_unique(col.unique)
4284 .with_check_expr(col.check_expr.clone());
4285 let _ = field_resolver.register_field(definition); }
4287 tracing::debug!(
4288 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
4289 executor_table.schema.columns.len(),
4290 canonical_name
4291 );
4292 }
4293
4294 tracing::debug!(
4295 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
4296 canonical_name,
4297 table_id,
4298 field_ids.len(),
4299 next_row_id
4300 );
4301
4302 Ok(executor_table)
4303 }
4304
4305 fn remove_table_entry(&self, canonical_name: &str) {
4306 let mut tables = self.tables.write().unwrap();
4307 if tables.remove(canonical_name).is_some() {
4308 tracing::trace!(
4309 "remove_table_entry: removed table '{}' from context cache",
4310 canonical_name
4311 );
4312 }
4313 }
4314
4315 pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
4316 let (display_name, canonical_name) = canonical_table_name(name)?;
4317 let (table_id, column_field_ids) = {
4318 let tables = self.tables.read().unwrap();
4319 let Some(entry) = tables.get(&canonical_name) else {
4320 if if_exists {
4321 return Ok(());
4322 } else {
4323 return Err(Error::CatalogError(format!(
4324 "Catalog Error: Table '{}' does not exist",
4325 display_name
4326 )));
4327 }
4328 };
4329
4330 let field_ids = entry
4331 .schema
4332 .columns
4333 .iter()
4334 .map(|col| col.field_id)
4335 .collect::<Vec<_>>();
4336 (entry.table.table_id(), field_ids)
4337 };
4338
4339 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
4340
4341 for detail in referencing {
4342 if detail.referencing_table_canonical == canonical_name {
4343 continue;
4344 }
4345
4346 if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
4347 continue;
4348 }
4349
4350 let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
4351 return Err(Error::ConstraintError(format!(
4352 "Cannot drop table '{}' because it is referenced by foreign key constraint '{}' on table '{}'",
4353 display_name, constraint_label, detail.referencing_table_display
4354 )));
4355 }
4356
4357 self.catalog_service
4358 .drop_table(&canonical_name, table_id, &column_field_ids)?;
4359 tracing::debug!(
4360 "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
4361 canonical_name,
4362 table_id
4363 );
4364
4365 self.dropped_tables
4366 .write()
4367 .unwrap()
4368 .insert(canonical_name.clone());
4369 Ok(())
4370 }
4371
4372 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
4373 self.dropped_tables.read().unwrap().contains(canonical_name)
4374 }
4375}
4376
4377impl<P> TransactionContext for RuntimeContextWrapper<P>
4379where
4380 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4381{
4382 type Pager = P;
4383
4384 fn set_snapshot(&self, snapshot: TransactionSnapshot) {
4385 self.update_snapshot(snapshot);
4386 }
4387
4388 fn snapshot(&self) -> TransactionSnapshot {
4389 self.current_snapshot()
4390 }
4391
4392 fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
4393 RuntimeContext::table_column_specs(self.context(), table_name)
4394 }
4395
4396 fn export_table_rows(
4397 &self,
4398 table_name: &str,
4399 ) -> llkv_result::Result<llkv_transaction::RowBatch> {
4400 let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
4401 Ok(llkv_transaction::RowBatch {
4403 columns: batch.columns,
4404 rows: batch.rows,
4405 })
4406 }
4407
4408 fn get_batches_with_row_ids(
4409 &self,
4410 table_name: &str,
4411 filter: Option<LlkvExpr<'static, String>>,
4412 ) -> llkv_result::Result<Vec<RecordBatch>> {
4413 RuntimeContext::get_batches_with_row_ids_with_snapshot(
4414 self.context(),
4415 table_name,
4416 filter,
4417 self.snapshot(),
4418 )
4419 }
4420
4421 fn execute_select(
4422 &self,
4423 plan: SelectPlan,
4424 ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
4425 RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
4426 }
4427
4428 fn create_table_plan(
4429 &self,
4430 plan: CreateTablePlan,
4431 ) -> llkv_result::Result<TransactionResult<P>> {
4432 let result = RuntimeContext::create_table_plan(self.context(), plan)?;
4433 Ok(convert_statement_result(result))
4434 }
4435
4436 fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
4437 tracing::trace!(
4438 "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
4439 plan.table,
4440 &*self.ctx.pager
4441 );
4442 let snapshot = self.current_snapshot();
4443 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4444 self.ctx().insert(plan)?
4445 } else {
4446 RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
4447 };
4448 Ok(convert_statement_result(result))
4449 }
4450
4451 fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
4452 let snapshot = self.current_snapshot();
4453 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4454 self.ctx().update(plan)?
4455 } else {
4456 RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
4457 };
4458 Ok(convert_statement_result(result))
4459 }
4460
4461 fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
4462 let snapshot = self.current_snapshot();
4463 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4464 self.ctx().delete(plan)?
4465 } else {
4466 RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
4467 };
4468 Ok(convert_statement_result(result))
4469 }
4470
4471 fn create_index(&self, plan: CreateIndexPlan) -> llkv_result::Result<TransactionResult<P>> {
4472 let result = RuntimeContext::create_index(self.context(), plan)?;
4473 Ok(convert_statement_result(result))
4474 }
4475
4476 fn append_batches_with_row_ids(
4477 &self,
4478 table_name: &str,
4479 batches: Vec<RecordBatch>,
4480 ) -> llkv_result::Result<usize> {
4481 RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
4482 }
4483
4484 fn table_names(&self) -> Vec<String> {
4485 RuntimeContext::table_names(self.context())
4486 }
4487
4488 fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
4489 let ctx = self.context();
4492 if ctx.is_table_marked_dropped(table_name) {
4493 return Err(Error::InvalidArgumentError(format!(
4494 "table '{}' has been dropped",
4495 table_name
4496 )));
4497 }
4498
4499 let table = ctx.lookup_table(table_name)?;
4500 Ok(table.table.table_id())
4501 }
4502
4503 fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
4504 let ctx = self.context();
4505 ctx.catalog.snapshot()
4506 }
4507
4508 fn validate_commit_constraints(&self, txn_id: TxnId) -> llkv_result::Result<()> {
4509 self.ctx.validate_primary_keys_for_commit(txn_id)
4510 }
4511
4512 fn clear_transaction_state(&self, txn_id: TxnId) {
4513 self.ctx.clear_transaction_state(txn_id);
4514 }
4515}
4516
4517fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
4519where
4520 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4521{
4522 use llkv_transaction::TransactionResult as TxResult;
4523 match result {
4524 RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
4525 RuntimeStatementResult::CreateIndex {
4526 table_name,
4527 index_name,
4528 } => TxResult::CreateIndex {
4529 table_name,
4530 index_name,
4531 },
4532 RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
4533 RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
4534 rows_matched: rows_updated,
4535 rows_updated,
4536 },
4537 RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
4538 RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
4539 _ => panic!("unsupported StatementResult conversion"),
4540 }
4541}
4542
4543fn filter_row_ids_for_snapshot<P>(
4544 table: &Table<P>,
4545 row_ids: Vec<RowId>,
4546 txn_manager: &TxnIdManager,
4547 snapshot: TransactionSnapshot,
4548) -> Result<Vec<RowId>>
4549where
4550 P: Pager<Blob = EntryHandle> + Send + Sync,
4551{
4552 tracing::debug!(
4553 "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
4554 row_ids.len(),
4555 snapshot.txn_id,
4556 snapshot.snapshot_id
4557 );
4558
4559 if row_ids.is_empty() {
4560 return Ok(row_ids);
4561 }
4562
4563 let table_id = table.table_id();
4564 let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
4565 let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
4566 let logical_fields: Arc<[LogicalFieldId]> = Arc::from([created_lfid, deleted_lfid]);
4567
4568 if let Err(err) = table
4569 .store()
4570 .prepare_gather_context(logical_fields.as_ref())
4571 {
4572 match err {
4573 Error::NotFound => {
4574 tracing::trace!(
4575 "[FILTER_ROWS] MVCC columns not found for table_id={}, treating all rows as visible",
4576 table_id
4577 );
4578 return Ok(row_ids);
4579 }
4580 other => {
4581 tracing::error!(
4582 "[FILTER_ROWS] Failed to prepare gather context: {:?}",
4583 other
4584 );
4585 return Err(other);
4586 }
4587 }
4588 }
4589
4590 let total_rows = row_ids.len();
4591 let mut stream = match table.stream_columns(
4592 Arc::clone(&logical_fields),
4593 row_ids,
4594 GatherNullPolicy::IncludeNulls,
4595 ) {
4596 Ok(stream) => stream,
4597 Err(err) => {
4598 tracing::error!("[FILTER_ROWS] stream_columns error: {:?}", err);
4599 return Err(err);
4600 }
4601 };
4602
4603 let mut visible = Vec::with_capacity(total_rows);
4604
4605 while let Some(chunk) = stream.next_batch()? {
4606 let batch = chunk.batch();
4607 let window = chunk.row_ids();
4608
4609 if batch.num_columns() < 2 {
4610 tracing::debug!(
4611 "[FILTER_ROWS] version_batch has < 2 columns for table_id={}, returning window rows unfiltered",
4612 table_id
4613 );
4614 visible.extend_from_slice(window);
4615 continue;
4616 }
4617
4618 let created_column = batch.column(0).as_any().downcast_ref::<UInt64Array>();
4619 let deleted_column = batch.column(1).as_any().downcast_ref::<UInt64Array>();
4620
4621 if created_column.is_none() || deleted_column.is_none() {
4622 tracing::debug!(
4623 "[FILTER_ROWS] Failed to downcast MVCC columns for table_id={}, returning window rows unfiltered",
4624 table_id
4625 );
4626 visible.extend_from_slice(window);
4627 continue;
4628 }
4629
4630 let created_column = created_column.unwrap();
4631 let deleted_column = deleted_column.unwrap();
4632
4633 for (idx, row_id) in window.iter().enumerate() {
4634 let created_by = if created_column.is_null(idx) {
4635 TXN_ID_AUTO_COMMIT
4636 } else {
4637 created_column.value(idx)
4638 };
4639 let deleted_by = if deleted_column.is_null(idx) {
4640 TXN_ID_NONE
4641 } else {
4642 deleted_column.value(idx)
4643 };
4644
4645 let version = RowVersion {
4646 created_by,
4647 deleted_by,
4648 };
4649 let is_visible = version.is_visible_for(txn_manager, snapshot);
4650 tracing::trace!(
4651 "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
4652 row_id,
4653 created_by,
4654 deleted_by,
4655 is_visible
4656 );
4657 if is_visible {
4658 visible.push(*row_id);
4659 }
4660 }
4661 }
4662
4663 tracing::debug!(
4664 "[FILTER_ROWS] Filtered from {} to {} visible rows",
4665 total_rows,
4666 visible.len()
4667 );
4668 Ok(visible)
4669}
4670
4671struct MvccRowIdFilter<P>
4672where
4673 P: Pager<Blob = EntryHandle> + Send + Sync,
4674{
4675 txn_manager: Arc<TxnIdManager>,
4676 snapshot: TransactionSnapshot,
4677 _marker: PhantomData<fn(P)>,
4678}
4679
4680impl<P> MvccRowIdFilter<P>
4681where
4682 P: Pager<Blob = EntryHandle> + Send + Sync,
4683{
4684 fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
4685 Self {
4686 txn_manager,
4687 snapshot,
4688 _marker: PhantomData,
4689 }
4690 }
4691}
4692
4693impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
4694where
4695 P: Pager<Blob = EntryHandle> + Send + Sync,
4696{
4697 fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> Result<Vec<RowId>> {
4698 tracing::trace!(
4699 "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
4700 row_ids,
4701 self.snapshot.txn_id,
4702 self.snapshot.snapshot_id
4703 );
4704 let result = filter_row_ids_for_snapshot(table, row_ids, &self.txn_manager, self.snapshot);
4705 if let Ok(ref visible) = result {
4706 tracing::trace!(
4707 "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
4708 visible
4709 );
4710 }
4711 result
4712 }
4713}
4714
4715struct ContextProvider<P>
4717where
4718 P: Pager<Blob = EntryHandle> + Send + Sync,
4719{
4720 context: Arc<RuntimeContext<P>>,
4721}
4722
4723impl<P> TableProvider<P> for ContextProvider<P>
4724where
4725 P: Pager<Blob = EntryHandle> + Send + Sync,
4726{
4727 fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4728 self.context.lookup_table(canonical_name)
4729 }
4730}
4731
4732pub struct RuntimeLazyFrame<P>
4734where
4735 P: Pager<Blob = EntryHandle> + Send + Sync,
4736{
4737 context: Arc<RuntimeContext<P>>,
4738 plan: SelectPlan,
4739}
4740
4741impl<P> RuntimeLazyFrame<P>
4742where
4743 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4744{
4745 pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
4746 let (display, canonical) = canonical_table_name(table)?;
4747 context.lookup_table(&canonical)?;
4748 Ok(Self {
4749 context,
4750 plan: SelectPlan::new(display),
4751 })
4752 }
4753
4754 pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
4755 self.plan.filter = Some(predicate);
4756 self
4757 }
4758
4759 pub fn select_all(mut self) -> Self {
4760 self.plan.projections = vec![SelectProjection::AllColumns];
4761 self
4762 }
4763
4764 pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
4765 where
4766 S: AsRef<str>,
4767 {
4768 self.plan.projections = columns
4769 .into_iter()
4770 .map(|name| SelectProjection::Column {
4771 name: name.as_ref().to_string(),
4772 alias: None,
4773 })
4774 .collect();
4775 self
4776 }
4777
4778 pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
4779 self.plan.projections = projections;
4780 self
4781 }
4782
4783 pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
4784 self.plan.aggregates = aggregates;
4785 self
4786 }
4787
4788 pub fn collect(self) -> Result<SelectExecution<P>> {
4789 self.context.execute_select(self.plan)
4790 }
4791
4792 pub fn collect_rows(self) -> Result<RowBatch> {
4793 let execution = self.context.execute_select(self.plan)?;
4794 execution.collect_rows()
4795 }
4796
4797 pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
4798 Ok(self.collect_rows()?.rows)
4799 }
4800}
4801
4802fn current_time_micros() -> u64 {
4803 SystemTime::now()
4804 .duration_since(UNIX_EPOCH)
4805 .unwrap_or_default()
4806 .as_micros() as u64
4807}
4808
4809pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
4810 if columns.is_empty() {
4811 return Ok((0..schema.columns.len()).collect());
4812 }
4813 let mut resolved = Vec::with_capacity(columns.len());
4814 for column in columns {
4815 let normalized = column.to_ascii_lowercase();
4816 let index = schema.lookup.get(&normalized).ok_or_else(|| {
4817 Error::InvalidArgumentError(format!(
4818 "Binder Error: does not have a column named '{}'",
4819 column
4820 ))
4821 })?;
4822 resolved.push(*index);
4823 }
4824 Ok(resolved)
4825}
4826
4827fn normalize_insert_value_for_column(
4828 column: &ExecutorColumn,
4829 value: PlanValue,
4830) -> Result<PlanValue> {
4831 match (&column.data_type, value) {
4832 (_, PlanValue::Null) => Ok(PlanValue::Null),
4833 (DataType::Int64, PlanValue::Integer(v)) => Ok(PlanValue::Integer(v)),
4834 (DataType::Int64, PlanValue::Float(v)) => Ok(PlanValue::Integer(v as i64)),
4835 (DataType::Int64, other) => Err(Error::InvalidArgumentError(format!(
4836 "cannot insert {other:?} into INT column '{}'",
4837 column.name
4838 ))),
4839 (DataType::Boolean, PlanValue::Integer(v)) => {
4840 Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 }))
4841 }
4842 (DataType::Boolean, PlanValue::Float(v)) => {
4843 Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 }))
4844 }
4845 (DataType::Boolean, PlanValue::String(s)) => {
4846 let normalized = s.trim().to_ascii_lowercase();
4847 let value = match normalized.as_str() {
4848 "true" | "t" | "1" => 1,
4849 "false" | "f" | "0" => 0,
4850 _ => {
4851 return Err(Error::InvalidArgumentError(format!(
4852 "cannot insert string '{}' into BOOLEAN column '{}'",
4853 s, column.name
4854 )));
4855 }
4856 };
4857 Ok(PlanValue::Integer(value))
4858 }
4859 (DataType::Boolean, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4860 "cannot insert struct into BOOLEAN column '{}'",
4861 column.name
4862 ))),
4863 (DataType::Float64, PlanValue::Integer(v)) => Ok(PlanValue::Float(v as f64)),
4864 (DataType::Float64, PlanValue::Float(v)) => Ok(PlanValue::Float(v)),
4865 (DataType::Float64, other) => Err(Error::InvalidArgumentError(format!(
4866 "cannot insert {other:?} into DOUBLE column '{}'",
4867 column.name
4868 ))),
4869 (DataType::Utf8, PlanValue::Integer(v)) => Ok(PlanValue::String(v.to_string())),
4870 (DataType::Utf8, PlanValue::Float(v)) => Ok(PlanValue::String(v.to_string())),
4871 (DataType::Utf8, PlanValue::String(s)) => Ok(PlanValue::String(s)),
4872 (DataType::Utf8, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4873 "cannot insert struct into STRING column '{}'",
4874 column.name
4875 ))),
4876 (DataType::Date32, PlanValue::Integer(days)) => {
4877 let casted = i32::try_from(days).map_err(|_| {
4878 Error::InvalidArgumentError(format!(
4879 "integer literal out of range for DATE column '{}'",
4880 column.name
4881 ))
4882 })?;
4883 Ok(PlanValue::Integer(casted as i64))
4884 }
4885 (DataType::Date32, PlanValue::String(text)) => {
4886 let days = parse_date32_literal(&text)?;
4887 Ok(PlanValue::Integer(days as i64))
4888 }
4889 (DataType::Date32, other) => Err(Error::InvalidArgumentError(format!(
4890 "cannot insert {other:?} into DATE column '{}'",
4891 column.name
4892 ))),
4893 (DataType::Struct(_), PlanValue::Struct(map)) => Ok(PlanValue::Struct(map)),
4894 (DataType::Struct(_), other) => Err(Error::InvalidArgumentError(format!(
4895 "expected struct value for struct column '{}', got {other:?}",
4896 column.name
4897 ))),
4898 (other_type, other_value) => Err(Error::InvalidArgumentError(format!(
4899 "unsupported Arrow data type {:?} for INSERT value {:?} in column '{}'",
4900 other_type, other_value, column.name
4901 ))),
4902 }
4903}
4904
4905pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
4906 match dtype {
4907 DataType::Int64 => {
4908 let mut builder = Int64Builder::with_capacity(values.len());
4909 for value in values {
4910 match value {
4911 PlanValue::Null => builder.append_null(),
4912 PlanValue::Integer(v) => builder.append_value(*v),
4913 PlanValue::Float(v) => builder.append_value(*v as i64),
4914 PlanValue::String(_) | PlanValue::Struct(_) => {
4915 return Err(Error::InvalidArgumentError(
4916 "cannot insert non-integer into INT column".into(),
4917 ));
4918 }
4919 }
4920 }
4921 Ok(Arc::new(builder.finish()))
4922 }
4923 DataType::Boolean => {
4924 let mut builder = BooleanBuilder::with_capacity(values.len());
4925 for value in values {
4926 match value {
4927 PlanValue::Null => builder.append_null(),
4928 PlanValue::Integer(v) => builder.append_value(*v != 0),
4929 PlanValue::Float(v) => builder.append_value(*v != 0.0),
4930 PlanValue::String(s) => {
4931 let normalized = s.trim().to_ascii_lowercase();
4932 match normalized.as_str() {
4933 "true" | "t" | "1" => builder.append_value(true),
4934 "false" | "f" | "0" => builder.append_value(false),
4935 _ => {
4936 return Err(Error::InvalidArgumentError(format!(
4937 "cannot insert string '{}' into BOOLEAN column",
4938 s
4939 )));
4940 }
4941 }
4942 }
4943 PlanValue::Struct(_) => {
4944 return Err(Error::InvalidArgumentError(
4945 "cannot insert struct into BOOLEAN column".into(),
4946 ));
4947 }
4948 }
4949 }
4950 Ok(Arc::new(builder.finish()))
4951 }
4952 DataType::Float64 => {
4953 let mut builder = Float64Builder::with_capacity(values.len());
4954 for value in values {
4955 match value {
4956 PlanValue::Null => builder.append_null(),
4957 PlanValue::Integer(v) => builder.append_value(*v as f64),
4958 PlanValue::Float(v) => builder.append_value(*v),
4959 PlanValue::String(_) | PlanValue::Struct(_) => {
4960 return Err(Error::InvalidArgumentError(
4961 "cannot insert non-numeric into DOUBLE column".into(),
4962 ));
4963 }
4964 }
4965 }
4966 Ok(Arc::new(builder.finish()))
4967 }
4968 DataType::Utf8 => {
4969 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
4970 for value in values {
4971 match value {
4972 PlanValue::Null => builder.append_null(),
4973 PlanValue::Integer(v) => builder.append_value(v.to_string()),
4974 PlanValue::Float(v) => builder.append_value(v.to_string()),
4975 PlanValue::String(s) => builder.append_value(s),
4976 PlanValue::Struct(_) => {
4977 return Err(Error::InvalidArgumentError(
4978 "cannot insert struct into STRING column".into(),
4979 ));
4980 }
4981 }
4982 }
4983 Ok(Arc::new(builder.finish()))
4984 }
4985 DataType::Date32 => {
4986 let mut builder = Date32Builder::with_capacity(values.len());
4987 for value in values {
4988 match value {
4989 PlanValue::Null => builder.append_null(),
4990 PlanValue::Integer(days) => {
4991 let casted = i32::try_from(*days).map_err(|_| {
4992 Error::InvalidArgumentError(
4993 "integer literal out of range for DATE column".into(),
4994 )
4995 })?;
4996 builder.append_value(casted);
4997 }
4998 PlanValue::Float(_) | PlanValue::Struct(_) => {
4999 return Err(Error::InvalidArgumentError(
5000 "cannot insert non-date value into DATE column".into(),
5001 ));
5002 }
5003 PlanValue::String(text) => {
5004 let days = parse_date32_literal(text)?;
5005 builder.append_value(days);
5006 }
5007 }
5008 }
5009 Ok(Arc::new(builder.finish()))
5010 }
5011 DataType::Struct(fields) => {
5012 use arrow::array::StructArray;
5013 let mut field_arrays: Vec<(FieldRef, ArrayRef)> = Vec::with_capacity(fields.len());
5014
5015 for field in fields.iter() {
5016 let field_name = field.name();
5017 let field_type = field.data_type();
5018 let mut field_values = Vec::with_capacity(values.len());
5019
5020 for value in values {
5021 match value {
5022 PlanValue::Null => field_values.push(PlanValue::Null),
5023 PlanValue::Struct(map) => {
5024 let field_value =
5025 map.get(field_name).cloned().unwrap_or(PlanValue::Null);
5026 field_values.push(field_value);
5027 }
5028 _ => {
5029 return Err(Error::InvalidArgumentError(format!(
5030 "expected struct value for struct column, got {:?}",
5031 value
5032 )));
5033 }
5034 }
5035 }
5036
5037 let field_array = build_array_for_column(field_type, &field_values)?;
5038 field_arrays.push((Arc::clone(field), field_array));
5039 }
5040
5041 Ok(Arc::new(StructArray::from(field_arrays)))
5042 }
5043 other => Err(Error::InvalidArgumentError(format!(
5044 "unsupported Arrow data type for INSERT: {other:?}"
5045 ))),
5046 }
5047}
5048
5049fn parse_date32_literal(text: &str) -> Result<i32> {
5050 let mut parts = text.split('-');
5051 let year_str = parts
5052 .next()
5053 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5054 let month_str = parts
5055 .next()
5056 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5057 let day_str = parts
5058 .next()
5059 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5060 if parts.next().is_some() {
5061 return Err(Error::InvalidArgumentError(format!(
5062 "invalid DATE literal '{text}'"
5063 )));
5064 }
5065
5066 let year = year_str.parse::<i32>().map_err(|_| {
5067 Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
5068 })?;
5069 let month_num = month_str.parse::<u8>().map_err(|_| {
5070 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5071 })?;
5072 let day = day_str.parse::<u8>().map_err(|_| {
5073 Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
5074 })?;
5075
5076 let month = Month::try_from(month_num).map_err(|_| {
5077 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5078 })?;
5079
5080 let date = Date::from_calendar_date(year, month, day).map_err(|err| {
5081 Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
5082 })?;
5083 let days = date.to_julian_day() - epoch_julian_day();
5084 Ok(days)
5085}
5086
5087fn epoch_julian_day() -> i32 {
5088 Date::from_calendar_date(1970, Month::January, 1)
5089 .expect("1970-01-01 is a valid date")
5090 .to_julian_day()
5091}
5092
5093fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
5096 LlkvExpr::Pred(Filter {
5097 field_id,
5098 op: Operator::Range {
5099 lower: Bound::Unbounded,
5100 upper: Bound::Unbounded,
5101 },
5102 })
5103}
5104
5105fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
5106 if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
5107 return Ok(ROW_ID_FIELD_ID);
5108 }
5109
5110 schema
5111 .resolve(name)
5112 .map(|column| column.field_id)
5113 .ok_or_else(|| {
5114 Error::InvalidArgumentError(format!(
5115 "Binder Error: does not have a column named '{name}'"
5116 ))
5117 })
5118}
5119
5120fn translate_predicate(
5121 expr: LlkvExpr<'static, String>,
5122 schema: &ExecutorSchema,
5123) -> Result<LlkvExpr<'static, FieldId>> {
5124 match expr {
5125 LlkvExpr::And(list) => {
5126 let mut converted = Vec::with_capacity(list.len());
5127 for item in list {
5128 converted.push(translate_predicate(item, schema)?);
5129 }
5130 Ok(LlkvExpr::And(converted))
5131 }
5132 LlkvExpr::Or(list) => {
5133 let mut converted = Vec::with_capacity(list.len());
5134 for item in list {
5135 converted.push(translate_predicate(item, schema)?);
5136 }
5137 Ok(LlkvExpr::Or(converted))
5138 }
5139 LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
5140 *inner, schema,
5141 )?))),
5142 LlkvExpr::Pred(Filter { field_id, op }) => {
5143 let resolved = resolve_field_id_from_schema(schema, &field_id)?;
5144 Ok(LlkvExpr::Pred(Filter {
5145 field_id: resolved,
5146 op,
5147 }))
5148 }
5149 LlkvExpr::Compare { left, op, right } => {
5150 let left = translate_scalar(&left, schema)?;
5151 let right = translate_scalar(&right, schema)?;
5152 Ok(LlkvExpr::Compare { left, op, right })
5153 }
5154 }
5155}
5156
5157fn translate_scalar(
5158 expr: &ScalarExpr<String>,
5159 schema: &ExecutorSchema,
5160) -> Result<ScalarExpr<FieldId>> {
5161 match expr {
5162 ScalarExpr::Column(name) => {
5163 let field_id = resolve_field_id_from_schema(schema, name)?;
5164 Ok(ScalarExpr::column(field_id))
5165 }
5166 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
5167 ScalarExpr::Binary { left, op, right } => {
5168 let left_expr = translate_scalar(left, schema)?;
5169 let right_expr = translate_scalar(right, schema)?;
5170 Ok(ScalarExpr::Binary {
5171 left: Box::new(left_expr),
5172 op: *op,
5173 right: Box::new(right_expr),
5174 })
5175 }
5176 ScalarExpr::Aggregate(agg) => {
5177 use llkv_expr::expr::AggregateCall;
5179 let translated_agg = match agg {
5180 AggregateCall::CountStar => AggregateCall::CountStar,
5181 AggregateCall::Count(name) => {
5182 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5183 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5184 })?;
5185 AggregateCall::Count(field_id)
5186 }
5187 AggregateCall::Sum(name) => {
5188 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5189 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5190 })?;
5191 AggregateCall::Sum(field_id)
5192 }
5193 AggregateCall::Min(name) => {
5194 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5195 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5196 })?;
5197 AggregateCall::Min(field_id)
5198 }
5199 AggregateCall::Max(name) => {
5200 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5201 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5202 })?;
5203 AggregateCall::Max(field_id)
5204 }
5205 AggregateCall::CountNulls(name) => {
5206 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5207 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5208 })?;
5209 AggregateCall::CountNulls(field_id)
5210 }
5211 };
5212 Ok(ScalarExpr::Aggregate(translated_agg))
5213 }
5214 ScalarExpr::GetField { base, field_name } => {
5215 let base_expr = translate_scalar(base, schema)?;
5216 Ok(ScalarExpr::GetField {
5217 base: Box::new(base_expr),
5218 field_name: field_name.clone(),
5219 })
5220 }
5221 }
5222}
5223
5224fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
5225 match expr {
5226 SqlExpr::Value(value) => plan_value_from_sql_value(value),
5227 SqlExpr::UnaryOp {
5228 op: UnaryOperator::Minus,
5229 expr,
5230 } => match plan_value_from_sql_expr(expr)? {
5231 PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
5232 PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
5233 PlanValue::Null | PlanValue::String(_) | PlanValue::Struct(_) => Err(
5234 Error::InvalidArgumentError("cannot negate non-numeric literal".into()),
5235 ),
5236 },
5237 SqlExpr::UnaryOp {
5238 op: UnaryOperator::Plus,
5239 expr,
5240 } => plan_value_from_sql_expr(expr),
5241 SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
5242 SqlExpr::Dictionary(fields) => {
5243 let mut map = std::collections::HashMap::new();
5244 for field in fields {
5245 let key = field.key.value.clone();
5246 let value = plan_value_from_sql_expr(&field.value)?;
5247 map.insert(key, value);
5248 }
5249 Ok(PlanValue::Struct(map))
5250 }
5251 other => Err(Error::InvalidArgumentError(format!(
5252 "unsupported literal expression: {other:?}"
5253 ))),
5254 }
5255}
5256
5257fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
5258 match &value.value {
5259 Value::Null => Ok(PlanValue::Null),
5260 Value::Number(text, _) => {
5261 if text.contains(['.', 'e', 'E']) {
5262 let parsed = text.parse::<f64>().map_err(|err| {
5263 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5264 })?;
5265 Ok(PlanValue::Float(parsed))
5266 } else {
5267 let parsed = text.parse::<i64>().map_err(|err| {
5268 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5269 })?;
5270 Ok(PlanValue::Integer(parsed))
5271 }
5272 }
5273 Value::Boolean(_) => Err(Error::InvalidArgumentError(
5274 "BOOLEAN literals are not supported yet".into(),
5275 )),
5276 other => {
5277 if let Some(text) = other.clone().into_string() {
5278 Ok(PlanValue::String(text))
5279 } else {
5280 Err(Error::InvalidArgumentError(format!(
5281 "unsupported literal: {other:?}"
5282 )))
5283 }
5284 }
5285 }
5286}
5287
5288fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5289 matches!(
5290 expr,
5291 GroupByExpr::Expressions(exprs, modifiers)
5292 if exprs.is_empty() && modifiers.is_empty()
5293 )
5294}
5295
5296#[derive(Clone)]
5297pub struct RuntimeRangeSelectRows {
5298 rows: Vec<Vec<PlanValue>>,
5299}
5300
5301impl RuntimeRangeSelectRows {
5302 pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
5303 self.rows
5304 }
5305}
5306
5307#[derive(Clone)]
5308enum RangeProjection {
5309 Column,
5310 Literal(PlanValue),
5311}
5312
5313#[derive(Clone)]
5314pub struct RuntimeRangeSpec {
5315 start: i64,
5316 #[allow(dead_code)] end: i64,
5318 row_count: usize,
5319 column_name_lower: String,
5320 table_alias_lower: Option<String>,
5321}
5322
5323impl RuntimeRangeSpec {
5324 fn matches_identifier(&self, ident: &str) -> bool {
5325 let lower = ident.to_ascii_lowercase();
5326 lower == self.column_name_lower || lower == "range"
5327 }
5328
5329 fn matches_table_alias(&self, ident: &str) -> bool {
5330 let lower = ident.to_ascii_lowercase();
5331 match &self.table_alias_lower {
5332 Some(alias) => lower == *alias,
5333 None => lower == "range",
5334 }
5335 }
5336
5337 fn matches_object_name(&self, name: &ObjectName) -> bool {
5338 if name.0.len() != 1 {
5339 return false;
5340 }
5341 match &name.0[0] {
5342 ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
5343 _ => false,
5344 }
5345 }
5346}
5347
5348pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
5349 let spec = match parse_range_spec(select)? {
5350 Some(spec) => spec,
5351 None => return Ok(None),
5352 };
5353
5354 if select.selection.is_some() {
5355 return Err(Error::InvalidArgumentError(
5356 "WHERE clauses are not supported for range() SELECT statements".into(),
5357 ));
5358 }
5359 if select.having.is_some()
5360 || !select.named_window.is_empty()
5361 || select.qualify.is_some()
5362 || select.distinct.is_some()
5363 || select.top.is_some()
5364 || select.into.is_some()
5365 || select.prewhere.is_some()
5366 || !select.lateral_views.is_empty()
5367 || select.value_table_mode.is_some()
5368 || !group_by_is_empty(&select.group_by)
5369 {
5370 return Err(Error::InvalidArgumentError(
5371 "advanced SELECT clauses are not supported for range() SELECT statements".into(),
5372 ));
5373 }
5374
5375 let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
5376
5377 if select.projection.is_empty() {
5379 projections.push(RangeProjection::Column);
5380 } else {
5381 for item in &select.projection {
5382 let projection = match item {
5383 SelectItem::Wildcard(_) => RangeProjection::Column,
5384 SelectItem::QualifiedWildcard(kind, _) => match kind {
5385 SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
5386 if spec.matches_object_name(object_name) {
5387 RangeProjection::Column
5388 } else {
5389 return Err(Error::InvalidArgumentError(
5390 "qualified wildcard must reference the range() source".into(),
5391 ));
5392 }
5393 }
5394 SelectItemQualifiedWildcardKind::Expr(_) => {
5395 return Err(Error::InvalidArgumentError(
5396 "expression-qualified wildcards are not supported for range() SELECT statements".into(),
5397 ));
5398 }
5399 },
5400 SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
5401 SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
5402 };
5403 projections.push(projection);
5404 }
5405 }
5406
5407 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
5408 for idx in 0..spec.row_count {
5409 let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
5410 let value = spec.start + (idx as i64);
5411 for projection in &projections {
5412 match projection {
5413 RangeProjection::Column => row.push(PlanValue::Integer(value)),
5414 RangeProjection::Literal(value) => row.push(value.clone()),
5415 }
5416 }
5417 rows.push(row);
5418 }
5419
5420 Ok(Some(RuntimeRangeSelectRows { rows }))
5421}
5422
5423fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
5424 match expr {
5425 SqlExpr::Identifier(ident) => {
5426 if spec.matches_identifier(&ident.value) {
5427 Ok(RangeProjection::Column)
5428 } else {
5429 Err(Error::InvalidArgumentError(format!(
5430 "unknown column '{}' in range() SELECT",
5431 ident.value
5432 )))
5433 }
5434 }
5435 SqlExpr::CompoundIdentifier(parts) => {
5436 if parts.len() == 2
5437 && spec.matches_table_alias(&parts[0].value)
5438 && spec.matches_identifier(&parts[1].value)
5439 {
5440 Ok(RangeProjection::Column)
5441 } else {
5442 Err(Error::InvalidArgumentError(
5443 "compound identifiers must reference the range() source".into(),
5444 ))
5445 }
5446 }
5447 SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
5448 other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
5449 }
5450}
5451
5452fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
5453 if select.from.len() != 1 {
5454 return Ok(None);
5455 }
5456 let item = &select.from[0];
5457 if !item.joins.is_empty() {
5458 return Err(Error::InvalidArgumentError(
5459 "JOIN clauses are not supported for range() SELECT statements".into(),
5460 ));
5461 }
5462
5463 match &item.relation {
5464 TableFactor::Function {
5465 lateral,
5466 name,
5467 args,
5468 alias,
5469 } => {
5470 if *lateral {
5471 return Err(Error::InvalidArgumentError(
5472 "LATERAL range() is not supported".into(),
5473 ));
5474 }
5475 parse_range_spec_from_args(name, args, alias)
5476 }
5477 TableFactor::Table {
5478 name,
5479 alias,
5480 args: Some(table_args),
5481 with_ordinality,
5482 ..
5483 } => {
5484 if *with_ordinality {
5485 return Err(Error::InvalidArgumentError(
5486 "WITH ORDINALITY is not supported for range()".into(),
5487 ));
5488 }
5489 if table_args.settings.is_some() {
5490 return Err(Error::InvalidArgumentError(
5491 "range() SETTINGS clause is not supported".into(),
5492 ));
5493 }
5494 parse_range_spec_from_args(name, &table_args.args, alias)
5495 }
5496 _ => Ok(None),
5497 }
5498}
5499
5500fn parse_range_spec_from_args(
5501 name: &ObjectName,
5502 args: &[FunctionArg],
5503 alias: &Option<TableAlias>,
5504) -> Result<Option<RuntimeRangeSpec>> {
5505 if name.0.len() != 1 {
5506 return Ok(None);
5507 }
5508 let func_name = match &name.0[0] {
5509 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
5510 _ => return Ok(None),
5511 };
5512 if func_name != "range" {
5513 return Ok(None);
5514 }
5515
5516 if args.is_empty() || args.len() > 2 {
5517 return Err(Error::InvalidArgumentError(
5518 "range() requires one or two arguments".into(),
5519 ));
5520 }
5521
5522 let extract_int = |arg: &FunctionArg| -> Result<i64> {
5524 let arg_expr = match arg {
5525 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5526 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
5527 | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
5528 return Err(Error::InvalidArgumentError(
5529 "range() argument must be an integer literal".into(),
5530 ));
5531 }
5532 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
5533 return Err(Error::InvalidArgumentError(
5534 "named arguments are not supported for range()".into(),
5535 ));
5536 }
5537 };
5538
5539 let value = plan_value_from_sql_expr(arg_expr)?;
5540 match value {
5541 PlanValue::Integer(v) => Ok(v),
5542 _ => Err(Error::InvalidArgumentError(
5543 "range() argument must be an integer literal".into(),
5544 )),
5545 }
5546 };
5547
5548 let (start, end, row_count) = if args.len() == 1 {
5549 let count = extract_int(&args[0])?;
5551 if count < 0 {
5552 return Err(Error::InvalidArgumentError(
5553 "range() argument must be non-negative".into(),
5554 ));
5555 }
5556 (0, count, count as usize)
5557 } else {
5558 let start = extract_int(&args[0])?;
5560 let end = extract_int(&args[1])?;
5561 if end < start {
5562 return Err(Error::InvalidArgumentError(
5563 "range() end must be >= start".into(),
5564 ));
5565 }
5566 let row_count = (end - start) as usize;
5567 (start, end, row_count)
5568 };
5569
5570 let column_name_lower = alias
5571 .as_ref()
5572 .and_then(|a| {
5573 a.columns
5574 .first()
5575 .map(|col| col.name.value.to_ascii_lowercase())
5576 })
5577 .unwrap_or_else(|| "range".to_string());
5578 let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
5579
5580 Ok(Some(RuntimeRangeSpec {
5581 start,
5582 end,
5583 row_count,
5584 column_name_lower,
5585 table_alias_lower,
5586 }))
5587}
5588
5589pub struct RuntimeCreateTableBuilder<'ctx, P>
5590where
5591 P: Pager<Blob = EntryHandle> + Send + Sync,
5592{
5593 ctx: &'ctx RuntimeContext<P>,
5594 plan: CreateTablePlan,
5595}
5596
5597impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
5598where
5599 P: Pager<Blob = EntryHandle> + Send + Sync,
5600{
5601 pub fn if_not_exists(mut self) -> Self {
5602 self.plan.if_not_exists = true;
5603 self
5604 }
5605
5606 pub fn or_replace(mut self) -> Self {
5607 self.plan.or_replace = true;
5608 self
5609 }
5610
5611 pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5612 self.plan
5613 .columns
5614 .push(ColumnSpec::new(name.into(), data_type, true));
5615 self
5616 }
5617
5618 pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5619 self.plan
5620 .columns
5621 .push(ColumnSpec::new(name.into(), data_type, false));
5622 self
5623 }
5624
5625 pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
5626 self.plan.columns.push(spec);
5627 self
5628 }
5629
5630 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
5631 self.ctx.execute_create_table(self.plan)
5632 }
5633}
5634
5635#[derive(Clone, Debug, Default)]
5636pub struct RuntimeRow {
5637 values: Vec<(String, PlanValue)>,
5638}
5639
5640impl RuntimeRow {
5641 pub fn new() -> Self {
5642 Self { values: Vec::new() }
5643 }
5644
5645 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
5646 self.set(name, value);
5647 self
5648 }
5649
5650 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
5651 let name = name.into();
5652 let value = value.into();
5653 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
5654 *existing = value;
5655 } else {
5656 self.values.push((name, value));
5657 }
5658 self
5659 }
5660
5661 fn columns(&self) -> Vec<String> {
5662 self.values.iter().map(|(n, _)| n.clone()).collect()
5663 }
5664
5665 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
5666 let mut out = Vec::with_capacity(columns.len());
5667 for column in columns {
5668 let value = self
5669 .values
5670 .iter()
5671 .find(|(name, _)| name == column)
5672 .ok_or_else(|| {
5673 Error::InvalidArgumentError(format!(
5674 "insert row missing value for column '{}'",
5675 column
5676 ))
5677 })?;
5678 out.push(value.1.clone());
5679 }
5680 Ok(out)
5681 }
5682}
5683
5684pub fn row() -> RuntimeRow {
5685 RuntimeRow::new()
5686}
5687
5688#[doc(hidden)]
5689pub enum RuntimeInsertRowKind {
5690 Named {
5691 columns: Vec<String>,
5692 values: Vec<PlanValue>,
5693 },
5694 Positional(Vec<PlanValue>),
5695}
5696
5697pub trait IntoInsertRow {
5698 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
5699}
5700
5701impl IntoInsertRow for RuntimeRow {
5702 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5703 let row = self;
5704 if row.values.is_empty() {
5705 return Err(Error::InvalidArgumentError(
5706 "insert requires at least one column".into(),
5707 ));
5708 }
5709 let columns = row.columns();
5710 let values = row.values_for_columns(&columns)?;
5711 Ok(RuntimeInsertRowKind::Named { columns, values })
5712 }
5713}
5714
5715impl<T> IntoInsertRow for Vec<T>
5720where
5721 T: Into<PlanValue>,
5722{
5723 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5724 if self.is_empty() {
5725 return Err(Error::InvalidArgumentError(
5726 "insert requires at least one column".into(),
5727 ));
5728 }
5729 Ok(RuntimeInsertRowKind::Positional(
5730 self.into_iter().map(Into::into).collect(),
5731 ))
5732 }
5733}
5734
5735impl<T, const N: usize> IntoInsertRow for [T; N]
5736where
5737 T: Into<PlanValue>,
5738{
5739 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5740 if N == 0 {
5741 return Err(Error::InvalidArgumentError(
5742 "insert requires at least one column".into(),
5743 ));
5744 }
5745 Ok(RuntimeInsertRowKind::Positional(
5746 self.into_iter().map(Into::into).collect(),
5747 ))
5748 }
5749}
5750
5751macro_rules! impl_into_insert_row_tuple {
5752 ($($type:ident => $value:ident),+) => {
5753 impl<$($type,)+> IntoInsertRow for ($($type,)+)
5754 where
5755 $($type: Into<PlanValue>,)+
5756 {
5757 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5758 let ($($value,)+) = self;
5759 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
5760 }
5761 }
5762 };
5763}
5764
5765impl_into_insert_row_tuple!(T1 => v1);
5766impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
5767impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
5768impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
5769impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
5770impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
5771impl_into_insert_row_tuple!(
5772 T1 => v1,
5773 T2 => v2,
5774 T3 => v3,
5775 T4 => v4,
5776 T5 => v5,
5777 T6 => v6,
5778 T7 => v7
5779);
5780impl_into_insert_row_tuple!(
5781 T1 => v1,
5782 T2 => v2,
5783 T3 => v3,
5784 T4 => v4,
5785 T5 => v5,
5786 T6 => v6,
5787 T7 => v7,
5788 T8 => v8
5789);
5790
5791pub struct RuntimeTableHandle<P>
5792where
5793 P: Pager<Blob = EntryHandle> + Send + Sync,
5794{
5795 context: Arc<RuntimeContext<P>>,
5796 display_name: String,
5797 _canonical_name: String,
5798}
5799
5800impl<P> RuntimeTableHandle<P>
5801where
5802 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
5803{
5804 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
5805 let (display_name, canonical_name) = canonical_table_name(name)?;
5806 context.lookup_table(&canonical_name)?;
5807 Ok(Self {
5808 context,
5809 display_name,
5810 _canonical_name: canonical_name,
5811 })
5812 }
5813
5814 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
5815 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
5816 }
5817
5818 pub fn insert_rows<R>(
5819 &self,
5820 rows: impl IntoIterator<Item = R>,
5821 ) -> Result<RuntimeStatementResult<P>>
5822 where
5823 R: IntoInsertRow,
5824 {
5825 enum InsertMode {
5826 Named,
5827 Positional,
5828 }
5829
5830 let table = self.context.lookup_table(&self._canonical_name)?;
5831 let schema = table.schema.as_ref();
5832 let schema_column_names: Vec<String> =
5833 schema.columns.iter().map(|col| col.name.clone()).collect();
5834 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
5835 let mut mode: Option<InsertMode> = None;
5836 let mut column_names: Option<Vec<String>> = None;
5837 let mut row_count = 0usize;
5838
5839 for row in rows.into_iter() {
5840 row_count += 1;
5841 match row.into_insert_row()? {
5842 RuntimeInsertRowKind::Named { columns, values } => {
5843 if let Some(existing) = &mode {
5844 if !matches!(existing, InsertMode::Named) {
5845 return Err(Error::InvalidArgumentError(
5846 "cannot mix positional and named insert rows".into(),
5847 ));
5848 }
5849 } else {
5850 mode = Some(InsertMode::Named);
5851 let mut seen =
5852 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
5853 for column in &columns {
5854 if !seen.insert(column.clone()) {
5855 return Err(Error::InvalidArgumentError(format!(
5856 "duplicate column '{}' in insert row",
5857 column
5858 )));
5859 }
5860 }
5861 column_names = Some(columns.clone());
5862 }
5863
5864 let expected = column_names
5865 .as_ref()
5866 .expect("column names must be initialized for named insert");
5867 if columns != *expected {
5868 return Err(Error::InvalidArgumentError(
5869 "insert rows must specify the same columns".into(),
5870 ));
5871 }
5872 if values.len() != expected.len() {
5873 return Err(Error::InvalidArgumentError(format!(
5874 "insert row expected {} values, found {}",
5875 expected.len(),
5876 values.len()
5877 )));
5878 }
5879 normalized_rows.push(values);
5880 }
5881 RuntimeInsertRowKind::Positional(values) => {
5882 if let Some(existing) = &mode {
5883 if !matches!(existing, InsertMode::Positional) {
5884 return Err(Error::InvalidArgumentError(
5885 "cannot mix positional and named insert rows".into(),
5886 ));
5887 }
5888 } else {
5889 mode = Some(InsertMode::Positional);
5890 column_names = Some(schema_column_names.clone());
5891 }
5892
5893 if values.len() != schema.columns.len() {
5894 return Err(Error::InvalidArgumentError(format!(
5895 "insert row expected {} values, found {}",
5896 schema.columns.len(),
5897 values.len()
5898 )));
5899 }
5900 normalized_rows.push(values);
5901 }
5902 }
5903 }
5904
5905 if row_count == 0 {
5906 return Err(Error::InvalidArgumentError(
5907 "insert requires at least one row".into(),
5908 ));
5909 }
5910
5911 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
5912 self.insert_row_batch(RowBatch {
5913 columns,
5914 rows: normalized_rows,
5915 })
5916 }
5917
5918 pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
5919 if batch.rows.is_empty() {
5920 return Err(Error::InvalidArgumentError(
5921 "insert requires at least one row".into(),
5922 ));
5923 }
5924 if batch.columns.is_empty() {
5925 return Err(Error::InvalidArgumentError(
5926 "insert requires at least one column".into(),
5927 ));
5928 }
5929 for row in &batch.rows {
5930 if row.len() != batch.columns.len() {
5931 return Err(Error::InvalidArgumentError(
5932 "insert rows must have values for every column".into(),
5933 ));
5934 }
5935 }
5936
5937 let plan = InsertPlan {
5938 table: self.display_name.clone(),
5939 columns: batch.columns,
5940 source: InsertSource::Rows(batch.rows),
5941 };
5942 self.context.insert(plan)
5943 }
5944
5945 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
5946 let plan = InsertPlan {
5947 table: self.display_name.clone(),
5948 columns: Vec::new(),
5949 source: InsertSource::Batches(batches),
5950 };
5951 self.context.insert(plan)
5952 }
5953
5954 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
5955 let RowBatch { columns, rows } = frame.collect_rows()?;
5956 self.insert_row_batch(RowBatch { columns, rows })
5957 }
5958
5959 pub fn name(&self) -> &str {
5960 &self.display_name
5961 }
5962}
5963
5964#[cfg(test)]
5965mod tests {
5966 use super::*;
5967 use arrow::array::{Array, Int64Array, StringArray};
5968 use llkv_storage::pager::MemPager;
5969 use std::sync::Arc;
5970
5971 #[test]
5972 fn create_insert_select_roundtrip() {
5973 let pager = Arc::new(MemPager::default());
5974 let context = Arc::new(RuntimeContext::new(pager));
5975
5976 let table = context
5977 .create_table(
5978 "people",
5979 [
5980 ("id", DataType::Int64, NotNull),
5981 ("name", DataType::Utf8, Nullable),
5982 ],
5983 )
5984 .expect("create table");
5985 table
5986 .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
5987 .expect("insert rows");
5988
5989 let execution = table.lazy().expect("lazy scan");
5990 let select = execution.collect().expect("build select execution");
5991 let batches = select.collect().expect("collect batches");
5992 assert_eq!(batches.len(), 1);
5993 let column = batches[0]
5994 .column(1)
5995 .as_any()
5996 .downcast_ref::<StringArray>()
5997 .expect("string column");
5998 assert_eq!(column.len(), 2);
5999 }
6000
6001 #[test]
6002 fn aggregate_count_nulls() {
6003 let pager = Arc::new(MemPager::default());
6004 let context = Arc::new(RuntimeContext::new(pager));
6005
6006 let table = context
6007 .create_table("ints", [("i", DataType::Int64)])
6008 .expect("create table");
6009 table
6010 .insert_rows([
6011 (PlanValue::Null,),
6012 (PlanValue::Integer(1),),
6013 (PlanValue::Null,),
6014 ])
6015 .expect("insert rows");
6016
6017 let plan =
6018 SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
6019 let execution = context.execute_select(plan).expect("select");
6020 let batches = execution.collect().expect("collect batches");
6021 let column = batches[0]
6022 .column(0)
6023 .as_any()
6024 .downcast_ref::<Int64Array>()
6025 .expect("int column");
6026 assert_eq!(column.value(0), 2);
6027 }
6028}