llkv_runtime/runtime_context/
utils.rs

1//! General utility methods for RuntimeContext operations.
2//!
3//! This module contains shared helper functions used across multiple DML and DDL operations:
4//! - Type coercion for INSERT/UPDATE value normalization
5//! - Column scanning for INDEX creation and constraint validation
6//! - Multi-column scanning for UNIQUE/PRIMARY KEY/FOREIGN KEY constraints
7//! - Row collection for FK validation
8//! - MVCC visibility filtering for transaction isolation
9//! - Transaction state tracking
10//! - Table cache management
11
12use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::{DataType, IntervalUnit};
14use croaring::Treemap;
15use llkv_column_map::store::GatherNullPolicy;
16use llkv_compute::date::parse_date32_literal;
17use llkv_compute::scalar::decimal::{
18    align_decimal_to_scale, decimal_truthy, truncate_decimal_to_i64,
19};
20use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
21use llkv_plan::PlanValue;
22use llkv_result::{Error, Result};
23use llkv_storage::pager::Pager;
24use llkv_table::{FieldId, RowStream};
25use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
26use llkv_types::LogicalFieldId;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::Arc;
29
30use crate::TXN_ID_AUTO_COMMIT;
31use crate::TXN_ID_NONE;
32
33use super::RuntimeContext;
34
35impl<P> RuntimeContext<P>
36where
37    P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39    /// Coerce a PlanValue to match the expected column type.
40    ///
41    /// Used during INSERT and UPDATE operations to ensure type compatibility.
42    pub(super) fn coerce_plan_value_for_column(
43        &self,
44        value: PlanValue,
45        column: &ExecutorColumn,
46    ) -> Result<PlanValue> {
47        match value {
48            PlanValue::Null => Ok(PlanValue::Null),
49            PlanValue::Decimal(decimal) => match &column.data_type {
50                DataType::Decimal128(precision, scale) => {
51                    let aligned = align_decimal_to_scale(decimal, *precision, *scale).map_err(
52                        |err| {
53                            Error::InvalidArgumentError(format!(
54                                "decimal literal {} incompatible with DECIMAL({}, {}) column '{}': {err}",
55                                decimal, precision, scale, column.name
56                            ))
57                        },
58                    )?;
59                    Ok(PlanValue::Decimal(aligned))
60                }
61                DataType::Int64 => {
62                    let coerced = truncate_decimal_to_i64(decimal).map_err(|err| {
63                        Error::InvalidArgumentError(format!(
64                            "decimal literal {} incompatible with INT column '{}': {err}",
65                            decimal, column.name
66                        ))
67                    })?;
68                    Ok(PlanValue::Integer(coerced))
69                }
70                DataType::Float64 => Ok(PlanValue::Float(decimal.to_f64())),
71                DataType::Boolean => Ok(PlanValue::Integer(if decimal_truthy(decimal) {
72                    1
73                } else {
74                    0
75                })),
76                DataType::Utf8 => Ok(PlanValue::String(decimal.to_string())),
77                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
78                    "cannot assign decimal literal to DATE column '{}'",
79                    column.name
80                ))),
81                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
82                    "cannot assign decimal literal to STRUCT column '{}'",
83                    column.name
84                ))),
85                other => Err(Error::InvalidArgumentError(format!(
86                    "unsupported target type {:?} for DECIMAL literal in column '{}'",
87                    other, column.name
88                ))),
89            },
90            PlanValue::Integer(v) => match &column.data_type {
91                DataType::Int64 => Ok(PlanValue::Integer(v)),
92                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
93                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
94                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
95                DataType::Date32 => {
96                    let casted = i32::try_from(v).map_err(|_| {
97                        Error::InvalidArgumentError(format!(
98                            "integer literal out of range for DATE column '{}'",
99                            column.name
100                        ))
101                    })?;
102                    Ok(PlanValue::Date32(casted))
103                }
104                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
105                    "cannot assign integer to STRUCT column '{}'",
106                    column.name
107                ))),
108                _ => Ok(PlanValue::Integer(v)),
109            },
110            PlanValue::Float(v) => match &column.data_type {
111                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
112                DataType::Float64 => Ok(PlanValue::Float(v)),
113                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
114                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
115                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
116                    "cannot assign floating-point value to DATE column '{}'",
117                    column.name
118                ))),
119                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
120                    "cannot assign floating-point value to STRUCT column '{}'",
121                    column.name
122                ))),
123                _ => Ok(PlanValue::Float(v)),
124            },
125            PlanValue::String(s) => match &column.data_type {
126                DataType::Boolean => {
127                    let normalized = s.trim().to_ascii_lowercase();
128                    match normalized.as_str() {
129                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
130                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
131                        _ => Err(Error::InvalidArgumentError(format!(
132                            "cannot assign string '{}' to BOOLEAN column '{}'",
133                            s, column.name
134                        ))),
135                    }
136                }
137                DataType::Utf8 => Ok(PlanValue::String(s)),
138                DataType::Date32 => {
139                    let days = parse_date32_literal(&s)?;
140                    Ok(PlanValue::Date32(days))
141                }
142                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
143                    "cannot assign string '{}' to numeric column '{}'",
144                    s, column.name
145                ))),
146                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
147                    "cannot assign string to STRUCT column '{}'",
148                    column.name
149                ))),
150                _ => Ok(PlanValue::String(s)),
151            },
152            PlanValue::Struct(map) => match &column.data_type {
153                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
154                _ => Err(Error::InvalidArgumentError(format!(
155                    "cannot assign struct value to column '{}'",
156                    column.name
157                ))),
158            },
159            PlanValue::Interval(interval) => match &column.data_type {
160                DataType::Interval(IntervalUnit::MonthDayNano) => Ok(PlanValue::Interval(interval)),
161                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
162                    "cannot assign INTERVAL literal to STRUCT column '{}'",
163                    column.name
164                ))),
165                other => Err(Error::InvalidArgumentError(format!(
166                    "unsupported target type {:?} for INTERVAL literal in column '{}'",
167                    other, column.name
168                ))),
169            },
170            PlanValue::Date32(days) => match &column.data_type {
171                DataType::Date32 => Ok(PlanValue::Date32(days)),
172                DataType::Int64 => Ok(PlanValue::Integer(i64::from(days))),
173                DataType::Float64 => Ok(PlanValue::Float(days as f64)),
174                DataType::Utf8 => Err(Error::InvalidArgumentError(format!(
175                    "cannot assign DATE literal to TEXT column '{}'",
176                    column.name
177                ))),
178                DataType::Boolean => Err(Error::InvalidArgumentError(format!(
179                    "cannot assign DATE literal to BOOLEAN column '{}'",
180                    column.name
181                ))),
182                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
183                    "cannot assign DATE literal to STRUCT column '{}'",
184                    column.name
185                ))),
186                other => Err(Error::InvalidArgumentError(format!(
187                    "unsupported target type {:?} for DATE literal in column '{}'",
188                    other, column.name
189                ))),
190            },
191        }
192    }
193
194    /// Scan a single column and materialize values into memory.
195    ///
196    /// Used during CREATE INDEX operations and constraint validation.
197    ///
198    /// NOTE: Current implementation buffers the entire result set; convert to a
199    /// streaming iterator once executor-side consumers support incremental consumption.
200    pub(super) fn scan_column_values(
201        &self,
202        table: &ExecutorTable<P>,
203        field_id: FieldId,
204        snapshot: TransactionSnapshot,
205    ) -> Result<Vec<PlanValue>> {
206        let table_id = table.table_id();
207        use llkv_expr::{Expr, Filter, Operator};
208        use std::ops::Bound;
209
210        // Create a filter that matches all rows (unbounded range)
211        let match_all_filter = Filter {
212            field_id,
213            op: Operator::Range {
214                lower: Bound::Unbounded,
215                upper: Bound::Unbounded,
216            },
217        };
218        let filter_expr = Expr::Pred(match_all_filter);
219
220        // Get all matching row_ids first
221        let row_ids = match table.filter_row_ids(&filter_expr) {
222            Ok(ids) => ids,
223            Err(Error::NotFound) => return Ok(Vec::new()),
224            Err(e) => return Err(e),
225        };
226
227        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
228        let row_ids = filter_row_ids_for_snapshot(
229            table.table.as_ref(),
230            row_ids,
231            &self.txn_manager,
232            snapshot,
233        )?;
234
235        if row_ids.is_empty() {
236            return Ok(Vec::new());
237        }
238
239        // Gather the column values for visible rows
240        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
241        let row_count = row_ids.cardinality() as usize;
242        let mut stream = match table.stream_columns(
243            vec![logical_field_id],
244            &row_ids,
245            GatherNullPolicy::IncludeNulls,
246        ) {
247            Ok(stream) => stream,
248            Err(Error::NotFound) => return Ok(Vec::new()),
249            Err(e) => return Err(e),
250        };
251
252        // TODO: Don't buffer all values; make this streamable
253        // NOTE: Values are accumulated eagerly; revisit when `llkv-plan` supports
254        // incremental parameter binding.
255        let mut values = Vec::with_capacity(row_count);
256        while let Some(chunk) = stream.next_chunk()? {
257            let batch = chunk.record_batch();
258            if batch.num_columns() == 0 {
259                continue;
260            }
261            let array = batch.column(0);
262            for row_idx in 0..batch.num_rows() {
263                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
264                    values.push(value);
265                }
266            }
267        }
268
269        Ok(values)
270    }
271
272    /// Scan a set of columns and materialize rows into memory.
273    ///
274    /// Used during constraint validation (multi-column UNIQUE, PRIMARY KEY)
275    /// and CREATE INDEX operations.
276    ///
277    /// NOTE: Similar to [`Self::scan_column_values`], this buffers eagerly pending
278    /// enhancements to the executor pipeline.
279    pub(super) fn scan_multi_column_values(
280        &self,
281        table: &ExecutorTable<P>,
282        field_ids: &[FieldId],
283        snapshot: TransactionSnapshot,
284    ) -> Result<Vec<Vec<PlanValue>>> {
285        if field_ids.is_empty() {
286            return Ok(Vec::new());
287        }
288
289        let table_id = table.table_id();
290        use llkv_expr::{Expr, Filter, Operator};
291        use std::ops::Bound;
292
293        let match_all_filter = Filter {
294            field_id: field_ids[0],
295            op: Operator::Range {
296                lower: Bound::Unbounded,
297                upper: Bound::Unbounded,
298            },
299        };
300        let filter_expr = Expr::Pred(match_all_filter);
301
302        let row_ids = match table.filter_row_ids(&filter_expr) {
303            Ok(ids) => ids,
304            Err(Error::NotFound) => return Ok(Vec::new()),
305            Err(e) => return Err(e),
306        };
307
308        let row_ids = filter_row_ids_for_snapshot(
309            table.table.as_ref(),
310            row_ids,
311            &self.txn_manager,
312            snapshot,
313        )?;
314
315        if row_ids.is_empty() {
316            return Ok(Vec::new());
317        }
318
319        let logical_field_ids: Vec<_> = field_ids
320            .iter()
321            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
322            .collect();
323
324        let total_rows = row_ids.cardinality() as usize;
325        let mut stream =
326            match table.stream_columns(logical_field_ids, &row_ids, GatherNullPolicy::IncludeNulls)
327            {
328                Ok(stream) => stream,
329                Err(Error::NotFound) => return Ok(Vec::new()),
330                Err(e) => return Err(e),
331            };
332
333        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
334        let mut emitted = 0usize;
335        while let Some(chunk) = stream.next_chunk()? {
336            let batch = chunk.record_batch();
337            if batch.num_columns() == 0 {
338                continue;
339            }
340
341            let base = emitted;
342            let local_len = batch.num_rows();
343            for col_idx in 0..batch.num_columns() {
344                let array = batch.column(col_idx);
345                for local_idx in 0..local_len {
346                    let target_index = base + local_idx;
347                    debug_assert!(
348                        target_index < rows.len(),
349                        "stream chunk produced out-of-bounds row index"
350                    );
351                    if let Some(row) = rows.get_mut(target_index) {
352                        match llkv_plan::plan_value_from_array(array, local_idx) {
353                            Ok(value) => row.push(value),
354                            Err(_) => row.push(PlanValue::Null),
355                        }
356                    }
357                }
358            }
359            emitted += local_len;
360        }
361
362        Ok(rows)
363    }
364
365    /// Scan multi-column values with FK-aware visibility semantics.
366    ///
367    /// This is similar to [`Self::scan_multi_column_values`], but uses FK-specific
368    /// MVCC visibility rules where rows deleted by the current transaction are treated
369    /// as still visible (matching SQL standard FK constraint checking behavior).
370    pub(super) fn scan_multi_column_values_for_fk_check(
371        &self,
372        table: &ExecutorTable<P>,
373        field_ids: &[FieldId],
374        snapshot: TransactionSnapshot,
375    ) -> Result<Vec<Vec<PlanValue>>> {
376        if field_ids.is_empty() {
377            return Ok(Vec::new());
378        }
379
380        let table_id = table.table_id();
381        use llkv_expr::{Expr, Filter, Operator};
382        use std::ops::Bound;
383
384        let match_all_filter = Filter {
385            field_id: field_ids[0],
386            op: Operator::Range {
387                lower: Bound::Unbounded,
388                upper: Bound::Unbounded,
389            },
390        };
391        let filter_expr = Expr::Pred(match_all_filter);
392
393        let row_ids = match table.filter_row_ids(&filter_expr) {
394            Ok(ids) => ids,
395            Err(Error::NotFound) => return Ok(Vec::new()),
396            Err(e) => return Err(e),
397        };
398
399        // Use FK-specific filtering that treats deleted rows as still visible
400        let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
401            table.table.as_ref(),
402            row_ids,
403            &self.txn_manager,
404            snapshot,
405        )?;
406
407        if row_ids.is_empty() {
408            return Ok(Vec::new());
409        }
410
411        let logical_field_ids: Vec<_> = field_ids
412            .iter()
413            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
414            .collect();
415
416        let total_rows = row_ids.cardinality() as usize;
417        let mut stream =
418            match table.stream_columns(logical_field_ids, &row_ids, GatherNullPolicy::IncludeNulls)
419            {
420                Ok(stream) => stream,
421                Err(Error::NotFound) => return Ok(Vec::new()),
422                Err(e) => return Err(e),
423            };
424
425        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
426        let mut emitted = 0usize;
427        while let Some(chunk) = stream.next_chunk()? {
428            let batch = chunk.record_batch();
429            if batch.num_columns() == 0 {
430                continue;
431            }
432
433            let base = emitted;
434            let local_len = batch.num_rows();
435            for col_idx in 0..batch.num_columns() {
436                let array = batch.column(col_idx);
437                for local_idx in 0..local_len {
438                    let target_index = base + local_idx;
439                    debug_assert!(
440                        target_index < rows.len(),
441                        "stream chunk produced out-of-bounds row index"
442                    );
443                    if let Some(row) = rows.get_mut(target_index) {
444                        match llkv_plan::plan_value_from_array(array, local_idx) {
445                            Ok(value) => row.push(value),
446                            Err(_) => row.push(PlanValue::Null),
447                        }
448                    }
449                }
450            }
451            emitted += local_len;
452        }
453
454        Ok(rows)
455    }
456
457    /// Collect row values for specific row IDs and field IDs.
458    ///
459    /// Used during foreign key validation to gather referenced/referencing values.
460    pub(super) fn collect_row_values_for_ids(
461        &self,
462        table: &ExecutorTable<P>,
463        row_ids: &Treemap,
464        field_ids: &[FieldId],
465    ) -> Result<Vec<Vec<PlanValue>>> {
466        if row_ids.is_empty() || field_ids.is_empty() {
467            return Ok(Vec::new());
468        }
469
470        let table_id = table.table_id();
471        let logical_field_ids: Vec<LogicalFieldId> = field_ids
472            .iter()
473            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
474            .collect();
475
476        let mut stream = match table.stream_columns(
477            logical_field_ids.clone(),
478            row_ids,
479            GatherNullPolicy::IncludeNulls,
480        ) {
481            Ok(stream) => stream,
482            Err(Error::NotFound) => return Ok(Vec::new()),
483            Err(e) => return Err(e),
484        };
485
486        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.cardinality() as usize];
487        let mut emitted = 0usize;
488        while let Some(chunk) = stream.next_chunk()? {
489            let batch = chunk.record_batch();
490            let base = emitted;
491            let local_len = batch.num_rows();
492            for col_idx in 0..batch.num_columns() {
493                let array = batch.column(col_idx);
494                for local_idx in 0..local_len {
495                    let target_index = base + local_idx;
496                    if let Some(row) = rows.get_mut(target_index) {
497                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
498                        row.push(value);
499                    }
500                }
501            }
502            emitted += local_len;
503        }
504
505        Ok(rows)
506    }
507
508    /// Filter row IDs to only include those visible in the given snapshot.
509    ///
510    /// Used across all DML operations to ensure MVCC transaction isolation.
511    pub(super) fn filter_visible_row_ids(
512        &self,
513        table: &ExecutorTable<P>,
514        row_ids: Treemap,
515        snapshot: TransactionSnapshot,
516    ) -> Result<Treemap> {
517        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
518    }
519
520    /// Record that a transaction has inserted rows into a table.
521    ///
522    /// Used for deferred PRIMARY KEY validation at commit time.
523    pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
524        if txn_id == TXN_ID_AUTO_COMMIT {
525            return;
526        }
527
528        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
529        guard.entry(txn_id).or_default().insert(canonical_name);
530    }
531
532    /// Collect all rows created by a specific transaction.
533    ///
534    /// Used for PRIMARY KEY validation at commit time.
535    pub(super) fn collect_rows_created_by_txn(
536        &self,
537        table: &ExecutorTable<P>,
538        txn_id: TxnId,
539    ) -> Result<Vec<Vec<PlanValue>>> {
540        if txn_id == TXN_ID_AUTO_COMMIT {
541            return Ok(Vec::new());
542        }
543
544        if table.schema.columns.is_empty() {
545            return Ok(Vec::new());
546        }
547
548        let Some(first_field_id) = table.schema.first_field_id() else {
549            return Ok(Vec::new());
550        };
551        let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
552
553        let row_ids = table.filter_row_ids(&filter_expr)?;
554        if row_ids.is_empty() {
555            return Ok(Vec::new());
556        }
557
558        let table_id = table.table_id();
559        let mut logical_fields: Vec<LogicalFieldId> =
560            Vec::with_capacity(table.schema.columns.len() + 2);
561        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
562        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
563        for column in &table.schema.columns {
564            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
565        }
566
567        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
568        let mut stream = table.stream_columns(
569            Arc::clone(&logical_fields),
570            row_ids,
571            GatherNullPolicy::IncludeNulls,
572        )?;
573
574        let mut rows = Vec::new();
575        while let Some(chunk) = stream.next_chunk()? {
576            let batch = chunk.record_batch();
577            if batch.num_columns() < table.schema.columns.len() + 2 {
578                continue;
579            }
580
581            let created_col = batch
582                .column(0)
583                .as_any()
584                .downcast_ref::<UInt64Array>()
585                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
586            let deleted_col = batch
587                .column(1)
588                .as_any()
589                .downcast_ref::<UInt64Array>()
590                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
591
592            for row_idx in 0..batch.num_rows() {
593                let created_by = if created_col.is_null(row_idx) {
594                    TXN_ID_AUTO_COMMIT
595                } else {
596                    created_col.value(row_idx)
597                };
598                if created_by != txn_id {
599                    continue;
600                }
601
602                let deleted_by = if deleted_col.is_null(row_idx) {
603                    TXN_ID_NONE
604                } else {
605                    deleted_col.value(row_idx)
606                };
607                if deleted_by != TXN_ID_NONE {
608                    continue;
609                }
610
611                let mut row_values = Vec::with_capacity(table.schema.columns.len());
612                for col_idx in 0..table.schema.columns.len() {
613                    let array = batch.column(col_idx + 2);
614                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
615                    row_values.push(value);
616                }
617                rows.push(row_values);
618            }
619        }
620
621        Ok(rows)
622    }
623
624    /// Validate primary key constraints for rows inserted by a transaction before commit.
625    pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
626        if txn_id == TXN_ID_AUTO_COMMIT {
627            return Ok(());
628        }
629
630        let pending_tables = {
631            let guard = self.txn_tables_with_new_rows.read().unwrap();
632            guard.get(&txn_id).cloned()
633        };
634
635        let Some(tables) = pending_tables else {
636            return Ok(());
637        };
638
639        for canonical_name in tables {
640            let table = match self.lookup_table(&canonical_name) {
641                Ok(table) => table,
642                Err(Error::NotFound) => continue,
643                Err(err) => return Err(err),
644            };
645
646            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
647            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
648                continue;
649            };
650
651            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
652            if new_rows.is_empty() {
653                continue;
654            }
655
656            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
657            let table_for_fetch = Arc::clone(&table);
658            let snapshot = self.default_snapshot();
659
660            self.constraint_service.validate_primary_key_rows(
661                &constraint_ctx.schema_field_ids,
662                primary_key,
663                &column_order,
664                &new_rows,
665                |field_ids| {
666                    self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
667                },
668            )?;
669        }
670
671        Ok(())
672    }
673
674    /// Clear any per-transaction bookkeeping maintained by the runtime context.
675    pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
676        if txn_id == TXN_ID_AUTO_COMMIT {
677            return;
678        }
679
680        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
681        guard.remove(&txn_id);
682    }
683
684    /// Remove a table from the executor cache.
685    ///
686    /// Forces the table to be reloaded from metadata on next access.
687    /// Used after schema changes (ALTER TABLE, DROP COLUMN, etc.).
688    pub(super) fn remove_table_entry(&self, canonical_name: &str) {
689        let mut tables = self.tables.write().unwrap();
690        if tables.remove(canonical_name).is_some() {
691            tracing::trace!(
692                "remove_table_entry: removed table '{}' from context cache",
693                canonical_name
694            );
695        }
696    }
697
698    /// Check if a table is marked as dropped.
699    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
700        self.dropped_tables.read().unwrap().contains(canonical_name)
701    }
702}