1use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::DataType;
14use llkv_column_map::store::GatherNullPolicy;
15use llkv_column_map::types::LogicalFieldId;
16use llkv_executor::utils::parse_date32_literal;
17use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result};
20use llkv_storage::pager::Pager;
21use llkv_table::{FieldId, RowId};
22use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::Arc;
25
26use crate::TXN_ID_AUTO_COMMIT;
27use crate::TXN_ID_NONE;
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35 pub(super) fn coerce_plan_value_for_column(
39 &self,
40 value: PlanValue,
41 column: &ExecutorColumn,
42 ) -> Result<PlanValue> {
43 match value {
44 PlanValue::Null => Ok(PlanValue::Null),
45 PlanValue::Integer(v) => match &column.data_type {
46 DataType::Int64 => Ok(PlanValue::Integer(v)),
47 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
48 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
49 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
50 DataType::Date32 => {
51 let casted = i32::try_from(v).map_err(|_| {
52 Error::InvalidArgumentError(format!(
53 "integer literal out of range for DATE column '{}'",
54 column.name
55 ))
56 })?;
57 Ok(PlanValue::Integer(casted as i64))
58 }
59 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
60 "cannot assign integer to STRUCT column '{}'",
61 column.name
62 ))),
63 _ => Ok(PlanValue::Integer(v)),
64 },
65 PlanValue::Float(v) => match &column.data_type {
66 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
67 DataType::Float64 => Ok(PlanValue::Float(v)),
68 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
69 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
70 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
71 "cannot assign floating-point value to DATE column '{}'",
72 column.name
73 ))),
74 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
75 "cannot assign floating-point value to STRUCT column '{}'",
76 column.name
77 ))),
78 _ => Ok(PlanValue::Float(v)),
79 },
80 PlanValue::String(s) => match &column.data_type {
81 DataType::Boolean => {
82 let normalized = s.trim().to_ascii_lowercase();
83 match normalized.as_str() {
84 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
85 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
86 _ => Err(Error::InvalidArgumentError(format!(
87 "cannot assign string '{}' to BOOLEAN column '{}'",
88 s, column.name
89 ))),
90 }
91 }
92 DataType::Utf8 => Ok(PlanValue::String(s)),
93 DataType::Date32 => {
94 let days = parse_date32_literal(&s)?;
95 Ok(PlanValue::Integer(days as i64))
96 }
97 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
98 "cannot assign string '{}' to numeric column '{}'",
99 s, column.name
100 ))),
101 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
102 "cannot assign string to STRUCT column '{}'",
103 column.name
104 ))),
105 _ => Ok(PlanValue::String(s)),
106 },
107 PlanValue::Struct(map) => match &column.data_type {
108 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
109 _ => Err(Error::InvalidArgumentError(format!(
110 "cannot assign struct value to column '{}'",
111 column.name
112 ))),
113 },
114 }
115 }
116
117 pub(super) fn scan_column_values(
124 &self,
125 table: &ExecutorTable<P>,
126 field_id: FieldId,
127 snapshot: TransactionSnapshot,
128 ) -> Result<Vec<PlanValue>> {
129 let table_id = table.table.table_id();
130 use llkv_expr::{Expr, Filter, Operator};
131 use std::ops::Bound;
132
133 let match_all_filter = Filter {
135 field_id,
136 op: Operator::Range {
137 lower: Bound::Unbounded,
138 upper: Bound::Unbounded,
139 },
140 };
141 let filter_expr = Expr::Pred(match_all_filter);
142
143 let row_ids = match table.table.filter_row_ids(&filter_expr) {
145 Ok(ids) => ids,
146 Err(Error::NotFound) => return Ok(Vec::new()),
147 Err(e) => return Err(e),
148 };
149
150 let row_ids = filter_row_ids_for_snapshot(
152 table.table.as_ref(),
153 row_ids,
154 &self.txn_manager,
155 snapshot,
156 )?;
157
158 if row_ids.is_empty() {
159 return Ok(Vec::new());
160 }
161
162 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
164 let row_count = row_ids.len();
165 let mut stream = match table.table.stream_columns(
166 vec![logical_field_id],
167 row_ids,
168 GatherNullPolicy::IncludeNulls,
169 ) {
170 Ok(stream) => stream,
171 Err(Error::NotFound) => return Ok(Vec::new()),
172 Err(e) => return Err(e),
173 };
174
175 let mut values = Vec::with_capacity(row_count);
179 while let Some(chunk) = stream.next_batch()? {
180 let batch = chunk.batch();
181 if batch.num_columns() == 0 {
182 continue;
183 }
184 let array = batch.column(0);
185 for row_idx in 0..batch.num_rows() {
186 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
187 values.push(value);
188 }
189 }
190 }
191
192 Ok(values)
193 }
194
195 pub(super) fn scan_multi_column_values(
203 &self,
204 table: &ExecutorTable<P>,
205 field_ids: &[FieldId],
206 snapshot: TransactionSnapshot,
207 ) -> Result<Vec<Vec<PlanValue>>> {
208 if field_ids.is_empty() {
209 return Ok(Vec::new());
210 }
211
212 let table_id = table.table.table_id();
213 use llkv_expr::{Expr, Filter, Operator};
214 use std::ops::Bound;
215
216 let match_all_filter = Filter {
217 field_id: field_ids[0],
218 op: Operator::Range {
219 lower: Bound::Unbounded,
220 upper: Bound::Unbounded,
221 },
222 };
223 let filter_expr = Expr::Pred(match_all_filter);
224
225 let row_ids = match table.table.filter_row_ids(&filter_expr) {
226 Ok(ids) => ids,
227 Err(Error::NotFound) => return Ok(Vec::new()),
228 Err(e) => return Err(e),
229 };
230
231 let row_ids = filter_row_ids_for_snapshot(
232 table.table.as_ref(),
233 row_ids,
234 &self.txn_manager,
235 snapshot,
236 )?;
237
238 if row_ids.is_empty() {
239 return Ok(Vec::new());
240 }
241
242 let logical_field_ids: Vec<_> = field_ids
243 .iter()
244 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
245 .collect();
246
247 let total_rows = row_ids.len();
248 let mut stream = match table.table.stream_columns(
249 logical_field_ids,
250 row_ids,
251 GatherNullPolicy::IncludeNulls,
252 ) {
253 Ok(stream) => stream,
254 Err(Error::NotFound) => return Ok(Vec::new()),
255 Err(e) => return Err(e),
256 };
257
258 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
259 while let Some(chunk) = stream.next_batch()? {
260 let batch = chunk.batch();
261 if batch.num_columns() == 0 {
262 continue;
263 }
264
265 let base = chunk.row_offset();
266 let local_len = batch.num_rows();
267 for col_idx in 0..batch.num_columns() {
268 let array = batch.column(col_idx);
269 for local_idx in 0..local_len {
270 let target_index = base + local_idx;
271 debug_assert!(
272 target_index < rows.len(),
273 "stream chunk produced out-of-bounds row index"
274 );
275 if let Some(row) = rows.get_mut(target_index) {
276 match llkv_plan::plan_value_from_array(array, local_idx) {
277 Ok(value) => row.push(value),
278 Err(_) => row.push(PlanValue::Null),
279 }
280 }
281 }
282 }
283 }
284
285 Ok(rows)
286 }
287
288 pub(super) fn scan_multi_column_values_for_fk_check(
294 &self,
295 table: &ExecutorTable<P>,
296 field_ids: &[FieldId],
297 snapshot: TransactionSnapshot,
298 ) -> Result<Vec<Vec<PlanValue>>> {
299 if field_ids.is_empty() {
300 return Ok(Vec::new());
301 }
302
303 let table_id = table.table.table_id();
304 use llkv_expr::{Expr, Filter, Operator};
305 use std::ops::Bound;
306
307 let match_all_filter = Filter {
308 field_id: field_ids[0],
309 op: Operator::Range {
310 lower: Bound::Unbounded,
311 upper: Bound::Unbounded,
312 },
313 };
314 let filter_expr = Expr::Pred(match_all_filter);
315
316 let row_ids = match table.table.filter_row_ids(&filter_expr) {
317 Ok(ids) => ids,
318 Err(Error::NotFound) => return Ok(Vec::new()),
319 Err(e) => return Err(e),
320 };
321
322 let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
324 table.table.as_ref(),
325 row_ids,
326 &self.txn_manager,
327 snapshot,
328 )?;
329
330 if row_ids.is_empty() {
331 return Ok(Vec::new());
332 }
333
334 let logical_field_ids: Vec<_> = field_ids
335 .iter()
336 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
337 .collect();
338
339 let total_rows = row_ids.len();
340 let mut stream = match table.table.stream_columns(
341 logical_field_ids,
342 row_ids,
343 GatherNullPolicy::IncludeNulls,
344 ) {
345 Ok(stream) => stream,
346 Err(Error::NotFound) => return Ok(Vec::new()),
347 Err(e) => return Err(e),
348 };
349
350 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
351 while let Some(chunk) = stream.next_batch()? {
352 let batch = chunk.batch();
353 if batch.num_columns() == 0 {
354 continue;
355 }
356
357 let base = chunk.row_offset();
358 let local_len = batch.num_rows();
359 for col_idx in 0..batch.num_columns() {
360 let array = batch.column(col_idx);
361 for local_idx in 0..local_len {
362 let target_index = base + local_idx;
363 debug_assert!(
364 target_index < rows.len(),
365 "stream chunk produced out-of-bounds row index"
366 );
367 if let Some(row) = rows.get_mut(target_index) {
368 match llkv_plan::plan_value_from_array(array, local_idx) {
369 Ok(value) => row.push(value),
370 Err(_) => row.push(PlanValue::Null),
371 }
372 }
373 }
374 }
375 }
376
377 Ok(rows)
378 }
379
380 pub(super) fn collect_row_values_for_ids(
384 &self,
385 table: &ExecutorTable<P>,
386 row_ids: &[RowId],
387 field_ids: &[FieldId],
388 ) -> Result<Vec<Vec<PlanValue>>> {
389 if row_ids.is_empty() || field_ids.is_empty() {
390 return Ok(Vec::new());
391 }
392
393 let table_id = table.table.table_id();
394 let logical_field_ids: Vec<LogicalFieldId> = field_ids
395 .iter()
396 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
397 .collect();
398
399 let mut stream = match table.table.stream_columns(
400 logical_field_ids.clone(),
401 row_ids.to_vec(),
402 GatherNullPolicy::IncludeNulls,
403 ) {
404 Ok(stream) => stream,
405 Err(Error::NotFound) => return Ok(Vec::new()),
406 Err(e) => return Err(e),
407 };
408
409 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
410 while let Some(chunk) = stream.next_batch()? {
411 let batch = chunk.batch();
412 let base = chunk.row_offset();
413 let local_len = batch.num_rows();
414 for col_idx in 0..batch.num_columns() {
415 let array = batch.column(col_idx);
416 for local_idx in 0..local_len {
417 let target_index = base + local_idx;
418 if let Some(row) = rows.get_mut(target_index) {
419 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
420 row.push(value);
421 }
422 }
423 }
424 }
425
426 Ok(rows)
427 }
428
429 pub(super) fn filter_visible_row_ids(
433 &self,
434 table: &ExecutorTable<P>,
435 row_ids: Vec<RowId>,
436 snapshot: TransactionSnapshot,
437 ) -> Result<Vec<RowId>> {
438 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
439 }
440
441 pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
445 if txn_id == TXN_ID_AUTO_COMMIT {
446 return;
447 }
448
449 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
450 guard.entry(txn_id).or_default().insert(canonical_name);
451 }
452
453 pub(super) fn collect_rows_created_by_txn(
457 &self,
458 table: &ExecutorTable<P>,
459 txn_id: TxnId,
460 ) -> Result<Vec<Vec<PlanValue>>> {
461 if txn_id == TXN_ID_AUTO_COMMIT {
462 return Ok(Vec::new());
463 }
464
465 if table.schema.columns.is_empty() {
466 return Ok(Vec::new());
467 }
468
469 let Some(first_field_id) = table.schema.first_field_id() else {
470 return Ok(Vec::new());
471 };
472 let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
473
474 let row_ids = table.table.filter_row_ids(&filter_expr)?;
475 if row_ids.is_empty() {
476 return Ok(Vec::new());
477 }
478
479 let table_id = table.table.table_id();
480 let mut logical_fields: Vec<LogicalFieldId> =
481 Vec::with_capacity(table.schema.columns.len() + 2);
482 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
483 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
484 for column in &table.schema.columns {
485 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
486 }
487
488 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
489 let mut stream = table.table.stream_columns(
490 Arc::clone(&logical_fields),
491 row_ids,
492 GatherNullPolicy::IncludeNulls,
493 )?;
494
495 let mut rows = Vec::new();
496 while let Some(chunk) = stream.next_batch()? {
497 let batch = chunk.batch();
498 if batch.num_columns() < table.schema.columns.len() + 2 {
499 continue;
500 }
501
502 let created_col = batch
503 .column(0)
504 .as_any()
505 .downcast_ref::<UInt64Array>()
506 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
507 let deleted_col = batch
508 .column(1)
509 .as_any()
510 .downcast_ref::<UInt64Array>()
511 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
512
513 for row_idx in 0..batch.num_rows() {
514 let created_by = if created_col.is_null(row_idx) {
515 TXN_ID_AUTO_COMMIT
516 } else {
517 created_col.value(row_idx)
518 };
519 if created_by != txn_id {
520 continue;
521 }
522
523 let deleted_by = if deleted_col.is_null(row_idx) {
524 TXN_ID_NONE
525 } else {
526 deleted_col.value(row_idx)
527 };
528 if deleted_by != TXN_ID_NONE {
529 continue;
530 }
531
532 let mut row_values = Vec::with_capacity(table.schema.columns.len());
533 for col_idx in 0..table.schema.columns.len() {
534 let array = batch.column(col_idx + 2);
535 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
536 row_values.push(value);
537 }
538 rows.push(row_values);
539 }
540 }
541
542 Ok(rows)
543 }
544
545 pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
547 if txn_id == TXN_ID_AUTO_COMMIT {
548 return Ok(());
549 }
550
551 let pending_tables = {
552 let guard = self.txn_tables_with_new_rows.read().unwrap();
553 guard.get(&txn_id).cloned()
554 };
555
556 let Some(tables) = pending_tables else {
557 return Ok(());
558 };
559
560 for canonical_name in tables {
561 let table = match self.lookup_table(&canonical_name) {
562 Ok(table) => table,
563 Err(Error::NotFound) => continue,
564 Err(err) => return Err(err),
565 };
566
567 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
568 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
569 continue;
570 };
571
572 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
573 if new_rows.is_empty() {
574 continue;
575 }
576
577 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
578 let table_for_fetch = Arc::clone(&table);
579 let snapshot = self.default_snapshot();
580
581 self.constraint_service.validate_primary_key_rows(
582 &constraint_ctx.schema_field_ids,
583 primary_key,
584 &column_order,
585 &new_rows,
586 |field_ids| {
587 self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
588 },
589 )?;
590 }
591
592 Ok(())
593 }
594
595 pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
597 if txn_id == TXN_ID_AUTO_COMMIT {
598 return;
599 }
600
601 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
602 guard.remove(&txn_id);
603 }
604
605 pub(super) fn remove_table_entry(&self, canonical_name: &str) {
610 let mut tables = self.tables.write().unwrap();
611 if tables.remove(canonical_name).is_some() {
612 tracing::trace!(
613 "remove_table_entry: removed table '{}' from context cache",
614 canonical_name
615 );
616 }
617 }
618
619 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
621 self.dropped_tables.read().unwrap().contains(canonical_name)
622 }
623}