1use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::{DataType, IntervalUnit};
14use croaring::Treemap;
15use llkv_column_map::store::GatherNullPolicy;
16use llkv_compute::date::parse_date32_literal;
17use llkv_compute::scalar::decimal::{
18 align_decimal_to_scale, decimal_truthy, truncate_decimal_to_i64,
19};
20use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
21use llkv_plan::PlanValue;
22use llkv_result::{Error, Result};
23use llkv_storage::pager::Pager;
24use llkv_table::{FieldId, RowStream};
25use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
26use llkv_types::LogicalFieldId;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::Arc;
29
30use crate::TXN_ID_AUTO_COMMIT;
31use crate::TXN_ID_NONE;
32
33use super::RuntimeContext;
34
35impl<P> RuntimeContext<P>
36where
37 P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39 pub(super) fn coerce_plan_value_for_column(
43 &self,
44 value: PlanValue,
45 column: &ExecutorColumn,
46 ) -> Result<PlanValue> {
47 match value {
48 PlanValue::Null => Ok(PlanValue::Null),
49 PlanValue::Decimal(decimal) => match &column.data_type {
50 DataType::Decimal128(precision, scale) => {
51 let aligned = align_decimal_to_scale(decimal, *precision, *scale).map_err(
52 |err| {
53 Error::InvalidArgumentError(format!(
54 "decimal literal {} incompatible with DECIMAL({}, {}) column '{}': {err}",
55 decimal, precision, scale, column.name
56 ))
57 },
58 )?;
59 Ok(PlanValue::Decimal(aligned))
60 }
61 DataType::Int64 => {
62 let coerced = truncate_decimal_to_i64(decimal).map_err(|err| {
63 Error::InvalidArgumentError(format!(
64 "decimal literal {} incompatible with INT column '{}': {err}",
65 decimal, column.name
66 ))
67 })?;
68 Ok(PlanValue::Integer(coerced))
69 }
70 DataType::Float64 => Ok(PlanValue::Float(decimal.to_f64())),
71 DataType::Boolean => Ok(PlanValue::Integer(if decimal_truthy(decimal) {
72 1
73 } else {
74 0
75 })),
76 DataType::Utf8 => Ok(PlanValue::String(decimal.to_string())),
77 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
78 "cannot assign decimal literal to DATE column '{}'",
79 column.name
80 ))),
81 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
82 "cannot assign decimal literal to STRUCT column '{}'",
83 column.name
84 ))),
85 other => Err(Error::InvalidArgumentError(format!(
86 "unsupported target type {:?} for DECIMAL literal in column '{}'",
87 other, column.name
88 ))),
89 },
90 PlanValue::Integer(v) => match &column.data_type {
91 DataType::Int64 => Ok(PlanValue::Integer(v)),
92 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
93 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
94 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
95 DataType::Date32 => {
96 let casted = i32::try_from(v).map_err(|_| {
97 Error::InvalidArgumentError(format!(
98 "integer literal out of range for DATE column '{}'",
99 column.name
100 ))
101 })?;
102 Ok(PlanValue::Date32(casted))
103 }
104 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
105 "cannot assign integer to STRUCT column '{}'",
106 column.name
107 ))),
108 _ => Ok(PlanValue::Integer(v)),
109 },
110 PlanValue::Float(v) => match &column.data_type {
111 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
112 DataType::Float64 => Ok(PlanValue::Float(v)),
113 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
114 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
115 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
116 "cannot assign floating-point value to DATE column '{}'",
117 column.name
118 ))),
119 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
120 "cannot assign floating-point value to STRUCT column '{}'",
121 column.name
122 ))),
123 _ => Ok(PlanValue::Float(v)),
124 },
125 PlanValue::String(s) => match &column.data_type {
126 DataType::Boolean => {
127 let normalized = s.trim().to_ascii_lowercase();
128 match normalized.as_str() {
129 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
130 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
131 _ => Err(Error::InvalidArgumentError(format!(
132 "cannot assign string '{}' to BOOLEAN column '{}'",
133 s, column.name
134 ))),
135 }
136 }
137 DataType::Utf8 => Ok(PlanValue::String(s)),
138 DataType::Date32 => {
139 let days = parse_date32_literal(&s)?;
140 Ok(PlanValue::Date32(days))
141 }
142 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
143 "cannot assign string '{}' to numeric column '{}'",
144 s, column.name
145 ))),
146 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
147 "cannot assign string to STRUCT column '{}'",
148 column.name
149 ))),
150 _ => Ok(PlanValue::String(s)),
151 },
152 PlanValue::Struct(map) => match &column.data_type {
153 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
154 _ => Err(Error::InvalidArgumentError(format!(
155 "cannot assign struct value to column '{}'",
156 column.name
157 ))),
158 },
159 PlanValue::Interval(interval) => match &column.data_type {
160 DataType::Interval(IntervalUnit::MonthDayNano) => Ok(PlanValue::Interval(interval)),
161 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
162 "cannot assign INTERVAL literal to STRUCT column '{}'",
163 column.name
164 ))),
165 other => Err(Error::InvalidArgumentError(format!(
166 "unsupported target type {:?} for INTERVAL literal in column '{}'",
167 other, column.name
168 ))),
169 },
170 PlanValue::Date32(days) => match &column.data_type {
171 DataType::Date32 => Ok(PlanValue::Date32(days)),
172 DataType::Int64 => Ok(PlanValue::Integer(i64::from(days))),
173 DataType::Float64 => Ok(PlanValue::Float(days as f64)),
174 DataType::Utf8 => Err(Error::InvalidArgumentError(format!(
175 "cannot assign DATE literal to TEXT column '{}'",
176 column.name
177 ))),
178 DataType::Boolean => Err(Error::InvalidArgumentError(format!(
179 "cannot assign DATE literal to BOOLEAN column '{}'",
180 column.name
181 ))),
182 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
183 "cannot assign DATE literal to STRUCT column '{}'",
184 column.name
185 ))),
186 other => Err(Error::InvalidArgumentError(format!(
187 "unsupported target type {:?} for DATE literal in column '{}'",
188 other, column.name
189 ))),
190 },
191 }
192 }
193
194 pub(super) fn scan_column_values(
201 &self,
202 table: &ExecutorTable<P>,
203 field_id: FieldId,
204 snapshot: TransactionSnapshot,
205 ) -> Result<Vec<PlanValue>> {
206 let table_id = table.table_id();
207 use llkv_expr::{Expr, Filter, Operator};
208 use std::ops::Bound;
209
210 let match_all_filter = Filter {
212 field_id,
213 op: Operator::Range {
214 lower: Bound::Unbounded,
215 upper: Bound::Unbounded,
216 },
217 };
218 let filter_expr = Expr::Pred(match_all_filter);
219
220 let row_ids = match table.filter_row_ids(&filter_expr) {
222 Ok(ids) => ids,
223 Err(Error::NotFound) => return Ok(Vec::new()),
224 Err(e) => return Err(e),
225 };
226
227 let row_ids = filter_row_ids_for_snapshot(
229 table.table.as_ref(),
230 row_ids,
231 &self.txn_manager,
232 snapshot,
233 )?;
234
235 if row_ids.is_empty() {
236 return Ok(Vec::new());
237 }
238
239 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
241 let row_count = row_ids.cardinality() as usize;
242 let mut stream = match table.stream_columns(
243 vec![logical_field_id],
244 &row_ids,
245 GatherNullPolicy::IncludeNulls,
246 ) {
247 Ok(stream) => stream,
248 Err(Error::NotFound) => return Ok(Vec::new()),
249 Err(e) => return Err(e),
250 };
251
252 let mut values = Vec::with_capacity(row_count);
256 while let Some(chunk) = stream.next_chunk()? {
257 let batch = chunk.record_batch();
258 if batch.num_columns() == 0 {
259 continue;
260 }
261 let array = batch.column(0);
262 for row_idx in 0..batch.num_rows() {
263 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
264 values.push(value);
265 }
266 }
267 }
268
269 Ok(values)
270 }
271
272 pub(super) fn scan_multi_column_values(
280 &self,
281 table: &ExecutorTable<P>,
282 field_ids: &[FieldId],
283 snapshot: TransactionSnapshot,
284 ) -> Result<Vec<Vec<PlanValue>>> {
285 if field_ids.is_empty() {
286 return Ok(Vec::new());
287 }
288
289 let table_id = table.table_id();
290 use llkv_expr::{Expr, Filter, Operator};
291 use std::ops::Bound;
292
293 let match_all_filter = Filter {
294 field_id: field_ids[0],
295 op: Operator::Range {
296 lower: Bound::Unbounded,
297 upper: Bound::Unbounded,
298 },
299 };
300 let filter_expr = Expr::Pred(match_all_filter);
301
302 let row_ids = match table.filter_row_ids(&filter_expr) {
303 Ok(ids) => ids,
304 Err(Error::NotFound) => return Ok(Vec::new()),
305 Err(e) => return Err(e),
306 };
307
308 let row_ids = filter_row_ids_for_snapshot(
309 table.table.as_ref(),
310 row_ids,
311 &self.txn_manager,
312 snapshot,
313 )?;
314
315 if row_ids.is_empty() {
316 return Ok(Vec::new());
317 }
318
319 let logical_field_ids: Vec<_> = field_ids
320 .iter()
321 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
322 .collect();
323
324 let total_rows = row_ids.cardinality() as usize;
325 let mut stream =
326 match table.stream_columns(logical_field_ids, &row_ids, GatherNullPolicy::IncludeNulls)
327 {
328 Ok(stream) => stream,
329 Err(Error::NotFound) => return Ok(Vec::new()),
330 Err(e) => return Err(e),
331 };
332
333 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
334 let mut emitted = 0usize;
335 while let Some(chunk) = stream.next_chunk()? {
336 let batch = chunk.record_batch();
337 if batch.num_columns() == 0 {
338 continue;
339 }
340
341 let base = emitted;
342 let local_len = batch.num_rows();
343 for col_idx in 0..batch.num_columns() {
344 let array = batch.column(col_idx);
345 for local_idx in 0..local_len {
346 let target_index = base + local_idx;
347 debug_assert!(
348 target_index < rows.len(),
349 "stream chunk produced out-of-bounds row index"
350 );
351 if let Some(row) = rows.get_mut(target_index) {
352 match llkv_plan::plan_value_from_array(array, local_idx) {
353 Ok(value) => row.push(value),
354 Err(_) => row.push(PlanValue::Null),
355 }
356 }
357 }
358 }
359 emitted += local_len;
360 }
361
362 Ok(rows)
363 }
364
365 pub(super) fn scan_multi_column_values_for_fk_check(
371 &self,
372 table: &ExecutorTable<P>,
373 field_ids: &[FieldId],
374 snapshot: TransactionSnapshot,
375 ) -> Result<Vec<Vec<PlanValue>>> {
376 if field_ids.is_empty() {
377 return Ok(Vec::new());
378 }
379
380 let table_id = table.table_id();
381 use llkv_expr::{Expr, Filter, Operator};
382 use std::ops::Bound;
383
384 let match_all_filter = Filter {
385 field_id: field_ids[0],
386 op: Operator::Range {
387 lower: Bound::Unbounded,
388 upper: Bound::Unbounded,
389 },
390 };
391 let filter_expr = Expr::Pred(match_all_filter);
392
393 let row_ids = match table.filter_row_ids(&filter_expr) {
394 Ok(ids) => ids,
395 Err(Error::NotFound) => return Ok(Vec::new()),
396 Err(e) => return Err(e),
397 };
398
399 let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
401 table.table.as_ref(),
402 row_ids,
403 &self.txn_manager,
404 snapshot,
405 )?;
406
407 if row_ids.is_empty() {
408 return Ok(Vec::new());
409 }
410
411 let logical_field_ids: Vec<_> = field_ids
412 .iter()
413 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
414 .collect();
415
416 let total_rows = row_ids.cardinality() as usize;
417 let mut stream =
418 match table.stream_columns(logical_field_ids, &row_ids, GatherNullPolicy::IncludeNulls)
419 {
420 Ok(stream) => stream,
421 Err(Error::NotFound) => return Ok(Vec::new()),
422 Err(e) => return Err(e),
423 };
424
425 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
426 let mut emitted = 0usize;
427 while let Some(chunk) = stream.next_chunk()? {
428 let batch = chunk.record_batch();
429 if batch.num_columns() == 0 {
430 continue;
431 }
432
433 let base = emitted;
434 let local_len = batch.num_rows();
435 for col_idx in 0..batch.num_columns() {
436 let array = batch.column(col_idx);
437 for local_idx in 0..local_len {
438 let target_index = base + local_idx;
439 debug_assert!(
440 target_index < rows.len(),
441 "stream chunk produced out-of-bounds row index"
442 );
443 if let Some(row) = rows.get_mut(target_index) {
444 match llkv_plan::plan_value_from_array(array, local_idx) {
445 Ok(value) => row.push(value),
446 Err(_) => row.push(PlanValue::Null),
447 }
448 }
449 }
450 }
451 emitted += local_len;
452 }
453
454 Ok(rows)
455 }
456
457 pub(super) fn collect_row_values_for_ids(
461 &self,
462 table: &ExecutorTable<P>,
463 row_ids: &Treemap,
464 field_ids: &[FieldId],
465 ) -> Result<Vec<Vec<PlanValue>>> {
466 if row_ids.is_empty() || field_ids.is_empty() {
467 return Ok(Vec::new());
468 }
469
470 let table_id = table.table_id();
471 let logical_field_ids: Vec<LogicalFieldId> = field_ids
472 .iter()
473 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
474 .collect();
475
476 let mut stream = match table.stream_columns(
477 logical_field_ids.clone(),
478 row_ids,
479 GatherNullPolicy::IncludeNulls,
480 ) {
481 Ok(stream) => stream,
482 Err(Error::NotFound) => return Ok(Vec::new()),
483 Err(e) => return Err(e),
484 };
485
486 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.cardinality() as usize];
487 let mut emitted = 0usize;
488 while let Some(chunk) = stream.next_chunk()? {
489 let batch = chunk.record_batch();
490 let base = emitted;
491 let local_len = batch.num_rows();
492 for col_idx in 0..batch.num_columns() {
493 let array = batch.column(col_idx);
494 for local_idx in 0..local_len {
495 let target_index = base + local_idx;
496 if let Some(row) = rows.get_mut(target_index) {
497 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
498 row.push(value);
499 }
500 }
501 }
502 emitted += local_len;
503 }
504
505 Ok(rows)
506 }
507
508 pub(super) fn filter_visible_row_ids(
512 &self,
513 table: &ExecutorTable<P>,
514 row_ids: Treemap,
515 snapshot: TransactionSnapshot,
516 ) -> Result<Treemap> {
517 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
518 }
519
520 pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
524 if txn_id == TXN_ID_AUTO_COMMIT {
525 return;
526 }
527
528 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
529 guard.entry(txn_id).or_default().insert(canonical_name);
530 }
531
532 pub(super) fn collect_rows_created_by_txn(
536 &self,
537 table: &ExecutorTable<P>,
538 txn_id: TxnId,
539 ) -> Result<Vec<Vec<PlanValue>>> {
540 if txn_id == TXN_ID_AUTO_COMMIT {
541 return Ok(Vec::new());
542 }
543
544 if table.schema.columns.is_empty() {
545 return Ok(Vec::new());
546 }
547
548 let Some(first_field_id) = table.schema.first_field_id() else {
549 return Ok(Vec::new());
550 };
551 let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
552
553 let row_ids = table.filter_row_ids(&filter_expr)?;
554 if row_ids.is_empty() {
555 return Ok(Vec::new());
556 }
557
558 let table_id = table.table_id();
559 let mut logical_fields: Vec<LogicalFieldId> =
560 Vec::with_capacity(table.schema.columns.len() + 2);
561 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
562 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
563 for column in &table.schema.columns {
564 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
565 }
566
567 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
568 let mut stream = table.stream_columns(
569 Arc::clone(&logical_fields),
570 row_ids,
571 GatherNullPolicy::IncludeNulls,
572 )?;
573
574 let mut rows = Vec::new();
575 while let Some(chunk) = stream.next_chunk()? {
576 let batch = chunk.record_batch();
577 if batch.num_columns() < table.schema.columns.len() + 2 {
578 continue;
579 }
580
581 let created_col = batch
582 .column(0)
583 .as_any()
584 .downcast_ref::<UInt64Array>()
585 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
586 let deleted_col = batch
587 .column(1)
588 .as_any()
589 .downcast_ref::<UInt64Array>()
590 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
591
592 for row_idx in 0..batch.num_rows() {
593 let created_by = if created_col.is_null(row_idx) {
594 TXN_ID_AUTO_COMMIT
595 } else {
596 created_col.value(row_idx)
597 };
598 if created_by != txn_id {
599 continue;
600 }
601
602 let deleted_by = if deleted_col.is_null(row_idx) {
603 TXN_ID_NONE
604 } else {
605 deleted_col.value(row_idx)
606 };
607 if deleted_by != TXN_ID_NONE {
608 continue;
609 }
610
611 let mut row_values = Vec::with_capacity(table.schema.columns.len());
612 for col_idx in 0..table.schema.columns.len() {
613 let array = batch.column(col_idx + 2);
614 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
615 row_values.push(value);
616 }
617 rows.push(row_values);
618 }
619 }
620
621 Ok(rows)
622 }
623
624 pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
626 if txn_id == TXN_ID_AUTO_COMMIT {
627 return Ok(());
628 }
629
630 let pending_tables = {
631 let guard = self.txn_tables_with_new_rows.read().unwrap();
632 guard.get(&txn_id).cloned()
633 };
634
635 let Some(tables) = pending_tables else {
636 return Ok(());
637 };
638
639 for canonical_name in tables {
640 let table = match self.lookup_table(&canonical_name) {
641 Ok(table) => table,
642 Err(Error::NotFound) => continue,
643 Err(err) => return Err(err),
644 };
645
646 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
647 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
648 continue;
649 };
650
651 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
652 if new_rows.is_empty() {
653 continue;
654 }
655
656 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
657 let table_for_fetch = Arc::clone(&table);
658 let snapshot = self.default_snapshot();
659
660 self.constraint_service.validate_primary_key_rows(
661 &constraint_ctx.schema_field_ids,
662 primary_key,
663 &column_order,
664 &new_rows,
665 |field_ids| {
666 self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
667 },
668 )?;
669 }
670
671 Ok(())
672 }
673
674 pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
676 if txn_id == TXN_ID_AUTO_COMMIT {
677 return;
678 }
679
680 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
681 guard.remove(&txn_id);
682 }
683
684 pub(super) fn remove_table_entry(&self, canonical_name: &str) {
689 let mut tables = self.tables.write().unwrap();
690 if tables.remove(canonical_name).is_some() {
691 tracing::trace!(
692 "remove_table_entry: removed table '{}' from context cache",
693 canonical_name
694 );
695 }
696 }
697
698 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
700 self.dropped_tables.read().unwrap().contains(canonical_name)
701 }
702}