llkv_runtime/runtime_context/
table_access.rs

1//! Table access and caching utilities for `RuntimeContext`.
2//!
3//! This module centralizes lower-level table access helpers that were previously
4//! embedded directly inside `mod.rs`. Moving them here keeps the core module
5//! focused on high-level orchestration while these helpers encapsulate caching,
6//! lazy loading, and direct batch interactions.
7
8use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_column_map::types::LogicalFieldId;
13use llkv_executor::{
14    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
15    translation,
16};
17use llkv_result::{Error, Result};
18use llkv_storage::pager::Pager;
19use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
20use llkv_table::{
21    ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, Table, TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27    Arc, RwLock,
28    atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35    /// Exports all rows from a table as a `RowBatch` - internal storage API.
36    /// Use through `RuntimeSession` or `RuntimeTableHandle` instead.
37    pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39        handle.lazy()?.collect_rows()
40    }
41
42    /// Get raw batches from a table including row_ids - internal storage API.
43    /// This is used for transaction seeding where we need to preserve existing row_ids.
44    /// Use through `RuntimeSession` or transaction context instead.
45    pub(crate) fn get_batches_with_row_ids(
46        &self,
47        table_name: &str,
48        filter: Option<llkv_expr::Expr<'static, String>>,
49        snapshot: TransactionSnapshot,
50    ) -> Result<Vec<RecordBatch>> {
51        let (_, canonical_name) = canonical_table_name(table_name)?;
52        let table = self.lookup_table(&canonical_name)?;
53
54        let filter_expr = match filter {
55            Some(expr) => {
56                translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57                    Error::InvalidArgumentError(format!(
58                        "Binder Error: does not have a column named '{}'",
59                        name
60                    ))
61                })?
62            }
63            None => {
64                let field_id = table.schema.first_field_id().ok_or_else(|| {
65                    Error::InvalidArgumentError(
66                        "table has no columns; cannot perform wildcard scan".into(),
67                    )
68                })?;
69                translation::expression::full_table_scan_filter(field_id)
70            }
71        };
72
73        // First, get the row_ids that match the filter
74        let row_ids = table.table.filter_row_ids(&filter_expr)?;
75        if row_ids.is_empty() {
76            return Ok(Vec::new());
77        }
78
79        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80        if visible_row_ids.is_empty() {
81            return Ok(Vec::new());
82        }
83
84        // Scan to get the column data without materializing full columns
85        let table_id = table.table.table_id();
86
87        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88        let mut logical_fields: Vec<LogicalFieldId> =
89            Vec::with_capacity(table.schema.columns.len());
90
91        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93        for column in &table.schema.columns {
94            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95            logical_fields.push(logical_field_id);
96            let field = mvcc::build_field_with_metadata(
97                &column.name,
98                column.data_type.clone(),
99                column.nullable,
100                column.field_id,
101            );
102            fields.push(field);
103        }
104
105        let schema = Arc::new(Schema::new(fields));
106
107        if logical_fields.is_empty() {
108            // Tables without user columns should still return row_id batches.
109            let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
110            for row_id in &visible_row_ids {
111                row_id_builder.append_value(*row_id);
112            }
113            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
114            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
115            return Ok(vec![batch]);
116        }
117
118        let mut stream = table.table.stream_columns(
119            Arc::from(logical_fields),
120            visible_row_ids,
121            GatherNullPolicy::IncludeNulls,
122        )?;
123
124        let mut batches = Vec::new();
125        while let Some(chunk) = stream.next_batch()? {
126            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
127
128            let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
129            for row_id in chunk.row_ids() {
130                row_id_builder.append_value(*row_id);
131            }
132            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
133
134            let chunk_batch = chunk.into_batch();
135            for column_array in chunk_batch.columns() {
136                arrays.push(column_array.clone());
137            }
138
139            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
140            batches.push(batch);
141        }
142
143        Ok(batches)
144    }
145
146    /// Append batches directly to a table, preserving row_ids - internal storage API.
147    /// This is used for transaction seeding where we need to preserve existing row_ids.
148    /// Use through `RuntimeSession` or transaction context instead.
149    pub(crate) fn append_batches_with_row_ids(
150        &self,
151        table_name: &str,
152        batches: Vec<RecordBatch>,
153    ) -> Result<usize> {
154        let (_, canonical_name) = canonical_table_name(table_name)?;
155        let table = self.lookup_table(&canonical_name)?;
156
157        let mut total_rows = 0;
158        for batch in batches {
159            if batch.num_rows() == 0 {
160                continue;
161            }
162
163            // Verify the batch has a row_id column
164            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
165                Error::InvalidArgumentError(
166                    "batch must contain row_id column for direct append".into(),
167                )
168            })?;
169
170            // Append the batch directly to the underlying table
171            table.table.append(&batch)?;
172            total_rows += batch.num_rows();
173        }
174
175        Ok(total_rows)
176    }
177
178    /// Looks up a table in the executor cache, lazily loading it from metadata if not already cached.
179    /// This is the primary method for obtaining table references for query execution.
180    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
181        // Fast path: check if table is already loaded
182        {
183            let tables = self.tables.read().unwrap();
184            if let Some(table) = tables.get(canonical_name) {
185                // Check if table has been dropped
186                if self.dropped_tables.read().unwrap().contains(canonical_name) {
187                    // Table was dropped - treat as not found
188                    return Err(Error::NotFound);
189                }
190                tracing::trace!(
191                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
192                    canonical_name,
193                    table.table.table_id(),
194                    table.schema.columns.len(),
195                    &*self.pager
196                );
197                return Ok(Arc::clone(table));
198            }
199        } // Release read lock
200
201        // Slow path: load table from catalog (happens once per table)
202        tracing::debug!(
203            "[LAZY_LOAD] Loading table '{}' from catalog",
204            canonical_name
205        );
206
207        // Check catalog first for table existence
208        tracing::debug!(
209            "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
210            canonical_name,
211            &*self.catalog
212        );
213        let catalog_table_id = match self.catalog.table_id(canonical_name) {
214            Some(id) => {
215                tracing::debug!(
216                    "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
217                    canonical_name,
218                    id
219                );
220                id
221            }
222            None => {
223                tracing::debug!(
224                    "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
225                    canonical_name,
226                    &*self.catalog
227                );
228                // Table not found in catalog - try fallback if available
229                if let Some(fallback) = &self.fallback_lookup {
230                    tracing::debug!(
231                        "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
232                        canonical_name
233                    );
234                    return fallback.lookup_table(canonical_name);
235                }
236                return Err(Error::InvalidArgumentError(format!(
237                    "unknown table '{}'",
238                    canonical_name
239                )));
240            }
241        };
242
243        let table_id = catalog_table_id;
244
245        // Try to load the table from our store. If it fails, try fallback context.
246        let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
247            Ok(t) => t,
248            Err(e) => {
249                // Table exists in catalog but not in our store - try fallback
250                if let Some(fallback) = &self.fallback_lookup {
251                    tracing::debug!(
252                        "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
253                        canonical_name,
254                        e
255                    );
256                    return fallback.lookup_table(canonical_name);
257                }
258                return Err(e);
259            }
260        };
261        let store = table.store();
262        let mut logical_fields = store.user_field_ids_for_table(table_id);
263        logical_fields.sort_by_key(|lfid| lfid.field_id());
264        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
265        let summary = self
266            .catalog_service
267            .table_constraint_summary(canonical_name)?;
268        let TableConstraintSummaryView {
269            table_meta,
270            column_metas,
271            constraint_records,
272            multi_column_uniques,
273        } = summary;
274
275        // If table_meta is None, the table metadata isn't in our context's store.
276        // Try fallback context before erroring.
277        let _table_meta = match table_meta {
278            Some(meta) => meta,
279            None => {
280                if let Some(fallback) = &self.fallback_lookup {
281                    tracing::debug!(
282                        "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
283                        canonical_name
284                    );
285                    return fallback.lookup_table(canonical_name);
286                }
287                return Err(Error::InvalidArgumentError(format!(
288                    "unknown table '{}'",
289                    canonical_name
290                )));
291            }
292        };
293        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
294        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
295        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
296        let mut has_primary_key_records = false;
297        let mut has_single_unique_records = false;
298
299        for record in constraint_records
300            .iter()
301            .filter(|record| record.is_active())
302        {
303            match &record.kind {
304                ConstraintKind::PrimaryKey(pk) => {
305                    has_primary_key_records = true;
306                    for field_id in &pk.field_ids {
307                        metadata_primary_keys.insert(*field_id);
308                        metadata_unique_fields.insert(*field_id);
309                    }
310                }
311                ConstraintKind::Unique(unique) => {
312                    if unique.field_ids.len() == 1 {
313                        has_single_unique_records = true;
314                        metadata_unique_fields.insert(unique.field_ids[0]);
315                    }
316                }
317                _ => {}
318            }
319        }
320
321        // Build ExecutorSchema from metadata manager snapshots
322        let mut executor_columns = Vec::new();
323        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
324
325        for (idx, lfid) in logical_fields.iter().enumerate() {
326            let field_id = lfid.field_id();
327            let normalized_index = executor_columns.len();
328
329            let column_name = column_metas
330                .get(idx)
331                .and_then(|meta| meta.as_ref())
332                .and_then(|meta| meta.name.clone())
333                .unwrap_or_else(|| format!("col_{}", field_id));
334
335            let normalized = column_name.to_ascii_lowercase();
336            lookup.insert(normalized, normalized_index);
337
338            let fallback_constraints: FieldConstraints = catalog_field_resolver
339                .as_ref()
340                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
341                .unwrap_or_default();
342
343            let metadata_primary = metadata_primary_keys.contains(&field_id);
344            let primary_key = if has_primary_key_records {
345                metadata_primary
346            } else {
347                fallback_constraints.primary_key
348            };
349
350            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
351            let unique = if has_primary_key_records || has_single_unique_records {
352                metadata_unique
353            } else {
354                fallback_constraints.primary_key || fallback_constraints.unique
355            };
356
357            let data_type = store.data_type(*lfid)?;
358            let nullable = !primary_key;
359
360            executor_columns.push(ExecutorColumn {
361                name: column_name,
362                data_type,
363                nullable,
364                primary_key,
365                unique,
366                field_id,
367                check_expr: fallback_constraints.check_expr.clone(),
368            });
369        }
370
371        let exec_schema = Arc::new(ExecutorSchema {
372            columns: executor_columns,
373            lookup,
374        });
375
376        // Find the maximum row_id in the table to set next_row_id correctly
377        let max_row_id = {
378            use arrow::array::UInt64Array;
379            use llkv_column_map::store::rowid_fid;
380            use llkv_column_map::store::scan::{
381                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
382                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
383            };
384
385            struct MaxRowIdVisitor {
386                max: RowId,
387            }
388
389            impl PrimitiveVisitor for MaxRowIdVisitor {
390                fn u64_chunk(&mut self, values: &UInt64Array) {
391                    for i in 0..values.len() {
392                        let val = values.value(i);
393                        if val > self.max {
394                            self.max = val;
395                        }
396                    }
397                }
398            }
399
400            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
401            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
402            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
403
404            // Scan the row_id column for any user field in this table
405            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
406            let mut visitor = MaxRowIdVisitor { max: 0 };
407
408            match ScanBuilder::new(table.store(), row_id_field)
409                .options(ScanOptions::default())
410                .run(&mut visitor)
411            {
412                Ok(_) => visitor.max,
413                Err(llkv_result::Error::NotFound) => 0,
414                Err(e) => {
415                    tracing::warn!(
416                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
417                        canonical_name,
418                        e
419                    );
420                    0
421                }
422            }
423        };
424
425        let next_row_id = if max_row_id > 0 {
426            max_row_id.saturating_add(1)
427        } else {
428            0
429        };
430
431        // Get the actual persisted row count from table metadata
432        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
433        // Fallback to 0 for truly empty tables
434        let total_rows = table.total_rows().unwrap_or(0);
435
436        let executor_table = Arc::new(ExecutorTable {
437            table: Arc::new(table),
438            schema: exec_schema,
439            next_row_id: AtomicU64::new(next_row_id),
440            total_rows: AtomicU64::new(total_rows),
441            multi_column_uniques: RwLock::new(Vec::new()),
442        });
443
444        if !multi_column_uniques.is_empty() {
445            let executor_uniques =
446                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
447            executor_table.set_multi_column_uniques(executor_uniques);
448        }
449
450        // Cache the loaded table
451        {
452            let mut tables = self.tables.write().unwrap();
453            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
454        }
455
456        // Register fields in catalog (may already be registered from RuntimeContext::new())
457        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
458            for col in &executor_table.schema.columns {
459                let definition = FieldDefinition::new(&col.name)
460                    .with_primary_key(col.primary_key)
461                    .with_unique(col.unique)
462                    .with_check_expr(col.check_expr.clone());
463                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
464            }
465            tracing::debug!(
466                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
467                executor_table.schema.columns.len(),
468                canonical_name
469            );
470        }
471
472        tracing::debug!(
473            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
474            canonical_name,
475            table_id,
476            field_ids.len(),
477            next_row_id
478        );
479
480        Ok(executor_table)
481    }
482
483    pub(super) fn build_executor_multi_column_uniques(
484        table: &ExecutorTable<P>,
485        stored: &[MultiColumnIndexEntryMeta],
486    ) -> Vec<ExecutorMultiColumnUnique> {
487        let mut results = Vec::with_capacity(stored.len());
488
489        'outer: for entry in stored {
490            if entry.column_ids.is_empty() {
491                continue;
492            }
493
494            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
495            for field_id in &entry.column_ids {
496                if let Some((idx, _)) = table
497                    .schema
498                    .columns
499                    .iter()
500                    .enumerate()
501                    .find(|(_, col)| &col.field_id == field_id)
502                {
503                    column_indices.push(idx);
504                } else {
505                    tracing::warn!(
506                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
507                        entry.index_name,
508                        table.table.table_id(),
509                        field_id
510                    );
511                    continue 'outer;
512                }
513            }
514
515            results.push(ExecutorMultiColumnUnique {
516                index_name: entry.index_name.clone(),
517                column_indices,
518            });
519        }
520
521        results
522    }
523
524    pub(super) fn rebuild_executor_table_with_unique(
525        table: &ExecutorTable<P>,
526        field_id: FieldId,
527    ) -> Option<Arc<ExecutorTable<P>>> {
528        let mut columns = table.schema.columns.clone();
529        let mut found = false;
530        for column in &mut columns {
531            if column.field_id == field_id {
532                column.unique = true;
533                found = true;
534                break;
535            }
536        }
537        if !found {
538            return None;
539        }
540
541        let schema = Arc::new(ExecutorSchema {
542            columns,
543            lookup: table.schema.lookup.clone(),
544        });
545
546        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
547        let total_rows = table.total_rows.load(Ordering::SeqCst);
548        let uniques = table.multi_column_uniques();
549
550        Some(Arc::new(ExecutorTable {
551            table: Arc::clone(&table.table),
552            schema,
553            next_row_id: AtomicU64::new(next_row_id),
554            total_rows: AtomicU64::new(total_rows),
555            multi_column_uniques: RwLock::new(uniques),
556        }))
557    }
558}