llkv_runtime/runtime_context/
utils.rs

1//! General utility methods for RuntimeContext operations.
2//!
3//! This module contains shared helper functions used across multiple DML and DDL operations:
4//! - Type coercion for INSERT/UPDATE value normalization
5//! - Column scanning for INDEX creation and constraint validation
6//! - Multi-column scanning for UNIQUE/PRIMARY KEY/FOREIGN KEY constraints
7//! - Row collection for FK validation
8//! - MVCC visibility filtering for transaction isolation
9//! - Transaction state tracking
10//! - Table cache management
11
12use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::DataType;
14use llkv_column_map::store::GatherNullPolicy;
15use llkv_column_map::types::LogicalFieldId;
16use llkv_executor::utils::parse_date32_literal;
17use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result};
20use llkv_storage::pager::Pager;
21use llkv_table::{FieldId, RowId};
22use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::Arc;
25
26use crate::TXN_ID_AUTO_COMMIT;
27use crate::TXN_ID_NONE;
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35    /// Coerce a PlanValue to match the expected column type.
36    ///
37    /// Used during INSERT and UPDATE operations to ensure type compatibility.
38    pub(super) fn coerce_plan_value_for_column(
39        &self,
40        value: PlanValue,
41        column: &ExecutorColumn,
42    ) -> Result<PlanValue> {
43        match value {
44            PlanValue::Null => Ok(PlanValue::Null),
45            PlanValue::Integer(v) => match &column.data_type {
46                DataType::Int64 => Ok(PlanValue::Integer(v)),
47                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
48                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
49                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
50                DataType::Date32 => {
51                    let casted = i32::try_from(v).map_err(|_| {
52                        Error::InvalidArgumentError(format!(
53                            "integer literal out of range for DATE column '{}'",
54                            column.name
55                        ))
56                    })?;
57                    Ok(PlanValue::Integer(casted as i64))
58                }
59                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
60                    "cannot assign integer to STRUCT column '{}'",
61                    column.name
62                ))),
63                _ => Ok(PlanValue::Integer(v)),
64            },
65            PlanValue::Float(v) => match &column.data_type {
66                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
67                DataType::Float64 => Ok(PlanValue::Float(v)),
68                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
69                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
70                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
71                    "cannot assign floating-point value to DATE column '{}'",
72                    column.name
73                ))),
74                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
75                    "cannot assign floating-point value to STRUCT column '{}'",
76                    column.name
77                ))),
78                _ => Ok(PlanValue::Float(v)),
79            },
80            PlanValue::String(s) => match &column.data_type {
81                DataType::Boolean => {
82                    let normalized = s.trim().to_ascii_lowercase();
83                    match normalized.as_str() {
84                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
85                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
86                        _ => Err(Error::InvalidArgumentError(format!(
87                            "cannot assign string '{}' to BOOLEAN column '{}'",
88                            s, column.name
89                        ))),
90                    }
91                }
92                DataType::Utf8 => Ok(PlanValue::String(s)),
93                DataType::Date32 => {
94                    let days = parse_date32_literal(&s)?;
95                    Ok(PlanValue::Integer(days as i64))
96                }
97                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
98                    "cannot assign string '{}' to numeric column '{}'",
99                    s, column.name
100                ))),
101                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
102                    "cannot assign string to STRUCT column '{}'",
103                    column.name
104                ))),
105                _ => Ok(PlanValue::String(s)),
106            },
107            PlanValue::Struct(map) => match &column.data_type {
108                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
109                _ => Err(Error::InvalidArgumentError(format!(
110                    "cannot assign struct value to column '{}'",
111                    column.name
112                ))),
113            },
114        }
115    }
116
117    /// Scan a single column and materialize values into memory.
118    ///
119    /// Used during CREATE INDEX operations and constraint validation.
120    ///
121    /// NOTE: Current implementation buffers the entire result set; convert to a
122    /// streaming iterator once executor-side consumers support incremental consumption.
123    pub(super) fn scan_column_values(
124        &self,
125        table: &ExecutorTable<P>,
126        field_id: FieldId,
127        snapshot: TransactionSnapshot,
128    ) -> Result<Vec<PlanValue>> {
129        let table_id = table.table.table_id();
130        use llkv_expr::{Expr, Filter, Operator};
131        use std::ops::Bound;
132
133        // Create a filter that matches all rows (unbounded range)
134        let match_all_filter = Filter {
135            field_id,
136            op: Operator::Range {
137                lower: Bound::Unbounded,
138                upper: Bound::Unbounded,
139            },
140        };
141        let filter_expr = Expr::Pred(match_all_filter);
142
143        // Get all matching row_ids first
144        let row_ids = match table.table.filter_row_ids(&filter_expr) {
145            Ok(ids) => ids,
146            Err(Error::NotFound) => return Ok(Vec::new()),
147            Err(e) => return Err(e),
148        };
149
150        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
151        let row_ids = filter_row_ids_for_snapshot(
152            table.table.as_ref(),
153            row_ids,
154            &self.txn_manager,
155            snapshot,
156        )?;
157
158        if row_ids.is_empty() {
159            return Ok(Vec::new());
160        }
161
162        // Gather the column values for visible rows
163        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
164        let row_count = row_ids.len();
165        let mut stream = match table.table.stream_columns(
166            vec![logical_field_id],
167            row_ids,
168            GatherNullPolicy::IncludeNulls,
169        ) {
170            Ok(stream) => stream,
171            Err(Error::NotFound) => return Ok(Vec::new()),
172            Err(e) => return Err(e),
173        };
174
175        // TODO: Don't buffer all values; make this streamable
176        // NOTE: Values are accumulated eagerly; revisit when `llkv-plan` supports
177        // incremental parameter binding.
178        let mut values = Vec::with_capacity(row_count);
179        while let Some(chunk) = stream.next_batch()? {
180            let batch = chunk.batch();
181            if batch.num_columns() == 0 {
182                continue;
183            }
184            let array = batch.column(0);
185            for row_idx in 0..batch.num_rows() {
186                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
187                    values.push(value);
188                }
189            }
190        }
191
192        Ok(values)
193    }
194
195    /// Scan a set of columns and materialize rows into memory.
196    ///
197    /// Used during constraint validation (multi-column UNIQUE, PRIMARY KEY)
198    /// and CREATE INDEX operations.
199    ///
200    /// NOTE: Similar to [`Self::scan_column_values`], this buffers eagerly pending
201    /// enhancements to the executor pipeline.
202    pub(super) fn scan_multi_column_values(
203        &self,
204        table: &ExecutorTable<P>,
205        field_ids: &[FieldId],
206        snapshot: TransactionSnapshot,
207    ) -> Result<Vec<Vec<PlanValue>>> {
208        if field_ids.is_empty() {
209            return Ok(Vec::new());
210        }
211
212        let table_id = table.table.table_id();
213        use llkv_expr::{Expr, Filter, Operator};
214        use std::ops::Bound;
215
216        let match_all_filter = Filter {
217            field_id: field_ids[0],
218            op: Operator::Range {
219                lower: Bound::Unbounded,
220                upper: Bound::Unbounded,
221            },
222        };
223        let filter_expr = Expr::Pred(match_all_filter);
224
225        let row_ids = match table.table.filter_row_ids(&filter_expr) {
226            Ok(ids) => ids,
227            Err(Error::NotFound) => return Ok(Vec::new()),
228            Err(e) => return Err(e),
229        };
230
231        let row_ids = filter_row_ids_for_snapshot(
232            table.table.as_ref(),
233            row_ids,
234            &self.txn_manager,
235            snapshot,
236        )?;
237
238        if row_ids.is_empty() {
239            return Ok(Vec::new());
240        }
241
242        let logical_field_ids: Vec<_> = field_ids
243            .iter()
244            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
245            .collect();
246
247        let total_rows = row_ids.len();
248        let mut stream = match table.table.stream_columns(
249            logical_field_ids,
250            row_ids,
251            GatherNullPolicy::IncludeNulls,
252        ) {
253            Ok(stream) => stream,
254            Err(Error::NotFound) => return Ok(Vec::new()),
255            Err(e) => return Err(e),
256        };
257
258        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
259        while let Some(chunk) = stream.next_batch()? {
260            let batch = chunk.batch();
261            if batch.num_columns() == 0 {
262                continue;
263            }
264
265            let base = chunk.row_offset();
266            let local_len = batch.num_rows();
267            for col_idx in 0..batch.num_columns() {
268                let array = batch.column(col_idx);
269                for local_idx in 0..local_len {
270                    let target_index = base + local_idx;
271                    debug_assert!(
272                        target_index < rows.len(),
273                        "stream chunk produced out-of-bounds row index"
274                    );
275                    if let Some(row) = rows.get_mut(target_index) {
276                        match llkv_plan::plan_value_from_array(array, local_idx) {
277                            Ok(value) => row.push(value),
278                            Err(_) => row.push(PlanValue::Null),
279                        }
280                    }
281                }
282            }
283        }
284
285        Ok(rows)
286    }
287
288    /// Scan multi-column values with FK-aware visibility semantics.
289    ///
290    /// This is similar to [`Self::scan_multi_column_values`], but uses FK-specific
291    /// MVCC visibility rules where rows deleted by the current transaction are treated
292    /// as still visible (matching SQL standard FK constraint checking behavior).
293    pub(super) fn scan_multi_column_values_for_fk_check(
294        &self,
295        table: &ExecutorTable<P>,
296        field_ids: &[FieldId],
297        snapshot: TransactionSnapshot,
298    ) -> Result<Vec<Vec<PlanValue>>> {
299        if field_ids.is_empty() {
300            return Ok(Vec::new());
301        }
302
303        let table_id = table.table.table_id();
304        use llkv_expr::{Expr, Filter, Operator};
305        use std::ops::Bound;
306
307        let match_all_filter = Filter {
308            field_id: field_ids[0],
309            op: Operator::Range {
310                lower: Bound::Unbounded,
311                upper: Bound::Unbounded,
312            },
313        };
314        let filter_expr = Expr::Pred(match_all_filter);
315
316        let row_ids = match table.table.filter_row_ids(&filter_expr) {
317            Ok(ids) => ids,
318            Err(Error::NotFound) => return Ok(Vec::new()),
319            Err(e) => return Err(e),
320        };
321
322        // Use FK-specific filtering that treats deleted rows as still visible
323        let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
324            table.table.as_ref(),
325            row_ids,
326            &self.txn_manager,
327            snapshot,
328        )?;
329
330        if row_ids.is_empty() {
331            return Ok(Vec::new());
332        }
333
334        let logical_field_ids: Vec<_> = field_ids
335            .iter()
336            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
337            .collect();
338
339        let total_rows = row_ids.len();
340        let mut stream = match table.table.stream_columns(
341            logical_field_ids,
342            row_ids,
343            GatherNullPolicy::IncludeNulls,
344        ) {
345            Ok(stream) => stream,
346            Err(Error::NotFound) => return Ok(Vec::new()),
347            Err(e) => return Err(e),
348        };
349
350        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
351        while let Some(chunk) = stream.next_batch()? {
352            let batch = chunk.batch();
353            if batch.num_columns() == 0 {
354                continue;
355            }
356
357            let base = chunk.row_offset();
358            let local_len = batch.num_rows();
359            for col_idx in 0..batch.num_columns() {
360                let array = batch.column(col_idx);
361                for local_idx in 0..local_len {
362                    let target_index = base + local_idx;
363                    debug_assert!(
364                        target_index < rows.len(),
365                        "stream chunk produced out-of-bounds row index"
366                    );
367                    if let Some(row) = rows.get_mut(target_index) {
368                        match llkv_plan::plan_value_from_array(array, local_idx) {
369                            Ok(value) => row.push(value),
370                            Err(_) => row.push(PlanValue::Null),
371                        }
372                    }
373                }
374            }
375        }
376
377        Ok(rows)
378    }
379
380    /// Collect row values for specific row IDs and field IDs.
381    ///
382    /// Used during foreign key validation to gather referenced/referencing values.
383    pub(super) fn collect_row_values_for_ids(
384        &self,
385        table: &ExecutorTable<P>,
386        row_ids: &[RowId],
387        field_ids: &[FieldId],
388    ) -> Result<Vec<Vec<PlanValue>>> {
389        if row_ids.is_empty() || field_ids.is_empty() {
390            return Ok(Vec::new());
391        }
392
393        let table_id = table.table.table_id();
394        let logical_field_ids: Vec<LogicalFieldId> = field_ids
395            .iter()
396            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
397            .collect();
398
399        let mut stream = match table.table.stream_columns(
400            logical_field_ids.clone(),
401            row_ids.to_vec(),
402            GatherNullPolicy::IncludeNulls,
403        ) {
404            Ok(stream) => stream,
405            Err(Error::NotFound) => return Ok(Vec::new()),
406            Err(e) => return Err(e),
407        };
408
409        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
410        while let Some(chunk) = stream.next_batch()? {
411            let batch = chunk.batch();
412            let base = chunk.row_offset();
413            let local_len = batch.num_rows();
414            for col_idx in 0..batch.num_columns() {
415                let array = batch.column(col_idx);
416                for local_idx in 0..local_len {
417                    let target_index = base + local_idx;
418                    if let Some(row) = rows.get_mut(target_index) {
419                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
420                        row.push(value);
421                    }
422                }
423            }
424        }
425
426        Ok(rows)
427    }
428
429    /// Filter row IDs to only include those visible in the given snapshot.
430    ///
431    /// Used across all DML operations to ensure MVCC transaction isolation.
432    pub(super) fn filter_visible_row_ids(
433        &self,
434        table: &ExecutorTable<P>,
435        row_ids: Vec<RowId>,
436        snapshot: TransactionSnapshot,
437    ) -> Result<Vec<RowId>> {
438        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
439    }
440
441    /// Record that a transaction has inserted rows into a table.
442    ///
443    /// Used for deferred PRIMARY KEY validation at commit time.
444    pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
445        if txn_id == TXN_ID_AUTO_COMMIT {
446            return;
447        }
448
449        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
450        guard.entry(txn_id).or_default().insert(canonical_name);
451    }
452
453    /// Collect all rows created by a specific transaction.
454    ///
455    /// Used for PRIMARY KEY validation at commit time.
456    pub(super) fn collect_rows_created_by_txn(
457        &self,
458        table: &ExecutorTable<P>,
459        txn_id: TxnId,
460    ) -> Result<Vec<Vec<PlanValue>>> {
461        if txn_id == TXN_ID_AUTO_COMMIT {
462            return Ok(Vec::new());
463        }
464
465        if table.schema.columns.is_empty() {
466            return Ok(Vec::new());
467        }
468
469        let Some(first_field_id) = table.schema.first_field_id() else {
470            return Ok(Vec::new());
471        };
472        let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
473
474        let row_ids = table.table.filter_row_ids(&filter_expr)?;
475        if row_ids.is_empty() {
476            return Ok(Vec::new());
477        }
478
479        let table_id = table.table.table_id();
480        let mut logical_fields: Vec<LogicalFieldId> =
481            Vec::with_capacity(table.schema.columns.len() + 2);
482        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
483        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
484        for column in &table.schema.columns {
485            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
486        }
487
488        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
489        let mut stream = table.table.stream_columns(
490            Arc::clone(&logical_fields),
491            row_ids,
492            GatherNullPolicy::IncludeNulls,
493        )?;
494
495        let mut rows = Vec::new();
496        while let Some(chunk) = stream.next_batch()? {
497            let batch = chunk.batch();
498            if batch.num_columns() < table.schema.columns.len() + 2 {
499                continue;
500            }
501
502            let created_col = batch
503                .column(0)
504                .as_any()
505                .downcast_ref::<UInt64Array>()
506                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
507            let deleted_col = batch
508                .column(1)
509                .as_any()
510                .downcast_ref::<UInt64Array>()
511                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
512
513            for row_idx in 0..batch.num_rows() {
514                let created_by = if created_col.is_null(row_idx) {
515                    TXN_ID_AUTO_COMMIT
516                } else {
517                    created_col.value(row_idx)
518                };
519                if created_by != txn_id {
520                    continue;
521                }
522
523                let deleted_by = if deleted_col.is_null(row_idx) {
524                    TXN_ID_NONE
525                } else {
526                    deleted_col.value(row_idx)
527                };
528                if deleted_by != TXN_ID_NONE {
529                    continue;
530                }
531
532                let mut row_values = Vec::with_capacity(table.schema.columns.len());
533                for col_idx in 0..table.schema.columns.len() {
534                    let array = batch.column(col_idx + 2);
535                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
536                    row_values.push(value);
537                }
538                rows.push(row_values);
539            }
540        }
541
542        Ok(rows)
543    }
544
545    /// Validate primary key constraints for rows inserted by a transaction before commit.
546    pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
547        if txn_id == TXN_ID_AUTO_COMMIT {
548            return Ok(());
549        }
550
551        let pending_tables = {
552            let guard = self.txn_tables_with_new_rows.read().unwrap();
553            guard.get(&txn_id).cloned()
554        };
555
556        let Some(tables) = pending_tables else {
557            return Ok(());
558        };
559
560        for canonical_name in tables {
561            let table = match self.lookup_table(&canonical_name) {
562                Ok(table) => table,
563                Err(Error::NotFound) => continue,
564                Err(err) => return Err(err),
565            };
566
567            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
568            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
569                continue;
570            };
571
572            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
573            if new_rows.is_empty() {
574                continue;
575            }
576
577            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
578            let table_for_fetch = Arc::clone(&table);
579            let snapshot = self.default_snapshot();
580
581            self.constraint_service.validate_primary_key_rows(
582                &constraint_ctx.schema_field_ids,
583                primary_key,
584                &column_order,
585                &new_rows,
586                |field_ids| {
587                    self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
588                },
589            )?;
590        }
591
592        Ok(())
593    }
594
595    /// Clear any per-transaction bookkeeping maintained by the runtime context.
596    pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
597        if txn_id == TXN_ID_AUTO_COMMIT {
598            return;
599        }
600
601        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
602        guard.remove(&txn_id);
603    }
604
605    /// Remove a table from the executor cache.
606    ///
607    /// Forces the table to be reloaded from metadata on next access.
608    /// Used after schema changes (ALTER TABLE, DROP COLUMN, etc.).
609    pub(super) fn remove_table_entry(&self, canonical_name: &str) {
610        let mut tables = self.tables.write().unwrap();
611        if tables.remove(canonical_name).is_some() {
612            tracing::trace!(
613                "remove_table_entry: removed table '{}' from context cache",
614                canonical_name
615            );
616        }
617    }
618
619    /// Check if a table is marked as dropped.
620    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
621        self.dropped_tables.read().unwrap().contains(canonical_name)
622    }
623}