llkv_runtime/runtime_context/
utils.rs1use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::DataType;
14use llkv_column_map::store::GatherNullPolicy;
15use llkv_column_map::types::LogicalFieldId;
16use llkv_executor::utils::parse_date32_literal;
17use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result};
20use llkv_storage::pager::Pager;
21use llkv_table::{FieldId, RowId};
22use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::Arc;
25
26use crate::TXN_ID_AUTO_COMMIT;
27use crate::TXN_ID_NONE;
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35 pub(super) fn coerce_plan_value_for_column(
39 &self,
40 value: PlanValue,
41 column: &ExecutorColumn,
42 ) -> Result<PlanValue> {
43 match value {
44 PlanValue::Null => Ok(PlanValue::Null),
45 PlanValue::Integer(v) => match &column.data_type {
46 DataType::Int64 => Ok(PlanValue::Integer(v)),
47 DataType::Float64 => Ok(PlanValue::Float(v as f64)),
48 DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
49 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
50 DataType::Date32 => {
51 let casted = i32::try_from(v).map_err(|_| {
52 Error::InvalidArgumentError(format!(
53 "integer literal out of range for DATE column '{}'",
54 column.name
55 ))
56 })?;
57 Ok(PlanValue::Integer(casted as i64))
58 }
59 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
60 "cannot assign integer to STRUCT column '{}'",
61 column.name
62 ))),
63 _ => Ok(PlanValue::Integer(v)),
64 },
65 PlanValue::Float(v) => match &column.data_type {
66 DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
67 DataType::Float64 => Ok(PlanValue::Float(v)),
68 DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
69 DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
70 DataType::Date32 => Err(Error::InvalidArgumentError(format!(
71 "cannot assign floating-point value to DATE column '{}'",
72 column.name
73 ))),
74 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
75 "cannot assign floating-point value to STRUCT column '{}'",
76 column.name
77 ))),
78 _ => Ok(PlanValue::Float(v)),
79 },
80 PlanValue::String(s) => match &column.data_type {
81 DataType::Boolean => {
82 let normalized = s.trim().to_ascii_lowercase();
83 match normalized.as_str() {
84 "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
85 "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
86 _ => Err(Error::InvalidArgumentError(format!(
87 "cannot assign string '{}' to BOOLEAN column '{}'",
88 s, column.name
89 ))),
90 }
91 }
92 DataType::Utf8 => Ok(PlanValue::String(s)),
93 DataType::Date32 => {
94 let days = parse_date32_literal(&s)?;
95 Ok(PlanValue::Integer(days as i64))
96 }
97 DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
98 "cannot assign string '{}' to numeric column '{}'",
99 s, column.name
100 ))),
101 DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
102 "cannot assign string to STRUCT column '{}'",
103 column.name
104 ))),
105 _ => Ok(PlanValue::String(s)),
106 },
107 PlanValue::Struct(map) => match &column.data_type {
108 DataType::Struct(_) => Ok(PlanValue::Struct(map)),
109 _ => Err(Error::InvalidArgumentError(format!(
110 "cannot assign struct value to column '{}'",
111 column.name
112 ))),
113 },
114 }
115 }
116
117 pub(super) fn scan_column_values(
124 &self,
125 table: &ExecutorTable<P>,
126 field_id: FieldId,
127 snapshot: TransactionSnapshot,
128 ) -> Result<Vec<PlanValue>> {
129 let table_id = table.table.table_id();
130 use llkv_expr::{Expr, Filter, Operator};
131 use std::ops::Bound;
132
133 let match_all_filter = Filter {
135 field_id,
136 op: Operator::Range {
137 lower: Bound::Unbounded,
138 upper: Bound::Unbounded,
139 },
140 };
141 let filter_expr = Expr::Pred(match_all_filter);
142
143 let row_ids = match table.table.filter_row_ids(&filter_expr) {
145 Ok(ids) => ids,
146 Err(Error::NotFound) => return Ok(Vec::new()),
147 Err(e) => return Err(e),
148 };
149
150 let row_ids = filter_row_ids_for_snapshot(
152 table.table.as_ref(),
153 row_ids,
154 &self.txn_manager,
155 snapshot,
156 )?;
157
158 if row_ids.is_empty() {
159 return Ok(Vec::new());
160 }
161
162 let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
164 let row_count = row_ids.len();
165 let mut stream = match table.table.stream_columns(
166 vec![logical_field_id],
167 row_ids,
168 GatherNullPolicy::IncludeNulls,
169 ) {
170 Ok(stream) => stream,
171 Err(Error::NotFound) => return Ok(Vec::new()),
172 Err(e) => return Err(e),
173 };
174
175 let mut values = Vec::with_capacity(row_count);
179 while let Some(chunk) = stream.next_batch()? {
180 let batch = chunk.batch();
181 if batch.num_columns() == 0 {
182 continue;
183 }
184 let array = batch.column(0);
185 for row_idx in 0..batch.num_rows() {
186 if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
187 values.push(value);
188 }
189 }
190 }
191
192 Ok(values)
193 }
194
195 pub(super) fn scan_multi_column_values(
203 &self,
204 table: &ExecutorTable<P>,
205 field_ids: &[FieldId],
206 snapshot: TransactionSnapshot,
207 ) -> Result<Vec<Vec<PlanValue>>> {
208 if field_ids.is_empty() {
209 return Ok(Vec::new());
210 }
211
212 let table_id = table.table.table_id();
213 use llkv_expr::{Expr, Filter, Operator};
214 use std::ops::Bound;
215
216 let match_all_filter = Filter {
217 field_id: field_ids[0],
218 op: Operator::Range {
219 lower: Bound::Unbounded,
220 upper: Bound::Unbounded,
221 },
222 };
223 let filter_expr = Expr::Pred(match_all_filter);
224
225 let row_ids = match table.table.filter_row_ids(&filter_expr) {
226 Ok(ids) => ids,
227 Err(Error::NotFound) => return Ok(Vec::new()),
228 Err(e) => return Err(e),
229 };
230
231 let row_ids = filter_row_ids_for_snapshot(
232 table.table.as_ref(),
233 row_ids,
234 &self.txn_manager,
235 snapshot,
236 )?;
237
238 if row_ids.is_empty() {
239 return Ok(Vec::new());
240 }
241
242 let logical_field_ids: Vec<_> = field_ids
243 .iter()
244 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
245 .collect();
246
247 let total_rows = row_ids.len();
248 let mut stream = match table.table.stream_columns(
249 logical_field_ids,
250 row_ids,
251 GatherNullPolicy::IncludeNulls,
252 ) {
253 Ok(stream) => stream,
254 Err(Error::NotFound) => return Ok(Vec::new()),
255 Err(e) => return Err(e),
256 };
257
258 let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
259 while let Some(chunk) = stream.next_batch()? {
260 let batch = chunk.batch();
261 if batch.num_columns() == 0 {
262 continue;
263 }
264
265 let base = chunk.row_offset();
266 let local_len = batch.num_rows();
267 for col_idx in 0..batch.num_columns() {
268 let array = batch.column(col_idx);
269 for local_idx in 0..local_len {
270 let target_index = base + local_idx;
271 debug_assert!(
272 target_index < rows.len(),
273 "stream chunk produced out-of-bounds row index"
274 );
275 if let Some(row) = rows.get_mut(target_index) {
276 match llkv_plan::plan_value_from_array(array, local_idx) {
277 Ok(value) => row.push(value),
278 Err(_) => row.push(PlanValue::Null),
279 }
280 }
281 }
282 }
283 }
284
285 Ok(rows)
286 }
287
288 pub(super) fn collect_row_values_for_ids(
292 &self,
293 table: &ExecutorTable<P>,
294 row_ids: &[RowId],
295 field_ids: &[FieldId],
296 ) -> Result<Vec<Vec<PlanValue>>> {
297 if row_ids.is_empty() || field_ids.is_empty() {
298 return Ok(Vec::new());
299 }
300
301 let table_id = table.table.table_id();
302 let logical_field_ids: Vec<LogicalFieldId> = field_ids
303 .iter()
304 .map(|&fid| LogicalFieldId::for_user(table_id, fid))
305 .collect();
306
307 let mut stream = match table.table.stream_columns(
308 logical_field_ids.clone(),
309 row_ids.to_vec(),
310 GatherNullPolicy::IncludeNulls,
311 ) {
312 Ok(stream) => stream,
313 Err(Error::NotFound) => return Ok(Vec::new()),
314 Err(e) => return Err(e),
315 };
316
317 let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
318 while let Some(chunk) = stream.next_batch()? {
319 let batch = chunk.batch();
320 let base = chunk.row_offset();
321 let local_len = batch.num_rows();
322 for col_idx in 0..batch.num_columns() {
323 let array = batch.column(col_idx);
324 for local_idx in 0..local_len {
325 let target_index = base + local_idx;
326 if let Some(row) = rows.get_mut(target_index) {
327 let value = llkv_plan::plan_value_from_array(array, local_idx)?;
328 row.push(value);
329 }
330 }
331 }
332 }
333
334 Ok(rows)
335 }
336
337 pub(super) fn filter_visible_row_ids(
341 &self,
342 table: &ExecutorTable<P>,
343 row_ids: Vec<RowId>,
344 snapshot: TransactionSnapshot,
345 ) -> Result<Vec<RowId>> {
346 filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
347 }
348
349 pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
353 if txn_id == TXN_ID_AUTO_COMMIT {
354 return;
355 }
356
357 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
358 guard.entry(txn_id).or_default().insert(canonical_name);
359 }
360
361 pub(super) fn collect_rows_created_by_txn(
365 &self,
366 table: &ExecutorTable<P>,
367 txn_id: TxnId,
368 ) -> Result<Vec<Vec<PlanValue>>> {
369 if txn_id == TXN_ID_AUTO_COMMIT {
370 return Ok(Vec::new());
371 }
372
373 if table.schema.columns.is_empty() {
374 return Ok(Vec::new());
375 }
376
377 let Some(first_field_id) = table.schema.first_field_id() else {
378 return Ok(Vec::new());
379 };
380 let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
381
382 let row_ids = table.table.filter_row_ids(&filter_expr)?;
383 if row_ids.is_empty() {
384 return Ok(Vec::new());
385 }
386
387 let table_id = table.table.table_id();
388 let mut logical_fields: Vec<LogicalFieldId> =
389 Vec::with_capacity(table.schema.columns.len() + 2);
390 logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
391 logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
392 for column in &table.schema.columns {
393 logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
394 }
395
396 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
397 let mut stream = table.table.stream_columns(
398 Arc::clone(&logical_fields),
399 row_ids,
400 GatherNullPolicy::IncludeNulls,
401 )?;
402
403 let mut rows = Vec::new();
404 while let Some(chunk) = stream.next_batch()? {
405 let batch = chunk.batch();
406 if batch.num_columns() < table.schema.columns.len() + 2 {
407 continue;
408 }
409
410 let created_col = batch
411 .column(0)
412 .as_any()
413 .downcast_ref::<UInt64Array>()
414 .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
415 let deleted_col = batch
416 .column(1)
417 .as_any()
418 .downcast_ref::<UInt64Array>()
419 .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
420
421 for row_idx in 0..batch.num_rows() {
422 let created_by = if created_col.is_null(row_idx) {
423 TXN_ID_AUTO_COMMIT
424 } else {
425 created_col.value(row_idx)
426 };
427 if created_by != txn_id {
428 continue;
429 }
430
431 let deleted_by = if deleted_col.is_null(row_idx) {
432 TXN_ID_NONE
433 } else {
434 deleted_col.value(row_idx)
435 };
436 if deleted_by != TXN_ID_NONE {
437 continue;
438 }
439
440 let mut row_values = Vec::with_capacity(table.schema.columns.len());
441 for col_idx in 0..table.schema.columns.len() {
442 let array = batch.column(col_idx + 2);
443 let value = llkv_plan::plan_value_from_array(array, row_idx)?;
444 row_values.push(value);
445 }
446 rows.push(row_values);
447 }
448 }
449
450 Ok(rows)
451 }
452
453 pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
455 if txn_id == TXN_ID_AUTO_COMMIT {
456 return Ok(());
457 }
458
459 let pending_tables = {
460 let guard = self.txn_tables_with_new_rows.read().unwrap();
461 guard.get(&txn_id).cloned()
462 };
463
464 let Some(tables) = pending_tables else {
465 return Ok(());
466 };
467
468 for canonical_name in tables {
469 let table = match self.lookup_table(&canonical_name) {
470 Ok(table) => table,
471 Err(Error::NotFound) => continue,
472 Err(err) => return Err(err),
473 };
474
475 let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
476 let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
477 continue;
478 };
479
480 let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
481 if new_rows.is_empty() {
482 continue;
483 }
484
485 let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
486 let table_for_fetch = Arc::clone(&table);
487 let snapshot = self.default_snapshot();
488
489 self.constraint_service.validate_primary_key_rows(
490 &constraint_ctx.schema_field_ids,
491 primary_key,
492 &column_order,
493 &new_rows,
494 |field_ids| {
495 self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
496 },
497 )?;
498 }
499
500 Ok(())
501 }
502
503 pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
505 if txn_id == TXN_ID_AUTO_COMMIT {
506 return;
507 }
508
509 let mut guard = self.txn_tables_with_new_rows.write().unwrap();
510 guard.remove(&txn_id);
511 }
512
513 pub(super) fn remove_table_entry(&self, canonical_name: &str) {
518 let mut tables = self.tables.write().unwrap();
519 if tables.remove(canonical_name).is_some() {
520 tracing::trace!(
521 "remove_table_entry: removed table '{}' from context cache",
522 canonical_name
523 );
524 }
525 }
526
527 pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
529 self.dropped_tables.read().unwrap().contains(canonical_name)
530 }
531}