llkv_runtime/runtime_context/
utils.rs

1//! General utility methods for RuntimeContext operations.
2//!
3//! This module contains shared helper functions used across multiple DML and DDL operations:
4//! - Type coercion for INSERT/UPDATE value normalization
5//! - Column scanning for INDEX creation and constraint validation
6//! - Multi-column scanning for UNIQUE/PRIMARY KEY/FOREIGN KEY constraints
7//! - Row collection for FK validation
8//! - MVCC visibility filtering for transaction isolation
9//! - Transaction state tracking
10//! - Table cache management
11
12use arrow::array::{Array, UInt64Array};
13use arrow::datatypes::{DataType, IntervalUnit};
14use croaring::Treemap;
15use llkv_column_map::store::GatherNullPolicy;
16use llkv_compute::date::parse_date32_literal;
17use llkv_compute::scalar::decimal::{
18    align_decimal_to_scale, decimal_truthy, truncate_decimal_to_i64,
19};
20use llkv_executor::{ExecutorColumn, ExecutorTable, translation};
21use llkv_plan::PlanValue;
22use llkv_result::{Error, Result};
23use llkv_storage::pager::Pager;
24use llkv_table::FieldId;
25use llkv_transaction::{TransactionSnapshot, TxnId, filter_row_ids_for_snapshot};
26use llkv_types::LogicalFieldId;
27use simd_r_drive_entry_handle::EntryHandle;
28use std::sync::Arc;
29
30use crate::TXN_ID_AUTO_COMMIT;
31use crate::TXN_ID_NONE;
32
33use super::RuntimeContext;
34
35impl<P> RuntimeContext<P>
36where
37    P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39    /// Coerce a PlanValue to match the expected column type.
40    ///
41    /// Used during INSERT and UPDATE operations to ensure type compatibility.
42    pub(super) fn coerce_plan_value_for_column(
43        &self,
44        value: PlanValue,
45        column: &ExecutorColumn,
46    ) -> Result<PlanValue> {
47        match value {
48            PlanValue::Null => Ok(PlanValue::Null),
49            PlanValue::Decimal(decimal) => match &column.data_type {
50                DataType::Decimal128(precision, scale) => {
51                    let aligned = align_decimal_to_scale(decimal, *precision, *scale).map_err(
52                        |err| {
53                            Error::InvalidArgumentError(format!(
54                                "decimal literal {} incompatible with DECIMAL({}, {}) column '{}': {err}",
55                                decimal, precision, scale, column.name
56                            ))
57                        },
58                    )?;
59                    Ok(PlanValue::Decimal(aligned))
60                }
61                DataType::Int64 => {
62                    let coerced = truncate_decimal_to_i64(decimal).map_err(|err| {
63                        Error::InvalidArgumentError(format!(
64                            "decimal literal {} incompatible with INT column '{}': {err}",
65                            decimal, column.name
66                        ))
67                    })?;
68                    Ok(PlanValue::Integer(coerced))
69                }
70                DataType::Float64 => Ok(PlanValue::Float(decimal.to_f64())),
71                DataType::Boolean => Ok(PlanValue::Integer(if decimal_truthy(decimal) {
72                    1
73                } else {
74                    0
75                })),
76                DataType::Utf8 => Ok(PlanValue::String(decimal.to_string())),
77                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
78                    "cannot assign decimal literal to DATE column '{}'",
79                    column.name
80                ))),
81                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
82                    "cannot assign decimal literal to STRUCT column '{}'",
83                    column.name
84                ))),
85                other => Err(Error::InvalidArgumentError(format!(
86                    "unsupported target type {:?} for DECIMAL literal in column '{}'",
87                    other, column.name
88                ))),
89            },
90            PlanValue::Integer(v) => match &column.data_type {
91                DataType::Int64 => Ok(PlanValue::Integer(v)),
92                DataType::Float64 => Ok(PlanValue::Float(v as f64)),
93                DataType::Boolean => Ok(PlanValue::Integer(if v != 0 { 1 } else { 0 })),
94                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
95                DataType::Date32 => {
96                    let casted = i32::try_from(v).map_err(|_| {
97                        Error::InvalidArgumentError(format!(
98                            "integer literal out of range for DATE column '{}'",
99                            column.name
100                        ))
101                    })?;
102                    Ok(PlanValue::Date32(casted))
103                }
104                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
105                    "cannot assign integer to STRUCT column '{}'",
106                    column.name
107                ))),
108                _ => Ok(PlanValue::Integer(v)),
109            },
110            PlanValue::Float(v) => match &column.data_type {
111                DataType::Int64 => Ok(PlanValue::Integer(v as i64)),
112                DataType::Float64 => Ok(PlanValue::Float(v)),
113                DataType::Boolean => Ok(PlanValue::Integer(if v != 0.0 { 1 } else { 0 })),
114                DataType::Utf8 => Ok(PlanValue::String(v.to_string())),
115                DataType::Date32 => Err(Error::InvalidArgumentError(format!(
116                    "cannot assign floating-point value to DATE column '{}'",
117                    column.name
118                ))),
119                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
120                    "cannot assign floating-point value to STRUCT column '{}'",
121                    column.name
122                ))),
123                _ => Ok(PlanValue::Float(v)),
124            },
125            PlanValue::String(s) => match &column.data_type {
126                DataType::Boolean => {
127                    let normalized = s.trim().to_ascii_lowercase();
128                    match normalized.as_str() {
129                        "true" | "t" | "1" => Ok(PlanValue::Integer(1)),
130                        "false" | "f" | "0" => Ok(PlanValue::Integer(0)),
131                        _ => Err(Error::InvalidArgumentError(format!(
132                            "cannot assign string '{}' to BOOLEAN column '{}'",
133                            s, column.name
134                        ))),
135                    }
136                }
137                DataType::Utf8 => Ok(PlanValue::String(s)),
138                DataType::Date32 => {
139                    let days = parse_date32_literal(&s)?;
140                    Ok(PlanValue::Date32(days))
141                }
142                DataType::Int64 | DataType::Float64 => Err(Error::InvalidArgumentError(format!(
143                    "cannot assign string '{}' to numeric column '{}'",
144                    s, column.name
145                ))),
146                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
147                    "cannot assign string to STRUCT column '{}'",
148                    column.name
149                ))),
150                _ => Ok(PlanValue::String(s)),
151            },
152            PlanValue::Struct(map) => match &column.data_type {
153                DataType::Struct(_) => Ok(PlanValue::Struct(map)),
154                _ => Err(Error::InvalidArgumentError(format!(
155                    "cannot assign struct value to column '{}'",
156                    column.name
157                ))),
158            },
159            PlanValue::Interval(interval) => match &column.data_type {
160                DataType::Interval(IntervalUnit::MonthDayNano) => Ok(PlanValue::Interval(interval)),
161                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
162                    "cannot assign INTERVAL literal to STRUCT column '{}'",
163                    column.name
164                ))),
165                other => Err(Error::InvalidArgumentError(format!(
166                    "unsupported target type {:?} for INTERVAL literal in column '{}'",
167                    other, column.name
168                ))),
169            },
170            PlanValue::Date32(days) => match &column.data_type {
171                DataType::Date32 => Ok(PlanValue::Date32(days)),
172                DataType::Int64 => Ok(PlanValue::Integer(i64::from(days))),
173                DataType::Float64 => Ok(PlanValue::Float(days as f64)),
174                DataType::Utf8 => Err(Error::InvalidArgumentError(format!(
175                    "cannot assign DATE literal to TEXT column '{}'",
176                    column.name
177                ))),
178                DataType::Boolean => Err(Error::InvalidArgumentError(format!(
179                    "cannot assign DATE literal to BOOLEAN column '{}'",
180                    column.name
181                ))),
182                DataType::Struct(_) => Err(Error::InvalidArgumentError(format!(
183                    "cannot assign DATE literal to STRUCT column '{}'",
184                    column.name
185                ))),
186                other => Err(Error::InvalidArgumentError(format!(
187                    "unsupported target type {:?} for DATE literal in column '{}'",
188                    other, column.name
189                ))),
190            },
191        }
192    }
193
194    /// Scan a single column and materialize values into memory.
195    ///
196    /// Used during CREATE INDEX operations and constraint validation.
197    ///
198    /// NOTE: Current implementation buffers the entire result set; convert to a
199    /// streaming iterator once executor-side consumers support incremental consumption.
200    pub(super) fn scan_column_values(
201        &self,
202        table: &ExecutorTable<P>,
203        field_id: FieldId,
204        snapshot: TransactionSnapshot,
205    ) -> Result<Vec<PlanValue>> {
206        let table_id = table.table.table_id();
207        use llkv_expr::{Expr, Filter, Operator};
208        use std::ops::Bound;
209
210        // Create a filter that matches all rows (unbounded range)
211        let match_all_filter = Filter {
212            field_id,
213            op: Operator::Range {
214                lower: Bound::Unbounded,
215                upper: Bound::Unbounded,
216            },
217        };
218        let filter_expr = Expr::Pred(match_all_filter);
219
220        // Get all matching row_ids first
221        let row_ids = match table.table.filter_row_ids(&filter_expr) {
222            Ok(ids) => ids,
223            Err(Error::NotFound) => return Ok(Vec::new()),
224            Err(e) => return Err(e),
225        };
226
227        // Apply MVCC filtering manually using filter_row_ids_for_snapshot
228        let row_ids = filter_row_ids_for_snapshot(
229            table.table.as_ref(),
230            row_ids,
231            &self.txn_manager,
232            snapshot,
233        )?;
234
235        if row_ids.is_empty() {
236            return Ok(Vec::new());
237        }
238
239        // Gather the column values for visible rows
240        let logical_field_id = LogicalFieldId::for_user(table_id, field_id);
241        let row_count = row_ids.cardinality() as usize;
242        let mut stream = match table.table.stream_columns(
243            vec![logical_field_id],
244            &row_ids,
245            GatherNullPolicy::IncludeNulls,
246        ) {
247            Ok(stream) => stream,
248            Err(Error::NotFound) => return Ok(Vec::new()),
249            Err(e) => return Err(e),
250        };
251
252        // TODO: Don't buffer all values; make this streamable
253        // NOTE: Values are accumulated eagerly; revisit when `llkv-plan` supports
254        // incremental parameter binding.
255        let mut values = Vec::with_capacity(row_count);
256        while let Some(chunk) = stream.next_batch()? {
257            let batch = chunk.batch();
258            if batch.num_columns() == 0 {
259                continue;
260            }
261            let array = batch.column(0);
262            for row_idx in 0..batch.num_rows() {
263                if let Ok(value) = llkv_plan::plan_value_from_array(array, row_idx) {
264                    values.push(value);
265                }
266            }
267        }
268
269        Ok(values)
270    }
271
272    /// Scan a set of columns and materialize rows into memory.
273    ///
274    /// Used during constraint validation (multi-column UNIQUE, PRIMARY KEY)
275    /// and CREATE INDEX operations.
276    ///
277    /// NOTE: Similar to [`Self::scan_column_values`], this buffers eagerly pending
278    /// enhancements to the executor pipeline.
279    pub(super) fn scan_multi_column_values(
280        &self,
281        table: &ExecutorTable<P>,
282        field_ids: &[FieldId],
283        snapshot: TransactionSnapshot,
284    ) -> Result<Vec<Vec<PlanValue>>> {
285        if field_ids.is_empty() {
286            return Ok(Vec::new());
287        }
288
289        let table_id = table.table.table_id();
290        use llkv_expr::{Expr, Filter, Operator};
291        use std::ops::Bound;
292
293        let match_all_filter = Filter {
294            field_id: field_ids[0],
295            op: Operator::Range {
296                lower: Bound::Unbounded,
297                upper: Bound::Unbounded,
298            },
299        };
300        let filter_expr = Expr::Pred(match_all_filter);
301
302        let row_ids = match table.table.filter_row_ids(&filter_expr) {
303            Ok(ids) => ids,
304            Err(Error::NotFound) => return Ok(Vec::new()),
305            Err(e) => return Err(e),
306        };
307
308        let row_ids = filter_row_ids_for_snapshot(
309            table.table.as_ref(),
310            row_ids,
311            &self.txn_manager,
312            snapshot,
313        )?;
314
315        if row_ids.is_empty() {
316            return Ok(Vec::new());
317        }
318
319        let logical_field_ids: Vec<_> = field_ids
320            .iter()
321            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
322            .collect();
323
324        let total_rows = row_ids.cardinality() as usize;
325        let mut stream = match table.table.stream_columns(
326            logical_field_ids,
327            &row_ids,
328            GatherNullPolicy::IncludeNulls,
329        ) {
330            Ok(stream) => stream,
331            Err(Error::NotFound) => return Ok(Vec::new()),
332            Err(e) => return Err(e),
333        };
334
335        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
336        while let Some(chunk) = stream.next_batch()? {
337            let batch = chunk.batch();
338            if batch.num_columns() == 0 {
339                continue;
340            }
341
342            let base = chunk.row_offset();
343            let local_len = batch.num_rows();
344            for col_idx in 0..batch.num_columns() {
345                let array = batch.column(col_idx);
346                for local_idx in 0..local_len {
347                    let target_index = base + local_idx;
348                    debug_assert!(
349                        target_index < rows.len(),
350                        "stream chunk produced out-of-bounds row index"
351                    );
352                    if let Some(row) = rows.get_mut(target_index) {
353                        match llkv_plan::plan_value_from_array(array, local_idx) {
354                            Ok(value) => row.push(value),
355                            Err(_) => row.push(PlanValue::Null),
356                        }
357                    }
358                }
359            }
360        }
361
362        Ok(rows)
363    }
364
365    /// Scan multi-column values with FK-aware visibility semantics.
366    ///
367    /// This is similar to [`Self::scan_multi_column_values`], but uses FK-specific
368    /// MVCC visibility rules where rows deleted by the current transaction are treated
369    /// as still visible (matching SQL standard FK constraint checking behavior).
370    pub(super) fn scan_multi_column_values_for_fk_check(
371        &self,
372        table: &ExecutorTable<P>,
373        field_ids: &[FieldId],
374        snapshot: TransactionSnapshot,
375    ) -> Result<Vec<Vec<PlanValue>>> {
376        if field_ids.is_empty() {
377            return Ok(Vec::new());
378        }
379
380        let table_id = table.table.table_id();
381        use llkv_expr::{Expr, Filter, Operator};
382        use std::ops::Bound;
383
384        let match_all_filter = Filter {
385            field_id: field_ids[0],
386            op: Operator::Range {
387                lower: Bound::Unbounded,
388                upper: Bound::Unbounded,
389            },
390        };
391        let filter_expr = Expr::Pred(match_all_filter);
392
393        let row_ids = match table.table.filter_row_ids(&filter_expr) {
394            Ok(ids) => ids,
395            Err(Error::NotFound) => return Ok(Vec::new()),
396            Err(e) => return Err(e),
397        };
398
399        // Use FK-specific filtering that treats deleted rows as still visible
400        let row_ids = llkv_transaction::filter_row_ids_for_fk_check(
401            table.table.as_ref(),
402            row_ids,
403            &self.txn_manager,
404            snapshot,
405        )?;
406
407        if row_ids.is_empty() {
408            return Ok(Vec::new());
409        }
410
411        let logical_field_ids: Vec<_> = field_ids
412            .iter()
413            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
414            .collect();
415
416        let total_rows = row_ids.cardinality() as usize;
417        let mut stream = match table.table.stream_columns(
418            logical_field_ids,
419            &row_ids,
420            GatherNullPolicy::IncludeNulls,
421        ) {
422            Ok(stream) => stream,
423            Err(Error::NotFound) => return Ok(Vec::new()),
424            Err(e) => return Err(e),
425        };
426
427        let mut rows = vec![Vec::with_capacity(field_ids.len()); total_rows];
428        while let Some(chunk) = stream.next_batch()? {
429            let batch = chunk.batch();
430            if batch.num_columns() == 0 {
431                continue;
432            }
433
434            let base = chunk.row_offset();
435            let local_len = batch.num_rows();
436            for col_idx in 0..batch.num_columns() {
437                let array = batch.column(col_idx);
438                for local_idx in 0..local_len {
439                    let target_index = base + local_idx;
440                    debug_assert!(
441                        target_index < rows.len(),
442                        "stream chunk produced out-of-bounds row index"
443                    );
444                    if let Some(row) = rows.get_mut(target_index) {
445                        match llkv_plan::plan_value_from_array(array, local_idx) {
446                            Ok(value) => row.push(value),
447                            Err(_) => row.push(PlanValue::Null),
448                        }
449                    }
450                }
451            }
452        }
453
454        Ok(rows)
455    }
456
457    /// Collect row values for specific row IDs and field IDs.
458    ///
459    /// Used during foreign key validation to gather referenced/referencing values.
460    pub(super) fn collect_row_values_for_ids(
461        &self,
462        table: &ExecutorTable<P>,
463        row_ids: &Treemap,
464        field_ids: &[FieldId],
465    ) -> Result<Vec<Vec<PlanValue>>> {
466        if row_ids.is_empty() || field_ids.is_empty() {
467            return Ok(Vec::new());
468        }
469
470        let table_id = table.table.table_id();
471        let logical_field_ids: Vec<LogicalFieldId> = field_ids
472            .iter()
473            .map(|&fid| LogicalFieldId::for_user(table_id, fid))
474            .collect();
475
476        let mut stream = match table.table.stream_columns(
477            logical_field_ids.clone(),
478            row_ids,
479            GatherNullPolicy::IncludeNulls,
480        ) {
481            Ok(stream) => stream,
482            Err(Error::NotFound) => return Ok(Vec::new()),
483            Err(e) => return Err(e),
484        };
485
486        let mut rows = vec![Vec::with_capacity(field_ids.len()); row_ids.cardinality() as usize];
487        while let Some(chunk) = stream.next_batch()? {
488            let batch = chunk.batch();
489            let base = chunk.row_offset();
490            let local_len = batch.num_rows();
491            for col_idx in 0..batch.num_columns() {
492                let array = batch.column(col_idx);
493                for local_idx in 0..local_len {
494                    let target_index = base + local_idx;
495                    if let Some(row) = rows.get_mut(target_index) {
496                        let value = llkv_plan::plan_value_from_array(array, local_idx)?;
497                        row.push(value);
498                    }
499                }
500            }
501        }
502
503        Ok(rows)
504    }
505
506    /// Filter row IDs to only include those visible in the given snapshot.
507    ///
508    /// Used across all DML operations to ensure MVCC transaction isolation.
509    pub(super) fn filter_visible_row_ids(
510        &self,
511        table: &ExecutorTable<P>,
512        row_ids: Treemap,
513        snapshot: TransactionSnapshot,
514    ) -> Result<Treemap> {
515        filter_row_ids_for_snapshot(table.table.as_ref(), row_ids, &self.txn_manager, snapshot)
516    }
517
518    /// Record that a transaction has inserted rows into a table.
519    ///
520    /// Used for deferred PRIMARY KEY validation at commit time.
521    pub(super) fn record_table_with_new_rows(&self, txn_id: TxnId, canonical_name: String) {
522        if txn_id == TXN_ID_AUTO_COMMIT {
523            return;
524        }
525
526        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
527        guard.entry(txn_id).or_default().insert(canonical_name);
528    }
529
530    /// Collect all rows created by a specific transaction.
531    ///
532    /// Used for PRIMARY KEY validation at commit time.
533    pub(super) fn collect_rows_created_by_txn(
534        &self,
535        table: &ExecutorTable<P>,
536        txn_id: TxnId,
537    ) -> Result<Vec<Vec<PlanValue>>> {
538        if txn_id == TXN_ID_AUTO_COMMIT {
539            return Ok(Vec::new());
540        }
541
542        if table.schema.columns.is_empty() {
543            return Ok(Vec::new());
544        }
545
546        let Some(first_field_id) = table.schema.first_field_id() else {
547            return Ok(Vec::new());
548        };
549        let filter_expr = translation::expression::full_table_scan_filter(first_field_id);
550
551        let row_ids = table.table.filter_row_ids(&filter_expr)?;
552        if row_ids.is_empty() {
553            return Ok(Vec::new());
554        }
555
556        let table_id = table.table.table_id();
557        let mut logical_fields: Vec<LogicalFieldId> =
558            Vec::with_capacity(table.schema.columns.len() + 2);
559        logical_fields.push(LogicalFieldId::for_mvcc_created_by(table_id));
560        logical_fields.push(LogicalFieldId::for_mvcc_deleted_by(table_id));
561        for column in &table.schema.columns {
562            logical_fields.push(LogicalFieldId::for_user(table_id, column.field_id));
563        }
564
565        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
566        let mut stream = table.table.stream_columns(
567            Arc::clone(&logical_fields),
568            row_ids,
569            GatherNullPolicy::IncludeNulls,
570        )?;
571
572        let mut rows = Vec::new();
573        while let Some(chunk) = stream.next_batch()? {
574            let batch = chunk.batch();
575            if batch.num_columns() < table.schema.columns.len() + 2 {
576                continue;
577            }
578
579            let created_col = batch
580                .column(0)
581                .as_any()
582                .downcast_ref::<UInt64Array>()
583                .ok_or_else(|| Error::Internal("missing created_by column in MVCC data".into()))?;
584            let deleted_col = batch
585                .column(1)
586                .as_any()
587                .downcast_ref::<UInt64Array>()
588                .ok_or_else(|| Error::Internal("missing deleted_by column in MVCC data".into()))?;
589
590            for row_idx in 0..batch.num_rows() {
591                let created_by = if created_col.is_null(row_idx) {
592                    TXN_ID_AUTO_COMMIT
593                } else {
594                    created_col.value(row_idx)
595                };
596                if created_by != txn_id {
597                    continue;
598                }
599
600                let deleted_by = if deleted_col.is_null(row_idx) {
601                    TXN_ID_NONE
602                } else {
603                    deleted_col.value(row_idx)
604                };
605                if deleted_by != TXN_ID_NONE {
606                    continue;
607                }
608
609                let mut row_values = Vec::with_capacity(table.schema.columns.len());
610                for col_idx in 0..table.schema.columns.len() {
611                    let array = batch.column(col_idx + 2);
612                    let value = llkv_plan::plan_value_from_array(array, row_idx)?;
613                    row_values.push(value);
614                }
615                rows.push(row_values);
616            }
617        }
618
619        Ok(rows)
620    }
621
622    /// Validate primary key constraints for rows inserted by a transaction before commit.
623    pub(crate) fn validate_primary_keys_for_commit(&self, txn_id: TxnId) -> Result<()> {
624        if txn_id == TXN_ID_AUTO_COMMIT {
625            return Ok(());
626        }
627
628        let pending_tables = {
629            let guard = self.txn_tables_with_new_rows.read().unwrap();
630            guard.get(&txn_id).cloned()
631        };
632
633        let Some(tables) = pending_tables else {
634            return Ok(());
635        };
636
637        for canonical_name in tables {
638            let table = match self.lookup_table(&canonical_name) {
639                Ok(table) => table,
640                Err(Error::NotFound) => continue,
641                Err(err) => return Err(err),
642            };
643
644            let constraint_ctx = self.build_table_constraint_context(table.as_ref())?;
645            let Some(primary_key) = constraint_ctx.primary_key.as_ref() else {
646                continue;
647            };
648
649            let new_rows = self.collect_rows_created_by_txn(table.as_ref(), txn_id)?;
650            if new_rows.is_empty() {
651                continue;
652            }
653
654            let column_order: Vec<usize> = (0..table.schema.columns.len()).collect();
655            let table_for_fetch = Arc::clone(&table);
656            let snapshot = self.default_snapshot();
657
658            self.constraint_service.validate_primary_key_rows(
659                &constraint_ctx.schema_field_ids,
660                primary_key,
661                &column_order,
662                &new_rows,
663                |field_ids| {
664                    self.scan_multi_column_values(table_for_fetch.as_ref(), field_ids, snapshot)
665                },
666            )?;
667        }
668
669        Ok(())
670    }
671
672    /// Clear any per-transaction bookkeeping maintained by the runtime context.
673    pub(crate) fn clear_transaction_state(&self, txn_id: TxnId) {
674        if txn_id == TXN_ID_AUTO_COMMIT {
675            return;
676        }
677
678        let mut guard = self.txn_tables_with_new_rows.write().unwrap();
679        guard.remove(&txn_id);
680    }
681
682    /// Remove a table from the executor cache.
683    ///
684    /// Forces the table to be reloaded from metadata on next access.
685    /// Used after schema changes (ALTER TABLE, DROP COLUMN, etc.).
686    pub(super) fn remove_table_entry(&self, canonical_name: &str) {
687        let mut tables = self.tables.write().unwrap();
688        if tables.remove(canonical_name).is_some() {
689            tracing::trace!(
690                "remove_table_entry: removed table '{}' from context cache",
691                canonical_name
692            );
693        }
694    }
695
696    /// Check if a table is marked as dropped.
697    pub fn is_table_marked_dropped(&self, canonical_name: &str) -> bool {
698        self.dropped_tables.read().unwrap().contains(canonical_name)
699    }
700}