llkv_runtime/runtime_context/
table_access.rs

1//! Table access and caching utilities for `RuntimeContext`.
2//!
3//! This module centralizes lower-level table access helpers that were previously
4//! embedded directly inside `mod.rs`. Moving them here keeps the core module
5//! focused on high-level orchestration while these helpers encapsulate caching,
6//! lazy loading, and direct batch interactions.
7
8use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_executor::{
13    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
14    translation,
15};
16use llkv_result::{Error, Result};
17use llkv_storage::pager::Pager;
18use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
19use llkv_table::{
20    ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, Table, TableConstraintSummaryView,
21};
22use llkv_transaction::{TransactionSnapshot, mvcc};
23use llkv_types::LogicalFieldId;
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27    Arc, RwLock,
28    atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35    /// Exports all rows from a table as a `RowBatch` - internal storage API.
36    /// Use through `RuntimeSession` or `RuntimeTableHandle` instead.
37    pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39        handle.lazy()?.collect_rows()
40    }
41
42    /// Get raw batches from a table including row_ids - internal storage API.
43    /// This is used for transaction seeding where we need to preserve existing row_ids.
44    /// Use through `RuntimeSession` or transaction context instead.
45    pub(crate) fn get_batches_with_row_ids(
46        &self,
47        table_name: &str,
48        filter: Option<llkv_expr::Expr<'static, String>>,
49        snapshot: TransactionSnapshot,
50    ) -> Result<Vec<RecordBatch>> {
51        let (_, canonical_name) = canonical_table_name(table_name)?;
52        let table = self.lookup_table(&canonical_name)?;
53
54        let filter_expr = match filter {
55            Some(expr) => {
56                translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57                    Error::InvalidArgumentError(format!(
58                        "Binder Error: does not have a column named '{}'",
59                        name
60                    ))
61                })?
62            }
63            None => {
64                let field_id = table.schema.first_field_id().ok_or_else(|| {
65                    Error::InvalidArgumentError(
66                        "table has no columns; cannot perform wildcard scan".into(),
67                    )
68                })?;
69                translation::expression::full_table_scan_filter(field_id)
70            }
71        };
72
73        // First, get the row_ids that match the filter
74        let row_ids = table.table.filter_row_ids(&filter_expr)?;
75        if row_ids.is_empty() {
76            return Ok(Vec::new());
77        }
78
79        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80        if visible_row_ids.is_empty() {
81            return Ok(Vec::new());
82        }
83
84        // Scan to get the column data without materializing full columns
85        let table_id = table.table.table_id();
86
87        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88        let mut logical_fields: Vec<LogicalFieldId> =
89            Vec::with_capacity(table.schema.columns.len());
90
91        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93        for column in &table.schema.columns {
94            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95            logical_fields.push(logical_field_id);
96            let field = mvcc::build_field_with_metadata(
97                &column.name,
98                column.data_type.clone(),
99                column.nullable,
100                column.field_id,
101            );
102            fields.push(field);
103        }
104
105        let schema = Arc::new(Schema::new(fields));
106
107        if logical_fields.is_empty() {
108            // Tables without user columns should still return row_id batches.
109            let mut row_id_builder =
110                UInt64Builder::with_capacity(visible_row_ids.cardinality() as usize);
111            for row_id in visible_row_ids.iter() {
112                row_id_builder.append_value(row_id);
113            }
114            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
115            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
116            return Ok(vec![batch]);
117        }
118
119        let mut stream = table.table.stream_columns(
120            Arc::from(logical_fields),
121            &visible_row_ids,
122            GatherNullPolicy::IncludeNulls,
123        )?;
124
125        let mut batches = Vec::new();
126        while let Some(chunk) = stream.next_batch()? {
127            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
128
129            let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
130            for row_id in chunk.row_ids() {
131                row_id_builder.append_value(*row_id);
132            }
133            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
134
135            let chunk_batch = chunk.into_batch();
136            for column_array in chunk_batch.columns() {
137                arrays.push(column_array.clone());
138            }
139
140            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
141            batches.push(batch);
142        }
143
144        Ok(batches)
145    }
146
147    /// Append batches directly to a table, preserving row_ids - internal storage API.
148    /// This is used for transaction seeding where we need to preserve existing row_ids.
149    /// Use through `RuntimeSession` or transaction context instead.
150    pub(crate) fn append_batches_with_row_ids(
151        &self,
152        table_name: &str,
153        batches: Vec<RecordBatch>,
154    ) -> Result<usize> {
155        let (_, canonical_name) = canonical_table_name(table_name)?;
156        let table = self.lookup_table(&canonical_name)?;
157
158        let mut total_rows = 0;
159        for batch in batches {
160            if batch.num_rows() == 0 {
161                continue;
162            }
163
164            // Verify the batch has a row_id column
165            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
166                Error::InvalidArgumentError(
167                    "batch must contain row_id column for direct append".into(),
168                )
169            })?;
170
171            // Append the batch directly to the underlying table
172            table.table.append(&batch)?;
173            total_rows += batch.num_rows();
174        }
175
176        Ok(total_rows)
177    }
178
179    /// Looks up a table in the executor cache, lazily loading it from metadata if not already cached.
180    /// This is the primary method for obtaining table references for query execution.
181    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
182        // Fast path: check if table is already loaded
183        {
184            let tables = self.tables.read().unwrap();
185            if let Some(table) = tables.get(canonical_name) {
186                // Check if table has been dropped
187                if self.dropped_tables.read().unwrap().contains(canonical_name) {
188                    // Table was dropped - treat as not found
189                    return Err(Error::NotFound);
190                }
191                tracing::trace!(
192                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
193                    canonical_name,
194                    table.table.table_id(),
195                    table.schema.columns.len(),
196                    &*self.pager
197                );
198                return Ok(Arc::clone(table));
199            }
200        } // Release read lock
201
202        // Slow path: load table from catalog (happens once per table)
203        tracing::debug!(
204            "[LAZY_LOAD] Loading table '{}' from catalog",
205            canonical_name
206        );
207
208        // Check catalog first for table existence
209        tracing::debug!(
210            "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
211            canonical_name,
212            &*self.catalog
213        );
214        let catalog_table_id = match self.catalog.table_id(canonical_name) {
215            Some(id) => {
216                tracing::debug!(
217                    "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
218                    canonical_name,
219                    id
220                );
221                id
222            }
223            None => {
224                tracing::debug!(
225                    "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
226                    canonical_name,
227                    &*self.catalog
228                );
229                // Table not found in catalog - try fallback if available
230                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
231                    tracing::debug!(
232                        "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
233                        canonical_name
234                    );
235                    return fallback.lookup_table(canonical_name);
236                }
237                return Err(Error::InvalidArgumentError(format!(
238                    "unknown table '{}'",
239                    canonical_name
240                )));
241            }
242        };
243
244        let table_id = catalog_table_id;
245
246        // Try to load the table from our store. If it fails, try fallback context.
247        let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
248            Ok(t) => t,
249            Err(e) => {
250                // Table exists in catalog but not in our store - try fallback
251                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
252                    tracing::debug!(
253                        "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
254                        canonical_name,
255                        e
256                    );
257                    return fallback.lookup_table(canonical_name);
258                }
259                return Err(e);
260            }
261        };
262        let store = table.store();
263        let mut logical_fields = store.user_field_ids_for_table(table_id);
264        logical_fields.sort_by_key(|lfid| lfid.field_id());
265        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
266        let summary = self
267            .catalog_service
268            .table_constraint_summary(canonical_name)?;
269        let TableConstraintSummaryView {
270            table_meta,
271            column_metas,
272            constraint_records,
273            multi_column_uniques,
274            constraint_names: _constraint_names,
275        } = summary;
276
277        // If table_meta is None, the table metadata isn't in our context's store.
278        // Try fallback context before erroring.
279        let _table_meta = match table_meta {
280            Some(meta) => meta,
281            None => {
282                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
283                    tracing::debug!(
284                        "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
285                        canonical_name
286                    );
287                    return fallback.lookup_table(canonical_name);
288                }
289                return Err(Error::InvalidArgumentError(format!(
290                    "unknown table '{}'",
291                    canonical_name
292                )));
293            }
294        };
295        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
296        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
297        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
298        let mut has_primary_key_records = false;
299        let mut has_single_unique_records = false;
300
301        for record in constraint_records
302            .iter()
303            .filter(|record| record.is_active())
304        {
305            match &record.kind {
306                ConstraintKind::PrimaryKey(pk) => {
307                    has_primary_key_records = true;
308                    for field_id in &pk.field_ids {
309                        metadata_primary_keys.insert(*field_id);
310                        metadata_unique_fields.insert(*field_id);
311                    }
312                }
313                ConstraintKind::Unique(unique) => {
314                    if unique.field_ids.len() == 1 {
315                        has_single_unique_records = true;
316                        metadata_unique_fields.insert(unique.field_ids[0]);
317                    }
318                }
319                _ => {}
320            }
321        }
322
323        // Build ExecutorSchema from metadata manager snapshots
324        let mut executor_columns = Vec::new();
325        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
326
327        for (idx, lfid) in logical_fields.iter().enumerate() {
328            let field_id = lfid.field_id();
329            let normalized_index = executor_columns.len();
330
331            let column_name = column_metas
332                .get(idx)
333                .and_then(|meta| meta.as_ref())
334                .and_then(|meta| meta.name.clone())
335                .unwrap_or_else(|| format!("col_{}", field_id));
336
337            let normalized = column_name.to_ascii_lowercase();
338            lookup.insert(normalized, normalized_index);
339
340            let fallback_constraints: FieldConstraints = catalog_field_resolver
341                .as_ref()
342                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
343                .unwrap_or_default();
344
345            let metadata_primary = metadata_primary_keys.contains(&field_id);
346            let primary_key = if has_primary_key_records {
347                metadata_primary
348            } else {
349                fallback_constraints.primary_key
350            };
351
352            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
353            let unique = if has_primary_key_records || has_single_unique_records {
354                metadata_unique
355            } else {
356                fallback_constraints.primary_key || fallback_constraints.unique
357            };
358
359            let data_type = store.data_type(*lfid)?;
360            let nullable = !primary_key;
361
362            executor_columns.push(ExecutorColumn {
363                name: column_name,
364                data_type,
365                nullable,
366                primary_key,
367                unique,
368                field_id,
369                check_expr: fallback_constraints.check_expr.clone(),
370            });
371        }
372
373        let exec_schema = Arc::new(ExecutorSchema {
374            columns: executor_columns,
375            lookup,
376        });
377
378        // Find the maximum row_id in the table to set next_row_id correctly
379        let max_row_id = {
380            use arrow::array::UInt64Array;
381            use llkv_column_map::store::rowid_fid;
382            use llkv_column_map::store::scan::{
383                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
384                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
385            };
386
387            struct MaxRowIdVisitor {
388                max: RowId,
389            }
390
391            impl PrimitiveVisitor for MaxRowIdVisitor {
392                fn u64_chunk(&mut self, values: &UInt64Array) {
393                    for i in 0..values.len() {
394                        let val = values.value(i);
395                        if val > self.max {
396                            self.max = val;
397                        }
398                    }
399                }
400            }
401
402            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
403            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
404            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
405
406            // Scan the row_id column for any user field in this table
407            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
408            let mut visitor = MaxRowIdVisitor { max: 0 };
409
410            match ScanBuilder::new(table.store(), row_id_field)
411                .options(ScanOptions::default())
412                .run(&mut visitor)
413            {
414                Ok(_) => visitor.max,
415                Err(llkv_result::Error::NotFound) => 0,
416                Err(e) => {
417                    tracing::warn!(
418                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
419                        canonical_name,
420                        e
421                    );
422                    0
423                }
424            }
425        };
426
427        let next_row_id = if max_row_id > 0 {
428            max_row_id.saturating_add(1)
429        } else {
430            0
431        };
432
433        // Get the actual persisted row count from table metadata
434        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
435        // Fallback to 0 for truly empty tables
436        let total_rows = table.total_rows().unwrap_or(0);
437
438        let executor_table = Arc::new(ExecutorTable {
439            table: Arc::new(table),
440            schema: exec_schema,
441            next_row_id: AtomicU64::new(next_row_id),
442            total_rows: AtomicU64::new(total_rows),
443            multi_column_uniques: RwLock::new(Vec::new()),
444        });
445
446        if !multi_column_uniques.is_empty() {
447            let executor_uniques =
448                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
449            executor_table.set_multi_column_uniques(executor_uniques);
450        }
451
452        // Cache the loaded table
453        {
454            let mut tables = self.tables.write().unwrap();
455            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
456        }
457
458        // Register fields in catalog (may already be registered from RuntimeContext::new())
459        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
460            for col in &executor_table.schema.columns {
461                let definition = FieldDefinition::new(&col.name)
462                    .with_primary_key(col.primary_key)
463                    .with_unique(col.unique)
464                    .with_check_expr(col.check_expr.clone());
465                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
466            }
467            tracing::debug!(
468                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
469                executor_table.schema.columns.len(),
470                canonical_name
471            );
472        }
473
474        tracing::debug!(
475            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
476            canonical_name,
477            table_id,
478            field_ids.len(),
479            next_row_id
480        );
481
482        Ok(executor_table)
483    }
484
485    pub(super) fn build_executor_multi_column_uniques(
486        table: &ExecutorTable<P>,
487        stored: &[MultiColumnIndexEntryMeta],
488    ) -> Vec<ExecutorMultiColumnUnique> {
489        let mut results = Vec::with_capacity(stored.len());
490
491        'outer: for entry in stored {
492            if entry.column_ids.is_empty() {
493                continue;
494            }
495
496            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
497            for field_id in &entry.column_ids {
498                if let Some((idx, _)) = table
499                    .schema
500                    .columns
501                    .iter()
502                    .enumerate()
503                    .find(|(_, col)| &col.field_id == field_id)
504                {
505                    column_indices.push(idx);
506                } else {
507                    tracing::warn!(
508                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
509                        entry.index_name,
510                        table.table.table_id(),
511                        field_id
512                    );
513                    continue 'outer;
514                }
515            }
516
517            results.push(ExecutorMultiColumnUnique {
518                index_name: entry.index_name.clone(),
519                column_indices,
520            });
521        }
522
523        results
524    }
525
526    pub(super) fn rebuild_executor_table_with_unique(
527        table: &ExecutorTable<P>,
528        field_id: FieldId,
529    ) -> Option<Arc<ExecutorTable<P>>> {
530        let mut columns = table.schema.columns.clone();
531        let mut found = false;
532        for column in &mut columns {
533            if column.field_id == field_id {
534                column.unique = true;
535                found = true;
536                break;
537            }
538        }
539        if !found {
540            return None;
541        }
542
543        let schema = Arc::new(ExecutorSchema {
544            columns,
545            lookup: table.schema.lookup.clone(),
546        });
547
548        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
549        let total_rows = table.total_rows.load(Ordering::SeqCst);
550        let uniques = table.multi_column_uniques();
551
552        Some(Arc::new(ExecutorTable {
553            table: Arc::clone(&table.table),
554            schema,
555            next_row_id: AtomicU64::new(next_row_id),
556            total_rows: AtomicU64::new(total_rows),
557            multi_column_uniques: RwLock::new(uniques),
558        }))
559    }
560}