llkv_runtime/runtime_context/
table_access.rs

1//! Table access and caching utilities for `RuntimeContext`.
2//!
3//! This module centralizes lower-level table access helpers that were previously
4//! embedded directly inside `mod.rs`. Moving them here keeps the core module
5//! focused on high-level orchestration while these helpers encapsulate caching,
6//! lazy loading, and direct batch interactions.
7
8use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_executor::{
13    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
14    TableStorageAdapter, translation,
15};
16use llkv_result::{Error, Result};
17use llkv_storage::pager::Pager;
18use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
19use llkv_table::{
20    ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, RowStream, Table,
21    TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use llkv_types::LogicalFieldId;
25use rustc_hash::{FxHashMap, FxHashSet};
26use simd_r_drive_entry_handle::EntryHandle;
27use std::sync::{
28    Arc, RwLock,
29    atomic::{AtomicU64, Ordering},
30};
31
32impl<P> RuntimeContext<P>
33where
34    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
35{
36    /// Exports all rows from a table as a `RowBatch` - internal storage API.
37    /// Use through `RuntimeSession` or `RuntimeTableHandle` instead.
38    pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
39        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
40        handle.lazy()?.collect_rows()
41    }
42
43    /// Get raw batches from a table including row_ids - internal storage API.
44    /// This is used for transaction seeding where we need to preserve existing row_ids.
45    /// Use through `RuntimeSession` or transaction context instead.
46    pub(crate) fn get_batches_with_row_ids(
47        &self,
48        table_name: &str,
49        filter: Option<llkv_expr::Expr<'static, String>>,
50        snapshot: TransactionSnapshot,
51    ) -> Result<Vec<RecordBatch>> {
52        let (_, canonical_name) = canonical_table_name(table_name)?;
53        let table = self.lookup_table(&canonical_name)?;
54
55        let filter_expr = match filter {
56            Some(expr) => {
57                translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
58                    Error::InvalidArgumentError(format!(
59                        "Binder Error: does not have a column named '{}'",
60                        name
61                    ))
62                })?
63            }
64            None => {
65                let field_id = table.schema.first_field_id().ok_or_else(|| {
66                    Error::InvalidArgumentError(
67                        "table has no columns; cannot perform wildcard scan".into(),
68                    )
69                })?;
70                translation::expression::full_table_scan_filter(field_id)
71            }
72        };
73
74        // First, get the row_ids that match the filter
75        let row_ids = table.filter_row_ids(&filter_expr)?;
76        if row_ids.is_empty() {
77            return Ok(Vec::new());
78        }
79
80        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
81        if visible_row_ids.is_empty() {
82            return Ok(Vec::new());
83        }
84
85        // Scan to get the column data without materializing full columns
86        let table_id = table.table_id();
87
88        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
89        let mut logical_fields: Vec<LogicalFieldId> =
90            Vec::with_capacity(table.schema.columns.len());
91
92        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
93
94        for column in &table.schema.columns {
95            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
96            logical_fields.push(logical_field_id);
97            let field = mvcc::build_field_with_metadata(
98                &column.name,
99                column.data_type.clone(),
100                column.nullable,
101                column.field_id,
102            );
103            fields.push(field);
104        }
105
106        let schema = Arc::new(Schema::new(fields));
107
108        if logical_fields.is_empty() {
109            // Tables without user columns should still return row_id batches.
110            let mut row_id_builder =
111                UInt64Builder::with_capacity(visible_row_ids.cardinality() as usize);
112            for row_id in visible_row_ids.iter() {
113                row_id_builder.append_value(row_id);
114            }
115            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
116            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
117            return Ok(vec![batch]);
118        }
119
120        let mut stream = table.stream_columns(
121            Arc::from(logical_fields),
122            &visible_row_ids,
123            GatherNullPolicy::IncludeNulls,
124        )?;
125
126        let mut batches = Vec::new();
127        while let Some(chunk) = stream.next_chunk()? {
128            let batch = chunk.record_batch();
129            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns() + 1);
130
131            let row_ids = chunk
132                .row_ids
133                .expect("table access requires row ids when streaming");
134            let mut row_id_builder = UInt64Builder::with_capacity(row_ids.len());
135            for idx in 0..row_ids.len() {
136                row_id_builder.append_value(row_ids.value(idx));
137            }
138            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
139
140            for column_array in batch.columns() {
141                arrays.push(column_array.clone());
142            }
143
144            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
145            batches.push(batch);
146        }
147
148        Ok(batches)
149    }
150
151    /// Append batches directly to a table, preserving row_ids - internal storage API.
152    /// This is used for transaction seeding where we need to preserve existing row_ids.
153    /// Use through `RuntimeSession` or transaction context instead.
154    pub(crate) fn append_batches_with_row_ids(
155        &self,
156        table_name: &str,
157        batches: Vec<RecordBatch>,
158    ) -> Result<usize> {
159        let (_, canonical_name) = canonical_table_name(table_name)?;
160        let table = self.lookup_table(&canonical_name)?;
161
162        let mut total_rows = 0;
163        for batch in batches {
164            if batch.num_rows() == 0 {
165                continue;
166            }
167
168            // Verify the batch has a row_id column
169            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
170                Error::InvalidArgumentError(
171                    "batch must contain row_id column for direct append".into(),
172                )
173            })?;
174
175            // Append the batch directly to the underlying table
176            table.table.append(&batch)?;
177            total_rows += batch.num_rows();
178        }
179
180        Ok(total_rows)
181    }
182
183    /// Looks up a table in the executor cache, lazily loading it from metadata if not already cached.
184    /// This is the primary method for obtaining table references for query execution.
185    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
186        // Fast path: check if table is already loaded
187        {
188            let tables = self.tables.read().unwrap();
189            if let Some(table) = tables.get(canonical_name) {
190                // Check if table has been dropped
191                if self.dropped_tables.read().unwrap().contains(canonical_name) {
192                    // Table was dropped - treat as not found
193                    return Err(Error::NotFound);
194                }
195                tracing::trace!(
196                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
197                    canonical_name,
198                    table.table_id(),
199                    table.schema.columns.len(),
200                    &*self.pager
201                );
202                return Ok(Arc::clone(table));
203            }
204        } // Release read lock
205
206        // Slow path: load table from catalog (happens once per table)
207        tracing::debug!(
208            "[LAZY_LOAD] Loading table '{}' from catalog",
209            canonical_name
210        );
211
212        // Check catalog first for table existence
213        tracing::debug!(
214            "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
215            canonical_name,
216            &*self.catalog
217        );
218        let catalog_table_id = match self.catalog.table_id(canonical_name) {
219            Some(id) => {
220                tracing::debug!(
221                    "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
222                    canonical_name,
223                    id
224                );
225                id
226            }
227            None => {
228                tracing::debug!(
229                    "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
230                    canonical_name,
231                    &*self.catalog
232                );
233                // Table not found in catalog - try fallback if available
234                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
235                    tracing::debug!(
236                        "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
237                        canonical_name
238                    );
239                    return fallback.lookup_table(canonical_name);
240                }
241                return Err(Error::InvalidArgumentError(format!(
242                    "unknown table '{}'",
243                    canonical_name
244                )));
245            }
246        };
247
248        let table_id = catalog_table_id;
249
250        // Try to load the table from our store. If it fails, try fallback context.
251        let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
252            Ok(t) => t,
253            Err(e) => {
254                // Table exists in catalog but not in our store - try fallback
255                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
256                    tracing::debug!(
257                        "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
258                        canonical_name,
259                        e
260                    );
261                    return fallback.lookup_table(canonical_name);
262                }
263                return Err(e);
264            }
265        };
266        let store = table.store();
267        let mut logical_fields = store.user_field_ids_for_table(table_id);
268        logical_fields.sort_by_key(|lfid| lfid.field_id());
269        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
270        let summary = self
271            .catalog_service
272            .table_constraint_summary(canonical_name)?;
273        let TableConstraintSummaryView {
274            table_meta,
275            column_metas,
276            constraint_records,
277            multi_column_uniques,
278            constraint_names: _constraint_names,
279        } = summary;
280
281        // If table_meta is None, the table metadata isn't in our context's store.
282        // Try fallback context before erroring.
283        let _table_meta = match table_meta {
284            Some(meta) => meta,
285            None => {
286                if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
287                    tracing::debug!(
288                        "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
289                        canonical_name
290                    );
291                    return fallback.lookup_table(canonical_name);
292                }
293                return Err(Error::InvalidArgumentError(format!(
294                    "unknown table '{}'",
295                    canonical_name
296                )));
297            }
298        };
299        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
300        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
301        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
302        let mut has_primary_key_records = false;
303        let mut has_single_unique_records = false;
304
305        for record in constraint_records
306            .iter()
307            .filter(|record| record.is_active())
308        {
309            match &record.kind {
310                ConstraintKind::PrimaryKey(pk) => {
311                    has_primary_key_records = true;
312                    for field_id in &pk.field_ids {
313                        metadata_primary_keys.insert(*field_id);
314                        metadata_unique_fields.insert(*field_id);
315                    }
316                }
317                ConstraintKind::Unique(unique) => {
318                    if unique.field_ids.len() == 1 {
319                        has_single_unique_records = true;
320                        metadata_unique_fields.insert(unique.field_ids[0]);
321                    }
322                }
323                _ => {}
324            }
325        }
326
327        // Build ExecutorSchema from metadata manager snapshots
328        let mut executor_columns = Vec::new();
329        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
330
331        for (idx, lfid) in logical_fields.iter().enumerate() {
332            let field_id = lfid.field_id();
333            let normalized_index = executor_columns.len();
334
335            let column_name = column_metas
336                .get(idx)
337                .and_then(|meta| meta.as_ref())
338                .and_then(|meta| meta.name.clone())
339                .unwrap_or_else(|| format!("col_{}", field_id));
340
341            let normalized = column_name.to_ascii_lowercase();
342            lookup.insert(normalized, normalized_index);
343
344            let fallback_constraints: FieldConstraints = catalog_field_resolver
345                .as_ref()
346                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
347                .unwrap_or_default();
348
349            let metadata_primary = metadata_primary_keys.contains(&field_id);
350            let primary_key = if has_primary_key_records {
351                metadata_primary
352            } else {
353                fallback_constraints.primary_key
354            };
355
356            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
357            let unique = if has_primary_key_records || has_single_unique_records {
358                metadata_unique
359            } else {
360                fallback_constraints.primary_key || fallback_constraints.unique
361            };
362
363            let data_type = store.data_type(*lfid)?;
364            let nullable = !primary_key;
365
366            executor_columns.push(ExecutorColumn {
367                name: column_name,
368                data_type,
369                nullable,
370                primary_key,
371                unique,
372                field_id,
373                check_expr: fallback_constraints.check_expr.clone(),
374            });
375        }
376
377        let exec_schema = Arc::new(ExecutorSchema {
378            columns: executor_columns,
379            lookup,
380        });
381
382        // Find the maximum row_id in the table to set next_row_id correctly
383        let max_row_id = {
384            use arrow::array::UInt64Array;
385            use llkv_column_map::store::rowid_fid;
386            use llkv_column_map::store::scan::{
387                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
388                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
389            };
390
391            struct MaxRowIdVisitor {
392                max: RowId,
393            }
394
395            impl PrimitiveVisitor for MaxRowIdVisitor {
396                fn u64_chunk(&mut self, values: &UInt64Array) {
397                    for i in 0..values.len() {
398                        let val = values.value(i);
399                        if val > self.max {
400                            self.max = val;
401                        }
402                    }
403                }
404            }
405
406            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
407            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
408            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
409
410            // Scan the row_id column for any user field in this table
411            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
412            let mut visitor = MaxRowIdVisitor { max: 0 };
413
414            match ScanBuilder::new(table.store(), row_id_field)
415                .options(ScanOptions::default())
416                .run(&mut visitor)
417            {
418                Ok(_) => visitor.max,
419                Err(llkv_result::Error::NotFound) => 0,
420                Err(e) => {
421                    tracing::warn!(
422                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
423                        canonical_name,
424                        e
425                    );
426                    0
427                }
428            }
429        };
430
431        let next_row_id = if max_row_id > 0 {
432            max_row_id.saturating_add(1)
433        } else {
434            0
435        };
436
437        // Get the actual persisted row count from table metadata
438        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
439        // Fallback to 0 for truly empty tables
440        let total_rows = table.total_rows().unwrap_or(0);
441
442        let table = Arc::new(table);
443        let executor_table = Arc::new(ExecutorTable {
444            storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
445            table,
446            schema: exec_schema,
447            next_row_id: AtomicU64::new(next_row_id),
448            total_rows: AtomicU64::new(total_rows),
449            multi_column_uniques: RwLock::new(Vec::new()),
450        });
451
452        if !multi_column_uniques.is_empty() {
453            let executor_uniques =
454                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
455            executor_table.set_multi_column_uniques(executor_uniques);
456        }
457
458        // Cache the loaded table
459        {
460            let mut tables = self.tables.write().unwrap();
461            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
462        }
463
464        // Register fields in catalog (may already be registered from RuntimeContext::new())
465        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
466            for col in &executor_table.schema.columns {
467                let definition = FieldDefinition::new(&col.name)
468                    .with_primary_key(col.primary_key)
469                    .with_unique(col.unique)
470                    .with_check_expr(col.check_expr.clone());
471                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
472            }
473            tracing::debug!(
474                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
475                executor_table.schema.columns.len(),
476                canonical_name
477            );
478        }
479
480        tracing::debug!(
481            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
482            canonical_name,
483            table_id,
484            field_ids.len(),
485            next_row_id
486        );
487
488        Ok(executor_table)
489    }
490
491    pub(super) fn build_executor_multi_column_uniques(
492        table: &ExecutorTable<P>,
493        stored: &[MultiColumnIndexEntryMeta],
494    ) -> Vec<ExecutorMultiColumnUnique> {
495        let mut results = Vec::with_capacity(stored.len());
496
497        'outer: for entry in stored {
498            if entry.column_ids.is_empty() {
499                continue;
500            }
501
502            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
503            for field_id in &entry.column_ids {
504                if let Some((idx, _)) = table
505                    .schema
506                    .columns
507                    .iter()
508                    .enumerate()
509                    .find(|(_, col)| &col.field_id == field_id)
510                {
511                    column_indices.push(idx);
512                } else {
513                    tracing::warn!(
514                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
515                        entry.index_name,
516                        table.table_id(),
517                        field_id
518                    );
519                    continue 'outer;
520                }
521            }
522
523            results.push(ExecutorMultiColumnUnique {
524                index_name: entry.index_name.clone(),
525                column_indices,
526            });
527        }
528
529        results
530    }
531
532    pub(super) fn rebuild_executor_table_with_unique(
533        table: &ExecutorTable<P>,
534        field_id: FieldId,
535    ) -> Option<Arc<ExecutorTable<P>>> {
536        let mut columns = table.schema.columns.clone();
537        let mut found = false;
538        for column in &mut columns {
539            if column.field_id == field_id {
540                column.unique = true;
541                found = true;
542                break;
543            }
544        }
545        if !found {
546            return None;
547        }
548
549        let schema = Arc::new(ExecutorSchema {
550            columns,
551            lookup: table.schema.lookup.clone(),
552        });
553
554        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
555        let total_rows = table.total_rows.load(Ordering::SeqCst);
556        let uniques = table.multi_column_uniques();
557
558        Some(Arc::new(ExecutorTable {
559            storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table.table))),
560            table: Arc::clone(&table.table),
561            schema,
562            next_row_id: AtomicU64::new(next_row_id),
563            total_rows: AtomicU64::new(total_rows),
564            multi_column_uniques: RwLock::new(uniques),
565        }))
566    }
567}