1#![forbid(unsafe_code)]
30
31pub mod storage_namespace;
32
33use std::fmt;
34use std::marker::PhantomData;
35use std::mem;
36use std::ops::Bound;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, RwLock};
39use std::time::{SystemTime, UNIX_EPOCH};
40
41use rustc_hash::{FxHashMap, FxHashSet};
42
43use arrow::array::{
44 Array, ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
45 UInt64Array, UInt64Builder,
46};
47use arrow::datatypes::{DataType, Field, FieldRef, Schema};
48use arrow::record_batch::RecordBatch;
49use llkv_column_map::ColumnStore;
50use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
51use llkv_column_map::types::LogicalFieldId;
52use llkv_expr::expr::{Expr as LlkvExpr, Filter, Operator, ScalarExpr};
53use llkv_result::Error;
54use llkv_storage::pager::{BoxedPager, MemPager, Pager};
55use llkv_table::catalog::{FieldConstraints, FieldDefinition, TableCatalog};
56use llkv_table::table::{RowIdFilter, ScanProjection, ScanStreamOptions, Table};
57use llkv_table::types::{FieldId, ROW_ID_FIELD_ID, RowId};
58use llkv_table::{
59 CatalogManager, ConstraintColumnInfo, ConstraintService, CreateTableResult, ForeignKeyColumn,
60 ForeignKeyTableInfo, ForeignKeyView, InsertColumnConstraint, InsertMultiColumnUnique,
61 InsertUniqueColumn, MetadataManager, MultiColumnUniqueEntryMeta, MultiColumnUniqueRegistration,
62 SysCatalog, TableConstraintSummaryView, TableView, UniqueKey, build_composite_unique_key,
63 canonical_table_name, constraints::ConstraintKind, ensure_multi_column_unique,
64 ensure_single_column_unique,
65};
66use simd_r_drive_entry_handle::EntryHandle;
67use sqlparser::ast::{
68 Expr as SqlExpr, FunctionArg, FunctionArgExpr, GroupByExpr, ObjectName, ObjectNamePart, Select,
69 SelectItem, SelectItemQualifiedWildcardKind, TableAlias, TableFactor, UnaryOperator, Value,
70 ValueWithSpan,
71};
72use time::{Date, Month};
73
74pub type Result<T> = llkv_result::Result<T>;
75
76pub use llkv_plan::{
78 AggregateExpr, AggregateFunction, AssignmentValue, ColumnAssignment, ColumnNullability,
79 ColumnSpec, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan, ForeignKeyAction,
80 ForeignKeySpec, IndexColumnPlan, InsertPlan, InsertSource, IntoColumnSpec, NotNull, Nullable,
81 OrderByPlan, OrderSortType, OrderTarget, PlanOperation, PlanStatement, PlanValue, SelectPlan,
82 SelectProjection, UpdatePlan,
83};
84
85use llkv_executor::{ExecutorColumn, ExecutorMultiColumnUnique, ExecutorSchema, ExecutorTable};
87pub use llkv_executor::{QueryExecutor, RowBatch, SelectExecution, TableProvider};
88
89use crate::storage_namespace::{
90 PersistentNamespace, StorageNamespace, StorageNamespaceRegistry, TemporaryNamespace,
91};
92
93pub use llkv_transaction::TransactionKind;
95use llkv_transaction::{
96 RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionManager,
97 TransactionResult, TxnId, TxnIdManager, mvcc::TransactionSnapshot,
98};
99
100use llkv_transaction::TransactionSession;
102
103use llkv_table::mvcc;
106
107struct TableConstraintContext {
108 schema_field_ids: Vec<FieldId>,
109 column_constraints: Vec<InsertColumnConstraint>,
110 unique_columns: Vec<InsertUniqueColumn>,
111 multi_column_uniques: Vec<InsertMultiColumnUnique>,
112 primary_key: Option<InsertMultiColumnUnique>,
113}
114
115#[allow(clippy::large_enum_variant)]
117#[derive(Clone)]
118pub enum RuntimeStatementResult<P>
119where
120 P: Pager<Blob = EntryHandle> + Send + Sync,
121{
122 CreateTable {
123 table_name: String,
124 },
125 CreateIndex {
126 table_name: String,
127 index_name: Option<String>,
128 },
129 NoOp,
130 Insert {
131 table_name: String,
132 rows_inserted: usize,
133 },
134 Update {
135 table_name: String,
136 rows_updated: usize,
137 },
138 Delete {
139 table_name: String,
140 rows_deleted: usize,
141 },
142 Select {
143 table_name: String,
144 schema: Arc<Schema>,
145 execution: SelectExecution<P>,
146 },
147 Transaction {
148 kind: TransactionKind,
149 },
150}
151
152impl<P> fmt::Debug for RuntimeStatementResult<P>
153where
154 P: Pager<Blob = EntryHandle> + Send + Sync,
155{
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 match self {
158 RuntimeStatementResult::CreateTable { table_name } => f
159 .debug_struct("CreateTable")
160 .field("table_name", table_name)
161 .finish(),
162 RuntimeStatementResult::CreateIndex {
163 table_name,
164 index_name,
165 } => f
166 .debug_struct("CreateIndex")
167 .field("table_name", table_name)
168 .field("index_name", index_name)
169 .finish(),
170 RuntimeStatementResult::NoOp => f.debug_struct("NoOp").finish(),
171 RuntimeStatementResult::Insert {
172 table_name,
173 rows_inserted,
174 } => f
175 .debug_struct("Insert")
176 .field("table_name", table_name)
177 .field("rows_inserted", rows_inserted)
178 .finish(),
179 RuntimeStatementResult::Update {
180 table_name,
181 rows_updated,
182 } => f
183 .debug_struct("Update")
184 .field("table_name", table_name)
185 .field("rows_updated", rows_updated)
186 .finish(),
187 RuntimeStatementResult::Delete {
188 table_name,
189 rows_deleted,
190 } => f
191 .debug_struct("Delete")
192 .field("table_name", table_name)
193 .field("rows_deleted", rows_deleted)
194 .finish(),
195 RuntimeStatementResult::Select {
196 table_name, schema, ..
197 } => f
198 .debug_struct("Select")
199 .field("table_name", table_name)
200 .field("schema", schema)
201 .finish(),
202 RuntimeStatementResult::Transaction { kind } => {
203 f.debug_struct("Transaction").field("kind", kind).finish()
204 }
205 }
206 }
207}
208
209enum PreparedAssignmentValue {
211 Literal(PlanValue),
212 Expression { expr_index: usize },
213}
214
215impl<P> RuntimeStatementResult<P>
216where
217 P: Pager<Blob = EntryHandle> + Send + Sync,
218{
219 #[allow(dead_code)]
222 pub(crate) fn convert_pager_type<Q>(self) -> Result<RuntimeStatementResult<Q>>
223 where
224 Q: Pager<Blob = EntryHandle> + Send + Sync,
225 {
226 match self {
227 RuntimeStatementResult::CreateTable { table_name } => {
228 Ok(RuntimeStatementResult::CreateTable { table_name })
229 }
230 RuntimeStatementResult::CreateIndex {
231 table_name,
232 index_name,
233 } => Ok(RuntimeStatementResult::CreateIndex {
234 table_name,
235 index_name,
236 }),
237 RuntimeStatementResult::NoOp => Ok(RuntimeStatementResult::NoOp),
238 RuntimeStatementResult::Insert {
239 table_name,
240 rows_inserted,
241 } => Ok(RuntimeStatementResult::Insert {
242 table_name,
243 rows_inserted,
244 }),
245 RuntimeStatementResult::Update {
246 table_name,
247 rows_updated,
248 } => Ok(RuntimeStatementResult::Update {
249 table_name,
250 rows_updated,
251 }),
252 RuntimeStatementResult::Delete {
253 table_name,
254 rows_deleted,
255 } => Ok(RuntimeStatementResult::Delete {
256 table_name,
257 rows_deleted,
258 }),
259 RuntimeStatementResult::Transaction { kind } => {
260 Ok(RuntimeStatementResult::Transaction { kind })
261 }
262 RuntimeStatementResult::Select { .. } => Err(Error::Internal(
263 "Cannot convert SELECT result between pager types in transaction".into(),
264 )),
265 }
266 }
267}
268
269pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
276 match statement {
277 PlanStatement::CreateTable(plan) => Some(&plan.name),
278 PlanStatement::CreateIndex(plan) => Some(&plan.table),
279 PlanStatement::Insert(plan) => Some(&plan.table),
280 PlanStatement::Update(plan) => Some(&plan.table),
281 PlanStatement::Delete(plan) => Some(&plan.table),
282 PlanStatement::Select(plan) => {
283 if plan.tables.len() == 1 {
285 Some(&plan.tables[0].table)
286 } else {
287 None
288 }
289 }
290 PlanStatement::BeginTransaction
291 | PlanStatement::CommitTransaction
292 | PlanStatement::RollbackTransaction => None,
293 }
294}
295
296pub struct RuntimeContextWrapper<P>
315where
316 P: Pager<Blob = EntryHandle> + Send + Sync,
317{
318 ctx: Arc<RuntimeContext<P>>,
319 snapshot: RwLock<TransactionSnapshot>,
320}
321
322impl<P> RuntimeContextWrapper<P>
323where
324 P: Pager<Blob = EntryHandle> + Send + Sync,
325{
326 fn new(ctx: Arc<RuntimeContext<P>>) -> Self {
327 let snapshot = ctx.default_snapshot();
328 Self {
329 ctx,
330 snapshot: RwLock::new(snapshot),
331 }
332 }
333
334 fn update_snapshot(&self, snapshot: TransactionSnapshot) {
335 let mut guard = self.snapshot.write().expect("snapshot lock poisoned");
336 *guard = snapshot;
337 }
338
339 fn current_snapshot(&self) -> TransactionSnapshot {
340 *self.snapshot.read().expect("snapshot lock poisoned")
341 }
342
343 fn context(&self) -> &Arc<RuntimeContext<P>> {
344 &self.ctx
345 }
346
347 fn ctx(&self) -> &RuntimeContext<P> {
348 &self.ctx
349 }
350}
351
352struct SessionNamespaces<P>
353where
354 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
355{
356 persistent: Arc<PersistentNamespace<P>>,
357 temporary: Option<Arc<TemporaryNamespace<BoxedPager>>>,
358 registry: Arc<RwLock<StorageNamespaceRegistry>>,
359}
360
361impl<P> SessionNamespaces<P>
362where
363 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
364{
365 fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
366 let persistent = Arc::new(PersistentNamespace::new(
367 storage_namespace::PERSISTENT_NAMESPACE_ID.to_string(),
368 Arc::clone(&base_context),
369 ));
370
371 let mut registry = StorageNamespaceRegistry::new(persistent.namespace_id().clone());
372 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
373
374 let temporary = {
375 let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
376 let temp_context = Arc::new(RuntimeContext::new(temp_pager));
377 let namespace = Arc::new(TemporaryNamespace::new(
378 storage_namespace::TEMPORARY_NAMESPACE_ID.to_string(),
379 temp_context,
380 ));
381 registry.register_namespace(
382 Arc::clone(&namespace),
383 vec![storage_namespace::TEMPORARY_NAMESPACE_ID.to_string()],
384 true,
385 );
386 namespace
387 };
388
389 Self {
390 persistent,
391 temporary: Some(temporary),
392 registry: Arc::new(RwLock::new(registry)),
393 }
394 }
395
396 fn persistent(&self) -> Arc<PersistentNamespace<P>> {
397 Arc::clone(&self.persistent)
398 }
399
400 fn temporary(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
401 self.temporary.as_ref().map(Arc::clone)
402 }
403
404 fn registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
405 Arc::clone(&self.registry)
406 }
407}
408
409impl<P> Drop for SessionNamespaces<P>
410where
411 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
412{
413 fn drop(&mut self) {
414 if let Some(temp) = &self.temporary {
415 temp.clear_tables();
416 }
417 }
418}
419
420pub struct RuntimeSession<P>
425where
426 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
427{
428 inner: TransactionSession<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
432 namespaces: Arc<SessionNamespaces<P>>,
433}
434
435impl<P> RuntimeSession<P>
436where
437 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
438{
439 pub(crate) fn clone_session(&self) -> Self {
442 Self {
443 inner: self.inner.clone_session(),
444 namespaces: self.namespaces.clone(),
445 }
446 }
447
448 pub fn namespace_registry(&self) -> Arc<RwLock<StorageNamespaceRegistry>> {
449 self.namespaces.registry()
450 }
451
452 fn resolve_namespace_for_table(&self, canonical: &str) -> storage_namespace::NamespaceId {
453 self.namespace_registry()
454 .read()
455 .expect("namespace registry poisoned")
456 .namespace_for_table(canonical)
457 }
458
459 fn namespace_for_select_plan(
460 &self,
461 plan: &SelectPlan,
462 ) -> Option<storage_namespace::NamespaceId> {
463 if plan.tables.len() != 1 {
464 return None;
465 }
466
467 let qualified = plan.tables[0].qualified_name();
468 let (_, canonical) = canonical_table_name(&qualified).ok()?;
469 Some(self.resolve_namespace_for_table(&canonical))
470 }
471
472 fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
473 let temp_namespace = self
474 .temporary_namespace()
475 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
476
477 let table_name = if plan.tables.len() == 1 {
478 plan.tables[0].qualified_name()
479 } else {
480 String::new()
481 };
482
483 let execution = temp_namespace.context().execute_select(plan.clone())?;
484 let schema = execution.schema();
485 let batches = execution.collect()?;
486
487 let combined = if batches.is_empty() {
488 RecordBatch::new_empty(Arc::clone(&schema))
489 } else if batches.len() == 1 {
490 batches.into_iter().next().unwrap()
491 } else {
492 let refs: Vec<&RecordBatch> = batches.iter().collect();
493 arrow::compute::concat_batches(&schema, refs)?
494 };
495
496 let execution =
497 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
498
499 Ok(RuntimeStatementResult::Select {
500 execution,
501 table_name,
502 schema,
503 })
504 }
505
506 fn persistent_namespace(&self) -> Arc<PersistentNamespace<P>> {
507 self.namespaces.persistent()
508 }
509
510 #[allow(dead_code)]
511 fn temporary_namespace(&self) -> Option<Arc<TemporaryNamespace<BoxedPager>>> {
512 self.namespaces.temporary()
513 }
514
515 pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
519 let staging_pager = Arc::new(MemPager::default());
520 tracing::trace!(
521 "BEGIN_TRANSACTION: Created staging pager at {:p}",
522 &*staging_pager
523 );
524 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
525
526 let staging_wrapper = Arc::new(RuntimeContextWrapper::new(staging_ctx));
531
532 self.inner.begin_transaction(staging_wrapper)?;
533 Ok(RuntimeStatementResult::Transaction {
534 kind: TransactionKind::Begin,
535 })
536 }
537
538 pub fn abort_transaction(&self) {
541 self.inner.abort_transaction();
542 }
543
544 pub fn has_active_transaction(&self) -> bool {
546 let result = self.inner.has_active_transaction();
547 tracing::trace!("SESSION: has_active_transaction() = {}", result);
548 result
549 }
550
551 pub fn is_aborted(&self) -> bool {
553 self.inner.is_aborted()
554 }
555
556 pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
559 tracing::trace!("Session::commit_transaction called");
560 let (tx_result, operations) = self.inner.commit_transaction()?;
561 tracing::trace!(
562 "Session::commit_transaction got {} operations",
563 operations.len()
564 );
565
566 if !operations.is_empty() {
567 let dropped_tables = self
568 .inner
569 .context()
570 .ctx()
571 .dropped_tables
572 .read()
573 .unwrap()
574 .clone();
575 if !dropped_tables.is_empty() {
576 for operation in &operations {
577 let table_name_opt = match operation {
578 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
579 PlanOperation::Update(plan) => Some(plan.table.as_str()),
580 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
581 _ => None,
582 };
583 if let Some(table_name) = table_name_opt {
584 let (_, canonical) = canonical_table_name(table_name)?;
585 if dropped_tables.contains(&canonical) {
586 self.abort_transaction();
587 return Err(Error::TransactionContextError(
588 "another transaction has dropped this table".into(),
589 ));
590 }
591 }
592 }
593 }
594 }
595
596 let kind = match tx_result {
598 TransactionResult::Transaction { kind } => kind,
599 _ => {
600 return Err(Error::Internal(
601 "commit_transaction returned non-transaction result".into(),
602 ));
603 }
604 };
605 tracing::trace!("Session::commit_transaction kind={:?}", kind);
606
607 for operation in operations {
609 match operation {
610 PlanOperation::CreateTable(plan) => {
611 TransactionContext::create_table_plan(&**self.inner.context(), plan)?;
612 }
613 PlanOperation::Insert(plan) => {
614 TransactionContext::insert(&**self.inner.context(), plan)?;
615 }
616 PlanOperation::Update(plan) => {
617 TransactionContext::update(&**self.inner.context(), plan)?;
618 }
619 PlanOperation::Delete(plan) => {
620 TransactionContext::delete(&**self.inner.context(), plan)?;
621 }
622 _ => {}
623 }
624 }
625
626 let base_ctx = self.inner.context();
629 let default_snapshot = base_ctx.ctx().default_snapshot();
630 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
631
632 if matches!(kind, TransactionKind::Commit) {
634 let ctx = base_ctx.ctx();
635 let next_txn_id = ctx.txn_manager().current_next_txn_id();
636 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
637 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
638 }
639 }
640
641 Ok(RuntimeStatementResult::Transaction { kind })
643 }
644
645 pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
647 self.inner.rollback_transaction()?;
648 let base_ctx = self.inner.context();
649 let default_snapshot = base_ctx.ctx().default_snapshot();
650 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
651 Ok(RuntimeStatementResult::Transaction {
652 kind: TransactionKind::Rollback,
653 })
654 }
655
656 fn materialize_create_table_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
657 if let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take() {
658 let select_result = self.select(*select_plan)?;
659 let (schema, batches) = match select_result {
660 RuntimeStatementResult::Select {
661 schema, execution, ..
662 } => {
663 let batches = execution.collect()?;
664 (schema, batches)
665 }
666 _ => {
667 return Err(Error::Internal(
668 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
669 ));
670 }
671 };
672 plan.source = Some(CreateTableSource::Batches { schema, batches });
673 }
674 Ok(plan)
675 }
676
677 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
679 let mut plan = self.materialize_create_table_plan(plan)?;
680 let namespace_id = plan
681 .namespace
682 .clone()
683 .unwrap_or_else(|| storage_namespace::PERSISTENT_NAMESPACE_ID.to_string());
684 plan.namespace = Some(namespace_id.clone());
685
686 match namespace_id.as_str() {
687 storage_namespace::TEMPORARY_NAMESPACE_ID => {
688 let temp_namespace = self
689 .temporary_namespace()
690 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
691 temp_namespace.create_table(plan)?.convert_pager_type::<P>()
692 }
693 storage_namespace::PERSISTENT_NAMESPACE_ID => {
694 if self.has_active_transaction() {
695 let table_name = plan.name.clone();
696 match self
697 .inner
698 .execute_operation(PlanOperation::CreateTable(plan))
699 {
700 Ok(_) => Ok(RuntimeStatementResult::CreateTable { table_name }),
701 Err(e) => {
702 self.abort_transaction();
704 Err(e)
705 }
706 }
707 } else {
708 self.persistent_namespace().create_table(plan)
709 }
710 }
711 other => Err(Error::InvalidArgumentError(format!(
712 "Unknown storage namespace '{}'",
713 other
714 ))),
715 }
716 }
717
718 pub fn drop_table(&self, name: &str, if_exists: bool) -> Result<()> {
719 let (_, canonical_table) = canonical_table_name(name)?;
720 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
721
722 match namespace_id.as_str() {
723 storage_namespace::TEMPORARY_NAMESPACE_ID => {
724 let temp_namespace = self
725 .temporary_namespace()
726 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
727 temp_namespace.drop_table(name, if_exists)
728 }
729 storage_namespace::PERSISTENT_NAMESPACE_ID => {
730 self.persistent_namespace().drop_table(name, if_exists)
731 }
732 other => Err(Error::InvalidArgumentError(format!(
733 "Unknown storage namespace '{}'",
734 other
735 ))),
736 }
737 }
738 pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
740 let (_, canonical_table) = canonical_table_name(&plan.table)?;
741 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
742
743 match namespace_id.as_str() {
744 storage_namespace::TEMPORARY_NAMESPACE_ID => {
745 let temp_namespace = self
746 .temporary_namespace()
747 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
748 temp_namespace.create_index(plan)?.convert_pager_type::<P>()
749 }
750 storage_namespace::PERSISTENT_NAMESPACE_ID => {
751 if self.has_active_transaction() {
752 return Err(Error::InvalidArgumentError(
753 "CREATE INDEX is not supported inside an active transaction".into(),
754 ));
755 }
756
757 self.persistent_namespace().create_index(plan)
758 }
759 other => Err(Error::InvalidArgumentError(format!(
760 "Unknown storage namespace '{}'",
761 other
762 ))),
763 }
764 }
765
766 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
767 let InsertPlan {
768 table,
769 columns,
770 source,
771 } = plan;
772
773 match source {
774 InsertSource::Rows(rows) => {
775 let count = rows.len();
776 Ok((
777 InsertPlan {
778 table,
779 columns,
780 source: InsertSource::Rows(rows),
781 },
782 count,
783 ))
784 }
785 InsertSource::Batches(batches) => {
786 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
787 Ok((
788 InsertPlan {
789 table,
790 columns,
791 source: InsertSource::Batches(batches),
792 },
793 count,
794 ))
795 }
796 InsertSource::Select { plan: select_plan } => {
797 let select_result = self.select(*select_plan)?;
798 let rows = match select_result {
799 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
800 _ => {
801 return Err(Error::Internal(
802 "expected Select result when executing INSERT ... SELECT".into(),
803 ));
804 }
805 };
806 let count = rows.len();
807 Ok((
808 InsertPlan {
809 table,
810 columns,
811 source: InsertSource::Rows(rows),
812 },
813 count,
814 ))
815 }
816 }
817 }
818
819 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
821 tracing::trace!("Session::insert called for table={}", plan.table);
822 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
823 let table_name = plan.table.clone();
824 let (_, canonical_table) = canonical_table_name(&plan.table)?;
825 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
826
827 match namespace_id.as_str() {
828 storage_namespace::TEMPORARY_NAMESPACE_ID => {
829 let temp_namespace = self
830 .temporary_namespace()
831 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
832 temp_namespace
833 .context()
834 .insert(plan)?
835 .convert_pager_type::<P>()?;
836 Ok(RuntimeStatementResult::Insert {
837 rows_inserted,
838 table_name,
839 })
840 }
841 storage_namespace::PERSISTENT_NAMESPACE_ID => {
842 if self.has_active_transaction() {
843 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
844 Ok(_) => {
845 tracing::trace!("Session::insert succeeded for table={}", table_name);
846 Ok(RuntimeStatementResult::Insert {
847 rows_inserted,
848 table_name,
849 })
850 }
851 Err(e) => {
852 tracing::trace!(
853 "Session::insert failed for table={}, error={:?}",
854 table_name,
855 e
856 );
857 if matches!(e, Error::ConstraintError(_)) {
858 tracing::trace!("Transaction is_aborted=true");
859 self.abort_transaction();
860 }
861 Err(e)
862 }
863 }
864 } else {
865 let context = self.inner.context();
866 let default_snapshot = context.ctx().default_snapshot();
867 TransactionContext::set_snapshot(&**context, default_snapshot);
868 TransactionContext::insert(&**context, plan)?;
869 Ok(RuntimeStatementResult::Insert {
870 rows_inserted,
871 table_name,
872 })
873 }
874 }
875 other => Err(Error::InvalidArgumentError(format!(
876 "Unknown storage namespace '{}'",
877 other
878 ))),
879 }
880 }
881
882 pub fn select(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
884 if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
885 && namespace_id == storage_namespace::TEMPORARY_NAMESPACE_ID
886 {
887 return self.select_from_temporary(plan);
888 }
889
890 if self.has_active_transaction() {
891 let tx_result = match self
892 .inner
893 .execute_operation(PlanOperation::Select(plan.clone()))
894 {
895 Ok(result) => result,
896 Err(e) => {
897 if matches!(e, Error::ConstraintError(_)) {
900 self.abort_transaction();
901 }
902 return Err(e);
903 }
904 };
905 match tx_result {
906 TransactionResult::Select {
907 table_name,
908 schema,
909 execution: staging_execution,
910 } => {
911 let batches = staging_execution.collect().unwrap_or_default();
914 let combined = if batches.is_empty() {
915 RecordBatch::new_empty(Arc::clone(&schema))
916 } else if batches.len() == 1 {
917 batches.into_iter().next().unwrap()
918 } else {
919 let refs: Vec<&RecordBatch> = batches.iter().collect();
920 arrow::compute::concat_batches(&schema, refs)?
921 };
922
923 let execution = SelectExecution::from_batch(
924 table_name.clone(),
925 Arc::clone(&schema),
926 combined,
927 );
928
929 Ok(RuntimeStatementResult::Select {
930 execution,
931 table_name,
932 schema,
933 })
934 }
935 _ => Err(Error::Internal("expected Select result".into())),
936 }
937 } else {
938 let context = self.inner.context();
940 let default_snapshot = context.ctx().default_snapshot();
941 TransactionContext::set_snapshot(&**context, default_snapshot);
942 let table_name = if plan.tables.len() == 1 {
943 plan.tables[0].qualified_name()
944 } else {
945 String::new()
946 };
947 let execution = TransactionContext::execute_select(&**context, plan)?;
948 let schema = execution.schema();
949 Ok(RuntimeStatementResult::Select {
950 execution,
951 table_name,
952 schema,
953 })
954 }
955 }
956
957 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
959 let plan =
960 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
961 match self.select(plan)? {
962 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
963 other => Err(Error::Internal(format!(
964 "expected Select result when reading table '{table}', got {:?}",
965 other
966 ))),
967 }
968 }
969
970 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
972 let (_, canonical_table) = canonical_table_name(&plan.table)?;
973 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
974
975 match namespace_id.as_str() {
976 storage_namespace::TEMPORARY_NAMESPACE_ID => {
977 let temp_namespace = self
978 .temporary_namespace()
979 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
980 temp_namespace
981 .context()
982 .update(plan)?
983 .convert_pager_type::<P>()
984 }
985 storage_namespace::PERSISTENT_NAMESPACE_ID => {
986 if self.has_active_transaction() {
987 let table_name = plan.table.clone();
988 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
989 Ok(result) => result,
990 Err(e) => {
991 self.abort_transaction();
993 return Err(e);
994 }
995 };
996 match result {
997 TransactionResult::Update {
998 rows_matched: _,
999 rows_updated,
1000 } => Ok(RuntimeStatementResult::Update {
1001 rows_updated,
1002 table_name,
1003 }),
1004 _ => Err(Error::Internal("expected Update result".into())),
1005 }
1006 } else {
1007 let context = self.inner.context();
1009 let default_snapshot = context.ctx().default_snapshot();
1010 TransactionContext::set_snapshot(&**context, default_snapshot);
1011 let table_name = plan.table.clone();
1012 let result = TransactionContext::update(&**context, plan)?;
1013 match result {
1014 TransactionResult::Update {
1015 rows_matched: _,
1016 rows_updated,
1017 } => Ok(RuntimeStatementResult::Update {
1018 rows_updated,
1019 table_name,
1020 }),
1021 _ => Err(Error::Internal("expected Update result".into())),
1022 }
1023 }
1024 }
1025 other => Err(Error::InvalidArgumentError(format!(
1026 "Unknown storage namespace '{}'",
1027 other
1028 ))),
1029 }
1030 }
1031
1032 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
1034 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1035 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1036
1037 match namespace_id.as_str() {
1038 storage_namespace::TEMPORARY_NAMESPACE_ID => {
1039 let temp_namespace = self
1040 .temporary_namespace()
1041 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1042 temp_namespace
1043 .context()
1044 .delete(plan)?
1045 .convert_pager_type::<P>()
1046 }
1047 storage_namespace::PERSISTENT_NAMESPACE_ID => {
1048 if self.has_active_transaction() {
1049 let table_name = plan.table.clone();
1050 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1051 Ok(result) => result,
1052 Err(e) => {
1053 self.abort_transaction();
1055 return Err(e);
1056 }
1057 };
1058 match result {
1059 TransactionResult::Delete { rows_deleted } => {
1060 Ok(RuntimeStatementResult::Delete {
1061 rows_deleted,
1062 table_name,
1063 })
1064 }
1065 _ => Err(Error::Internal("expected Delete result".into())),
1066 }
1067 } else {
1068 let context = self.inner.context();
1070 let default_snapshot = context.ctx().default_snapshot();
1071 TransactionContext::set_snapshot(&**context, default_snapshot);
1072 let table_name = plan.table.clone();
1073 let result = TransactionContext::delete(&**context, plan)?;
1074 match result {
1075 TransactionResult::Delete { rows_deleted } => {
1076 Ok(RuntimeStatementResult::Delete {
1077 rows_deleted,
1078 table_name,
1079 })
1080 }
1081 _ => Err(Error::Internal("expected Delete result".into())),
1082 }
1083 }
1084 }
1085 other => Err(Error::InvalidArgumentError(format!(
1086 "Unknown storage namespace '{}'",
1087 other
1088 ))),
1089 }
1090 }
1091}
1092
1093pub struct RuntimeEngine<P>
1094where
1095 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1096{
1097 context: Arc<RuntimeContext<P>>,
1098 session: RuntimeSession<P>,
1099}
1100
1101impl<P> Clone for RuntimeEngine<P>
1102where
1103 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1104{
1105 fn clone(&self) -> Self {
1106 tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
1109 Self {
1110 context: Arc::clone(&self.context),
1111 session: self.session.clone_session(),
1112 }
1113 }
1114}
1115
1116impl<P> RuntimeEngine<P>
1117where
1118 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1119{
1120 pub fn new(pager: Arc<P>) -> Self {
1121 let context = Arc::new(RuntimeContext::new(pager));
1122 Self::from_context(context)
1123 }
1124
1125 pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
1126 tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
1127 let session = context.create_session();
1128 tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
1129 Self { context, session }
1130 }
1131
1132 pub fn context(&self) -> Arc<RuntimeContext<P>> {
1133 Arc::clone(&self.context)
1134 }
1135
1136 pub fn session(&self) -> &RuntimeSession<P> {
1137 &self.session
1138 }
1139
1140 pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
1141 match statement {
1142 PlanStatement::BeginTransaction => self.session.begin_transaction(),
1143 PlanStatement::CommitTransaction => self.session.commit_transaction(),
1144 PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
1145 PlanStatement::CreateTable(plan) => self.session.create_table_plan(plan),
1146 PlanStatement::CreateIndex(plan) => self.session.create_index(plan),
1147 PlanStatement::Insert(plan) => self.session.insert(plan),
1148 PlanStatement::Update(plan) => self.session.update(plan),
1149 PlanStatement::Delete(plan) => self.session.delete(plan),
1150 PlanStatement::Select(plan) => self.session.select(plan),
1151 }
1152 }
1153
1154 pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
1155 where
1156 I: IntoIterator<Item = PlanStatement>,
1157 {
1158 let mut results = Vec::new();
1159 for statement in statements {
1160 results.push(self.execute_statement(statement)?);
1161 }
1162 Ok(results)
1163 }
1164}
1165
1166pub struct RuntimeContext<P>
1180where
1181 P: Pager<Blob = EntryHandle> + Send + Sync,
1182{
1183 pager: Arc<P>,
1184 tables: RwLock<FxHashMap<String, Arc<ExecutorTable<P>>>>,
1185 dropped_tables: RwLock<FxHashSet<String>>,
1186 metadata: Arc<MetadataManager<P>>,
1187 constraint_service: ConstraintService<P>,
1188 catalog_service: CatalogManager<P>,
1189 catalog: Arc<TableCatalog>,
1191 store: Arc<ColumnStore<P>>,
1194 transaction_manager:
1196 TransactionManager<RuntimeContextWrapper<P>, RuntimeContextWrapper<MemPager>>,
1197 txn_manager: Arc<TxnIdManager>,
1198 txn_tables_with_new_rows: RwLock<FxHashMap<TxnId, FxHashSet<String>>>,
1199}
1200
1201impl<P> RuntimeContext<P>
1202where
1203 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
1204{
1205 pub fn new(pager: Arc<P>) -> Self {
1206 Self::new_with_catalog_inner(pager, None)
1207 }
1208
1209 pub fn new_with_catalog(pager: Arc<P>, catalog: Arc<TableCatalog>) -> Self {
1210 Self::new_with_catalog_inner(pager, Some(catalog))
1211 }
1212
1213 fn new_with_catalog_inner(pager: Arc<P>, shared_catalog: Option<Arc<TableCatalog>>) -> Self {
1214 tracing::trace!("RuntimeContext::new called, pager={:p}", &*pager);
1215
1216 let store = ColumnStore::open(Arc::clone(&pager)).expect("failed to open ColumnStore");
1217 let catalog = SysCatalog::new(&store);
1218
1219 let next_txn_id = match catalog.get_next_txn_id() {
1220 Ok(Some(id)) => {
1221 tracing::debug!("[CONTEXT] Loaded next_txn_id={} from catalog", id);
1222 id
1223 }
1224 Ok(None) => {
1225 tracing::debug!("[CONTEXT] No persisted next_txn_id found, starting from default");
1226 TXN_ID_AUTO_COMMIT + 1
1227 }
1228 Err(e) => {
1229 tracing::warn!("[CONTEXT] Failed to load next_txn_id: {}, using default", e);
1230 TXN_ID_AUTO_COMMIT + 1
1231 }
1232 };
1233
1234 let last_committed = match catalog.get_last_committed_txn_id() {
1235 Ok(Some(id)) => {
1236 tracing::debug!("[CONTEXT] Loaded last_committed={} from catalog", id);
1237 id
1238 }
1239 Ok(None) => {
1240 tracing::debug!(
1241 "[CONTEXT] No persisted last_committed found, starting from default"
1242 );
1243 TXN_ID_AUTO_COMMIT
1244 }
1245 Err(e) => {
1246 tracing::warn!(
1247 "[CONTEXT] Failed to load last_committed: {}, using default",
1248 e
1249 );
1250 TXN_ID_AUTO_COMMIT
1251 }
1252 };
1253
1254 let store_arc = Arc::new(store);
1255 let metadata = Arc::new(MetadataManager::new(Arc::clone(&store_arc)));
1256
1257 let loaded_tables = match metadata.all_table_metas() {
1258 Ok(metas) => {
1259 tracing::debug!("[CONTEXT] Loaded {} table(s) from catalog", metas.len());
1260 metas
1261 }
1262 Err(e) => {
1263 tracing::warn!(
1264 "[CONTEXT] Failed to load table metas: {}, starting with empty registry",
1265 e
1266 );
1267 Vec::new()
1268 }
1269 };
1270
1271 let transaction_manager =
1272 TransactionManager::new_with_initial_state(next_txn_id, last_committed);
1273 let txn_manager = transaction_manager.txn_manager();
1274
1275 tracing::debug!(
1297 "[CONTEXT] Initialized with lazy loading for {} table(s)",
1298 loaded_tables.len()
1299 );
1300
1301 let (catalog, is_shared_catalog) = match shared_catalog {
1303 Some(existing) => (existing, true),
1304 None => (Arc::new(TableCatalog::new()), false),
1305 };
1306 for (table_id, table_meta) in &loaded_tables {
1307 if let Some(ref table_name) = table_meta.name
1308 && let Err(e) = catalog.register_table(table_name.as_str(), *table_id)
1309 {
1310 match e {
1311 Error::CatalogError(ref msg)
1312 if is_shared_catalog && msg.contains("already exists") =>
1313 {
1314 tracing::debug!(
1315 "[CONTEXT] Shared catalog already contains table '{}' with id={}",
1316 table_name,
1317 table_id
1318 );
1319 }
1320 other => {
1321 tracing::warn!(
1322 "[CONTEXT] Failed to register table '{}' (id={}) in catalog: {}",
1323 table_name,
1324 table_id,
1325 other
1326 );
1327 }
1328 }
1329 }
1330 }
1331 tracing::debug!(
1332 "[CONTEXT] Catalog initialized with {} table(s)",
1333 catalog.table_count()
1334 );
1335
1336 let constraint_service =
1337 ConstraintService::new(Arc::clone(&metadata), Arc::clone(&catalog));
1338 let catalog_service = CatalogManager::new(
1339 Arc::clone(&metadata),
1340 Arc::clone(&catalog),
1341 Arc::clone(&store_arc),
1342 );
1343
1344 Self {
1345 pager,
1346 tables: RwLock::new(FxHashMap::default()), dropped_tables: RwLock::new(FxHashSet::default()),
1348 metadata,
1349 constraint_service,
1350 catalog_service,
1351 catalog,
1352 store: store_arc,
1353 transaction_manager,
1354 txn_manager,
1355 txn_tables_with_new_rows: RwLock::new(FxHashMap::default()),
1356 }
1357 }
1358
1359 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1361 Arc::clone(&self.txn_manager)
1362 }
1363
1364 pub fn persist_next_txn_id(&self, next_txn_id: TxnId) -> Result<()> {
1366 let catalog = SysCatalog::new(&self.store);
1367 catalog.put_next_txn_id(next_txn_id)?;
1368 let last_committed = self.txn_manager.last_committed();
1369 catalog.put_last_committed_txn_id(last_committed)?;
1370 tracing::debug!(
1371 "[CONTEXT] Persisted next_txn_id={}, last_committed={}",
1372 next_txn_id,
1373 last_committed
1374 );
1375 Ok(())
1376 }
1377
1378 fn build_executor_multi_column_uniques(
1379 table: &ExecutorTable<P>,
1380 stored: &[MultiColumnUniqueEntryMeta],
1381 ) -> Vec<ExecutorMultiColumnUnique> {
1382 let mut results = Vec::with_capacity(stored.len());
1383
1384 'outer: for entry in stored {
1385 if entry.column_ids.is_empty() {
1386 continue;
1387 }
1388
1389 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
1390 for field_id in &entry.column_ids {
1391 if let Some((idx, _)) = table
1392 .schema
1393 .columns
1394 .iter()
1395 .enumerate()
1396 .find(|(_, col)| &col.field_id == field_id)
1397 {
1398 column_indices.push(idx);
1399 } else {
1400 tracing::warn!(
1401 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
1402 entry.index_name,
1403 table.table.table_id(),
1404 field_id
1405 );
1406 continue 'outer;
1407 }
1408 }
1409
1410 results.push(ExecutorMultiColumnUnique {
1411 index_name: entry.index_name.clone(),
1412 column_indices,
1413 });
1414 }
1415
1416 results
1417 }
1418
1419 pub fn default_snapshot(&self) -> TransactionSnapshot {
1421 TransactionSnapshot {
1422 txn_id: TXN_ID_AUTO_COMMIT,
1423 snapshot_id: self.txn_manager.last_committed(),
1424 }
1425 }
1426
1427 pub fn table_catalog(&self) -> Arc<TableCatalog> {
1429 Arc::clone(&self.catalog)
1430 }
1431
1432 pub fn create_session(self: &Arc<Self>) -> RuntimeSession<P> {
1435 tracing::debug!("[SESSION] RuntimeContext::create_session called");
1436 let namespaces = Arc::new(SessionNamespaces::new(Arc::clone(self)));
1437 let wrapper = RuntimeContextWrapper::new(Arc::clone(self));
1438 let inner = self.transaction_manager.create_session(Arc::new(wrapper));
1439 tracing::debug!(
1440 "[SESSION] Created TransactionSession with session_id (will be logged by transaction manager)"
1441 );
1442 RuntimeSession { inner, namespaces }
1443 }
1444
1445 pub fn table(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
1447 RuntimeTableHandle::new(Arc::clone(self), name)
1448 }
1449
1450 #[deprecated(note = "Use session-based transactions instead")]
1452 pub fn has_active_transaction(&self) -> bool {
1453 self.transaction_manager.has_active_transaction()
1454 }
1455
1456 pub fn create_table<C, I>(
1457 self: &Arc<Self>,
1458 name: &str,
1459 columns: I,
1460 ) -> Result<RuntimeTableHandle<P>>
1461 where
1462 C: IntoColumnSpec,
1463 I: IntoIterator<Item = C>,
1464 {
1465 self.create_table_with_options(name, columns, false)
1466 }
1467
1468 pub fn create_table_if_not_exists<C, I>(
1469 self: &Arc<Self>,
1470 name: &str,
1471 columns: I,
1472 ) -> Result<RuntimeTableHandle<P>>
1473 where
1474 C: IntoColumnSpec,
1475 I: IntoIterator<Item = C>,
1476 {
1477 self.create_table_with_options(name, columns, true)
1478 }
1479
1480 pub fn create_table_plan(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1481 if plan.columns.is_empty() && plan.source.is_none() {
1482 return Err(Error::InvalidArgumentError(
1483 "CREATE TABLE requires explicit columns or a source".into(),
1484 ));
1485 }
1486
1487 let (display_name, canonical_name) = canonical_table_name(&plan.name)?;
1488 let CreateTablePlan {
1489 name: _,
1490 if_not_exists,
1491 or_replace,
1492 columns,
1493 source,
1494 namespace: _,
1495 foreign_keys,
1496 } = plan;
1497
1498 tracing::trace!(
1499 "DEBUG create_table_plan: table='{}' if_not_exists={} columns={}",
1500 display_name,
1501 if_not_exists,
1502 columns.len()
1503 );
1504 for (idx, col) in columns.iter().enumerate() {
1505 tracing::trace!(
1506 " plan column[{}]: name='{}' primary_key={}",
1507 idx,
1508 col.name,
1509 col.primary_key
1510 );
1511 }
1512 let (exists, is_dropped) = {
1513 let tables = self.tables.read().unwrap();
1514 let in_cache = tables.contains_key(&canonical_name);
1515 let is_dropped = self
1516 .dropped_tables
1517 .read()
1518 .unwrap()
1519 .contains(&canonical_name);
1520 (in_cache && !is_dropped, is_dropped)
1522 };
1523 tracing::trace!(
1524 "DEBUG create_table_plan: exists={}, is_dropped={}",
1525 exists,
1526 is_dropped
1527 );
1528
1529 if is_dropped {
1531 self.remove_table_entry(&canonical_name);
1532 self.dropped_tables.write().unwrap().remove(&canonical_name);
1533 }
1534
1535 if exists {
1536 if or_replace {
1537 tracing::trace!(
1538 "DEBUG create_table_plan: table '{}' exists and or_replace=true, removing existing table before recreation",
1539 display_name
1540 );
1541 self.remove_table_entry(&canonical_name);
1542 } else if if_not_exists {
1543 tracing::trace!(
1544 "DEBUG create_table_plan: table '{}' exists and if_not_exists=true, returning early WITHOUT creating",
1545 display_name
1546 );
1547 return Ok(RuntimeStatementResult::CreateTable {
1548 table_name: display_name,
1549 });
1550 } else {
1551 return Err(Error::CatalogError(format!(
1552 "Catalog Error: Table '{}' already exists",
1553 display_name
1554 )));
1555 }
1556 }
1557
1558 match source {
1559 Some(CreateTableSource::Batches { schema, batches }) => self.create_table_from_batches(
1560 display_name,
1561 canonical_name,
1562 schema,
1563 batches,
1564 if_not_exists,
1565 ),
1566 Some(CreateTableSource::Select { .. }) => Err(Error::Internal(
1567 "CreateTableSource::Select should be materialized before reaching RuntimeContext::create_table_plan"
1568 .into(),
1569 )),
1570 None => self.create_table_from_columns(
1571 display_name,
1572 canonical_name,
1573 columns,
1574 foreign_keys,
1575 if_not_exists,
1576 ),
1577 }
1578 }
1579
1580 pub fn create_index(&self, plan: CreateIndexPlan) -> Result<RuntimeStatementResult<P>> {
1581 if plan.columns.is_empty() {
1582 return Err(Error::InvalidArgumentError(
1583 "CREATE INDEX requires at least one column".into(),
1584 ));
1585 }
1586
1587 for column_plan in &plan.columns {
1588 if !column_plan.ascending || column_plan.nulls_first {
1589 return Err(Error::InvalidArgumentError(
1590 "only ASC indexes with NULLS LAST are supported".into(),
1591 ));
1592 }
1593 }
1594
1595 let index_name = plan.name.clone();
1596 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1597 let table = self.lookup_table(&canonical_name)?;
1598
1599 let mut column_indices = Vec::with_capacity(plan.columns.len());
1600 let mut field_ids = Vec::with_capacity(plan.columns.len());
1601 let mut column_names = Vec::with_capacity(plan.columns.len());
1602 let mut seen_column_indices = FxHashSet::default();
1603
1604 for column_plan in &plan.columns {
1605 let normalized = column_plan.name.to_ascii_lowercase();
1606 let col_idx = table
1607 .schema
1608 .lookup
1609 .get(&normalized)
1610 .copied()
1611 .ok_or_else(|| {
1612 Error::InvalidArgumentError(format!(
1613 "column '{}' does not exist in table '{}'",
1614 column_plan.name, display_name
1615 ))
1616 })?;
1617 if !seen_column_indices.insert(col_idx) {
1618 return Err(Error::InvalidArgumentError(format!(
1619 "duplicate column '{}' in CREATE INDEX",
1620 column_plan.name
1621 )));
1622 }
1623
1624 let column = &table.schema.columns[col_idx];
1625 column_indices.push(col_idx);
1626 field_ids.push(column.field_id);
1627 column_names.push(column.name.clone());
1628 }
1629
1630 if plan.columns.len() == 1 {
1631 let field_id = field_ids[0];
1632 let column_name = column_names[0].clone();
1633
1634 if plan.unique {
1635 let snapshot = self.default_snapshot();
1636 let existing_values =
1637 self.scan_column_values(table.as_ref(), field_id, snapshot)?;
1638 ensure_single_column_unique(&existing_values, &[], &column_name)?;
1639 }
1640
1641 let created = self.catalog_service.register_single_column_index(
1642 &display_name,
1643 &canonical_name,
1644 &table.table,
1645 field_id,
1646 &column_name,
1647 plan.unique,
1648 plan.if_not_exists,
1649 )?;
1650
1651 if !created {
1652 return Ok(RuntimeStatementResult::CreateIndex {
1654 table_name: display_name,
1655 index_name,
1656 });
1657 }
1658
1659 if let Some(updated_table) =
1660 Self::rebuild_executor_table_with_unique(table.as_ref(), field_id)
1661 {
1662 self.tables
1663 .write()
1664 .unwrap()
1665 .insert(canonical_name.clone(), Arc::clone(&updated_table));
1666 } else {
1667 self.remove_table_entry(&canonical_name);
1668 }
1669
1670 drop(table);
1671
1672 return Ok(RuntimeStatementResult::CreateIndex {
1673 table_name: display_name,
1674 index_name,
1675 });
1676 }
1677
1678 if !plan.unique {
1679 return Err(Error::InvalidArgumentError(
1680 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1681 ));
1682 }
1683
1684 let table_id = table.table.table_id();
1685
1686 let snapshot = self.default_snapshot();
1687 let existing_rows = self.scan_multi_column_values(table.as_ref(), &field_ids, snapshot)?;
1688 ensure_multi_column_unique(&existing_rows, &[], &column_names)?;
1689
1690 let executor_entry = ExecutorMultiColumnUnique {
1691 index_name: index_name.clone(),
1692 column_indices: column_indices.clone(),
1693 };
1694
1695 let registration = self.catalog_service.register_multi_column_unique_index(
1696 table_id,
1697 &field_ids,
1698 index_name.clone(),
1699 )?;
1700
1701 match registration {
1702 MultiColumnUniqueRegistration::Created => {
1703 table.add_multi_column_unique(executor_entry);
1704 }
1705 MultiColumnUniqueRegistration::AlreadyExists {
1706 index_name: existing,
1707 } => {
1708 if plan.if_not_exists {
1709 drop(table);
1710 return Ok(RuntimeStatementResult::CreateIndex {
1711 table_name: display_name,
1712 index_name: existing,
1713 });
1714 }
1715 return Err(Error::CatalogError(format!(
1716 "Index already exists on columns '{}'",
1717 column_names.join(", ")
1718 )));
1719 }
1720 }
1721
1722 Ok(RuntimeStatementResult::CreateIndex {
1723 table_name: display_name,
1724 index_name,
1725 })
1726 }
1727
1728 pub fn table_names(self: &Arc<Self>) -> Vec<String> {
1730 self.catalog.table_names()
1732 }
1733
1734 pub fn table_view(&self, canonical_name: &str) -> Result<TableView> {
1736 self.catalog_service.table_view(canonical_name)
1737 }
1738
1739 fn filter_visible_row_ids(
1740 &self,
1741 table: &ExecutorTable<P>,
1742 row_ids: Vec<RowId>,
1743 snapshot: TransactionSnapshot,
1744 ) -> Result<Vec<RowId>> {
1745 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
1746 }
1747
1748 pub fn create_table_builder(&self, name: &str) -> RuntimeCreateTableBuilder<'_, P> {
1750 RuntimeCreateTableBuilder {
1751 ctx: self,
1752 plan: CreateTablePlan::new(name),
1753 }
1754 }
1755
1756 pub fn table_column_specs(self: &Arc<Self>, name: &str) -> Result<Vec<ColumnSpec>> {
1758 let (_, canonical_name) = canonical_table_name(name)?;
1759 self.catalog_service.table_column_specs(&canonical_name)
1760 }
1761
1762 pub fn foreign_key_views(self: &Arc<Self>, name: &str) -> Result<Vec<ForeignKeyView>> {
1764 let (_, canonical_name) = canonical_table_name(name)?;
1765 self.catalog_service.foreign_key_views(&canonical_name)
1766 }
1767
1768 pub fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<RowBatch> {
1770 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
1771 handle.lazy()?.collect_rows()
1772 }
1773
1774 fn execute_create_table(&self, plan: CreateTablePlan) -> Result<RuntimeStatementResult<P>> {
1775 self.create_table_plan(plan)
1776 }
1777
1778 fn create_table_with_options<C, I>(
1779 self: &Arc<Self>,
1780 name: &str,
1781 columns: I,
1782 if_not_exists: bool,
1783 ) -> Result<RuntimeTableHandle<P>>
1784 where
1785 C: IntoColumnSpec,
1786 I: IntoIterator<Item = C>,
1787 {
1788 let mut plan = CreateTablePlan::new(name);
1789 plan.if_not_exists = if_not_exists;
1790 plan.columns = columns
1791 .into_iter()
1792 .map(|column| column.into_column_spec())
1793 .collect();
1794 let result = self.create_table_plan(plan)?;
1795 match result {
1796 RuntimeStatementResult::CreateTable { .. } => {
1797 RuntimeTableHandle::new(Arc::clone(self), name)
1798 }
1799 other => Err(Error::InvalidArgumentError(format!(
1800 "unexpected statement result {other:?} when creating table"
1801 ))),
1802 }
1803 }
1804
1805 pub fn insert(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
1806 let snapshot = TransactionSnapshot {
1809 txn_id: TXN_ID_AUTO_COMMIT,
1810 snapshot_id: self.txn_manager.last_committed(),
1811 };
1812 self.insert_with_snapshot(plan, snapshot)
1813 }
1814
1815 pub fn insert_with_snapshot(
1816 &self,
1817 plan: InsertPlan,
1818 snapshot: TransactionSnapshot,
1819 ) -> Result<RuntimeStatementResult<P>> {
1820 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
1821 let table = self.lookup_table(&canonical_name)?;
1822
1823 if display_name == "keys" {
1825 tracing::trace!(
1826 "\n[KEYS] INSERT starting - table_id={}, context_pager={:p}",
1827 table.table.table_id(),
1828 &*self.pager
1829 );
1830 tracing::trace!(
1831 "[KEYS] Table has {} columns, primary_key columns: {:?}",
1832 table.schema.columns.len(),
1833 table
1834 .schema
1835 .columns
1836 .iter()
1837 .filter(|c| c.primary_key)
1838 .map(|c| &c.name)
1839 .collect::<Vec<_>>()
1840 );
1841 }
1842
1843 let result = match plan.source {
1844 InsertSource::Rows(rows) => self.insert_rows(
1845 table.as_ref(),
1846 display_name.clone(),
1847 canonical_name.clone(),
1848 rows,
1849 plan.columns,
1850 snapshot,
1851 ),
1852 InsertSource::Batches(batches) => self.insert_batches(
1853 table.as_ref(),
1854 display_name.clone(),
1855 canonical_name.clone(),
1856 batches,
1857 plan.columns,
1858 snapshot,
1859 ),
1860 InsertSource::Select { .. } => Err(Error::Internal(
1861 "InsertSource::Select should be materialized before reaching RuntimeContext::insert"
1862 .into(),
1863 )),
1864 };
1865
1866 if display_name == "keys" {
1867 tracing::trace!(
1868 "[KEYS] INSERT completed: {:?}",
1869 result
1870 .as_ref()
1871 .map(|_| "OK")
1872 .map_err(|e| format!("{:?}", e))
1873 );
1874 }
1875
1876 if matches!(result, Err(Error::NotFound)) {
1877 panic!(
1878 "BUG: insert yielded Error::NotFound for table '{}'. \
1879 This should never happen: insert should never return NotFound after successful table lookup. \
1880 This indicates a logic error in the runtime.",
1881 display_name
1882 );
1883 }
1884
1885 result
1886 }
1887
1888 pub fn get_batches_with_row_ids(
1891 &self,
1892 table_name: &str,
1893 filter: Option<LlkvExpr<'static, String>>,
1894 ) -> Result<Vec<RecordBatch>> {
1895 self.get_batches_with_row_ids_with_snapshot(table_name, filter, self.default_snapshot())
1896 }
1897
1898 pub fn get_batches_with_row_ids_with_snapshot(
1899 &self,
1900 table_name: &str,
1901 filter: Option<LlkvExpr<'static, String>>,
1902 snapshot: TransactionSnapshot,
1903 ) -> Result<Vec<RecordBatch>> {
1904 let (_, canonical_name) = canonical_table_name(table_name)?;
1905 let table = self.lookup_table(&canonical_name)?;
1906
1907 let filter_expr = match filter {
1908 Some(expr) => translate_predicate(expr, table.schema.as_ref())?,
1909 None => {
1910 let field_id = table.schema.first_field_id().ok_or_else(|| {
1911 Error::InvalidArgumentError(
1912 "table has no columns; cannot perform wildcard scan".into(),
1913 )
1914 })?;
1915 full_table_scan_filter(field_id)
1916 }
1917 };
1918
1919 let row_ids = table.table.filter_row_ids(&filter_expr)?;
1921 if row_ids.is_empty() {
1922 return Ok(Vec::new());
1923 }
1924
1925 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
1926 if visible_row_ids.is_empty() {
1927 return Ok(Vec::new());
1928 }
1929
1930 let table_id = table.table.table_id();
1932
1933 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
1934 let mut logical_fields: Vec<LogicalFieldId> =
1935 Vec::with_capacity(table.schema.columns.len());
1936
1937 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
1938
1939 for column in &table.schema.columns {
1940 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
1941 logical_fields.push(logical_field_id);
1942 let field = mvcc::build_field_with_metadata(
1943 &column.name,
1944 column.data_type.clone(),
1945 column.nullable,
1946 column.field_id,
1947 );
1948 fields.push(field);
1949 }
1950
1951 let schema = Arc::new(Schema::new(fields));
1952
1953 if logical_fields.is_empty() {
1954 let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
1956 for row_id in &visible_row_ids {
1957 row_id_builder.append_value(*row_id);
1958 }
1959 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
1960 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1961 return Ok(vec![batch]);
1962 }
1963
1964 let mut stream = table.table.stream_columns(
1965 Arc::from(logical_fields),
1966 visible_row_ids,
1967 GatherNullPolicy::IncludeNulls,
1968 )?;
1969
1970 let mut batches = Vec::new();
1971 while let Some(chunk) = stream.next_batch()? {
1972 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
1973
1974 let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
1975 for row_id in chunk.row_ids() {
1976 row_id_builder.append_value(*row_id);
1977 }
1978 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
1979
1980 let chunk_batch = chunk.into_batch();
1981 for column_array in chunk_batch.columns() {
1982 arrays.push(column_array.clone());
1983 }
1984
1985 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
1986 batches.push(batch);
1987 }
1988
1989 Ok(batches)
1990 }
1991
1992 pub fn append_batches_with_row_ids(
1995 &self,
1996 table_name: &str,
1997 batches: Vec<RecordBatch>,
1998 ) -> Result<usize> {
1999 let (_, canonical_name) = canonical_table_name(table_name)?;
2000 let table = self.lookup_table(&canonical_name)?;
2001
2002 let mut total_rows = 0;
2003 for batch in batches {
2004 if batch.num_rows() == 0 {
2005 continue;
2006 }
2007
2008 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
2010 Error::InvalidArgumentError(
2011 "batch must contain row_id column for direct append".into(),
2012 )
2013 })?;
2014
2015 table.table.append(&batch)?;
2017 total_rows += batch.num_rows();
2018 }
2019
2020 Ok(total_rows)
2021 }
2022
2023 pub fn update(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
2024 let snapshot = self.txn_manager.begin_transaction();
2025 match self.update_with_snapshot(plan, snapshot) {
2026 Ok(result) => {
2027 self.txn_manager.mark_committed(snapshot.txn_id);
2028 Ok(result)
2029 }
2030 Err(err) => {
2031 self.txn_manager.mark_aborted(snapshot.txn_id);
2032 Err(err)
2033 }
2034 }
2035 }
2036
2037 pub fn update_with_snapshot(
2038 &self,
2039 plan: UpdatePlan,
2040 snapshot: TransactionSnapshot,
2041 ) -> Result<RuntimeStatementResult<P>> {
2042 let UpdatePlan {
2043 table,
2044 assignments,
2045 filter,
2046 } = plan;
2047 let (display_name, canonical_name) = canonical_table_name(&table)?;
2048 let table = self.lookup_table(&canonical_name)?;
2049 if let Some(filter) = filter {
2050 self.update_filtered_rows(
2051 table.as_ref(),
2052 display_name,
2053 canonical_name,
2054 assignments,
2055 filter,
2056 snapshot,
2057 )
2058 } else {
2059 self.update_all_rows(
2060 table.as_ref(),
2061 display_name,
2062 canonical_name,
2063 assignments,
2064 snapshot,
2065 )
2066 }
2067 }
2068
2069 pub fn delete(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
2070 let snapshot = self.txn_manager.begin_transaction();
2071 match self.delete_with_snapshot(plan, snapshot) {
2072 Ok(result) => {
2073 self.txn_manager.mark_committed(snapshot.txn_id);
2074 Ok(result)
2075 }
2076 Err(err) => {
2077 self.txn_manager.mark_aborted(snapshot.txn_id);
2078 Err(err)
2079 }
2080 }
2081 }
2082
2083 pub fn delete_with_snapshot(
2084 &self,
2085 plan: DeletePlan,
2086 snapshot: TransactionSnapshot,
2087 ) -> Result<RuntimeStatementResult<P>> {
2088 let (display_name, canonical_name) = canonical_table_name(&plan.table)?;
2089 let table = match self.tables.read().unwrap().get(&canonical_name) {
2091 Some(t) => Arc::clone(t),
2092 None => return Err(Error::NotFound),
2093 };
2094 match plan.filter {
2095 Some(filter) => self.delete_filtered_rows(
2096 table.as_ref(),
2097 display_name,
2098 canonical_name.clone(),
2099 filter,
2100 snapshot,
2101 ),
2102 None => self.delete_all_rows(table.as_ref(), display_name, canonical_name, snapshot),
2103 }
2104 }
2105
2106 pub fn table_handle(self: &Arc<Self>, name: &str) -> Result<RuntimeTableHandle<P>> {
2107 RuntimeTableHandle::new(Arc::clone(self), name)
2108 }
2109
2110 pub fn execute_select(self: &Arc<Self>, plan: SelectPlan) -> Result<SelectExecution<P>> {
2111 let snapshot = self.default_snapshot();
2112 self.execute_select_with_snapshot(plan, snapshot)
2113 }
2114
2115 pub fn execute_select_with_snapshot(
2116 self: &Arc<Self>,
2117 plan: SelectPlan,
2118 snapshot: TransactionSnapshot,
2119 ) -> Result<SelectExecution<P>> {
2120 if plan.tables.is_empty() {
2122 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2123 context: Arc::clone(self),
2124 });
2125 let executor = QueryExecutor::new(provider);
2126 return executor.execute_select_with_filter(plan, None);
2128 }
2129
2130 let mut canonical_tables = Vec::new();
2132 for table_ref in &plan.tables {
2133 let qualified = table_ref.qualified_name();
2134 let (_display, canonical) = canonical_table_name(&qualified)?;
2135 let parts: Vec<&str> = canonical.split('.').collect();
2137 let canon_ref = if parts.len() >= 2 {
2138 llkv_plan::TableRef::new(parts[0], parts[1])
2139 } else {
2140 llkv_plan::TableRef::new("", &canonical)
2141 };
2142 canonical_tables.push(canon_ref);
2143 }
2144
2145 let mut canonical_plan = plan.clone();
2146 canonical_plan.tables = canonical_tables;
2147
2148 let provider: Arc<dyn TableProvider<P>> = Arc::new(ContextProvider {
2149 context: Arc::clone(self),
2150 });
2151 let executor = QueryExecutor::new(provider);
2152
2153 let row_filter: Option<Arc<dyn RowIdFilter<P>>> = if canonical_plan.tables.len() == 1 {
2155 Some(Arc::new(MvccRowIdFilter::new(
2156 Arc::clone(&self.txn_manager),
2157 snapshot,
2158 )))
2159 } else {
2160 None
2163 };
2164
2165 executor.execute_select_with_filter(canonical_plan, row_filter)
2166 }
2167
2168 fn create_table_from_columns(
2169 &self,
2170 display_name: String,
2171 canonical_name: String,
2172 columns: Vec<ColumnSpec>,
2173 foreign_keys: Vec<ForeignKeySpec>,
2174 if_not_exists: bool,
2175 ) -> Result<RuntimeStatementResult<P>> {
2176 tracing::trace!(
2177 "\n=== CREATE_TABLE_FROM_COLUMNS: table='{}' columns={} ===",
2178 display_name,
2179 columns.len()
2180 );
2181 for (idx, col) in columns.iter().enumerate() {
2182 tracing::trace!(
2183 " input column[{}]: name='{}' primary_key={}",
2184 idx,
2185 col.name,
2186 col.primary_key
2187 );
2188 }
2189 if columns.is_empty() {
2190 return Err(Error::InvalidArgumentError(
2191 "CREATE TABLE requires at least one column".into(),
2192 ));
2193 }
2194
2195 {
2197 let tables = self.tables.read().unwrap();
2198 if tables.contains_key(&canonical_name) {
2199 if if_not_exists {
2200 return Ok(RuntimeStatementResult::CreateTable {
2201 table_name: display_name,
2202 });
2203 }
2204
2205 return Err(Error::CatalogError(format!(
2206 "Catalog Error: Table '{}' already exists",
2207 display_name
2208 )));
2209 }
2210 }
2211
2212 let CreateTableResult {
2213 table_id,
2214 table,
2215 table_columns,
2216 column_lookup,
2217 } = Table::create_from_columns(
2218 &display_name,
2219 &canonical_name,
2220 &columns,
2221 self.metadata.clone(),
2222 self.catalog.clone(),
2223 self.store.clone(),
2224 )?;
2225
2226 tracing::trace!(
2227 "=== TABLE '{}' CREATED WITH table_id={} pager={:p} ===",
2228 display_name,
2229 table_id,
2230 &*self.pager
2231 );
2232
2233 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2234 for (idx, column) in table_columns.iter().enumerate() {
2235 tracing::trace!(
2236 "DEBUG create_table_from_columns[{}]: name='{}' data_type={:?} nullable={} primary_key={} unique={}",
2237 idx,
2238 column.name,
2239 column.data_type,
2240 column.nullable,
2241 column.primary_key,
2242 column.unique
2243 );
2244 column_defs.push(ExecutorColumn {
2245 name: column.name.clone(),
2246 data_type: column.data_type.clone(),
2247 nullable: column.nullable,
2248 primary_key: column.primary_key,
2249 unique: column.unique,
2250 field_id: column.field_id,
2251 check_expr: column.check_expr.clone(),
2252 });
2253 }
2254
2255 let schema = Arc::new(ExecutorSchema {
2256 columns: column_defs.clone(),
2257 lookup: column_lookup,
2258 });
2259 let table_entry = Arc::new(ExecutorTable {
2260 table: Arc::clone(&table),
2261 schema,
2262 next_row_id: AtomicU64::new(0),
2263 total_rows: AtomicU64::new(0),
2264 multi_column_uniques: RwLock::new(Vec::new()),
2265 });
2266
2267 let mut tables = self.tables.write().unwrap();
2268 if tables.contains_key(&canonical_name) {
2269 drop(tables);
2270 let field_ids: Vec<FieldId> =
2271 table_columns.iter().map(|column| column.field_id).collect();
2272 let _ = self
2273 .catalog_service
2274 .drop_table(&canonical_name, table_id, &field_ids);
2275 if if_not_exists {
2276 return Ok(RuntimeStatementResult::CreateTable {
2277 table_name: display_name,
2278 });
2279 }
2280 return Err(Error::CatalogError(format!(
2281 "Catalog Error: Table '{}' already exists",
2282 display_name
2283 )));
2284 }
2285 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2286 drop(tables);
2287
2288 if !foreign_keys.is_empty() {
2289 let fk_result = self.catalog_service.register_foreign_keys_for_new_table(
2290 table_id,
2291 &display_name,
2292 &canonical_name,
2293 &table_columns,
2294 &foreign_keys,
2295 |table_name| {
2296 let (display, canonical) = canonical_table_name(table_name)?;
2297 let referenced_table = self.lookup_table(&canonical).map_err(|_| {
2298 Error::InvalidArgumentError(format!(
2299 "referenced table '{}' does not exist",
2300 table_name
2301 ))
2302 })?;
2303
2304 let columns = referenced_table
2305 .schema
2306 .columns
2307 .iter()
2308 .map(|column| ForeignKeyColumn {
2309 name: column.name.clone(),
2310 data_type: column.data_type.clone(),
2311 nullable: column.nullable,
2312 primary_key: column.primary_key,
2313 unique: column.unique,
2314 field_id: column.field_id,
2315 })
2316 .collect();
2317
2318 Ok(ForeignKeyTableInfo {
2319 display_name: display,
2320 canonical_name: canonical,
2321 table_id: referenced_table.table.table_id(),
2322 columns,
2323 })
2324 },
2325 current_time_micros(),
2326 );
2327
2328 if let Err(err) = fk_result {
2329 let field_ids: Vec<FieldId> =
2330 table_columns.iter().map(|column| column.field_id).collect();
2331 let _ = self
2332 .catalog_service
2333 .drop_table(&canonical_name, table_id, &field_ids);
2334 self.remove_table_entry(&canonical_name);
2335 return Err(err);
2336 }
2337 }
2338
2339 Ok(RuntimeStatementResult::CreateTable {
2340 table_name: display_name,
2341 })
2342 }
2343
2344 fn create_table_from_batches(
2345 &self,
2346 display_name: String,
2347 canonical_name: String,
2348 schema: Arc<Schema>,
2349 batches: Vec<RecordBatch>,
2350 if_not_exists: bool,
2351 ) -> Result<RuntimeStatementResult<P>> {
2352 {
2353 let tables = self.tables.read().unwrap();
2354 if tables.contains_key(&canonical_name) {
2355 if if_not_exists {
2356 return Ok(RuntimeStatementResult::CreateTable {
2357 table_name: display_name,
2358 });
2359 }
2360 return Err(Error::CatalogError(format!(
2361 "Catalog Error: Table '{}' already exists",
2362 display_name
2363 )));
2364 }
2365 }
2366
2367 let CreateTableResult {
2368 table_id,
2369 table,
2370 table_columns,
2371 column_lookup,
2372 } = self.catalog_service.create_table_from_schema(
2373 &display_name,
2374 &canonical_name,
2375 &schema,
2376 )?;
2377
2378 tracing::trace!(
2379 "=== CTAS table '{}' created with table_id={} pager={:p} ===",
2380 display_name,
2381 table_id,
2382 &*self.pager
2383 );
2384
2385 let mut column_defs: Vec<ExecutorColumn> = Vec::with_capacity(table_columns.len());
2386 for column in &table_columns {
2387 column_defs.push(ExecutorColumn {
2388 name: column.name.clone(),
2389 data_type: column.data_type.clone(),
2390 nullable: column.nullable,
2391 primary_key: column.primary_key,
2392 unique: column.unique,
2393 field_id: column.field_id,
2394 check_expr: column.check_expr.clone(),
2395 });
2396 }
2397
2398 let schema_arc = Arc::new(ExecutorSchema {
2399 columns: column_defs.clone(),
2400 lookup: column_lookup,
2401 });
2402 let table_entry = Arc::new(ExecutorTable {
2403 table: Arc::clone(&table),
2404 schema: schema_arc,
2405 next_row_id: AtomicU64::new(0),
2406 total_rows: AtomicU64::new(0),
2407 multi_column_uniques: RwLock::new(Vec::new()),
2408 });
2409
2410 let creator_snapshot = self.txn_manager.begin_transaction();
2411 let creator_txn_id = creator_snapshot.txn_id;
2412 let (next_row_id, total_rows) = match self.catalog_service.append_batches_with_mvcc(
2413 table.as_ref(),
2414 &table_columns,
2415 &batches,
2416 creator_txn_id,
2417 TXN_ID_NONE,
2418 0,
2419 ) {
2420 Ok(result) => {
2421 self.txn_manager.mark_committed(creator_txn_id);
2422 result
2423 }
2424 Err(err) => {
2425 self.txn_manager.mark_aborted(creator_txn_id);
2426 let field_ids: Vec<FieldId> =
2427 table_columns.iter().map(|column| column.field_id).collect();
2428 let _ = self
2429 .catalog_service
2430 .drop_table(&canonical_name, table_id, &field_ids);
2431 return Err(err);
2432 }
2433 };
2434
2435 table_entry.next_row_id.store(next_row_id, Ordering::SeqCst);
2436 table_entry.total_rows.store(total_rows, Ordering::SeqCst);
2437
2438 let mut tables = self.tables.write().unwrap();
2439 if tables.contains_key(&canonical_name) {
2440 if if_not_exists {
2441 return Ok(RuntimeStatementResult::CreateTable {
2442 table_name: display_name,
2443 });
2444 }
2445 return Err(Error::CatalogError(format!(
2446 "Catalog Error: Table '{}' already exists",
2447 display_name
2448 )));
2449 }
2450 tables.insert(canonical_name.clone(), Arc::clone(&table_entry));
2451 drop(tables); Ok(RuntimeStatementResult::CreateTable {
2454 table_name: display_name,
2455 })
2456 }
2457
2458 fn rebuild_executor_table_with_unique(
2459 table: &ExecutorTable<P>,
2460 field_id: FieldId,
2461 ) -> Option<Arc<ExecutorTable<P>>> {
2462 let mut columns = table.schema.columns.clone();
2463 let mut found = false;
2464 for column in &mut columns {
2465 if column.field_id == field_id {
2466 column.unique = true;
2467 found = true;
2468 break;
2469 }
2470 }
2471 if !found {
2472 return None;
2473 }
2474
2475 let schema = Arc::new(ExecutorSchema {
2476 columns,
2477 lookup: table.schema.lookup.clone(),
2478 });
2479
2480 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
2481 let total_rows = table.total_rows.load(Ordering::SeqCst);
2482 let uniques = table.multi_column_uniques();
2483
2484 Some(Arc::new(ExecutorTable {
2485 table: Arc::clone(&table.table),
2486 schema,
2487 next_row_id: AtomicU64::new(next_row_id),
2488 total_rows: AtomicU64::new(total_rows),
2489 multi_column_uniques: RwLock::new(uniques),
2490 }))
2491 }
2492
2493 fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
2494 if txn_id == TXN_ID_AUTO_COMMIT {
2495 return;
2496 }
2497
2498 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2499 guard.entry(txn_id).or_default().insert(canonical_name);
2500 }
2501
2502 fn collect_rows_created_by_txn(
2503 &self,
2504 table: &ExecutorTable<P>,
2505 txn_id: TxnId,
2506 ) -> Result<Vec<Vec<PlanValue>>> {
2507 if txn_id == TXN_ID_AUTO_COMMIT {
2508 return Ok(Vec::new());
2509 }
2510
2511 if table.schema.columns.is_empty() {
2512 return Ok(Vec::new());
2513 }
2514
2515 let Some(first_field_id) = table.schema.first_field_id() else {
2516 return Ok(Vec::new());
2517 };
2518 let filter_expr = full_table_scan_filter(first_field_id);
2519
2520 let row_ids = table.table.filter_row_ids(&filter_expr)?;
2521 if row_ids.is_empty() {
2522 return Ok(Vec::new());
2523 }
2524
2525 let table_id = table.table.table_id();
2526 let mut logical_fields: Vec<LogicalFieldId> =
2527 Vec::with_capacity(table.schema.columns.len() + 2);
2528 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
2529 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
2530 for column in &table.schema.columns {
2531 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
2532 }
2533
2534 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
2535 let mut stream = table.table.stream_columns(
2536 Arc::clone(&logical_fields),
2537 row_ids,
2538 GatherNullPolicy::IncludeNulls,
2539 )?;
2540
2541 let mut rows = Vec::new();
2542 while let Some(chunk) = stream.next_batch()? {
2543 let batch = chunk.batch();
2544 if batch.num_columns() < table.schema.columns.len() + 2 {
2545 continue;
2546 }
2547
2548 let created_col = batch
2549 .column(0)
2550 .as_any()
2551 .downcast_ref::<UInt64Array>()
2552 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
2553 let deleted_col = batch
2554 .column(1)
2555 .as_any()
2556 .downcast_ref::<UInt64Array>()
2557 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
2558
2559 for row_idx in 0..batch.num_rows() {
2560 let created_by = if created_col.is_null(row_idx) {
2561 TXN_ID_AUTO_COMMIT
2562 } else {
2563 created_col.value(row_idx)
2564 };
2565 if created_by != txn_id {
2566 continue;
2567 }
2568
2569 let deleted_by = if deleted_col.is_null(row_idx) {
2570 TXN_ID_NONE
2571 } else {
2572 deleted_col.value(row_idx)
2573 };
2574 if deleted_by != TXN_ID_NONE {
2575 continue;
2576 }
2577
2578 let mut row_values = Vec::with_capacity(table.schema.columns.len());
2579 for col_idx in 0..table.schema.columns.len() {
2580 let array = batch.column(col_idx + 2);
2581 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
2582 row_values.push(value);
2583 }
2584 rows.push(row_values);
2585 }
2586 }
2587
2588 Ok(rows)
2589 }
2590
2591 fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
2592 if txn_id == TXN_ID_AUTO_COMMIT {
2593 return Ok(());
2594 }
2595
2596 let tables = {
2597 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2598 guard.remove(&txn_id)
2599 };
2600
2601 let Some(tables) = tables else {
2602 return Ok(());
2603 };
2604
2605 if tables.is_empty() {
2606 return Ok(());
2607 }
2608
2609 let snapshot = TransactionSnapshot {
2610 txn_id: TXN_ID_AUTO_COMMIT,
2611 snapshot_id: self.txn_manager.last_committed(),
2612 };
2613
2614 for table_name in tables {
2615 let table = self.lookup_table(&table_name)?;
2616 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
2617 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
2618 continue;
2619 };
2620
2621 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
2622 if new_rows.is_empty() {
2623 continue;
2624 }
2625
2626 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
2627 self.constraint_service.validate_primary_key_rows(
2628 &constraint_ctx.schema_field_ids,
2629 primary_key,
2630 &column_order,
2631 &new_rows,
2632 |field_ids| self.scan_multi_column_values(table.as_ref(), field_ids, snapshot),
2633 )?;
2634 }
2635
2636 Ok(())
2637 }
2638
2639 fn clear_transaction_state(&self, txn_id: TxnId) {
2640 if txn_id == TXN_ID_AUTO_COMMIT {
2641 return;
2642 }
2643
2644 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
2645 guard.remove(&txn_id);
2646 }
2647
2648 fn coerce_plan_value_for_column(
2649 &self,
2650 value: PlanValue,
2651 column: &ExecutorColumn,
2652 ) -> Result<PlanValue> {
2653 match value {
2654 PlanValue::Null => Ok(PlanValue::Null),
2655 PlanValue::Integer(v) => match &column.data_type {
2656 DataType::Int64 => Ok(PlanValue::Integer(v)),
2657 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
2658 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
2659 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2660 DataType::Date32 => {
2661 let casted = i32::try_from(v).map_err(|_| {
2662 Error::InvalidArgumentError(format!(
2663 "integer literal out of range for DATE column '{}'",
2664 column.name
2665 ))
2666 })?;
2667 Ok(PlanValue::Integer(casted as i64))
2668 }
2669 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2670 "cannot assign integer to STRUCT column '{}'",
2671 column.name
2672 ))),
2673 _ => Ok(PlanValue::Integer(v)),
2674 },
2675 PlanValue::Float(v) => match &column.data_type {
2676 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
2677 DataType::Float64 => Ok(PlanValue::Float(v)),
2678 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
2679 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
2680 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
2681 "cannot assign floating-point value to DATE column '{}'",
2682 column.name
2683 ))),
2684 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2685 "cannot assign floating-point value to STRUCT column '{}'",
2686 column.name
2687 ))),
2688 _ => Ok(PlanValue::Float(v)),
2689 },
2690 PlanValue::String(s) => match &column.data_type {
2691 DataType::Boolean => {
2692 let normalized = s.trim().to_ascii_lowercase();
2693 match normalized.as_str() {
2694 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
2695 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
2696 _ => Err(Error::InvalidArgumentError(format!(
2697 "cannot assign string '{}' to BOOLEAN column '{}'",
2698 s, column.name
2699 ))),
2700 }
2701 }
2702 DataType::Utf8 => Ok(PlanValue::String(s)),
2703 DataType::Date32 => {
2704 let days = parse_date32_literal(&s)?;
2705 Ok(PlanValue::Integer(days as i64))
2706 }
2707 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
2708 "cannot assign string '{}' to numeric column '{}'",
2709 s, column.name
2710 ))),
2711 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
2712 "cannot assign string to STRUCT column '{}'",
2713 column.name
2714 ))),
2715 _ => Ok(PlanValue::String(s)),
2716 },
2717 PlanValue::Struct(map) => match &column.data_type {
2718 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
2719 _ => Err(Error::InvalidArgumentError(format!(
2720 "cannot assign struct value to column '{}'",
2721 column.name
2722 ))),
2723 },
2724 }
2725 }
2726
2727 fn scan_column_values(
2732 &self,
2733 table: &ExecutorTable<P>,
2734 field_id: FieldId,
2735 snapshot: TransactionSnapshot,
2736 ) -> Result<Vec<PlanValue>> {
2737 let table_id = table.table.table_id();
2738 use llkv_expr::{Expr, Filter, Operator};
2739 use std::ops::Bound;
2740
2741 let match_all_filter = Filter {
2743 field_id,
2744 op: Operator::Range {
2745 lower: Bound::Unbounded,
2746 upper: Bound::Unbounded,
2747 },
2748 };
2749 let filter_expr = Expr::Pred(match_all_filter);
2750
2751 let row_ids = match table.table.filter_row_ids(&filter_expr) {
2753 Ok(ids) => ids,
2754 Err(Error::NotFound) => return Ok(Vec::new()),
2755 Err(e) => return Err(e),
2756 };
2757
2758 let row_ids = filter_row_ids_for_snapshot(
2760 table.table.as_ref(),
2761 row_ids,
2762 &self.txn_manager,
2763 snapshot,
2764 )?;
2765
2766 if row_ids.is_empty() {
2767 return Ok(Vec::new());
2768 }
2769
2770 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
2772 let row_count = row_ids.len();
2773 let mut stream = match table.table.stream_columns(
2774 vec![logical_field_id],
2775 row_ids,
2776 GatherNullPolicy::IncludeNulls,
2777 ) {
2778 Ok(stream) => stream,
2779 Err(Error::NotFound) => return Ok(Vec::new()),
2780 Err(e) => return Err(e),
2781 };
2782
2783 let mut values = Vec::with_capacity(row_count);
2787 while let Some(chunk) = stream.next_batch()? {
2788 let batch = chunk.batch();
2789 if batch.num_columns() == 0 {
2790 continue;
2791 }
2792 let array = batch.column(0);
2793 for row_idx in 0..batch.num_rows() {
2794 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
2795 values.push(value);
2796 }
2797 }
2798 }
2799
2800 Ok(values)
2801 }
2802
2803 fn scan_multi_column_values(
2809 &self,
2810 table: &ExecutorTable<P>,
2811 field_ids: &[FieldId],
2812 snapshot: TransactionSnapshot,
2813 ) -> Result<Vec<Vec<PlanValue>>> {
2814 if field_ids.is_empty() {
2815 return Ok(Vec::new());
2816 }
2817
2818 let table_id = table.table.table_id();
2819 use llkv_expr::{Expr, Filter, Operator};
2820 use std::ops::Bound;
2821
2822 let match_all_filter = Filter {
2823 field_id: field_ids[0],
2824 op: Operator::Range {
2825 lower: Bound::Unbounded,
2826 upper: Bound::Unbounded,
2827 },
2828 };
2829 let filter_expr = Expr::Pred(match_all_filter);
2830
2831 let row_ids = match table.table.filter_row_ids(&filter_expr) {
2832 Ok(ids) => ids,
2833 Err(Error::NotFound) => return Ok(Vec::new()),
2834 Err(e) => return Err(e),
2835 };
2836
2837 let row_ids = filter_row_ids_for_snapshot(
2838 table.table.as_ref(),
2839 row_ids,
2840 &self.txn_manager,
2841 snapshot,
2842 )?;
2843
2844 if row_ids.is_empty() {
2845 return Ok(Vec::new());
2846 }
2847
2848 let logical_field_ids: Vec<_> = field_ids
2849 .iter()
2850 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2851 .collect();
2852
2853 let total_rows = row_ids.len();
2854 let mut stream = match table.table.stream_columns(
2855 logical_field_ids,
2856 row_ids,
2857 GatherNullPolicy::IncludeNulls,
2858 ) {
2859 Ok(stream) => stream,
2860 Err(Error::NotFound) => return Ok(Vec::new()),
2861 Err(e) => return Err(e),
2862 };
2863
2864 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
2865 while let Some(chunk) = stream.next_batch()? {
2866 let batch = chunk.batch();
2867 if batch.num_columns() == 0 {
2868 continue;
2869 }
2870
2871 let base = chunk.row_offset();
2872 let local_len = batch.num_rows();
2873 for col_idx in 0..batch.num_columns() {
2874 let array = batch.column(col_idx);
2875 for local_idx in 0..local_len {
2876 let target_index = base + local_idx;
2877 debug_assert!(
2878 target_index < rows.len(),
2879 "stream chunk produced out-of-bounds row index"
2880 );
2881 if let Some(row) = rows.get_mut(target_index) {
2882 match llkv_plan::plan_value_from_array(array, local_idx) {
2883 Ok(value) => row.push(value),
2884 Err(_) => row.push(PlanValue::Null),
2885 }
2886 }
2887 }
2888 }
2889 }
2890
2891 Ok(rows)
2892 }
2893
2894 fn collect_row_values_for_ids(
2895 &self,
2896 table: &ExecutorTable<P>,
2897 row_ids: &[RowId],
2898 field_ids: &[FieldId],
2899 ) -> Result<Vec<Vec<PlanValue>>> {
2900 if row_ids.is_empty() || field_ids.is_empty() {
2901 return Ok(Vec::new());
2902 }
2903
2904 let table_id = table.table.table_id();
2905 let logical_field_ids: Vec<LogicalFieldId> = field_ids
2906 .iter()
2907 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2908 .collect();
2909
2910 let mut stream = match table.table.stream_columns(
2911 logical_field_ids.clone(),
2912 row_ids.to_vec(),
2913 GatherNullPolicy::IncludeNulls,
2914 ) {
2915 Ok(stream) => stream,
2916 Err(Error::NotFound) => return Ok(Vec::new()),
2917 Err(e) => return Err(e),
2918 };
2919
2920 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
2921 while let Some(chunk) = stream.next_batch()? {
2922 let batch = chunk.batch();
2923 let base = chunk.row_offset();
2924 let local_len = batch.num_rows();
2925 for col_idx in 0..batch.num_columns() {
2926 let array = batch.column(col_idx);
2927 for local_idx in 0..local_len {
2928 let target_index = base + local_idx;
2929 if let Some(row) = rows.get_mut(target_index) {
2930 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2931 row.push(value);
2932 }
2933 }
2934 }
2935 }
2936
2937 Ok(rows)
2938 }
2939
2940 fn collect_visible_child_rows(
2941 &self,
2942 table: &ExecutorTable<P>,
2943 field_ids: &[FieldId],
2944 snapshot: TransactionSnapshot,
2945 ) -> Result<Vec<(RowId, Vec<PlanValue>)>> {
2946 if field_ids.is_empty() {
2947 return Ok(Vec::new());
2948 }
2949
2950 let anchor_field = field_ids[0];
2951 let filter_expr = full_table_scan_filter(anchor_field);
2952 let raw_row_ids = match table.table.filter_row_ids(&filter_expr) {
2953 Ok(ids) => ids,
2954 Err(Error::NotFound) => return Ok(Vec::new()),
2955 Err(e) => return Err(e),
2956 };
2957
2958 let visible_row_ids = filter_row_ids_for_snapshot(
2959 table.table.as_ref(),
2960 raw_row_ids,
2961 &self.txn_manager,
2962 snapshot,
2963 )?;
2964
2965 if visible_row_ids.is_empty() {
2966 return Ok(Vec::new());
2967 }
2968
2969 let table_id = table.table.table_id();
2970 let logical_field_ids: Vec<LogicalFieldId> = field_ids
2971 .iter()
2972 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
2973 .collect();
2974
2975 let mut stream = match table.table.stream_columns(
2976 logical_field_ids.clone(),
2977 visible_row_ids.clone(),
2978 GatherNullPolicy::IncludeNulls,
2979 ) {
2980 Ok(stream) => stream,
2981 Err(Error::NotFound) => return Ok(Vec::new()),
2982 Err(e) => return Err(e),
2983 };
2984
2985 let mut rows = vec![Vec::with_capacity(field_ids.len()); visible_row_ids.len()];
2986 while let Some(chunk) = stream.next_batch()? {
2987 let batch = chunk.batch();
2988 let base = chunk.row_offset();
2989 let local_len = batch.num_rows();
2990 for col_idx in 0..batch.num_columns() {
2991 let array = batch.column(col_idx);
2992 for local_idx in 0..local_len {
2993 let target_index = base + local_idx;
2994 if let Some(row) = rows.get_mut(target_index) {
2995 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
2996 row.push(value);
2997 }
2998 }
2999 }
3000 }
3001
3002 Ok(visible_row_ids.into_iter().zip(rows).collect())
3003 }
3004
3005 fn build_table_constraint_context(
3006 &self,
3007 table: &ExecutorTable<P>,
3008 ) -> Result<TableConstraintContext> {
3009 let schema_field_ids: Vec<FieldId> = table
3010 .schema
3011 .columns
3012 .iter()
3013 .map(|column| column.field_id)
3014 .collect();
3015
3016 let column_constraints: Vec<InsertColumnConstraint> = table
3017 .schema
3018 .columns
3019 .iter()
3020 .enumerate()
3021 .map(|(idx, column)| InsertColumnConstraint {
3022 schema_index: idx,
3023 column: ConstraintColumnInfo {
3024 name: column.name.clone(),
3025 field_id: column.field_id,
3026 data_type: column.data_type.clone(),
3027 nullable: column.nullable,
3028 check_expr: column.check_expr.clone(),
3029 },
3030 })
3031 .collect();
3032
3033 let unique_columns: Vec<InsertUniqueColumn> = table
3034 .schema
3035 .columns
3036 .iter()
3037 .enumerate()
3038 .filter(|(_, column)| column.unique && !column.primary_key)
3039 .map(|(idx, column)| InsertUniqueColumn {
3040 schema_index: idx,
3041 field_id: column.field_id,
3042 name: column.name.clone(),
3043 })
3044 .collect();
3045
3046 let mut multi_column_uniques: Vec<InsertMultiColumnUnique> = Vec::new();
3047 for constraint in table.multi_column_uniques() {
3048 if constraint.column_indices.is_empty() {
3049 continue;
3050 }
3051
3052 let mut schema_indices = Vec::with_capacity(constraint.column_indices.len());
3053 let mut field_ids = Vec::with_capacity(constraint.column_indices.len());
3054 let mut column_names = Vec::with_capacity(constraint.column_indices.len());
3055 for &col_idx in &constraint.column_indices {
3056 let column = table.schema.columns.get(col_idx).ok_or_else(|| {
3057 Error::Internal(format!(
3058 "multi-column UNIQUE constraint references invalid column index {}",
3059 col_idx
3060 ))
3061 })?;
3062 schema_indices.push(col_idx);
3063 field_ids.push(column.field_id);
3064 column_names.push(column.name.clone());
3065 }
3066
3067 multi_column_uniques.push(InsertMultiColumnUnique {
3068 schema_indices,
3069 field_ids,
3070 column_names,
3071 });
3072 }
3073
3074 let primary_indices: Vec<usize> = table
3075 .schema
3076 .columns
3077 .iter()
3078 .enumerate()
3079 .filter(|(_, column)| column.primary_key)
3080 .map(|(idx, _)| idx)
3081 .collect();
3082
3083 let primary_key = if primary_indices.is_empty() {
3084 None
3085 } else {
3086 let mut field_ids = Vec::with_capacity(primary_indices.len());
3087 let mut column_names = Vec::with_capacity(primary_indices.len());
3088 for &idx in &primary_indices {
3089 let column = table.schema.columns.get(idx).ok_or_else(|| {
3090 Error::Internal(format!(
3091 "primary key references invalid column index {}",
3092 idx
3093 ))
3094 })?;
3095 field_ids.push(column.field_id);
3096 column_names.push(column.name.clone());
3097 }
3098
3099 Some(InsertMultiColumnUnique {
3100 schema_indices: primary_indices.clone(),
3101 field_ids,
3102 column_names,
3103 })
3104 };
3105
3106 Ok(TableConstraintContext {
3107 schema_field_ids,
3108 column_constraints,
3109 unique_columns,
3110 multi_column_uniques,
3111 primary_key,
3112 })
3113 }
3114
3115 fn insert_rows(
3116 &self,
3117 table: &ExecutorTable<P>,
3118 display_name: String,
3119 canonical_name: String,
3120 mut rows: Vec<Vec<PlanValue>>,
3121 columns: Vec<String>,
3122 snapshot: TransactionSnapshot,
3123 ) -> Result<RuntimeStatementResult<P>> {
3124 if rows.is_empty() {
3125 return Err(Error::InvalidArgumentError(
3126 "INSERT requires at least one row".into(),
3127 ));
3128 }
3129
3130 let column_order = resolve_insert_columns(&columns, table.schema.as_ref())?;
3131 let expected_len = column_order.len();
3132 for row in &rows {
3133 if row.len() != expected_len {
3134 return Err(Error::InvalidArgumentError(format!(
3135 "expected {} values in INSERT row, found {}",
3136 expected_len,
3137 row.len()
3138 )));
3139 }
3140 }
3141
3142 for row in rows.iter_mut() {
3143 for (position, value) in row.iter_mut().enumerate() {
3144 let schema_index = column_order
3145 .get(position)
3146 .copied()
3147 .ok_or_else(|| Error::Internal("invalid INSERT column index mapping".into()))?;
3148 let column = table.schema.columns.get(schema_index).ok_or_else(|| {
3149 Error::Internal(format!(
3150 "INSERT column index {} out of bounds for table '{}'",
3151 schema_index, display_name
3152 ))
3153 })?;
3154 let normalized = normalize_insert_value_for_column(column, value.clone())?;
3155 *value = normalized;
3156 }
3157 }
3158
3159 let constraint_ctx = self.build_table_constraint_context(table)?;
3160 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3161
3162 if display_name == "keys" {
3163 tracing::trace!(
3164 "[KEYS] Validating constraints for {} row(s) before insert",
3165 rows.len()
3166 );
3167 for (i, row) in rows.iter().enumerate() {
3168 tracing::trace!("[KEYS] row[{}]: {:?}", i, row);
3169 }
3170 }
3171
3172 self.constraint_service.validate_insert_constraints(
3173 &constraint_ctx.schema_field_ids,
3174 &constraint_ctx.column_constraints,
3175 &constraint_ctx.unique_columns,
3176 &constraint_ctx.multi_column_uniques,
3177 primary_key_spec,
3178 &column_order,
3179 &rows,
3180 |field_id| self.scan_column_values(table, field_id, snapshot),
3181 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3182 )?;
3183
3184 self.check_foreign_keys_on_insert(table, &display_name, &rows, &column_order, snapshot)?;
3185
3186 let row_count = rows.len();
3187 let mut column_values: Vec<Vec<PlanValue>> =
3188 vec![Vec::with_capacity(row_count); table.schema.columns.len()];
3189 for row in rows {
3190 for (idx, value) in row.into_iter().enumerate() {
3191 let dest_index = column_order[idx];
3192 column_values[dest_index].push(value);
3193 }
3194 }
3195
3196 let start_row = table.next_row_id.load(Ordering::SeqCst);
3197
3198 let (row_id_array, created_by_array, deleted_by_array) =
3200 mvcc::build_insert_mvcc_columns(row_count, start_row, snapshot.txn_id, TXN_ID_NONE);
3201
3202 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_values.len() + 3);
3203 arrays.push(row_id_array);
3204 arrays.push(created_by_array);
3205 arrays.push(deleted_by_array);
3206
3207 let mut fields: Vec<Field> = Vec::with_capacity(column_values.len() + 3);
3208 fields.extend(mvcc::build_mvcc_fields());
3209
3210 for (column, values) in table.schema.columns.iter().zip(column_values.into_iter()) {
3211 let array = build_array_for_column(&column.data_type, &values)?;
3212 let field = mvcc::build_field_with_metadata(
3213 &column.name,
3214 column.data_type.clone(),
3215 column.nullable,
3216 column.field_id,
3217 );
3218 arrays.push(array);
3219 fields.push(field);
3220 }
3221
3222 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)?;
3223 tracing::trace!(
3224 table_name = %display_name,
3225 store_ptr = ?std::ptr::addr_of!(*table.table.store()),
3226 "About to call table.append"
3227 );
3228 table.table.append(&batch)?;
3229 table
3230 .next_row_id
3231 .store(start_row + row_count as u64, Ordering::SeqCst);
3232 table
3233 .total_rows
3234 .fetch_add(row_count as u64, Ordering::SeqCst);
3235
3236 self.record_table_with_new_rows(snapshot.txn_id, canonical_name);
3237
3238 Ok(RuntimeStatementResult::Insert {
3239 table_name: display_name,
3240 rows_inserted: row_count,
3241 })
3242 }
3243
3244 fn insert_batches(
3245 &self,
3246 table: &ExecutorTable<P>,
3247 display_name: String,
3248 canonical_name: String,
3249 batches: Vec<RecordBatch>,
3250 columns: Vec<String>,
3251 snapshot: TransactionSnapshot,
3252 ) -> Result<RuntimeStatementResult<P>> {
3253 if batches.is_empty() {
3254 return Ok(RuntimeStatementResult::Insert {
3255 table_name: display_name,
3256 rows_inserted: 0,
3257 });
3258 }
3259
3260 let expected_len = if columns.is_empty() {
3261 table.schema.columns.len()
3262 } else {
3263 columns.len()
3264 };
3265 let mut total_rows_inserted = 0usize;
3266
3267 for batch in batches {
3268 if batch.num_columns() != expected_len {
3269 return Err(Error::InvalidArgumentError(format!(
3270 "expected {} columns in INSERT batch, found {}",
3271 expected_len,
3272 batch.num_columns()
3273 )));
3274 }
3275 let row_count = batch.num_rows();
3276 if row_count == 0 {
3277 continue;
3278 }
3279 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(row_count);
3280 for row_idx in 0..row_count {
3281 let mut row: Vec<PlanValue> = Vec::with_capacity(expected_len);
3282 for col_idx in 0..expected_len {
3283 let array = batch.column(col_idx);
3284 row.push(llkv_plan::plan_value_from_array(array, row_idx)?);
3285 }
3286 rows.push(row);
3287 }
3288
3289 match self.insert_rows(
3290 table,
3291 display_name.clone(),
3292 canonical_name.clone(),
3293 rows,
3294 columns.clone(),
3295 snapshot,
3296 )? {
3297 RuntimeStatementResult::Insert { rows_inserted, .. } => {
3298 total_rows_inserted += rows_inserted;
3299 }
3300 _ => unreachable!("insert_rows must return Insert result"),
3301 }
3302 }
3303
3304 Ok(RuntimeStatementResult::Insert {
3305 table_name: display_name,
3306 rows_inserted: total_rows_inserted,
3307 })
3308 }
3309
3310 fn update_filtered_rows(
3311 &self,
3312 table: &ExecutorTable<P>,
3313 display_name: String,
3314 canonical_name: String,
3315 assignments: Vec<ColumnAssignment>,
3316 filter: LlkvExpr<'static, String>,
3317 snapshot: TransactionSnapshot,
3318 ) -> Result<RuntimeStatementResult<P>> {
3319 if assignments.is_empty() {
3320 return Err(Error::InvalidArgumentError(
3321 "UPDATE requires at least one assignment".into(),
3322 ));
3323 }
3324
3325 let schema = table.schema.as_ref();
3326 let filter_expr = translate_predicate(filter, schema)?;
3327
3328 let mut seen_columns: FxHashSet<String> =
3329 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3330 let mut prepared: Vec<(ExecutorColumn, PreparedAssignmentValue)> =
3331 Vec::with_capacity(assignments.len());
3332 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3333
3334 for assignment in assignments {
3335 let normalized = assignment.column.to_ascii_lowercase();
3336 if !seen_columns.insert(normalized.clone()) {
3337 return Err(Error::InvalidArgumentError(format!(
3338 "duplicate column '{}' in UPDATE assignments",
3339 assignment.column
3340 )));
3341 }
3342 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3343 Error::InvalidArgumentError(format!(
3344 "unknown column '{}' in UPDATE",
3345 assignment.column
3346 ))
3347 })?;
3348
3349 match assignment.value {
3350 AssignmentValue::Literal(value) => {
3351 prepared.push((column.clone(), PreparedAssignmentValue::Literal(value)));
3352 }
3353 AssignmentValue::Expression(expr) => {
3354 let translated = translate_scalar(&expr, schema)?;
3355 let expr_index = scalar_exprs.len();
3356 scalar_exprs.push(translated);
3357 prepared.push((
3358 column.clone(),
3359 PreparedAssignmentValue::Expression { expr_index },
3360 ));
3361 }
3362 }
3363 }
3364
3365 let (row_ids, mut expr_values) =
3366 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3367
3368 if row_ids.is_empty() {
3369 return Ok(RuntimeStatementResult::Update {
3370 table_name: display_name,
3371 rows_updated: 0,
3372 });
3373 }
3374
3375 let row_count = row_ids.len();
3376 let table_id = table.table.table_id();
3377 let logical_fields: Vec<LogicalFieldId> = table
3378 .schema
3379 .columns
3380 .iter()
3381 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3382 .collect();
3383
3384 let mut stream = table.table.stream_columns(
3385 logical_fields.clone(),
3386 row_ids.clone(),
3387 GatherNullPolicy::IncludeNulls,
3388 )?;
3389
3390 let mut new_rows: Vec<Vec<PlanValue>> =
3391 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3392 while let Some(chunk) = stream.next_batch()? {
3393 let batch = chunk.batch();
3394 let base = chunk.row_offset();
3395 let local_len = batch.num_rows();
3396 for col_idx in 0..batch.num_columns() {
3397 let array = batch.column(col_idx);
3398 for local_idx in 0..local_len {
3399 let target_index = base + local_idx;
3400 debug_assert!(
3401 target_index < new_rows.len(),
3402 "column stream produced out-of-range row index"
3403 );
3404 if let Some(row) = new_rows.get_mut(target_index) {
3405 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3406 row.push(value);
3407 }
3408 }
3409 }
3410 }
3411 debug_assert!(
3412 new_rows
3413 .iter()
3414 .all(|row| row.len() == table.schema.columns.len())
3415 );
3416
3417 tracing::trace!(
3418 table = %display_name,
3419 row_count,
3420 rows = ?new_rows,
3421 "update_filtered_rows captured source rows"
3422 );
3423
3424 let constraint_ctx = self.build_table_constraint_context(table)?;
3425 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3426 let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3427 if let Some(pk) = primary_key_spec {
3428 original_primary_key_keys.reserve(row_count);
3429 for row in &new_rows {
3430 let mut values = Vec::with_capacity(pk.schema_indices.len());
3431 for &idx in &pk.schema_indices {
3432 let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3433 values.push(value);
3434 }
3435 let key = build_composite_unique_key(&values, &pk.column_names)?;
3436 original_primary_key_keys.push(key);
3437 }
3438 }
3439
3440 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3441 table
3442 .schema
3443 .columns
3444 .iter()
3445 .enumerate()
3446 .map(|(idx, column)| (column.field_id, idx)),
3447 );
3448
3449 for (column, value) in prepared {
3450 let column_index =
3451 column_positions
3452 .get(&column.field_id)
3453 .copied()
3454 .ok_or_else(|| {
3455 Error::InvalidArgumentError(format!(
3456 "column '{}' missing in table schema during UPDATE",
3457 column.name
3458 ))
3459 })?;
3460
3461 let values = match value {
3462 PreparedAssignmentValue::Literal(lit) => vec![lit; row_count],
3463 PreparedAssignmentValue::Expression { expr_index } => {
3464 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3465 Error::InvalidArgumentError(
3466 "expression assignment value missing during UPDATE".into(),
3467 )
3468 })?;
3469 if column_values.len() != row_count {
3470 return Err(Error::InvalidArgumentError(
3471 "expression result count did not match targeted row count".into(),
3472 ));
3473 }
3474 mem::take(column_values)
3475 }
3476 };
3477
3478 for (row_idx, new_value) in values.into_iter().enumerate() {
3479 if let Some(row) = new_rows.get_mut(row_idx) {
3480 let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3481 row[column_index] = coerced;
3482 }
3483 }
3484 }
3485
3486 let column_names: Vec<String> = table
3487 .schema
3488 .columns
3489 .iter()
3490 .map(|column| column.name.clone())
3491 .collect();
3492 let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3493 self.constraint_service.validate_row_level_constraints(
3494 &constraint_ctx.schema_field_ids,
3495 &constraint_ctx.column_constraints,
3496 &column_order,
3497 &new_rows,
3498 )?;
3499
3500 if let Some(pk) = primary_key_spec {
3501 self.constraint_service.validate_update_primary_keys(
3502 &constraint_ctx.schema_field_ids,
3503 pk,
3504 &column_order,
3505 &new_rows,
3506 &original_primary_key_keys,
3507 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3508 )?;
3509 }
3510
3511 let _ = self.apply_delete(
3512 table,
3513 display_name.clone(),
3514 canonical_name.clone(),
3515 row_ids.clone(),
3516 snapshot,
3517 false,
3518 )?;
3519
3520 let _ = self.insert_rows(
3521 table,
3522 display_name.clone(),
3523 canonical_name,
3524 new_rows,
3525 column_names,
3526 snapshot,
3527 )?;
3528
3529 Ok(RuntimeStatementResult::Update {
3530 table_name: display_name,
3531 rows_updated: row_count,
3532 })
3533 }
3534
3535 fn update_all_rows(
3536 &self,
3537 table: &ExecutorTable<P>,
3538 display_name: String,
3539 canonical_name: String,
3540 assignments: Vec<ColumnAssignment>,
3541 snapshot: TransactionSnapshot,
3542 ) -> Result<RuntimeStatementResult<P>> {
3543 if assignments.is_empty() {
3544 return Err(Error::InvalidArgumentError(
3545 "UPDATE requires at least one assignment".into(),
3546 ));
3547 }
3548
3549 let total_rows = table.total_rows.load(Ordering::SeqCst);
3550 let total_rows_usize = usize::try_from(total_rows).map_err(|_| {
3551 Error::InvalidArgumentError("table row count exceeds supported range".into())
3552 })?;
3553 if total_rows_usize == 0 {
3554 return Ok(RuntimeStatementResult::Update {
3555 table_name: display_name,
3556 rows_updated: 0,
3557 });
3558 }
3559
3560 let schema = table.schema.as_ref();
3561
3562 let mut seen_columns: FxHashSet<String> =
3563 FxHashSet::with_capacity_and_hasher(assignments.len(), Default::default());
3564 let mut prepared: Vec<(ExecutorColumn, PreparedAssignmentValue)> =
3565 Vec::with_capacity(assignments.len());
3566 let mut scalar_exprs: Vec<ScalarExpr<FieldId>> = Vec::new();
3567 let mut first_field_id: Option<FieldId> = None;
3568
3569 for assignment in assignments {
3570 let normalized = assignment.column.to_ascii_lowercase();
3571 if !seen_columns.insert(normalized.clone()) {
3572 return Err(Error::InvalidArgumentError(format!(
3573 "duplicate column '{}' in UPDATE assignments",
3574 assignment.column
3575 )));
3576 }
3577 let column = table.schema.resolve(&assignment.column).ok_or_else(|| {
3578 Error::InvalidArgumentError(format!(
3579 "unknown column '{}' in UPDATE",
3580 assignment.column
3581 ))
3582 })?;
3583 if first_field_id.is_none() {
3584 first_field_id = Some(column.field_id);
3585 }
3586
3587 match assignment.value {
3588 AssignmentValue::Literal(value) => {
3589 prepared.push((column.clone(), PreparedAssignmentValue::Literal(value)));
3590 }
3591 AssignmentValue::Expression(expr) => {
3592 let translated = translate_scalar(&expr, schema)?;
3593 let expr_index = scalar_exprs.len();
3594 scalar_exprs.push(translated);
3595 prepared.push((
3596 column.clone(),
3597 PreparedAssignmentValue::Expression { expr_index },
3598 ));
3599 }
3600 }
3601 }
3602
3603 let anchor_field = first_field_id.ok_or_else(|| {
3604 Error::InvalidArgumentError("UPDATE requires at least one target column".into())
3605 })?;
3606
3607 let filter_expr = full_table_scan_filter(anchor_field);
3608 let (row_ids, mut expr_values) =
3609 self.collect_update_rows(table, &filter_expr, &scalar_exprs, snapshot)?;
3610
3611 if row_ids.is_empty() {
3612 return Ok(RuntimeStatementResult::Update {
3613 table_name: display_name,
3614 rows_updated: 0,
3615 });
3616 }
3617
3618 let row_count = row_ids.len();
3619 let table_id = table.table.table_id();
3620 let logical_fields: Vec<LogicalFieldId> = table
3621 .schema
3622 .columns
3623 .iter()
3624 .map(|column| LogicalFieldId::for_user(table_id, column.field_id))
3625 .collect();
3626
3627 let mut stream = table.table.stream_columns(
3628 logical_fields.clone(),
3629 row_ids.clone(),
3630 GatherNullPolicy::IncludeNulls,
3631 )?;
3632
3633 let mut new_rows: Vec<Vec<PlanValue>> =
3634 vec![Vec::with_capacity(table.schema.columns.len()); row_count];
3635 while let Some(chunk) = stream.next_batch()? {
3636 let batch = chunk.batch();
3637 let base = chunk.row_offset();
3638 let local_len = batch.num_rows();
3639 for col_idx in 0..batch.num_columns() {
3640 let array = batch.column(col_idx);
3641 for local_idx in 0..local_len {
3642 let target_index = base + local_idx;
3643 debug_assert!(
3644 target_index < new_rows.len(),
3645 "column stream produced out-of-range row index"
3646 );
3647 if let Some(row) = new_rows.get_mut(target_index) {
3648 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
3649 row.push(value);
3650 }
3651 }
3652 }
3653 }
3654 debug_assert!(
3655 new_rows
3656 .iter()
3657 .all(|row| row.len() == table.schema.columns.len())
3658 );
3659
3660 let constraint_ctx = self.build_table_constraint_context(table)?;
3661 let primary_key_spec = constraint_ctx.primary_key.as_ref();
3662 let mut original_primary_key_keys: Vec<Option<UniqueKey>> = Vec::new();
3663 if let Some(pk) = primary_key_spec {
3664 original_primary_key_keys.reserve(row_count);
3665 for row in &new_rows {
3666 let mut values = Vec::with_capacity(pk.schema_indices.len());
3667 for &idx in &pk.schema_indices {
3668 let value = row.get(idx).cloned().unwrap_or(PlanValue::Null);
3669 values.push(value);
3670 }
3671 let key = build_composite_unique_key(&values, &pk.column_names)?;
3672 original_primary_key_keys.push(key);
3673 }
3674 }
3675
3676 let column_positions: FxHashMap<FieldId, usize> = FxHashMap::from_iter(
3677 table
3678 .schema
3679 .columns
3680 .iter()
3681 .enumerate()
3682 .map(|(idx, column)| (column.field_id, idx)),
3683 );
3684
3685 for (column, value) in prepared {
3686 let column_index =
3687 column_positions
3688 .get(&column.field_id)
3689 .copied()
3690 .ok_or_else(|| {
3691 Error::InvalidArgumentError(format!(
3692 "column '{}' missing in table schema during UPDATE",
3693 column.name
3694 ))
3695 })?;
3696
3697 let values = match value {
3698 PreparedAssignmentValue::Literal(lit) => vec![lit; row_count],
3699 PreparedAssignmentValue::Expression { expr_index } => {
3700 let column_values = expr_values.get_mut(expr_index).ok_or_else(|| {
3701 Error::InvalidArgumentError(
3702 "expression assignment value missing during UPDATE".into(),
3703 )
3704 })?;
3705 if column_values.len() != row_count {
3706 return Err(Error::InvalidArgumentError(
3707 "expression result count did not match targeted row count".into(),
3708 ));
3709 }
3710 mem::take(column_values)
3711 }
3712 };
3713
3714 for (row_idx, new_value) in values.into_iter().enumerate() {
3715 if let Some(row) = new_rows.get_mut(row_idx) {
3716 let coerced = self.coerce_plan_value_for_column(new_value, &column)?;
3717 row[column_index] = coerced;
3718 }
3719 }
3720 }
3721
3722 let column_names: Vec<String> = table
3723 .schema
3724 .columns
3725 .iter()
3726 .map(|column| column.name.clone())
3727 .collect();
3728 let column_order = resolve_insert_columns(&column_names, table.schema.as_ref())?;
3729 self.constraint_service.validate_row_level_constraints(
3730 &constraint_ctx.schema_field_ids,
3731 &constraint_ctx.column_constraints,
3732 &column_order,
3733 &new_rows,
3734 )?;
3735
3736 if let Some(pk) = primary_key_spec {
3737 self.constraint_service.validate_update_primary_keys(
3738 &constraint_ctx.schema_field_ids,
3739 pk,
3740 &column_order,
3741 &new_rows,
3742 &original_primary_key_keys,
3743 |field_ids| self.scan_multi_column_values(table, field_ids, snapshot),
3744 )?;
3745 }
3746
3747 let _ = self.apply_delete(
3748 table,
3749 display_name.clone(),
3750 canonical_name.clone(),
3751 row_ids.clone(),
3752 snapshot,
3753 false,
3754 )?;
3755
3756 let _ = self.insert_rows(
3757 table,
3758 display_name.clone(),
3759 canonical_name,
3760 new_rows,
3761 column_names,
3762 snapshot,
3763 )?;
3764
3765 Ok(RuntimeStatementResult::Update {
3766 table_name: display_name,
3767 rows_updated: row_count,
3768 })
3769 }
3770
3771 fn delete_filtered_rows(
3772 &self,
3773 table: &ExecutorTable<P>,
3774 display_name: String,
3775 canonical_name: String,
3776 filter: LlkvExpr<'static, String>,
3777 snapshot: TransactionSnapshot,
3778 ) -> Result<RuntimeStatementResult<P>> {
3779 let schema = table.schema.as_ref();
3780 let filter_expr = translate_predicate(filter, schema)?;
3781 let row_ids = table.table.filter_row_ids(&filter_expr)?;
3782 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3783 tracing::trace!(
3784 table = %display_name,
3785 rows = row_ids.len(),
3786 "delete_filtered_rows collected row ids"
3787 );
3788 self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3789 }
3790
3791 fn delete_all_rows(
3792 &self,
3793 table: &ExecutorTable<P>,
3794 display_name: String,
3795 canonical_name: String,
3796 snapshot: TransactionSnapshot,
3797 ) -> Result<RuntimeStatementResult<P>> {
3798 let total_rows = table.total_rows.load(Ordering::SeqCst);
3799 if total_rows == 0 {
3800 return Ok(RuntimeStatementResult::Delete {
3801 table_name: display_name,
3802 rows_deleted: 0,
3803 });
3804 }
3805
3806 let anchor_field = table.schema.first_field_id().ok_or_else(|| {
3807 Error::InvalidArgumentError("DELETE requires a table with at least one column".into())
3808 })?;
3809 let filter_expr = full_table_scan_filter(anchor_field);
3810 let row_ids = table.table.filter_row_ids(&filter_expr)?;
3811 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
3812 self.apply_delete(table, display_name, canonical_name, row_ids, snapshot, true)
3813 }
3814
3815 fn apply_delete(
3816 &self,
3817 table: &ExecutorTable<P>,
3818 display_name: String,
3819 canonical_name: String,
3820 row_ids: Vec<RowId>,
3821 snapshot: TransactionSnapshot,
3822 enforce_foreign_keys: bool,
3823 ) -> Result<RuntimeStatementResult<P>> {
3824 if row_ids.is_empty() {
3825 return Ok(RuntimeStatementResult::Delete {
3826 table_name: display_name,
3827 rows_deleted: 0,
3828 });
3829 }
3830
3831 if enforce_foreign_keys {
3832 self.check_foreign_keys_on_delete(
3833 table,
3834 &display_name,
3835 &canonical_name,
3836 &row_ids,
3837 snapshot,
3838 )?;
3839 }
3840
3841 self.detect_delete_conflicts(table, &display_name, &row_ids, snapshot)?;
3842
3843 let removed = row_ids.len();
3844
3845 let batch = mvcc::build_delete_batch(row_ids.clone(), snapshot.txn_id)?;
3847 table.table.append(&batch)?;
3848
3849 let removed_u64 = u64::try_from(removed)
3850 .map_err(|_| Error::InvalidArgumentError("row count exceeds supported range".into()))?;
3851 table.total_rows.fetch_sub(removed_u64, Ordering::SeqCst);
3852
3853 Ok(RuntimeStatementResult::Delete {
3854 table_name: display_name,
3855 rows_deleted: removed,
3856 })
3857 }
3858
3859 fn check_foreign_keys_on_delete(
3860 &self,
3861 table: &ExecutorTable<P>,
3862 _display_name: &str,
3863 _canonical_name: &str,
3864 row_ids: &[RowId],
3865 snapshot: TransactionSnapshot,
3866 ) -> Result<()> {
3867 if row_ids.is_empty() {
3868 return Ok(());
3869 }
3870
3871 self.constraint_service.validate_delete_foreign_keys(
3872 table.table.table_id(),
3873 row_ids,
3874 |request| {
3875 self.collect_row_values_for_ids(
3876 table,
3877 request.referenced_row_ids,
3878 request.referenced_field_ids,
3879 )
3880 },
3881 |request| {
3882 let child_table = self.lookup_table(request.referencing_table_canonical)?;
3883 self.collect_visible_child_rows(
3884 child_table.as_ref(),
3885 request.referencing_field_ids,
3886 snapshot,
3887 )
3888 },
3889 )
3890 }
3891
3892 fn check_foreign_keys_on_insert(
3893 &self,
3894 table: &ExecutorTable<P>,
3895 _display_name: &str,
3896 rows: &[Vec<PlanValue>],
3897 column_order: &[usize],
3898 snapshot: TransactionSnapshot,
3899 ) -> Result<()> {
3900 if rows.is_empty() {
3901 return Ok(());
3902 }
3903
3904 let schema_field_ids: Vec<FieldId> = table
3905 .schema
3906 .columns
3907 .iter()
3908 .map(|column| column.field_id)
3909 .collect();
3910
3911 self.constraint_service.validate_insert_foreign_keys(
3912 table.table.table_id(),
3913 &schema_field_ids,
3914 column_order,
3915 rows,
3916 |request| {
3917 let parent_table = self.lookup_table(request.referenced_table_canonical)?;
3918 self.scan_multi_column_values(
3919 parent_table.as_ref(),
3920 request.referenced_field_ids,
3921 snapshot,
3922 )
3923 },
3924 )
3925 }
3926
3927 fn detect_delete_conflicts(
3928 &self,
3929 table: &ExecutorTable<P>,
3930 display_name: &str,
3931 row_ids: &[RowId],
3932 snapshot: TransactionSnapshot,
3933 ) -> Result<()> {
3934 if row_ids.is_empty() {
3935 return Ok(());
3936 }
3937
3938 let table_id = table.table.table_id();
3939 let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
3940 let logical_fields: Arc<[LogicalFieldId]> = Arc::from([deleted_lfid]);
3941
3942 if let Err(err) = table
3943 .table
3944 .store()
3945 .prepare_gather_context(logical_fields.as_ref())
3946 {
3947 match err {
3948 Error::NotFound => return Ok(()),
3949 other => return Err(other),
3950 }
3951 }
3952
3953 let mut stream = table.table.stream_columns(
3954 Arc::clone(&logical_fields),
3955 row_ids.to_vec(),
3956 GatherNullPolicy::IncludeNulls,
3957 )?;
3958
3959 while let Some(chunk) = stream.next_batch()? {
3960 let batch = chunk.batch();
3961 let window = chunk.row_ids();
3962 let deleted_column = batch
3963 .column(0)
3964 .as_any()
3965 .downcast_ref::<UInt64Array>()
3966 .ok_or_else(|| {
3967 Error::Internal(
3968 "failed to read MVCC deleted_by column for conflict detection".into(),
3969 )
3970 })?;
3971
3972 for (idx, row_id) in window.iter().enumerate() {
3973 let deleted_by = if deleted_column.is_null(idx) {
3974 TXN_ID_NONE
3975 } else {
3976 deleted_column.value(idx)
3977 };
3978
3979 if deleted_by == TXN_ID_NONE || deleted_by == snapshot.txn_id {
3980 continue;
3981 }
3982
3983 let status = self.txn_manager.status(deleted_by);
3984 if !status.is_active() {
3985 continue;
3986 }
3987
3988 tracing::debug!(
3989 "[MVCC] delete conflict: table='{}' row_id={} deleted_by={} status={:?} current_txn={}",
3990 display_name,
3991 row_id,
3992 deleted_by,
3993 status,
3994 snapshot.txn_id
3995 );
3996
3997 return Err(Error::TransactionContextError(format!(
3998 "transaction conflict on table '{}' for row {}: row locked by transaction {} ({:?})",
3999 display_name, row_id, deleted_by, status
4000 )));
4001 }
4002 }
4003
4004 Ok(())
4005 }
4006
4007 fn collect_update_rows(
4008 &self,
4009 table: &ExecutorTable<P>,
4010 filter_expr: &LlkvExpr<'static, FieldId>,
4011 expressions: &[ScalarExpr<FieldId>],
4012 snapshot: TransactionSnapshot,
4013 ) -> Result<(Vec<RowId>, Vec<Vec<PlanValue>>)> {
4014 let row_ids = table.table.filter_row_ids(filter_expr)?;
4015 let row_ids = self.filter_visible_row_ids(table, row_ids, snapshot)?;
4016 if row_ids.is_empty() {
4017 return Ok((row_ids, vec![Vec::new(); expressions.len()]));
4018 }
4019
4020 if expressions.is_empty() {
4021 return Ok((row_ids, Vec::new()));
4022 }
4023
4024 let mut projections: Vec<ScanProjection> = Vec::with_capacity(expressions.len());
4025 for (idx, expr) in expressions.iter().enumerate() {
4026 let alias = format!("__expr_{idx}");
4027 projections.push(ScanProjection::computed(expr.clone(), alias));
4028 }
4029
4030 let mut expr_values: Vec<Vec<PlanValue>> =
4031 vec![Vec::with_capacity(row_ids.len()); expressions.len()];
4032 let mut error: Option<Error> = None;
4033 let row_filter: Arc<dyn RowIdFilter<P>> = Arc::new(MvccRowIdFilter::new(
4034 Arc::clone(&self.txn_manager),
4035 snapshot,
4036 ));
4037 let options = ScanStreamOptions {
4038 include_nulls: true,
4039 order: None,
4040 row_id_filter: Some(row_filter),
4041 };
4042
4043 table
4044 .table
4045 .scan_stream_with_exprs(&projections, filter_expr, options, |batch| {
4046 if error.is_some() {
4047 return;
4048 }
4049 if let Err(err) = Self::collect_expression_values(&mut expr_values, batch) {
4050 error = Some(err);
4051 }
4052 })?;
4053
4054 if let Some(err) = error {
4055 return Err(err);
4056 }
4057
4058 for values in &expr_values {
4059 if values.len() != row_ids.len() {
4060 return Err(Error::InvalidArgumentError(
4061 "expression result count did not match targeted row count".into(),
4062 ));
4063 }
4064 }
4065
4066 Ok((row_ids, expr_values))
4067 }
4068
4069 fn collect_expression_values(
4070 expr_values: &mut [Vec<PlanValue>],
4071 batch: RecordBatch,
4072 ) -> Result<()> {
4073 for row_idx in 0..batch.num_rows() {
4074 for (expr_index, values) in expr_values.iter_mut().enumerate() {
4075 let value = llkv_plan::plan_value_from_array(batch.column(expr_index), row_idx)?;
4076 values.push(value);
4077 }
4078 }
4079
4080 Ok(())
4081 }
4082
4083 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4086 {
4088 let tables = self.tables.read().unwrap();
4089 if let Some(table) = tables.get(canonical_name) {
4090 if self.dropped_tables.read().unwrap().contains(canonical_name) {
4092 return Err(Error::NotFound);
4094 }
4095 tracing::trace!(
4096 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
4097 canonical_name,
4098 table.table.table_id(),
4099 table.schema.columns.len(),
4100 &*self.pager
4101 );
4102 return Ok(Arc::clone(table));
4103 }
4104 } tracing::debug!(
4108 "[LAZY_LOAD] Loading table '{}' from catalog",
4109 canonical_name
4110 );
4111
4112 let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
4114 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4115 })?;
4116
4117 let table_id = catalog_table_id;
4118 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
4119 let store = table.store();
4120 let mut logical_fields = store.user_field_ids_for_table(table_id);
4121 logical_fields.sort_by_key(|lfid| lfid.field_id());
4122 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
4123 let summary = self
4124 .catalog_service
4125 .table_constraint_summary(canonical_name)?;
4126 let TableConstraintSummaryView {
4127 table_meta,
4128 column_metas,
4129 constraint_records,
4130 multi_column_uniques,
4131 } = summary;
4132 let _table_meta = table_meta.ok_or_else(|| {
4133 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
4134 })?;
4135 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
4136 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
4137 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
4138 let mut has_primary_key_records = false;
4139 let mut has_single_unique_records = false;
4140
4141 for record in constraint_records
4142 .iter()
4143 .filter(|record| record.is_active())
4144 {
4145 match &record.kind {
4146 ConstraintKind::PrimaryKey(pk) => {
4147 has_primary_key_records = true;
4148 for field_id in &pk.field_ids {
4149 metadata_primary_keys.insert(*field_id);
4150 metadata_unique_fields.insert(*field_id);
4151 }
4152 }
4153 ConstraintKind::Unique(unique) => {
4154 if unique.field_ids.len() == 1 {
4155 has_single_unique_records = true;
4156 metadata_unique_fields.insert(unique.field_ids[0]);
4157 }
4158 }
4159 _ => {}
4160 }
4161 }
4162
4163 let mut executor_columns = Vec::new();
4165 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
4166
4167 for (idx, lfid) in logical_fields.iter().enumerate() {
4168 let field_id = lfid.field_id();
4169 let normalized_index = executor_columns.len();
4170
4171 let column_name = column_metas
4172 .get(idx)
4173 .and_then(|meta| meta.as_ref())
4174 .and_then(|meta| meta.name.clone())
4175 .unwrap_or_else(|| format!("col_{}", field_id));
4176
4177 let normalized = column_name.to_ascii_lowercase();
4178 lookup.insert(normalized, normalized_index);
4179
4180 let fallback_constraints: FieldConstraints = catalog_field_resolver
4181 .as_ref()
4182 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
4183 .unwrap_or_default();
4184
4185 let metadata_primary = metadata_primary_keys.contains(&field_id);
4186 let primary_key = if has_primary_key_records {
4187 metadata_primary
4188 } else {
4189 fallback_constraints.primary_key
4190 };
4191
4192 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
4193 let unique = if has_primary_key_records || has_single_unique_records {
4194 metadata_unique
4195 } else {
4196 fallback_constraints.primary_key || fallback_constraints.unique
4197 };
4198
4199 let data_type = store.data_type(*lfid)?;
4200 let nullable = !primary_key;
4201
4202 executor_columns.push(ExecutorColumn {
4203 name: column_name,
4204 data_type,
4205 nullable,
4206 primary_key,
4207 unique,
4208 field_id,
4209 check_expr: fallback_constraints.check_expr.clone(),
4210 });
4211 }
4212
4213 let exec_schema = Arc::new(ExecutorSchema {
4214 columns: executor_columns,
4215 lookup,
4216 });
4217
4218 let max_row_id = {
4220 use arrow::array::UInt64Array;
4221 use llkv_column_map::store::rowid_fid;
4222 use llkv_column_map::store::scan::{
4223 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
4224 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
4225 };
4226
4227 struct MaxRowIdVisitor {
4228 max: RowId,
4229 }
4230
4231 impl PrimitiveVisitor for MaxRowIdVisitor {
4232 fn u64_chunk(&mut self, values: &UInt64Array) {
4233 for i in 0..values.len() {
4234 let val = values.value(i);
4235 if val > self.max {
4236 self.max = val;
4237 }
4238 }
4239 }
4240 }
4241
4242 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
4243 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
4244 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
4245
4246 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
4248 let mut visitor = MaxRowIdVisitor { max: 0 };
4249
4250 match ScanBuilder::new(table.store(), row_id_field)
4251 .options(ScanOptions::default())
4252 .run(&mut visitor)
4253 {
4254 Ok(_) => visitor.max,
4255 Err(llkv_result::Error::NotFound) => 0,
4256 Err(e) => {
4257 tracing::warn!(
4258 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
4259 canonical_name,
4260 e
4261 );
4262 0
4263 }
4264 }
4265 };
4266
4267 let next_row_id = if max_row_id > 0 {
4268 max_row_id.saturating_add(1)
4269 } else {
4270 0
4271 };
4272
4273 let total_rows = table.total_rows().unwrap_or(0);
4277
4278 let executor_table = Arc::new(ExecutorTable {
4279 table: Arc::new(table),
4280 schema: exec_schema,
4281 next_row_id: AtomicU64::new(next_row_id),
4282 total_rows: AtomicU64::new(total_rows),
4283 multi_column_uniques: RwLock::new(Vec::new()),
4284 });
4285
4286 if !multi_column_uniques.is_empty() {
4287 let executor_uniques =
4288 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
4289 executor_table.set_multi_column_uniques(executor_uniques);
4290 }
4291
4292 {
4294 let mut tables = self.tables.write().unwrap();
4295 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
4296 }
4297
4298 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
4300 for col in &executor_table.schema.columns {
4301 let definition = FieldDefinition::new(&col.name)
4302 .with_primary_key(col.primary_key)
4303 .with_unique(col.unique)
4304 .with_check_expr(col.check_expr.clone());
4305 let _ = field_resolver.register_field(definition); }
4307 tracing::debug!(
4308 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
4309 executor_table.schema.columns.len(),
4310 canonical_name
4311 );
4312 }
4313
4314 tracing::debug!(
4315 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
4316 canonical_name,
4317 table_id,
4318 field_ids.len(),
4319 next_row_id
4320 );
4321
4322 Ok(executor_table)
4323 }
4324
4325 fn remove_table_entry(&self, canonical_name: &str) {
4326 let mut tables = self.tables.write().unwrap();
4327 if tables.remove(canonical_name).is_some() {
4328 tracing::trace!(
4329 "remove_table_entry: removed table '{}' from context cache",
4330 canonical_name
4331 );
4332 }
4333 }
4334
4335 pub fn drop_table_immediate(&self, name: &str, if_exists: bool) -> Result<()> {
4336 let (display_name, canonical_name) = canonical_table_name(name)?;
4337 let (table_id, column_field_ids) = {
4338 let tables = self.tables.read().unwrap();
4339 let Some(entry) = tables.get(&canonical_name) else {
4340 if if_exists {
4341 return Ok(());
4342 } else {
4343 return Err(Error::CatalogError(format!(
4344 "Catalog Error: Table '{}' does not exist",
4345 display_name
4346 )));
4347 }
4348 };
4349
4350 let field_ids = entry
4351 .schema
4352 .columns
4353 .iter()
4354 .map(|col| col.field_id)
4355 .collect::<Vec<_>>();
4356 (entry.table.table_id(), field_ids)
4357 };
4358
4359 let referencing = self.constraint_service.referencing_foreign_keys(table_id)?;
4360
4361 for detail in referencing {
4362 if detail.referencing_table_canonical == canonical_name {
4363 continue;
4364 }
4365
4366 if self.is_table_marked_dropped(&detail.referencing_table_canonical) {
4367 continue;
4368 }
4369
4370 let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
4371 return Err(Error::ConstraintError(format!(
4372 "Cannot drop table '{}' because it is referenced by foreign key constraint '{}' on table '{}'",
4373 display_name, constraint_label, detail.referencing_table_display
4374 )));
4375 }
4376
4377 self.catalog_service
4378 .drop_table(&canonical_name, table_id, &column_field_ids)?;
4379 tracing::debug!(
4380 "[CATALOG] Unregistered table '{}' (table_id={}) from catalog",
4381 canonical_name,
4382 table_id
4383 );
4384
4385 self.dropped_tables
4386 .write()
4387 .unwrap()
4388 .insert(canonical_name.clone());
4389 Ok(())
4390 }
4391
4392 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
4393 self.dropped_tables.read().unwrap().contains(canonical_name)
4394 }
4395}
4396
4397impl<P> TransactionContext for RuntimeContextWrapper<P>
4399where
4400 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4401{
4402 type Pager = P;
4403
4404 fn set_snapshot(&self, snapshot: TransactionSnapshot) {
4405 self.update_snapshot(snapshot);
4406 }
4407
4408 fn snapshot(&self) -> TransactionSnapshot {
4409 self.current_snapshot()
4410 }
4411
4412 fn table_column_specs(&self, table_name: &str) -> llkv_result::Result<Vec<ColumnSpec>> {
4413 RuntimeContext::table_column_specs(self.context(), table_name)
4414 }
4415
4416 fn export_table_rows(
4417 &self,
4418 table_name: &str,
4419 ) -> llkv_result::Result<llkv_transaction::RowBatch> {
4420 let batch = RuntimeContext::export_table_rows(self.context(), table_name)?;
4421 Ok(llkv_transaction::RowBatch {
4423 columns: batch.columns,
4424 rows: batch.rows,
4425 })
4426 }
4427
4428 fn get_batches_with_row_ids(
4429 &self,
4430 table_name: &str,
4431 filter: Option<LlkvExpr<'static, String>>,
4432 ) -> llkv_result::Result<Vec<RecordBatch>> {
4433 RuntimeContext::get_batches_with_row_ids_with_snapshot(
4434 self.context(),
4435 table_name,
4436 filter,
4437 self.snapshot(),
4438 )
4439 }
4440
4441 fn execute_select(
4442 &self,
4443 plan: SelectPlan,
4444 ) -> llkv_result::Result<SelectExecution<Self::Pager>> {
4445 RuntimeContext::execute_select_with_snapshot(self.context(), plan, self.snapshot())
4446 }
4447
4448 fn create_table_plan(
4449 &self,
4450 plan: CreateTablePlan,
4451 ) -> llkv_result::Result<TransactionResult<P>> {
4452 let result = RuntimeContext::create_table_plan(self.context(), plan)?;
4453 Ok(convert_statement_result(result))
4454 }
4455
4456 fn insert(&self, plan: InsertPlan) -> llkv_result::Result<TransactionResult<P>> {
4457 tracing::trace!(
4458 "[WRAPPER] TransactionContext::insert called - plan.table='{}', wrapper_context_pager={:p}",
4459 plan.table,
4460 &*self.ctx.pager
4461 );
4462 let snapshot = self.current_snapshot();
4463 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4464 self.ctx().insert(plan)?
4465 } else {
4466 RuntimeContext::insert_with_snapshot(self.context(), plan, snapshot)?
4467 };
4468 Ok(convert_statement_result(result))
4469 }
4470
4471 fn update(&self, plan: UpdatePlan) -> llkv_result::Result<TransactionResult<P>> {
4472 let snapshot = self.current_snapshot();
4473 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4474 self.ctx().update(plan)?
4475 } else {
4476 RuntimeContext::update_with_snapshot(self.context(), plan, snapshot)?
4477 };
4478 Ok(convert_statement_result(result))
4479 }
4480
4481 fn delete(&self, plan: DeletePlan) -> llkv_result::Result<TransactionResult<P>> {
4482 let snapshot = self.current_snapshot();
4483 let result = if snapshot.txn_id == TXN_ID_AUTO_COMMIT {
4484 self.ctx().delete(plan)?
4485 } else {
4486 RuntimeContext::delete_with_snapshot(self.context(), plan, snapshot)?
4487 };
4488 Ok(convert_statement_result(result))
4489 }
4490
4491 fn create_index(&self, plan: CreateIndexPlan) -> llkv_result::Result<TransactionResult<P>> {
4492 let result = RuntimeContext::create_index(self.context(), plan)?;
4493 Ok(convert_statement_result(result))
4494 }
4495
4496 fn append_batches_with_row_ids(
4497 &self,
4498 table_name: &str,
4499 batches: Vec<RecordBatch>,
4500 ) -> llkv_result::Result<usize> {
4501 RuntimeContext::append_batches_with_row_ids(self.context(), table_name, batches)
4502 }
4503
4504 fn table_names(&self) -> Vec<String> {
4505 RuntimeContext::table_names(self.context())
4506 }
4507
4508 fn table_id(&self, table_name: &str) -> llkv_result::Result<llkv_table::types::TableId> {
4509 let ctx = self.context();
4512 if ctx.is_table_marked_dropped(table_name) {
4513 return Err(Error::InvalidArgumentError(format!(
4514 "table '{}' has been dropped",
4515 table_name
4516 )));
4517 }
4518
4519 let table = ctx.lookup_table(table_name)?;
4520 Ok(table.table.table_id())
4521 }
4522
4523 fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot {
4524 let ctx = self.context();
4525 ctx.catalog.snapshot()
4526 }
4527
4528 fn validate_commit_constraints(&self, txn_id: TxnId) -> llkv_result::Result<()> {
4529 self.ctx.validate_primary_keys_for_commit(txn_id)
4530 }
4531
4532 fn clear_transaction_state(&self, txn_id: TxnId) {
4533 self.ctx.clear_transaction_state(txn_id);
4534 }
4535}
4536
4537fn convert_statement_result<P>(result: RuntimeStatementResult<P>) -> TransactionResult<P>
4539where
4540 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4541{
4542 use llkv_transaction::TransactionResult as TxResult;
4543 match result {
4544 RuntimeStatementResult::CreateTable { table_name } => TxResult::CreateTable { table_name },
4545 RuntimeStatementResult::CreateIndex {
4546 table_name,
4547 index_name,
4548 } => TxResult::CreateIndex {
4549 table_name,
4550 index_name,
4551 },
4552 RuntimeStatementResult::Insert { rows_inserted, .. } => TxResult::Insert { rows_inserted },
4553 RuntimeStatementResult::Update { rows_updated, .. } => TxResult::Update {
4554 rows_matched: rows_updated,
4555 rows_updated,
4556 },
4557 RuntimeStatementResult::Delete { rows_deleted, .. } => TxResult::Delete { rows_deleted },
4558 RuntimeStatementResult::Transaction { kind } => TxResult::Transaction { kind },
4559 _ => panic!("unsupported StatementResult conversion"),
4560 }
4561}
4562
4563fn filter_row_ids_for_snapshot<P>(
4564 table: &Table<P>,
4565 row_ids: Vec<RowId>,
4566 txn_manager: &TxnIdManager,
4567 snapshot: TransactionSnapshot,
4568) -> Result<Vec<RowId>>
4569where
4570 P: Pager<Blob = EntryHandle> + Send + Sync,
4571{
4572 tracing::debug!(
4573 "[FILTER_ROWS] Filtering {} row IDs for snapshot txn_id={}, snapshot_id={}",
4574 row_ids.len(),
4575 snapshot.txn_id,
4576 snapshot.snapshot_id
4577 );
4578
4579 if row_ids.is_empty() {
4580 return Ok(row_ids);
4581 }
4582
4583 let table_id = table.table_id();
4584 let created_lfid = LogicalFieldId::for_mvcc_created_by(table_id);
4585 let deleted_lfid = LogicalFieldId::for_mvcc_deleted_by(table_id);
4586 let logical_fields: Arc<[LogicalFieldId]> = Arc::from([created_lfid, deleted_lfid]);
4587
4588 if let Err(err) = table
4589 .store()
4590 .prepare_gather_context(logical_fields.as_ref())
4591 {
4592 match err {
4593 Error::NotFound => {
4594 tracing::trace!(
4595 "[FILTER_ROWS] MVCC columns not found for table_id={}, treating all rows as visible",
4596 table_id
4597 );
4598 return Ok(row_ids);
4599 }
4600 other => {
4601 tracing::error!(
4602 "[FILTER_ROWS] Failed to prepare gather context: {:?}",
4603 other
4604 );
4605 return Err(other);
4606 }
4607 }
4608 }
4609
4610 let total_rows = row_ids.len();
4611 let mut stream = match table.stream_columns(
4612 Arc::clone(&logical_fields),
4613 row_ids,
4614 GatherNullPolicy::IncludeNulls,
4615 ) {
4616 Ok(stream) => stream,
4617 Err(err) => {
4618 tracing::error!("[FILTER_ROWS] stream_columns error: {:?}", err);
4619 return Err(err);
4620 }
4621 };
4622
4623 let mut visible = Vec::with_capacity(total_rows);
4624
4625 while let Some(chunk) = stream.next_batch()? {
4626 let batch = chunk.batch();
4627 let window = chunk.row_ids();
4628
4629 if batch.num_columns() < 2 {
4630 tracing::debug!(
4631 "[FILTER_ROWS] version_batch has < 2 columns for table_id={}, returning window rows unfiltered",
4632 table_id
4633 );
4634 visible.extend_from_slice(window);
4635 continue;
4636 }
4637
4638 let created_column = batch.column(0).as_any().downcast_ref::<UInt64Array>();
4639 let deleted_column = batch.column(1).as_any().downcast_ref::<UInt64Array>();
4640
4641 if created_column.is_none() || deleted_column.is_none() {
4642 tracing::debug!(
4643 "[FILTER_ROWS] Failed to downcast MVCC columns for table_id={}, returning window rows unfiltered",
4644 table_id
4645 );
4646 visible.extend_from_slice(window);
4647 continue;
4648 }
4649
4650 let created_column = created_column.unwrap();
4651 let deleted_column = deleted_column.unwrap();
4652
4653 for (idx, row_id) in window.iter().enumerate() {
4654 let created_by = if created_column.is_null(idx) {
4655 TXN_ID_AUTO_COMMIT
4656 } else {
4657 created_column.value(idx)
4658 };
4659 let deleted_by = if deleted_column.is_null(idx) {
4660 TXN_ID_NONE
4661 } else {
4662 deleted_column.value(idx)
4663 };
4664
4665 let version = RowVersion {
4666 created_by,
4667 deleted_by,
4668 };
4669 let is_visible = version.is_visible_for(txn_manager, snapshot);
4670 tracing::trace!(
4671 "[FILTER_ROWS] row_id={}: created_by={}, deleted_by={}, is_visible={}",
4672 row_id,
4673 created_by,
4674 deleted_by,
4675 is_visible
4676 );
4677 if is_visible {
4678 visible.push(*row_id);
4679 }
4680 }
4681 }
4682
4683 tracing::debug!(
4684 "[FILTER_ROWS] Filtered from {} to {} visible rows",
4685 total_rows,
4686 visible.len()
4687 );
4688 Ok(visible)
4689}
4690
4691struct MvccRowIdFilter<P>
4692where
4693 P: Pager<Blob = EntryHandle> + Send + Sync,
4694{
4695 txn_manager: Arc<TxnIdManager>,
4696 snapshot: TransactionSnapshot,
4697 _marker: PhantomData<fn(P)>,
4698}
4699
4700impl<P> MvccRowIdFilter<P>
4701where
4702 P: Pager<Blob = EntryHandle> + Send + Sync,
4703{
4704 fn new(txn_manager: Arc<TxnIdManager>, snapshot: TransactionSnapshot) -> Self {
4705 Self {
4706 txn_manager,
4707 snapshot,
4708 _marker: PhantomData,
4709 }
4710 }
4711}
4712
4713impl<P> RowIdFilter<P> for MvccRowIdFilter<P>
4714where
4715 P: Pager<Blob = EntryHandle> + Send + Sync,
4716{
4717 fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> Result<Vec<RowId>> {
4718 tracing::trace!(
4719 "[MVCC_FILTER] filter() called with row_ids {:?}, snapshot txn={}, snapshot_id={}",
4720 row_ids,
4721 self.snapshot.txn_id,
4722 self.snapshot.snapshot_id
4723 );
4724 let result = filter_row_ids_for_snapshot(table, row_ids, &self.txn_manager, self.snapshot);
4725 if let Ok(ref visible) = result {
4726 tracing::trace!(
4727 "[MVCC_FILTER] filter() returning visible row_ids: {:?}",
4728 visible
4729 );
4730 }
4731 result
4732 }
4733}
4734
4735struct ContextProvider<P>
4737where
4738 P: Pager<Blob = EntryHandle> + Send + Sync,
4739{
4740 context: Arc<RuntimeContext<P>>,
4741}
4742
4743impl<P> TableProvider<P> for ContextProvider<P>
4744where
4745 P: Pager<Blob = EntryHandle> + Send + Sync,
4746{
4747 fn get_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
4748 self.context.lookup_table(canonical_name)
4749 }
4750}
4751
4752pub struct RuntimeLazyFrame<P>
4754where
4755 P: Pager<Blob = EntryHandle> + Send + Sync,
4756{
4757 context: Arc<RuntimeContext<P>>,
4758 plan: SelectPlan,
4759}
4760
4761impl<P> RuntimeLazyFrame<P>
4762where
4763 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
4764{
4765 pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
4766 let (display, canonical) = canonical_table_name(table)?;
4767 context.lookup_table(&canonical)?;
4768 Ok(Self {
4769 context,
4770 plan: SelectPlan::new(display),
4771 })
4772 }
4773
4774 pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
4775 self.plan.filter = Some(predicate);
4776 self
4777 }
4778
4779 pub fn select_all(mut self) -> Self {
4780 self.plan.projections = vec![SelectProjection::AllColumns];
4781 self
4782 }
4783
4784 pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
4785 where
4786 S: AsRef<str>,
4787 {
4788 self.plan.projections = columns
4789 .into_iter()
4790 .map(|name| SelectProjection::Column {
4791 name: name.as_ref().to_string(),
4792 alias: None,
4793 })
4794 .collect();
4795 self
4796 }
4797
4798 pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
4799 self.plan.projections = projections;
4800 self
4801 }
4802
4803 pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
4804 self.plan.aggregates = aggregates;
4805 self
4806 }
4807
4808 pub fn collect(self) -> Result<SelectExecution<P>> {
4809 self.context.execute_select(self.plan)
4810 }
4811
4812 pub fn collect_rows(self) -> Result<RowBatch> {
4813 let execution = self.context.execute_select(self.plan)?;
4814 execution.collect_rows()
4815 }
4816
4817 pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
4818 Ok(self.collect_rows()?.rows)
4819 }
4820}
4821
4822fn current_time_micros() -> u64 {
4823 SystemTime::now()
4824 .duration_since(UNIX_EPOCH)
4825 .unwrap_or_default()
4826 .as_micros() as u64
4827}
4828
4829pub fn resolve_insert_columns(columns: &[String], schema: &ExecutorSchema) -> Result<Vec<usize>> {
4830 if columns.is_empty() {
4831 return Ok((0..schema.columns.len()).collect());
4832 }
4833 let mut resolved = Vec::with_capacity(columns.len());
4834 for column in columns {
4835 let normalized = column.to_ascii_lowercase();
4836 let index = schema.lookup.get(&normalized).ok_or_else(|| {
4837 Error::InvalidArgumentError(format!(
4838 "Binder Error: does not have a column named '{}'",
4839 column
4840 ))
4841 })?;
4842 resolved.push(*index);
4843 }
4844 Ok(resolved)
4845}
4846
4847fn normalize_insert_value_for_column(
4848 column: &ExecutorColumn,
4849 value: PlanValue,
4850) -> Result<PlanValue> {
4851 match (&column.data_type, value) {
4852 (_, PlanValue::Null) => Ok(PlanValue::Null),
4853 (DataType::Int64, PlanValue::Integer(v)) => Ok(PlanValue::Integer(v)),
4854 (DataType::Int64, PlanValue::Float(v)) => Ok(PlanValue::Integer(v as i64)),
4855 (DataType::Int64, other) => Err(Error::InvalidArgumentError(format!(
4856 "cannot insert {other:?} into INT column '{}'",
4857 column.name
4858 ))),
4859 (DataType::Boolean, PlanValue::Integer(v)) => {
4860 Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 }))
4861 }
4862 (DataType::Boolean, PlanValue::Float(v)) => {
4863 Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 }))
4864 }
4865 (DataType::Boolean, PlanValue::String(s)) => {
4866 let normalized = s.trim().to_ascii_lowercase();
4867 let value = match normalized.as_str() {
4868 "true" | "t" | "1" => 1,
4869 "false" | "f" | "0" => 0,
4870 _ => {
4871 return Err(Error::InvalidArgumentError(format!(
4872 "cannot insert string '{}' into BOOLEAN column '{}'",
4873 s, column.name
4874 )));
4875 }
4876 };
4877 Ok(PlanValue::Integer(value))
4878 }
4879 (DataType::Boolean, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4880 "cannot insert struct into BOOLEAN column '{}'",
4881 column.name
4882 ))),
4883 (DataType::Float64, PlanValue::Integer(v)) => Ok(PlanValue::Float(v as f64)),
4884 (DataType::Float64, PlanValue::Float(v)) => Ok(PlanValue::Float(v)),
4885 (DataType::Float64, other) => Err(Error::InvalidArgumentError(format!(
4886 "cannot insert {other:?} into DOUBLE column '{}'",
4887 column.name
4888 ))),
4889 (DataType::Utf8, PlanValue::Integer(v)) => Ok(PlanValue::String(v.to_string())),
4890 (DataType::Utf8, PlanValue::Float(v)) => Ok(PlanValue::String(v.to_string())),
4891 (DataType::Utf8, PlanValue::String(s)) => Ok(PlanValue::String(s)),
4892 (DataType::Utf8, PlanValue::Struct(_)) => Err(Error::InvalidArgumentError(format!(
4893 "cannot insert struct into STRING column '{}'",
4894 column.name
4895 ))),
4896 (DataType::Date32, PlanValue::Integer(days)) => {
4897 let casted = i32::try_from(days).map_err(|_| {
4898 Error::InvalidArgumentError(format!(
4899 "integer literal out of range for DATE column '{}'",
4900 column.name
4901 ))
4902 })?;
4903 Ok(PlanValue::Integer(casted as i64))
4904 }
4905 (DataType::Date32, PlanValue::String(text)) => {
4906 let days = parse_date32_literal(&text)?;
4907 Ok(PlanValue::Integer(days as i64))
4908 }
4909 (DataType::Date32, other) => Err(Error::InvalidArgumentError(format!(
4910 "cannot insert {other:?} into DATE column '{}'",
4911 column.name
4912 ))),
4913 (DataType::Struct(_), PlanValue::Struct(map)) => Ok(PlanValue::Struct(map)),
4914 (DataType::Struct(_), other) => Err(Error::InvalidArgumentError(format!(
4915 "expected struct value for struct column '{}', got {other:?}",
4916 column.name
4917 ))),
4918 (other_type, other_value) => Err(Error::InvalidArgumentError(format!(
4919 "unsupported Arrow data type {:?} for INSERT value {:?} in column '{}'",
4920 other_type, other_value, column.name
4921 ))),
4922 }
4923}
4924
4925pub fn build_array_for_column(dtype: &DataType, values: &[PlanValue]) -> Result<ArrayRef> {
4926 match dtype {
4927 DataType::Int64 => {
4928 let mut builder = Int64Builder::with_capacity(values.len());
4929 for value in values {
4930 match value {
4931 PlanValue::Null => builder.append_null(),
4932 PlanValue::Integer(v) => builder.append_value(*v),
4933 PlanValue::Float(v) => builder.append_value(*v as i64),
4934 PlanValue::String(_) | PlanValue::Struct(_) => {
4935 return Err(Error::InvalidArgumentError(
4936 "cannot insert non-integer into INT column".into(),
4937 ));
4938 }
4939 }
4940 }
4941 Ok(Arc::new(builder.finish()))
4942 }
4943 DataType::Boolean => {
4944 let mut builder = BooleanBuilder::with_capacity(values.len());
4945 for value in values {
4946 match value {
4947 PlanValue::Null => builder.append_null(),
4948 PlanValue::Integer(v) => builder.append_value(*v != 0),
4949 PlanValue::Float(v) => builder.append_value(*v != 0.0),
4950 PlanValue::String(s) => {
4951 let normalized = s.trim().to_ascii_lowercase();
4952 match normalized.as_str() {
4953 "true" | "t" | "1" => builder.append_value(true),
4954 "false" | "f" | "0" => builder.append_value(false),
4955 _ => {
4956 return Err(Error::InvalidArgumentError(format!(
4957 "cannot insert string '{}' into BOOLEAN column",
4958 s
4959 )));
4960 }
4961 }
4962 }
4963 PlanValue::Struct(_) => {
4964 return Err(Error::InvalidArgumentError(
4965 "cannot insert struct into BOOLEAN column".into(),
4966 ));
4967 }
4968 }
4969 }
4970 Ok(Arc::new(builder.finish()))
4971 }
4972 DataType::Float64 => {
4973 let mut builder = Float64Builder::with_capacity(values.len());
4974 for value in values {
4975 match value {
4976 PlanValue::Null => builder.append_null(),
4977 PlanValue::Integer(v) => builder.append_value(*v as f64),
4978 PlanValue::Float(v) => builder.append_value(*v),
4979 PlanValue::String(_) | PlanValue::Struct(_) => {
4980 return Err(Error::InvalidArgumentError(
4981 "cannot insert non-numeric into DOUBLE column".into(),
4982 ));
4983 }
4984 }
4985 }
4986 Ok(Arc::new(builder.finish()))
4987 }
4988 DataType::Utf8 => {
4989 let mut builder = StringBuilder::with_capacity(values.len(), values.len() * 8);
4990 for value in values {
4991 match value {
4992 PlanValue::Null => builder.append_null(),
4993 PlanValue::Integer(v) => builder.append_value(v.to_string()),
4994 PlanValue::Float(v) => builder.append_value(v.to_string()),
4995 PlanValue::String(s) => builder.append_value(s),
4996 PlanValue::Struct(_) => {
4997 return Err(Error::InvalidArgumentError(
4998 "cannot insert struct into STRING column".into(),
4999 ));
5000 }
5001 }
5002 }
5003 Ok(Arc::new(builder.finish()))
5004 }
5005 DataType::Date32 => {
5006 let mut builder = Date32Builder::with_capacity(values.len());
5007 for value in values {
5008 match value {
5009 PlanValue::Null => builder.append_null(),
5010 PlanValue::Integer(days) => {
5011 let casted = i32::try_from(*days).map_err(|_| {
5012 Error::InvalidArgumentError(
5013 "integer literal out of range for DATE column".into(),
5014 )
5015 })?;
5016 builder.append_value(casted);
5017 }
5018 PlanValue::Float(_) | PlanValue::Struct(_) => {
5019 return Err(Error::InvalidArgumentError(
5020 "cannot insert non-date value into DATE column".into(),
5021 ));
5022 }
5023 PlanValue::String(text) => {
5024 let days = parse_date32_literal(text)?;
5025 builder.append_value(days);
5026 }
5027 }
5028 }
5029 Ok(Arc::new(builder.finish()))
5030 }
5031 DataType::Struct(fields) => {
5032 use arrow::array::StructArray;
5033 let mut field_arrays: Vec<(FieldRef, ArrayRef)> = Vec::with_capacity(fields.len());
5034
5035 for field in fields.iter() {
5036 let field_name = field.name();
5037 let field_type = field.data_type();
5038 let mut field_values = Vec::with_capacity(values.len());
5039
5040 for value in values {
5041 match value {
5042 PlanValue::Null => field_values.push(PlanValue::Null),
5043 PlanValue::Struct(map) => {
5044 let field_value =
5045 map.get(field_name).cloned().unwrap_or(PlanValue::Null);
5046 field_values.push(field_value);
5047 }
5048 _ => {
5049 return Err(Error::InvalidArgumentError(format!(
5050 "expected struct value for struct column, got {:?}",
5051 value
5052 )));
5053 }
5054 }
5055 }
5056
5057 let field_array = build_array_for_column(field_type, &field_values)?;
5058 field_arrays.push((Arc::clone(field), field_array));
5059 }
5060
5061 Ok(Arc::new(StructArray::from(field_arrays)))
5062 }
5063 other => Err(Error::InvalidArgumentError(format!(
5064 "unsupported Arrow data type for INSERT: {other:?}"
5065 ))),
5066 }
5067}
5068
5069fn parse_date32_literal(text: &str) -> Result<i32> {
5070 let mut parts = text.split('-');
5071 let year_str = parts
5072 .next()
5073 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5074 let month_str = parts
5075 .next()
5076 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5077 let day_str = parts
5078 .next()
5079 .ok_or_else(|| Error::InvalidArgumentError(format!("invalid DATE literal '{text}'")))?;
5080 if parts.next().is_some() {
5081 return Err(Error::InvalidArgumentError(format!(
5082 "invalid DATE literal '{text}'"
5083 )));
5084 }
5085
5086 let year = year_str.parse::<i32>().map_err(|_| {
5087 Error::InvalidArgumentError(format!("invalid year in DATE literal '{text}'"))
5088 })?;
5089 let month_num = month_str.parse::<u8>().map_err(|_| {
5090 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5091 })?;
5092 let day = day_str.parse::<u8>().map_err(|_| {
5093 Error::InvalidArgumentError(format!("invalid day in DATE literal '{text}'"))
5094 })?;
5095
5096 let month = Month::try_from(month_num).map_err(|_| {
5097 Error::InvalidArgumentError(format!("invalid month in DATE literal '{text}'"))
5098 })?;
5099
5100 let date = Date::from_calendar_date(year, month, day).map_err(|err| {
5101 Error::InvalidArgumentError(format!("invalid DATE literal '{text}': {err}"))
5102 })?;
5103 let days = date.to_julian_day() - epoch_julian_day();
5104 Ok(days)
5105}
5106
5107fn epoch_julian_day() -> i32 {
5108 Date::from_calendar_date(1970, Month::January, 1)
5109 .expect("1970-01-01 is a valid date")
5110 .to_julian_day()
5111}
5112
5113fn full_table_scan_filter(field_id: FieldId) -> LlkvExpr<'static, FieldId> {
5116 LlkvExpr::Pred(Filter {
5117 field_id,
5118 op: Operator::Range {
5119 lower: Bound::Unbounded,
5120 upper: Bound::Unbounded,
5121 },
5122 })
5123}
5124
5125fn resolve_field_id_from_schema(schema: &ExecutorSchema, name: &str) -> Result<FieldId> {
5126 if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
5127 return Ok(ROW_ID_FIELD_ID);
5128 }
5129
5130 schema
5131 .resolve(name)
5132 .map(|column| column.field_id)
5133 .ok_or_else(|| {
5134 Error::InvalidArgumentError(format!(
5135 "Binder Error: does not have a column named '{name}'"
5136 ))
5137 })
5138}
5139
5140fn translate_predicate(
5141 expr: LlkvExpr<'static, String>,
5142 schema: &ExecutorSchema,
5143) -> Result<LlkvExpr<'static, FieldId>> {
5144 match expr {
5145 LlkvExpr::And(list) => {
5146 let mut converted = Vec::with_capacity(list.len());
5147 for item in list {
5148 converted.push(translate_predicate(item, schema)?);
5149 }
5150 Ok(LlkvExpr::And(converted))
5151 }
5152 LlkvExpr::Or(list) => {
5153 let mut converted = Vec::with_capacity(list.len());
5154 for item in list {
5155 converted.push(translate_predicate(item, schema)?);
5156 }
5157 Ok(LlkvExpr::Or(converted))
5158 }
5159 LlkvExpr::Not(inner) => Ok(LlkvExpr::Not(Box::new(translate_predicate(
5160 *inner, schema,
5161 )?))),
5162 LlkvExpr::Pred(Filter { field_id, op }) => {
5163 let resolved = resolve_field_id_from_schema(schema, &field_id)?;
5164 Ok(LlkvExpr::Pred(Filter {
5165 field_id: resolved,
5166 op,
5167 }))
5168 }
5169 LlkvExpr::Compare { left, op, right } => {
5170 let left = translate_scalar(&left, schema)?;
5171 let right = translate_scalar(&right, schema)?;
5172 Ok(LlkvExpr::Compare { left, op, right })
5173 }
5174 }
5175}
5176
5177fn translate_scalar(
5178 expr: &ScalarExpr<String>,
5179 schema: &ExecutorSchema,
5180) -> Result<ScalarExpr<FieldId>> {
5181 match expr {
5182 ScalarExpr::Column(name) => {
5183 let field_id = resolve_field_id_from_schema(schema, name)?;
5184 Ok(ScalarExpr::column(field_id))
5185 }
5186 ScalarExpr::Literal(lit) => Ok(ScalarExpr::Literal(lit.clone())),
5187 ScalarExpr::Binary { left, op, right } => {
5188 let left_expr = translate_scalar(left, schema)?;
5189 let right_expr = translate_scalar(right, schema)?;
5190 Ok(ScalarExpr::Binary {
5191 left: Box::new(left_expr),
5192 op: *op,
5193 right: Box::new(right_expr),
5194 })
5195 }
5196 ScalarExpr::Aggregate(agg) => {
5197 use llkv_expr::expr::AggregateCall;
5199 let translated_agg = match agg {
5200 AggregateCall::CountStar => AggregateCall::CountStar,
5201 AggregateCall::Count(name) => {
5202 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5203 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5204 })?;
5205 AggregateCall::Count(field_id)
5206 }
5207 AggregateCall::Sum(name) => {
5208 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5209 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5210 })?;
5211 AggregateCall::Sum(field_id)
5212 }
5213 AggregateCall::Min(name) => {
5214 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5215 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5216 })?;
5217 AggregateCall::Min(field_id)
5218 }
5219 AggregateCall::Max(name) => {
5220 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5221 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5222 })?;
5223 AggregateCall::Max(field_id)
5224 }
5225 AggregateCall::CountNulls(name) => {
5226 let field_id = resolve_field_id_from_schema(schema, name).map_err(|_| {
5227 Error::InvalidArgumentError(format!("unknown column '{name}' in aggregate"))
5228 })?;
5229 AggregateCall::CountNulls(field_id)
5230 }
5231 };
5232 Ok(ScalarExpr::Aggregate(translated_agg))
5233 }
5234 ScalarExpr::GetField { base, field_name } => {
5235 let base_expr = translate_scalar(base, schema)?;
5236 Ok(ScalarExpr::GetField {
5237 base: Box::new(base_expr),
5238 field_name: field_name.clone(),
5239 })
5240 }
5241 }
5242}
5243
5244fn plan_value_from_sql_expr(expr: &SqlExpr) -> Result<PlanValue> {
5245 match expr {
5246 SqlExpr::Value(value) => plan_value_from_sql_value(value),
5247 SqlExpr::UnaryOp {
5248 op: UnaryOperator::Minus,
5249 expr,
5250 } => match plan_value_from_sql_expr(expr)? {
5251 PlanValue::Integer(v) => Ok(PlanValue::Integer(-v)),
5252 PlanValue::Float(v) => Ok(PlanValue::Float(-v)),
5253 PlanValue::Null | PlanValue::String(_) | PlanValue::Struct(_) => Err(
5254 Error::InvalidArgumentError("cannot negate non-numeric literal".into()),
5255 ),
5256 },
5257 SqlExpr::UnaryOp {
5258 op: UnaryOperator::Plus,
5259 expr,
5260 } => plan_value_from_sql_expr(expr),
5261 SqlExpr::Nested(inner) => plan_value_from_sql_expr(inner),
5262 SqlExpr::Dictionary(fields) => {
5263 let mut map = std::collections::HashMap::new();
5264 for field in fields {
5265 let key = field.key.value.clone();
5266 let value = plan_value_from_sql_expr(&field.value)?;
5267 map.insert(key, value);
5268 }
5269 Ok(PlanValue::Struct(map))
5270 }
5271 other => Err(Error::InvalidArgumentError(format!(
5272 "unsupported literal expression: {other:?}"
5273 ))),
5274 }
5275}
5276
5277fn plan_value_from_sql_value(value: &ValueWithSpan) -> Result<PlanValue> {
5278 match &value.value {
5279 Value::Null => Ok(PlanValue::Null),
5280 Value::Number(text, _) => {
5281 if text.contains(['.', 'e', 'E']) {
5282 let parsed = text.parse::<f64>().map_err(|err| {
5283 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5284 })?;
5285 Ok(PlanValue::Float(parsed))
5286 } else {
5287 let parsed = text.parse::<i64>().map_err(|err| {
5288 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5289 })?;
5290 Ok(PlanValue::Integer(parsed))
5291 }
5292 }
5293 Value::Boolean(_) => Err(Error::InvalidArgumentError(
5294 "BOOLEAN literals are not supported yet".into(),
5295 )),
5296 other => {
5297 if let Some(text) = other.clone().into_string() {
5298 Ok(PlanValue::String(text))
5299 } else {
5300 Err(Error::InvalidArgumentError(format!(
5301 "unsupported literal: {other:?}"
5302 )))
5303 }
5304 }
5305 }
5306}
5307
5308fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5309 matches!(
5310 expr,
5311 GroupByExpr::Expressions(exprs, modifiers)
5312 if exprs.is_empty() && modifiers.is_empty()
5313 )
5314}
5315
5316#[derive(Clone)]
5317pub struct RuntimeRangeSelectRows {
5318 rows: Vec<Vec<PlanValue>>,
5319}
5320
5321impl RuntimeRangeSelectRows {
5322 pub fn into_rows(self) -> Vec<Vec<PlanValue>> {
5323 self.rows
5324 }
5325}
5326
5327#[derive(Clone)]
5328enum RangeProjection {
5329 Column,
5330 Literal(PlanValue),
5331}
5332
5333#[derive(Clone)]
5334pub struct RuntimeRangeSpec {
5335 start: i64,
5336 #[allow(dead_code)] end: i64,
5338 row_count: usize,
5339 column_name_lower: String,
5340 table_alias_lower: Option<String>,
5341}
5342
5343impl RuntimeRangeSpec {
5344 fn matches_identifier(&self, ident: &str) -> bool {
5345 let lower = ident.to_ascii_lowercase();
5346 lower == self.column_name_lower || lower == "range"
5347 }
5348
5349 fn matches_table_alias(&self, ident: &str) -> bool {
5350 let lower = ident.to_ascii_lowercase();
5351 match &self.table_alias_lower {
5352 Some(alias) => lower == *alias,
5353 None => lower == "range",
5354 }
5355 }
5356
5357 fn matches_object_name(&self, name: &ObjectName) -> bool {
5358 if name.0.len() != 1 {
5359 return false;
5360 }
5361 match &name.0[0] {
5362 ObjectNamePart::Identifier(ident) => self.matches_table_alias(&ident.value),
5363 _ => false,
5364 }
5365 }
5366}
5367
5368pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
5369 let spec = match parse_range_spec(select)? {
5370 Some(spec) => spec,
5371 None => return Ok(None),
5372 };
5373
5374 if select.selection.is_some() {
5375 return Err(Error::InvalidArgumentError(
5376 "WHERE clauses are not supported for range() SELECT statements".into(),
5377 ));
5378 }
5379 if select.having.is_some()
5380 || !select.named_window.is_empty()
5381 || select.qualify.is_some()
5382 || select.distinct.is_some()
5383 || select.top.is_some()
5384 || select.into.is_some()
5385 || select.prewhere.is_some()
5386 || !select.lateral_views.is_empty()
5387 || select.value_table_mode.is_some()
5388 || !group_by_is_empty(&select.group_by)
5389 {
5390 return Err(Error::InvalidArgumentError(
5391 "advanced SELECT clauses are not supported for range() SELECT statements".into(),
5392 ));
5393 }
5394
5395 let mut projections: Vec<RangeProjection> = Vec::with_capacity(select.projection.len());
5396
5397 if select.projection.is_empty() {
5399 projections.push(RangeProjection::Column);
5400 } else {
5401 for item in &select.projection {
5402 let projection = match item {
5403 SelectItem::Wildcard(_) => RangeProjection::Column,
5404 SelectItem::QualifiedWildcard(kind, _) => match kind {
5405 SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
5406 if spec.matches_object_name(object_name) {
5407 RangeProjection::Column
5408 } else {
5409 return Err(Error::InvalidArgumentError(
5410 "qualified wildcard must reference the range() source".into(),
5411 ));
5412 }
5413 }
5414 SelectItemQualifiedWildcardKind::Expr(_) => {
5415 return Err(Error::InvalidArgumentError(
5416 "expression-qualified wildcards are not supported for range() SELECT statements".into(),
5417 ));
5418 }
5419 },
5420 SelectItem::UnnamedExpr(expr) => build_range_projection_expr(expr, &spec)?,
5421 SelectItem::ExprWithAlias { expr, .. } => build_range_projection_expr(expr, &spec)?,
5422 };
5423 projections.push(projection);
5424 }
5425 }
5426
5427 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(spec.row_count);
5428 for idx in 0..spec.row_count {
5429 let mut row: Vec<PlanValue> = Vec::with_capacity(projections.len());
5430 let value = spec.start + (idx as i64);
5431 for projection in &projections {
5432 match projection {
5433 RangeProjection::Column => row.push(PlanValue::Integer(value)),
5434 RangeProjection::Literal(value) => row.push(value.clone()),
5435 }
5436 }
5437 rows.push(row);
5438 }
5439
5440 Ok(Some(RuntimeRangeSelectRows { rows }))
5441}
5442
5443fn build_range_projection_expr(expr: &SqlExpr, spec: &RuntimeRangeSpec) -> Result<RangeProjection> {
5444 match expr {
5445 SqlExpr::Identifier(ident) => {
5446 if spec.matches_identifier(&ident.value) {
5447 Ok(RangeProjection::Column)
5448 } else {
5449 Err(Error::InvalidArgumentError(format!(
5450 "unknown column '{}' in range() SELECT",
5451 ident.value
5452 )))
5453 }
5454 }
5455 SqlExpr::CompoundIdentifier(parts) => {
5456 if parts.len() == 2
5457 && spec.matches_table_alias(&parts[0].value)
5458 && spec.matches_identifier(&parts[1].value)
5459 {
5460 Ok(RangeProjection::Column)
5461 } else {
5462 Err(Error::InvalidArgumentError(
5463 "compound identifiers must reference the range() source".into(),
5464 ))
5465 }
5466 }
5467 SqlExpr::Wildcard(_) | SqlExpr::QualifiedWildcard(_, _) => unreachable!(),
5468 other => Ok(RangeProjection::Literal(plan_value_from_sql_expr(other)?)),
5469 }
5470}
5471
5472fn parse_range_spec(select: &Select) -> Result<Option<RuntimeRangeSpec>> {
5473 if select.from.len() != 1 {
5474 return Ok(None);
5475 }
5476 let item = &select.from[0];
5477 if !item.joins.is_empty() {
5478 return Err(Error::InvalidArgumentError(
5479 "JOIN clauses are not supported for range() SELECT statements".into(),
5480 ));
5481 }
5482
5483 match &item.relation {
5484 TableFactor::Function {
5485 lateral,
5486 name,
5487 args,
5488 alias,
5489 } => {
5490 if *lateral {
5491 return Err(Error::InvalidArgumentError(
5492 "LATERAL range() is not supported".into(),
5493 ));
5494 }
5495 parse_range_spec_from_args(name, args, alias)
5496 }
5497 TableFactor::Table {
5498 name,
5499 alias,
5500 args: Some(table_args),
5501 with_ordinality,
5502 ..
5503 } => {
5504 if *with_ordinality {
5505 return Err(Error::InvalidArgumentError(
5506 "WITH ORDINALITY is not supported for range()".into(),
5507 ));
5508 }
5509 if table_args.settings.is_some() {
5510 return Err(Error::InvalidArgumentError(
5511 "range() SETTINGS clause is not supported".into(),
5512 ));
5513 }
5514 parse_range_spec_from_args(name, &table_args.args, alias)
5515 }
5516 _ => Ok(None),
5517 }
5518}
5519
5520fn parse_range_spec_from_args(
5521 name: &ObjectName,
5522 args: &[FunctionArg],
5523 alias: &Option<TableAlias>,
5524) -> Result<Option<RuntimeRangeSpec>> {
5525 if name.0.len() != 1 {
5526 return Ok(None);
5527 }
5528 let func_name = match &name.0[0] {
5529 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
5530 _ => return Ok(None),
5531 };
5532 if func_name != "range" {
5533 return Ok(None);
5534 }
5535
5536 if args.is_empty() || args.len() > 2 {
5537 return Err(Error::InvalidArgumentError(
5538 "range() requires one or two arguments".into(),
5539 ));
5540 }
5541
5542 let extract_int = |arg: &FunctionArg| -> Result<i64> {
5544 let arg_expr = match arg {
5545 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5546 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_))
5547 | FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
5548 return Err(Error::InvalidArgumentError(
5549 "range() argument must be an integer literal".into(),
5550 ));
5551 }
5552 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
5553 return Err(Error::InvalidArgumentError(
5554 "named arguments are not supported for range()".into(),
5555 ));
5556 }
5557 };
5558
5559 let value = plan_value_from_sql_expr(arg_expr)?;
5560 match value {
5561 PlanValue::Integer(v) => Ok(v),
5562 _ => Err(Error::InvalidArgumentError(
5563 "range() argument must be an integer literal".into(),
5564 )),
5565 }
5566 };
5567
5568 let (start, end, row_count) = if args.len() == 1 {
5569 let count = extract_int(&args[0])?;
5571 if count < 0 {
5572 return Err(Error::InvalidArgumentError(
5573 "range() argument must be non-negative".into(),
5574 ));
5575 }
5576 (0, count, count as usize)
5577 } else {
5578 let start = extract_int(&args[0])?;
5580 let end = extract_int(&args[1])?;
5581 if end < start {
5582 return Err(Error::InvalidArgumentError(
5583 "range() end must be >= start".into(),
5584 ));
5585 }
5586 let row_count = (end - start) as usize;
5587 (start, end, row_count)
5588 };
5589
5590 let column_name_lower = alias
5591 .as_ref()
5592 .and_then(|a| {
5593 a.columns
5594 .first()
5595 .map(|col| col.name.value.to_ascii_lowercase())
5596 })
5597 .unwrap_or_else(|| "range".to_string());
5598 let table_alias_lower = alias.as_ref().map(|a| a.name.value.to_ascii_lowercase());
5599
5600 Ok(Some(RuntimeRangeSpec {
5601 start,
5602 end,
5603 row_count,
5604 column_name_lower,
5605 table_alias_lower,
5606 }))
5607}
5608
5609pub struct RuntimeCreateTableBuilder<'ctx, P>
5610where
5611 P: Pager<Blob = EntryHandle> + Send + Sync,
5612{
5613 ctx: &'ctx RuntimeContext<P>,
5614 plan: CreateTablePlan,
5615}
5616
5617impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
5618where
5619 P: Pager<Blob = EntryHandle> + Send + Sync,
5620{
5621 pub fn if_not_exists(mut self) -> Self {
5622 self.plan.if_not_exists = true;
5623 self
5624 }
5625
5626 pub fn or_replace(mut self) -> Self {
5627 self.plan.or_replace = true;
5628 self
5629 }
5630
5631 pub fn with_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5632 self.plan
5633 .columns
5634 .push(ColumnSpec::new(name.into(), data_type, true));
5635 self
5636 }
5637
5638 pub fn with_not_null_column(mut self, name: impl Into<String>, data_type: DataType) -> Self {
5639 self.plan
5640 .columns
5641 .push(ColumnSpec::new(name.into(), data_type, false));
5642 self
5643 }
5644
5645 pub fn with_column_spec(mut self, spec: ColumnSpec) -> Self {
5646 self.plan.columns.push(spec);
5647 self
5648 }
5649
5650 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
5651 self.ctx.execute_create_table(self.plan)
5652 }
5653}
5654
5655#[derive(Clone, Debug, Default)]
5656pub struct RuntimeRow {
5657 values: Vec<(String, PlanValue)>,
5658}
5659
5660impl RuntimeRow {
5661 pub fn new() -> Self {
5662 Self { values: Vec::new() }
5663 }
5664
5665 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
5666 self.set(name, value);
5667 self
5668 }
5669
5670 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
5671 let name = name.into();
5672 let value = value.into();
5673 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
5674 *existing = value;
5675 } else {
5676 self.values.push((name, value));
5677 }
5678 self
5679 }
5680
5681 fn columns(&self) -> Vec<String> {
5682 self.values.iter().map(|(n, _)| n.clone()).collect()
5683 }
5684
5685 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
5686 let mut out = Vec::with_capacity(columns.len());
5687 for column in columns {
5688 let value = self
5689 .values
5690 .iter()
5691 .find(|(name, _)| name == column)
5692 .ok_or_else(|| {
5693 Error::InvalidArgumentError(format!(
5694 "insert row missing value for column '{}'",
5695 column
5696 ))
5697 })?;
5698 out.push(value.1.clone());
5699 }
5700 Ok(out)
5701 }
5702}
5703
5704pub fn row() -> RuntimeRow {
5705 RuntimeRow::new()
5706}
5707
5708#[doc(hidden)]
5709pub enum RuntimeInsertRowKind {
5710 Named {
5711 columns: Vec<String>,
5712 values: Vec<PlanValue>,
5713 },
5714 Positional(Vec<PlanValue>),
5715}
5716
5717pub trait IntoInsertRow {
5718 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
5719}
5720
5721impl IntoInsertRow for RuntimeRow {
5722 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5723 let row = self;
5724 if row.values.is_empty() {
5725 return Err(Error::InvalidArgumentError(
5726 "insert requires at least one column".into(),
5727 ));
5728 }
5729 let columns = row.columns();
5730 let values = row.values_for_columns(&columns)?;
5731 Ok(RuntimeInsertRowKind::Named { columns, values })
5732 }
5733}
5734
5735impl<T> IntoInsertRow for Vec<T>
5740where
5741 T: Into<PlanValue>,
5742{
5743 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5744 if self.is_empty() {
5745 return Err(Error::InvalidArgumentError(
5746 "insert requires at least one column".into(),
5747 ));
5748 }
5749 Ok(RuntimeInsertRowKind::Positional(
5750 self.into_iter().map(Into::into).collect(),
5751 ))
5752 }
5753}
5754
5755impl<T, const N: usize> IntoInsertRow for [T; N]
5756where
5757 T: Into<PlanValue>,
5758{
5759 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5760 if N == 0 {
5761 return Err(Error::InvalidArgumentError(
5762 "insert requires at least one column".into(),
5763 ));
5764 }
5765 Ok(RuntimeInsertRowKind::Positional(
5766 self.into_iter().map(Into::into).collect(),
5767 ))
5768 }
5769}
5770
5771macro_rules! impl_into_insert_row_tuple {
5772 ($($type:ident => $value:ident),+) => {
5773 impl<$($type,)+> IntoInsertRow for ($($type,)+)
5774 where
5775 $($type: Into<PlanValue>,)+
5776 {
5777 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
5778 let ($($value,)+) = self;
5779 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
5780 }
5781 }
5782 };
5783}
5784
5785impl_into_insert_row_tuple!(T1 => v1);
5786impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
5787impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
5788impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
5789impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
5790impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
5791impl_into_insert_row_tuple!(
5792 T1 => v1,
5793 T2 => v2,
5794 T3 => v3,
5795 T4 => v4,
5796 T5 => v5,
5797 T6 => v6,
5798 T7 => v7
5799);
5800impl_into_insert_row_tuple!(
5801 T1 => v1,
5802 T2 => v2,
5803 T3 => v3,
5804 T4 => v4,
5805 T5 => v5,
5806 T6 => v6,
5807 T7 => v7,
5808 T8 => v8
5809);
5810
5811pub struct RuntimeTableHandle<P>
5812where
5813 P: Pager<Blob = EntryHandle> + Send + Sync,
5814{
5815 context: Arc<RuntimeContext<P>>,
5816 display_name: String,
5817 _canonical_name: String,
5818}
5819
5820impl<P> RuntimeTableHandle<P>
5821where
5822 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
5823{
5824 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
5825 let (display_name, canonical_name) = canonical_table_name(name)?;
5826 context.lookup_table(&canonical_name)?;
5827 Ok(Self {
5828 context,
5829 display_name,
5830 _canonical_name: canonical_name,
5831 })
5832 }
5833
5834 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
5835 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
5836 }
5837
5838 pub fn insert_rows<R>(
5839 &self,
5840 rows: impl IntoIterator<Item = R>,
5841 ) -> Result<RuntimeStatementResult<P>>
5842 where
5843 R: IntoInsertRow,
5844 {
5845 enum InsertMode {
5846 Named,
5847 Positional,
5848 }
5849
5850 let table = self.context.lookup_table(&self._canonical_name)?;
5851 let schema = table.schema.as_ref();
5852 let schema_column_names: Vec<String> =
5853 schema.columns.iter().map(|col| col.name.clone()).collect();
5854 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
5855 let mut mode: Option<InsertMode> = None;
5856 let mut column_names: Option<Vec<String>> = None;
5857 let mut row_count = 0usize;
5858
5859 for row in rows.into_iter() {
5860 row_count += 1;
5861 match row.into_insert_row()? {
5862 RuntimeInsertRowKind::Named { columns, values } => {
5863 if let Some(existing) = &mode {
5864 if !matches!(existing, InsertMode::Named) {
5865 return Err(Error::InvalidArgumentError(
5866 "cannot mix positional and named insert rows".into(),
5867 ));
5868 }
5869 } else {
5870 mode = Some(InsertMode::Named);
5871 let mut seen =
5872 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
5873 for column in &columns {
5874 if !seen.insert(column.clone()) {
5875 return Err(Error::InvalidArgumentError(format!(
5876 "duplicate column '{}' in insert row",
5877 column
5878 )));
5879 }
5880 }
5881 column_names = Some(columns.clone());
5882 }
5883
5884 let expected = column_names
5885 .as_ref()
5886 .expect("column names must be initialized for named insert");
5887 if columns != *expected {
5888 return Err(Error::InvalidArgumentError(
5889 "insert rows must specify the same columns".into(),
5890 ));
5891 }
5892 if values.len() != expected.len() {
5893 return Err(Error::InvalidArgumentError(format!(
5894 "insert row expected {} values, found {}",
5895 expected.len(),
5896 values.len()
5897 )));
5898 }
5899 normalized_rows.push(values);
5900 }
5901 RuntimeInsertRowKind::Positional(values) => {
5902 if let Some(existing) = &mode {
5903 if !matches!(existing, InsertMode::Positional) {
5904 return Err(Error::InvalidArgumentError(
5905 "cannot mix positional and named insert rows".into(),
5906 ));
5907 }
5908 } else {
5909 mode = Some(InsertMode::Positional);
5910 column_names = Some(schema_column_names.clone());
5911 }
5912
5913 if values.len() != schema.columns.len() {
5914 return Err(Error::InvalidArgumentError(format!(
5915 "insert row expected {} values, found {}",
5916 schema.columns.len(),
5917 values.len()
5918 )));
5919 }
5920 normalized_rows.push(values);
5921 }
5922 }
5923 }
5924
5925 if row_count == 0 {
5926 return Err(Error::InvalidArgumentError(
5927 "insert requires at least one row".into(),
5928 ));
5929 }
5930
5931 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
5932 self.insert_row_batch(RowBatch {
5933 columns,
5934 rows: normalized_rows,
5935 })
5936 }
5937
5938 pub fn insert_row_batch(&self, batch: RowBatch) -> Result<RuntimeStatementResult<P>> {
5939 if batch.rows.is_empty() {
5940 return Err(Error::InvalidArgumentError(
5941 "insert requires at least one row".into(),
5942 ));
5943 }
5944 if batch.columns.is_empty() {
5945 return Err(Error::InvalidArgumentError(
5946 "insert requires at least one column".into(),
5947 ));
5948 }
5949 for row in &batch.rows {
5950 if row.len() != batch.columns.len() {
5951 return Err(Error::InvalidArgumentError(
5952 "insert rows must have values for every column".into(),
5953 ));
5954 }
5955 }
5956
5957 let plan = InsertPlan {
5958 table: self.display_name.clone(),
5959 columns: batch.columns,
5960 source: InsertSource::Rows(batch.rows),
5961 };
5962 self.context.insert(plan)
5963 }
5964
5965 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
5966 let plan = InsertPlan {
5967 table: self.display_name.clone(),
5968 columns: Vec::new(),
5969 source: InsertSource::Batches(batches),
5970 };
5971 self.context.insert(plan)
5972 }
5973
5974 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
5975 let RowBatch { columns, rows } = frame.collect_rows()?;
5976 self.insert_row_batch(RowBatch { columns, rows })
5977 }
5978
5979 pub fn name(&self) -> &str {
5980 &self.display_name
5981 }
5982}
5983
5984#[cfg(test)]
5985mod tests {
5986 use super::*;
5987 use arrow::array::{Array, Int64Array, StringArray};
5988 use llkv_storage::pager::MemPager;
5989 use std::sync::Arc;
5990
5991 #[test]
5992 fn create_insert_select_roundtrip() {
5993 let pager = Arc::new(MemPager::default());
5994 let context = Arc::new(RuntimeContext::new(pager));
5995
5996 let table = context
5997 .create_table(
5998 "people",
5999 [
6000 ("id", DataType::Int64, NotNull),
6001 ("name", DataType::Utf8, Nullable),
6002 ],
6003 )
6004 .expect("create table");
6005 table
6006 .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
6007 .expect("insert rows");
6008
6009 let execution = table.lazy().expect("lazy scan");
6010 let select = execution.collect().expect("build select execution");
6011 let batches = select.collect().expect("collect batches");
6012 assert_eq!(batches.len(), 1);
6013 let column = batches[0]
6014 .column(1)
6015 .as_any()
6016 .downcast_ref::<StringArray>()
6017 .expect("string column");
6018 assert_eq!(column.len(), 2);
6019 }
6020
6021 #[test]
6022 fn aggregate_count_nulls() {
6023 let pager = Arc::new(MemPager::default());
6024 let context = Arc::new(RuntimeContext::new(pager));
6025
6026 let table = context
6027 .create_table("ints", [("i", DataType::Int64)])
6028 .expect("create table");
6029 table
6030 .insert_rows([
6031 (PlanValue::Null,),
6032 (PlanValue::Integer(1),),
6033 (PlanValue::Null,),
6034 ])
6035 .expect("insert rows");
6036
6037 let plan =
6038 SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
6039 let execution = context.execute_select(plan).expect("select");
6040 let batches = execution.collect().expect("collect batches");
6041 let column = batches[0]
6042 .column(0)
6043 .as_any()
6044 .downcast_ref::<Int64Array>()
6045 .expect("int column");
6046 assert_eq!(column.value(0), 2);
6047 }
6048}