llkv_runtime/runtime_context/
utils.rs

1//! General utility methods for RuntimeContext operations.
2//!
3//! This module contains shared helper functions used across multiple DML and DDL operations:
4//! - Type coercion for INSERT/UPDATE value normalization
5//! - Column scanning for INDEX creation and constraint validation
6//! - Multi-column scanning for UNIQUE/PRIMARY KEY/FOREIGN KEY constraints
7//! - Row collection for FK validation
8//! - MVCC visibility filtering for transaction isolation
9//! - Transaction state tracking
10//! - Table cache management
11
12use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::DataType;
14use llkv_column_map::store::GatherNullPolicy;
15use llkv_column_map::types::LogicalFieldId;
16use llkv_executor::utils::parse_date32_literal;
17use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result};
20use llkv_storage::pager::Pager;
21use llkv_table::{FieldId, RowId};
22use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::Arc;
25
26use crate::TXN_ID_AUTO_COMMIT;
27use crate::TXN_ID_NONE;
28
29use super::RuntimeContext;
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync,
34{
35    /// Coerce a PlanValue to match the expected column type.
36    ///
37    /// Used during INSERT and UPDATE operations to ensure type compatibility.
38    pub(super) fn coerce_plan_value_for_column(
39        &self,
40        value: PlanValue,
41        column: &ExecutorColumn,
42    ) -> Result<PlanValue> {
43        match value {
44            PlanValue::Null => Ok(PlanValue::Null),
45            PlanValue::Integer(v) => match &column.data_type {
46                DataType::Int64 => Ok(PlanValue::Integer(v)),
47                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
48                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
49                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
50                DataType::Date32 => {
51                    let casted = i32::try_from(v).map_err(|_| {
52                        Error::InvalidArgumentError(format!(
53                            "integer literal out of range for DATE column '{}'",
54                            column.name
55                        ))
56                    })?;
57                    Ok(PlanValue::Integer(casted as i64))
58                }
59                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
60                    "cannot assign integer to STRUCT column '{}'",
61                    column.name
62                ))),
63                _ => Ok(PlanValue::Integer(v)),
64            },
65            PlanValue::Float(v) => match &column.data_type {
66                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
67                DataType::Float64 => Ok(PlanValue::Float(v)),
68                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
69                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
70                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
71                    "cannot assign floating-point value to DATE column '{}'",
72                    column.name
73                ))),
74                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
75                    "cannot assign floating-point value to STRUCT column '{}'",
76                    column.name
77                ))),
78                _ => Ok(PlanValue::Float(v)),
79            },
80            PlanValue::String(s) => match &column.data_type {
81                DataType::Boolean => {
82                    let normalized = s.trim().to_ascii_lowercase();
83                    match normalized.as_str() {
84                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
85                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
86                        _ => Err(Error::InvalidArgumentError(format!(
87                            "cannot assign string '{}' to BOOLEAN column '{}'",
88                            s, column.name
89                        ))),
90                    }
91                }
92                DataType::Utf8 => Ok(PlanValue::String(s)),
93                DataType::Date32 => {
94                    let days = parse_date32_literal(&s)?;
95                    Ok(PlanValue::Integer(days as i64))
96                }
97                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
98                    "cannot assign string '{}' to numeric column '{}'",
99                    s, column.name
100                ))),
101                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
102                    "cannot assign string to STRUCT column '{}'",
103                    column.name
104                ))),
105                _ => Ok(PlanValue::String(s)),
106            },
107            PlanValue::Struct(map) => match &column.data_type {
108                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
109                _ => Err(Error::InvalidArgumentError(format!(
110                    "cannot assign struct value to column '{}'",
111                    column.name
112                ))),
113            },
114        }
115    }
116
117    /// Scan a single column and materialize values into memory.
118    ///
119    /// Used during CREATE INDEX operations and constraint validation.
120    ///
121    /// NOTE: Current implementation buffers the entire result set; convert to a
122    /// streaming iterator once executor-side consumers support incremental consumption.
123    pub(super) fn scan_column_values(
124        &self,
125        table: &ExecutorTable<P>,
126        field_id: FieldId,
127        snapshot: TransactionSnapshot,
128    ) -> Result<Vec<PlanValue>> {
129        let table_id = table.table.table_id();
130        use llkv_expr::{Expr, Filter, Operator};
131        use std::ops::Bound;
132
133        // Create a filter that matches all rows (unbounded range)
134        let match_all_filter = Filter {
135            field_id,
136            op: Operator::Range {
137                lower: Bound::Unbounded,
138                upper: Bound::Unbounded,
139            },
140        };
141        let filter_expr = Expr::Pred(match_all_filter);
142
143        // Get all matching row_ids first
144        let row_ids = match table.table.filter_row_ids(&filter_expr) {
145            Ok(ids) => ids,
146            Err(Error::NotFound) => return Ok(Vec::new()),
147            Err(e) => return Err(e),
148        };
149
150        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
151        let row_ids = filter_row_ids_for_snapshot(
152            table.table.as_ref(),
153            row_ids,
154            &self.txn_manager,
155            snapshot,
156        )?;
157
158        if row_ids.is_empty() {
159            return Ok(Vec::new());
160        }
161
162        // Gather the column values for visible rows
163        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
164        let row_count = row_ids.len();
165        let mut stream = match table.table.stream_columns(
166            vec![logical_field_id],
167            row_ids,
168            GatherNullPolicy::IncludeNulls,
169        ) {
170            Ok(stream) => stream,
171            Err(Error::NotFound) => return Ok(Vec::new()),
172            Err(e) => return Err(e),
173        };
174
175        // TODO: Don't buffer all values; make this streamable
176        // NOTE: Values are accumulated eagerly; revisit when `llkv-plan` supports
177        // incremental parameter binding.
178        let mut values = Vec::with_capacity(row_count);
179        while let Some(chunk) = stream.next_batch()? {
180            let batch = chunk.batch();
181            if batch.num_columns() == 0 {
182                continue;
183            }
184            let array = batch.column(0);
185            for row_idx in 0..batch.num_rows() {
186                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
187                    values.push(value);
188                }
189            }
190        }
191
192        Ok(values)
193    }
194
195    /// Scan a set of columns and materialize rows into memory.
196    ///
197    /// Used during constraint validation (multi-column UNIQUE, PRIMARY KEY)
198    /// and CREATE INDEX operations.
199    ///
200    /// NOTE: Similar to [`Self::scan_column_values`], this buffers eagerly pending
201    /// enhancements to the executor pipeline.
202    pub(super) fn scan_multi_column_values(
203        &self,
204        table: &ExecutorTable<P>,
205        field_ids: &[FieldId],
206        snapshot: TransactionSnapshot,
207    ) -> Result<Vec<Vec<PlanValue>>> {
208        if field_ids.is_empty() {
209            return Ok(Vec::new());
210        }
211
212        let table_id = table.table.table_id();
213        use llkv_expr::{Expr, Filter, Operator};
214        use std::ops::Bound;
215
216        let match_all_filter = Filter {
217            field_id: field_ids[0],
218            op: Operator::Range {
219                lower: Bound::Unbounded,
220                upper: Bound::Unbounded,
221            },
222        };
223        let filter_expr = Expr::Pred(match_all_filter);
224
225        let row_ids = match table.table.filter_row_ids(&filter_expr) {
226            Ok(ids) => ids,
227            Err(Error::NotFound) => return Ok(Vec::new()),
228            Err(e) => return Err(e),
229        };
230
231        let row_ids = filter_row_ids_for_snapshot(
232            table.table.as_ref(),
233            row_ids,
234            &self.txn_manager,
235            snapshot,
236        )?;
237
238        if row_ids.is_empty() {
239            return Ok(Vec::new());
240        }
241
242        let logical_field_ids: Vec<_> = field_ids
243            .iter()
244            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
245            .collect();
246
247        let total_rows = row_ids.len();
248        let mut stream = match table.table.stream_columns(
249            logical_field_ids,
250            row_ids,
251            GatherNullPolicy::IncludeNulls,
252        ) {
253            Ok(stream) => stream,
254            Err(Error::NotFound) => return Ok(Vec::new()),
255            Err(e) => return Err(e),
256        };
257
258        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
259        while let Some(chunk) = stream.next_batch()? {
260            let batch = chunk.batch();
261            if batch.num_columns() == 0 {
262                continue;
263            }
264
265            let base = chunk.row_offset();
266            let local_len = batch.num_rows();
267            for col_idx in 0..batch.num_columns() {
268                let array = batch.column(col_idx);
269                for local_idx in 0..local_len {
270                    let target_index = base + local_idx;
271                    debug_assert!(
272                        target_index < rows.len(),
273                        "stream chunk produced out-of-bounds row index"
274                    );
275                    if let Some(row) = rows.get_mut(target_index) {
276                        match llkv_plan::plan_value_from_array(array, local_idx) {
277                            Ok(value) => row.push(value),
278                            Err(_) => row.push(PlanValue::Null),
279                        }
280                    }
281                }
282            }
283        }
284
285        Ok(rows)
286    }
287
288    /// Collect row values for specific row IDs and field IDs.
289    ///
290    /// Used during foreign key validation to gather referenced/referencing values.
291    pub(super) fn collect_row_values_for_ids(
292        &self,
293        table: &ExecutorTable<P>,
294        row_ids: &[RowId],
295        field_ids: &[FieldId],
296    ) -> Result<Vec<Vec<PlanValue>>> {
297        if row_ids.is_empty() || field_ids.is_empty() {
298            return Ok(Vec::new());
299        }
300
301        let table_id = table.table.table_id();
302        let logical_field_ids: Vec<LogicalFieldId> = field_ids
303            .iter()
304            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
305            .collect();
306
307        let mut stream = match table.table.stream_columns(
308            logical_field_ids.clone(),
309            row_ids.to_vec(),
310            GatherNullPolicy::IncludeNulls,
311        ) {
312            Ok(stream) => stream,
313            Err(Error::NotFound) => return Ok(Vec::new()),
314            Err(e) => return Err(e),
315        };
316
317        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.len()];
318        while let Some(chunk) = stream.next_batch()? {
319            let batch = chunk.batch();
320            let base = chunk.row_offset();
321            let local_len = batch.num_rows();
322            for col_idx in 0..batch.num_columns() {
323                let array = batch.column(col_idx);
324                for local_idx in 0..local_len {
325                    let target_index = base + local_idx;
326                    if let Some(row) = rows.get_mut(target_index) {
327                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
328                        row.push(value);
329                    }
330                }
331            }
332        }
333
334        Ok(rows)
335    }
336
337    /// Filter row IDs to only include those visible in the given snapshot.
338    ///
339    /// Used across all DML operations to ensure MVCC transaction isolation.
340    pub(super) fn filter_visible_row_ids(
341        &self,
342        table: &ExecutorTable<P>,
343        row_ids: Vec<RowId>,
344        snapshot: TransactionSnapshot,
345    ) -> Result<Vec<RowId>> {
346        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
347    }
348
349    /// Record that a transaction has inserted rows into a table.
350    ///
351    /// Used for deferred PRIMARY KEY validation at commit time.
352    pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
353        if txn_id == TXN_ID_AUTO_COMMIT {
354            return;
355        }
356
357        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
358        guard.entry(txn_id).or_default().insert(canonical_name);
359    }
360
361    /// Collect all rows created by a specific transaction.
362    ///
363    /// Used for PRIMARY KEY validation at commit time.
364    pub(super) fn collect_rows_created_by_txn(
365        &self,
366        table: &ExecutorTable<P>,
367        txn_id: TxnId,
368    ) -> Result<Vec<Vec<PlanValue>>> {
369        if txn_id == TXN_ID_AUTO_COMMIT {
370            return Ok(Vec::new());
371        }
372
373        if table.schema.columns.is_empty() {
374            return Ok(Vec::new());
375        }
376
377        let Some(first_field_id) = table.schema.first_field_id() else {
378            return Ok(Vec::new());
379        };
380        let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
381
382        let row_ids = table.table.filter_row_ids(&filter_expr)?;
383        if row_ids.is_empty() {
384            return Ok(Vec::new());
385        }
386
387        let table_id = table.table.table_id();
388        let mut logical_fields: Vec<LogicalFieldId> =
389            Vec::with_capacity(table.schema.columns.len() + 2);
390        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
391        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
392        for column in &table.schema.columns {
393            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
394        }
395
396        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
397        let mut stream = table.table.stream_columns(
398            Arc::clone(&logical_fields),
399            row_ids,
400            GatherNullPolicy::IncludeNulls,
401        )?;
402
403        let mut rows = Vec::new();
404        while let Some(chunk) = stream.next_batch()? {
405            let batch = chunk.batch();
406            if batch.num_columns() < table.schema.columns.len() + 2 {
407                continue;
408            }
409
410            let created_col = batch
411                .column(0)
412                .as_any()
413                .downcast_ref::<UInt64Array>()
414                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
415            let deleted_col = batch
416                .column(1)
417                .as_any()
418                .downcast_ref::<UInt64Array>()
419                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
420
421            for row_idx in 0..batch.num_rows() {
422                let created_by = if created_col.is_null(row_idx) {
423                    TXN_ID_AUTO_COMMIT
424                } else {
425                    created_col.value(row_idx)
426                };
427                if created_by != txn_id {
428                    continue;
429                }
430
431                let deleted_by = if deleted_col.is_null(row_idx) {
432                    TXN_ID_NONE
433                } else {
434                    deleted_col.value(row_idx)
435                };
436                if deleted_by != TXN_ID_NONE {
437                    continue;
438                }
439
440                let mut row_values = Vec::with_capacity(table.schema.columns.len());
441                for col_idx in 0..table.schema.columns.len() {
442                    let array = batch.column(col_idx + 2);
443                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
444                    row_values.push(value);
445                }
446                rows.push(row_values);
447            }
448        }
449
450        Ok(rows)
451    }
452
453    /// Validate primary key constraints for rows inserted by a transaction before commit.
454    pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
455        if txn_id == TXN_ID_AUTO_COMMIT {
456            return Ok(());
457        }
458
459        let pending_tables = {
460            let guard = self.txn_tables_with_new_rows.read().unwrap();
461            guard.get(&txn_id).cloned()
462        };
463
464        let Some(tables) = pending_tables else {
465            return Ok(());
466        };
467
468        for canonical_name in tables {
469            let table = match self.lookup_table(&canonical_name) {
470                Ok(table) => table,
471                Err(Error::NotFound) => continue,
472                Err(err) => return Err(err),
473            };
474
475            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
476            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
477                continue;
478            };
479
480            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
481            if new_rows.is_empty() {
482                continue;
483            }
484
485            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
486            let table_for_fetch = Arc::clone(&table);
487            let snapshot = self.default_snapshot();
488
489            self.constraint_service.validate_primary_key_rows(
490                &constraint_ctx.schema_field_ids,
491                primary_key,
492                &column_order,
493                &new_rows,
494                |field_ids| {
495                    self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
496                },
497            )?;
498        }
499
500        Ok(())
501    }
502
503    /// Clear any per-transaction bookkeeping maintained by the runtime context.
504    pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
505        if txn_id == TXN_ID_AUTO_COMMIT {
506            return;
507        }
508
509        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
510        guard.remove(&txn_id);
511    }
512
513    /// Remove a table from the executor cache.
514    ///
515    /// Forces the table to be reloaded from metadata on next access.
516    /// Used after schema changes (ALTER TABLE, DROP COLUMN, etc.).
517    pub(super) fn remove_table_entry(&self, canonical_name: &str) {
518        let mut tables = self.tables.write().unwrap();
519        if tables.remove(canonical_name).is_some() {
520            tracing::trace!(
521                "remove_table_entry: removed table '{}' from context cache",
522                canonical_name
523            );
524        }
525    }
526
527    /// Check if a table is marked as dropped.
528    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
529        self.dropped_tables.read().unwrap().contains(canonical_name)
530    }
531}