1use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::{DataType, IntervalUnit};
14use croaring::Treemap;
15use llkv_column_map::store::GatherNullPolicy;
16use llkv_compute::date::parse_date32_literal;
17use llkv_compute::scalar::decimal::{
18 align_decimal_to_scale, decimal_truthy, truncate_decimal_to_i64,
19};
20use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
21use llkv_plan::PlanValue;
22use llkv_result::{Error, Result};
23use llkv_storage::pager::Pager;
24use llkv_table::FieldId;
25use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
26use llkv_types::LogicalFieldId;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::Arc;
29
30use crate::TXN_ID_AUTO_COMMIT;
31use crate::TXN_ID_NONE;
32
33use super::RuntimeContext;
34
35impl<P> RuntimeContext<P>
36where
37 P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39 pub(super) fn coerce_plan_value_for_column(
43 &self,
44 value: PlanValue,
45 column: &ExecutorColumn,
46 ) -> Result<PlanValue> {
47 match value {
48 PlanValue::Null => Ok(PlanValue::Null),
49 PlanValue::Decimal(decimal) => match &column.data_type {
50 DataType::Decimal128(precision, scale) => {
51 let aligned = align_decimal_to_scale(decimal, *precision, *scale).map_err(
52 |err| {
53 Error::InvalidArgumentError(format!(
54 "decimal literal {} incompatible with DECIMAL({}, {}) column '{}': {err}",
55 decimal, precision, scale, column.name
56 ))
57 },
58 )?;
59 Ok(PlanValue::Decimal(aligned))
60 }
61 DataType::Int64 => {
62 let coerced = truncate_decimal_to_i64(decimal).map_err(|err| {
63 Error::InvalidArgumentError(format!(
64 "decimal literal {} incompatible with INT column '{}': {err}",
65 decimal, column.name
66 ))
67 })?;
68 Ok(PlanValue::Integer(coerced))
69 }
70 DataType::Float64 => Ok(PlanValue::Float(decimal.to_f64())),
71 DataType::Boolean => Ok(PlanValue::Integer(if decimal_truthy(decimal) {
72 1
73 } else {
74 0
75 })),
76 DataType::Utf8 => Ok(PlanValue::String(decimal.to_string())),
77 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
78 "cannot assign decimal literal to DATE column '{}'",
79 column.name
80 ))),
81 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
82 "cannot assign decimal literal to STRUCT column '{}'",
83 column.name
84 ))),
85 other => Err(Error::InvalidArgumentError(format!(
86 "unsupported target type {:?} for DECIMAL literal in column '{}'",
87 other, column.name
88 ))),
89 },
90 PlanValue::Integer(v) => match &column.data_type {
91 DataType::Int64 => Ok(PlanValue::Integer(v)),
92 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
93 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
94 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
95 DataType::Date32 => {
96 let casted = i32::try_from(v).map_err(|_| {
97 Error::InvalidArgumentError(format!(
98 "integer literal out of range for DATE column '{}'",
99 column.name
100 ))
101 })?;
102 Ok(PlanValue::Date32(casted))
103 }
104 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
105 "cannot assign integer to STRUCT column '{}'",
106 column.name
107 ))),
108 _ => Ok(PlanValue::Integer(v)),
109 },
110 PlanValue::Float(v) => match &column.data_type {
111 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
112 DataType::Float64 => Ok(PlanValue::Float(v)),
113 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
114 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
115 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
116 "cannot assign floating-point value to DATE column '{}'",
117 column.name
118 ))),
119 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
120 "cannot assign floating-point value to STRUCT column '{}'",
121 column.name
122 ))),
123 _ => Ok(PlanValue::Float(v)),
124 },
125 PlanValue::String(s) => match &column.data_type {
126 DataType::Boolean => {
127 let normalized = s.trim().to_ascii_lowercase();
128 match normalized.as_str() {
129 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
130 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
131 _ => Err(Error::InvalidArgumentError(format!(
132 "cannot assign string '{}' to BOOLEAN column '{}'",
133 s, column.name
134 ))),
135 }
136 }
137 DataType::Utf8 => Ok(PlanValue::String(s)),
138 DataType::Date32 => {
139 let days = parse_date32_literal(&s)?;
140 Ok(PlanValue::Date32(days))
141 }
142 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
143 "cannot assign string '{}' to numeric column '{}'",
144 s, column.name
145 ))),
146 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
147 "cannot assign string to STRUCT column '{}'",
148 column.name
149 ))),
150 _ => Ok(PlanValue::String(s)),
151 },
152 PlanValue::Struct(map) => match &column.data_type {
153 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
154 _ => Err(Error::InvalidArgumentError(format!(
155 "cannot assign struct value to column '{}'",
156 column.name
157 ))),
158 },
159 PlanValue::Interval(interval) => match &column.data_type {
160 DataType::Interval(IntervalUnit::MonthDayNano) => Ok(PlanValue::Interval(interval)),
161 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
162 "cannot assign INTERVAL literal to STRUCT column '{}'",
163 column.name
164 ))),
165 other => Err(Error::InvalidArgumentError(format!(
166 "unsupported target type {:?} for INTERVAL literal in column '{}'",
167 other, column.name
168 ))),
169 },
170 PlanValue::Date32(days) => match &column.data_type {
171 DataType::Date32 => Ok(PlanValue::Date32(days)),
172 DataType::Int64 => Ok(PlanValue::Integer(i64::from(days))),
173 DataType::Float64 => Ok(PlanValue::Float(days as f64)),
174 DataType::Utf8 => Err(Error::InvalidArgumentError(format!(
175 "cannot assign DATE literal to TEXT column '{}'",
176 column.name
177 ))),
178 DataType::Boolean => Err(Error::InvalidArgumentError(format!(
179 "cannot assign DATE literal to BOOLEAN column '{}'",
180 column.name
181 ))),
182 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
183 "cannot assign DATE literal to STRUCT column '{}'",
184 column.name
185 ))),
186 other => Err(Error::InvalidArgumentError(format!(
187 "unsupported target type {:?} for DATE literal in column '{}'",
188 other, column.name
189 ))),
190 },
191 }
192 }
193
194 pub(super) fn scan_column_values(
201 &self,
202 table: &ExecutorTable<P>,
203 field_id: FieldId,
204 snapshot: TransactionSnapshot,
205 ) -> Result<Vec<PlanValue>> {
206 let table_id = table.table.table_id();
207 use llkv_expr::{Expr, Filter, Operator};
208 use std::ops::Bound;
209
210 let match_all_filter = Filter {
212 field_id,
213 op: Operator::Range {
214 lower: Bound::Unbounded,
215 upper: Bound::Unbounded,
216 },
217 };
218 let filter_expr = Expr::Pred(match_all_filter);
219
220 let row_ids = match table.table.filter_row_ids(&filter_expr) {
222 Ok(ids) => ids,
223 Err(Error::NotFound) => return Ok(Vec::new()),
224 Err(e) => return Err(e),
225 };
226
227 let row_ids = filter_row_ids_for_snapshot(
229 table.table.as_ref(),
230 row_ids,
231 &self.txn_manager,
232 snapshot,
233 )?;
234
235 if row_ids.is_empty() {
236 return Ok(Vec::new());
237 }
238
239 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
241 let row_count = row_ids.cardinality() as usize;
242 let mut stream = match table.table.stream_columns(
243 vec![logical_field_id],
244 &row_ids,
245 GatherNullPolicy::IncludeNulls,
246 ) {
247 Ok(stream) => stream,
248 Err(Error::NotFound) => return Ok(Vec::new()),
249 Err(e) => return Err(e),
250 };
251
252 let mut values = Vec::with_capacity(row_count);
256 while let Some(chunk) = stream.next_batch()? {
257 let batch = chunk.batch();
258 if batch.num_columns() == 0 {
259 continue;
260 }
261 let array = batch.column(0);
262 for row_idx in 0..batch.num_rows() {
263 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
264 values.push(value);
265 }
266 }
267 }
268
269 Ok(values)
270 }
271
272 pub(super) fn scan_multi_column_values(
280 &self,
281 table: &ExecutorTable<P>,
282 field_ids: &[FieldId],
283 snapshot: TransactionSnapshot,
284 ) -> Result<Vec<Vec<PlanValue>>> {
285 if field_ids.is_empty() {
286 return Ok(Vec::new());
287 }
288
289 let table_id = table.table.table_id();
290 use llkv_expr::{Expr, Filter, Operator};
291 use std::ops::Bound;
292
293 let match_all_filter = Filter {
294 field_id: field_ids[0],
295 op: Operator::Range {
296 lower: Bound::Unbounded,
297 upper: Bound::Unbounded,
298 },
299 };
300 let filter_expr = Expr::Pred(match_all_filter);
301
302 let row_ids = match table.table.filter_row_ids(&filter_expr) {
303 Ok(ids) => ids,
304 Err(Error::NotFound) => return Ok(Vec::new()),
305 Err(e) => return Err(e),
306 };
307
308 let row_ids = filter_row_ids_for_snapshot(
309 table.table.as_ref(),
310 row_ids,
311 &self.txn_manager,
312 snapshot,
313 )?;
314
315 if row_ids.is_empty() {
316 return Ok(Vec::new());
317 }
318
319 let logical_field_ids: Vec<_> = field_ids
320 .iter()
321 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
322 .collect();
323
324 let total_rows = row_ids.cardinality() as usize;
325 let mut stream = match table.table.stream_columns(
326 logical_field_ids,
327 &row_ids,
328 GatherNullPolicy::IncludeNulls,
329 ) {
330 Ok(stream) => stream,
331 Err(Error::NotFound) => return Ok(Vec::new()),
332 Err(e) => return Err(e),
333 };
334
335 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
336 while let Some(chunk) = stream.next_batch()? {
337 let batch = chunk.batch();
338 if batch.num_columns() == 0 {
339 continue;
340 }
341
342 let base = chunk.row_offset();
343 let local_len = batch.num_rows();
344 for col_idx in 0..batch.num_columns() {
345 let array = batch.column(col_idx);
346 for local_idx in 0..local_len {
347 let target_index = base + local_idx;
348 debug_assert!(
349 target_index < rows.len(),
350 "stream chunk produced out-of-bounds row index"
351 );
352 if let Some(row) = rows.get_mut(target_index) {
353 match llkv_plan::plan_value_from_array(array, local_idx) {
354 Ok(value) => row.push(value),
355 Err(_) => row.push(PlanValue::Null),
356 }
357 }
358 }
359 }
360 }
361
362 Ok(rows)
363 }
364
365 pub(super) fn scan_multi_column_values_for_fk_check(
371 &self,
372 table: &ExecutorTable<P>,
373 field_ids: &[FieldId],
374 snapshot: TransactionSnapshot,
375 ) -> Result<Vec<Vec<PlanValue>>> {
376 if field_ids.is_empty() {
377 return Ok(Vec::new());
378 }
379
380 let table_id = table.table.table_id();
381 use llkv_expr::{Expr, Filter, Operator};
382 use std::ops::Bound;
383
384 let match_all_filter = Filter {
385 field_id: field_ids[0],
386 op: Operator::Range {
387 lower: Bound::Unbounded,
388 upper: Bound::Unbounded,
389 },
390 };
391 let filter_expr = Expr::Pred(match_all_filter);
392
393 let row_ids = match table.table.filter_row_ids(&filter_expr) {
394 Ok(ids) => ids,
395 Err(Error::NotFound) => return Ok(Vec::new()),
396 Err(e) => return Err(e),
397 };
398
399 let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
401 table.table.as_ref(),
402 row_ids,
403 &self.txn_manager,
404 snapshot,
405 )?;
406
407 if row_ids.is_empty() {
408 return Ok(Vec::new());
409 }
410
411 let logical_field_ids: Vec<_> = field_ids
412 .iter()
413 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
414 .collect();
415
416 let total_rows = row_ids.cardinality() as usize;
417 let mut stream = match table.table.stream_columns(
418 logical_field_ids,
419 &row_ids,
420 GatherNullPolicy::IncludeNulls,
421 ) {
422 Ok(stream) => stream,
423 Err(Error::NotFound) => return Ok(Vec::new()),
424 Err(e) => return Err(e),
425 };
426
427 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
428 while let Some(chunk) = stream.next_batch()? {
429 let batch = chunk.batch();
430 if batch.num_columns() == 0 {
431 continue;
432 }
433
434 let base = chunk.row_offset();
435 let local_len = batch.num_rows();
436 for col_idx in 0..batch.num_columns() {
437 let array = batch.column(col_idx);
438 for local_idx in 0..local_len {
439 let target_index = base + local_idx;
440 debug_assert!(
441 target_index < rows.len(),
442 "stream chunk produced out-of-bounds row index"
443 );
444 if let Some(row) = rows.get_mut(target_index) {
445 match llkv_plan::plan_value_from_array(array, local_idx) {
446 Ok(value) => row.push(value),
447 Err(_) => row.push(PlanValue::Null),
448 }
449 }
450 }
451 }
452 }
453
454 Ok(rows)
455 }
456
457 pub(super) fn collect_row_values_for_ids(
461 &self,
462 table: &ExecutorTable<P>,
463 row_ids: &Treemap,
464 field_ids: &[FieldId],
465 ) -> Result<Vec<Vec<PlanValue>>> {
466 if row_ids.is_empty() || field_ids.is_empty() {
467 return Ok(Vec::new());
468 }
469
470 let table_id = table.table.table_id();
471 let logical_field_ids: Vec<LogicalFieldId> = field_ids
472 .iter()
473 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
474 .collect();
475
476 let mut stream = match table.table.stream_columns(
477 logical_field_ids.clone(),
478 row_ids,
479 GatherNullPolicy::IncludeNulls,
480 ) {
481 Ok(stream) => stream,
482 Err(Error::NotFound) => return Ok(Vec::new()),
483 Err(e) => return Err(e),
484 };
485
486 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.cardinality() as usize];
487 while let Some(chunk) = stream.next_batch()? {
488 let batch = chunk.batch();
489 let base = chunk.row_offset();
490 let local_len = batch.num_rows();
491 for col_idx in 0..batch.num_columns() {
492 let array = batch.column(col_idx);
493 for local_idx in 0..local_len {
494 let target_index = base + local_idx;
495 if let Some(row) = rows.get_mut(target_index) {
496 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
497 row.push(value);
498 }
499 }
500 }
501 }
502
503 Ok(rows)
504 }
505
506 pub(super) fn filter_visible_row_ids(
510 &self,
511 table: &ExecutorTable<P>,
512 row_ids: Treemap,
513 snapshot: TransactionSnapshot,
514 ) -> Result<Treemap> {
515 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
516 }
517
518 pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
522 if txn_id == TXN_ID_AUTO_COMMIT {
523 return;
524 }
525
526 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
527 guard.entry(txn_id).or_default().insert(canonical_name);
528 }
529
530 pub(super) fn collect_rows_created_by_txn(
534 &self,
535 table: &ExecutorTable<P>,
536 txn_id: TxnId,
537 ) -> Result<Vec<Vec<PlanValue>>> {
538 if txn_id == TXN_ID_AUTO_COMMIT {
539 return Ok(Vec::new());
540 }
541
542 if table.schema.columns.is_empty() {
543 return Ok(Vec::new());
544 }
545
546 let Some(first_field_id) = table.schema.first_field_id() else {
547 return Ok(Vec::new());
548 };
549 let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
550
551 let row_ids = table.table.filter_row_ids(&filter_expr)?;
552 if row_ids.is_empty() {
553 return Ok(Vec::new());
554 }
555
556 let table_id = table.table.table_id();
557 let mut logical_fields: Vec<LogicalFieldId> =
558 Vec::with_capacity(table.schema.columns.len() + 2);
559 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
560 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
561 for column in &table.schema.columns {
562 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
563 }
564
565 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
566 let mut stream = table.table.stream_columns(
567 Arc::clone(&logical_fields),
568 row_ids,
569 GatherNullPolicy::IncludeNulls,
570 )?;
571
572 let mut rows = Vec::new();
573 while let Some(chunk) = stream.next_batch()? {
574 let batch = chunk.batch();
575 if batch.num_columns() < table.schema.columns.len() + 2 {
576 continue;
577 }
578
579 let created_col = batch
580 .column(0)
581 .as_any()
582 .downcast_ref::<UInt64Array>()
583 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
584 let deleted_col = batch
585 .column(1)
586 .as_any()
587 .downcast_ref::<UInt64Array>()
588 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
589
590 for row_idx in 0..batch.num_rows() {
591 let created_by = if created_col.is_null(row_idx) {
592 TXN_ID_AUTO_COMMIT
593 } else {
594 created_col.value(row_idx)
595 };
596 if created_by != txn_id {
597 continue;
598 }
599
600 let deleted_by = if deleted_col.is_null(row_idx) {
601 TXN_ID_NONE
602 } else {
603 deleted_col.value(row_idx)
604 };
605 if deleted_by != TXN_ID_NONE {
606 continue;
607 }
608
609 let mut row_values = Vec::with_capacity(table.schema.columns.len());
610 for col_idx in 0..table.schema.columns.len() {
611 let array = batch.column(col_idx + 2);
612 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
613 row_values.push(value);
614 }
615 rows.push(row_values);
616 }
617 }
618
619 Ok(rows)
620 }
621
622 pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
624 if txn_id == TXN_ID_AUTO_COMMIT {
625 return Ok(());
626 }
627
628 let pending_tables = {
629 let guard = self.txn_tables_with_new_rows.read().unwrap();
630 guard.get(&txn_id).cloned()
631 };
632
633 let Some(tables) = pending_tables else {
634 return Ok(());
635 };
636
637 for canonical_name in tables {
638 let table = match self.lookup_table(&canonical_name) {
639 Ok(table) => table,
640 Err(Error::NotFound) => continue,
641 Err(err) => return Err(err),
642 };
643
644 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
645 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
646 continue;
647 };
648
649 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
650 if new_rows.is_empty() {
651 continue;
652 }
653
654 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
655 let table_for_fetch = Arc::clone(&table);
656 let snapshot = self.default_snapshot();
657
658 self.constraint_service.validate_primary_key_rows(
659 &constraint_ctx.schema_field_ids,
660 primary_key,
661 &column_order,
662 &new_rows,
663 |field_ids| {
664 self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
665 },
666 )?;
667 }
668
669 Ok(())
670 }
671
672 pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
674 if txn_id == TXN_ID_AUTO_COMMIT {
675 return;
676 }
677
678 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
679 guard.remove(&txn_id);
680 }
681
682 pub(super) fn remove_table_entry(&self, canonical_name: &str) {
687 let mut tables = self.tables.write().unwrap();
688 if tables.remove(canonical_name).is_some() {
689 tracing::trace!(
690 "remove_table_entry: removed table '{}' from context cache",
691 canonical_name
692 );
693 }
694 }
695
696 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
698 self.dropped_tables.read().unwrap().contains(canonical_name)
699 }
700}