llkv_runtime/runtime_context/
table_access.rs

1//! Table access and caching utilities for `RuntimeContext`.
2//!
3//! This module centralizes lower-level table access helpers that were previously
4//! embedded directly inside `mod.rs`. Moving them here keeps the core module
5//! focused on high-level orchestration while these helpers encapsulate caching,
6//! lazy loading, and direct batch interactions.
7
8use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_column_map::types::LogicalFieldId;
13use llkv_executor::{
14    ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
15    translation,
16};
17use llkv_result::{Error, Result};
18use llkv_storage::pager::Pager;
19use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
20use llkv_table::{
21    ConstraintKind, FieldId, MultiColumnUniqueEntryMeta, RowId, Table, TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27    Arc, RwLock,
28    atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35    /// Exports all rows from a table as a `RowBatch` - internal storage API.
36    /// Use through `RuntimeSession` or `RuntimeTableHandle` instead.
37    pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38        let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39        handle.lazy()?.collect_rows()
40    }
41
42    /// Get raw batches from a table including row_ids - internal storage API.
43    /// This is used for transaction seeding where we need to preserve existing row_ids.
44    /// Use through `RuntimeSession` or transaction context instead.
45    pub(crate) fn get_batches_with_row_ids(
46        &self,
47        table_name: &str,
48        filter: Option<llkv_expr::Expr<'static, String>>,
49        snapshot: TransactionSnapshot,
50    ) -> Result<Vec<RecordBatch>> {
51        let (_, canonical_name) = canonical_table_name(table_name)?;
52        let table = self.lookup_table(&canonical_name)?;
53
54        let filter_expr = match filter {
55            Some(expr) => {
56                translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57                    Error::InvalidArgumentError(format!(
58                        "Binder Error: does not have a column named '{}'",
59                        name
60                    ))
61                })?
62            }
63            None => {
64                let field_id = table.schema.first_field_id().ok_or_else(|| {
65                    Error::InvalidArgumentError(
66                        "table has no columns; cannot perform wildcard scan".into(),
67                    )
68                })?;
69                translation::expression::full_table_scan_filter(field_id)
70            }
71        };
72
73        // First, get the row_ids that match the filter
74        let row_ids = table.table.filter_row_ids(&filter_expr)?;
75        if row_ids.is_empty() {
76            return Ok(Vec::new());
77        }
78
79        let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80        if visible_row_ids.is_empty() {
81            return Ok(Vec::new());
82        }
83
84        // Scan to get the column data without materializing full columns
85        let table_id = table.table.table_id();
86
87        let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88        let mut logical_fields: Vec<LogicalFieldId> =
89            Vec::with_capacity(table.schema.columns.len());
90
91        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93        for column in &table.schema.columns {
94            let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95            logical_fields.push(logical_field_id);
96            let field = mvcc::build_field_with_metadata(
97                &column.name,
98                column.data_type.clone(),
99                column.nullable,
100                column.field_id,
101            );
102            fields.push(field);
103        }
104
105        let schema = Arc::new(Schema::new(fields));
106
107        if logical_fields.is_empty() {
108            // Tables without user columns should still return row_id batches.
109            let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
110            for row_id in &visible_row_ids {
111                row_id_builder.append_value(*row_id);
112            }
113            let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
114            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
115            return Ok(vec![batch]);
116        }
117
118        let mut stream = table.table.stream_columns(
119            Arc::from(logical_fields),
120            visible_row_ids,
121            GatherNullPolicy::IncludeNulls,
122        )?;
123
124        let mut batches = Vec::new();
125        while let Some(chunk) = stream.next_batch()? {
126            let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
127
128            let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
129            for row_id in chunk.row_ids() {
130                row_id_builder.append_value(*row_id);
131            }
132            arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
133
134            let chunk_batch = chunk.into_batch();
135            for column_array in chunk_batch.columns() {
136                arrays.push(column_array.clone());
137            }
138
139            let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
140            batches.push(batch);
141        }
142
143        Ok(batches)
144    }
145
146    /// Append batches directly to a table, preserving row_ids - internal storage API.
147    /// This is used for transaction seeding where we need to preserve existing row_ids.
148    /// Use through `RuntimeSession` or transaction context instead.
149    pub(crate) fn append_batches_with_row_ids(
150        &self,
151        table_name: &str,
152        batches: Vec<RecordBatch>,
153    ) -> Result<usize> {
154        let (_, canonical_name) = canonical_table_name(table_name)?;
155        let table = self.lookup_table(&canonical_name)?;
156
157        let mut total_rows = 0;
158        for batch in batches {
159            if batch.num_rows() == 0 {
160                continue;
161            }
162
163            // Verify the batch has a row_id column
164            let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
165                Error::InvalidArgumentError(
166                    "batch must contain row_id column for direct append".into(),
167                )
168            })?;
169
170            // Append the batch directly to the underlying table
171            table.table.append(&batch)?;
172            total_rows += batch.num_rows();
173        }
174
175        Ok(total_rows)
176    }
177
178    /// Looks up a table in the executor cache, lazily loading it from metadata if not already cached.
179    /// This is the primary method for obtaining table references for query execution.
180    pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
181        // Fast path: check if table is already loaded
182        {
183            let tables = self.tables.read().unwrap();
184            if let Some(table) = tables.get(canonical_name) {
185                // Check if table has been dropped
186                if self.dropped_tables.read().unwrap().contains(canonical_name) {
187                    // Table was dropped - treat as not found
188                    return Err(Error::NotFound);
189                }
190                tracing::trace!(
191                    "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
192                    canonical_name,
193                    table.table.table_id(),
194                    table.schema.columns.len(),
195                    &*self.pager
196                );
197                return Ok(Arc::clone(table));
198            }
199        } // Release read lock
200
201        // Slow path: load table from catalog (happens once per table)
202        tracing::debug!(
203            "[LAZY_LOAD] Loading table '{}' from catalog",
204            canonical_name
205        );
206
207        // Check catalog first for table existence
208        let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
209            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
210        })?;
211
212        let table_id = catalog_table_id;
213        let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
214        let store = table.store();
215        let mut logical_fields = store.user_field_ids_for_table(table_id);
216        logical_fields.sort_by_key(|lfid| lfid.field_id());
217        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
218        let summary = self
219            .catalog_service
220            .table_constraint_summary(canonical_name)?;
221        let TableConstraintSummaryView {
222            table_meta,
223            column_metas,
224            constraint_records,
225            multi_column_uniques,
226        } = summary;
227        let _table_meta = table_meta.ok_or_else(|| {
228            Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
229        })?;
230        let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
231        let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
232        let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
233        let mut has_primary_key_records = false;
234        let mut has_single_unique_records = false;
235
236        for record in constraint_records
237            .iter()
238            .filter(|record| record.is_active())
239        {
240            match &record.kind {
241                ConstraintKind::PrimaryKey(pk) => {
242                    has_primary_key_records = true;
243                    for field_id in &pk.field_ids {
244                        metadata_primary_keys.insert(*field_id);
245                        metadata_unique_fields.insert(*field_id);
246                    }
247                }
248                ConstraintKind::Unique(unique) => {
249                    if unique.field_ids.len() == 1 {
250                        has_single_unique_records = true;
251                        metadata_unique_fields.insert(unique.field_ids[0]);
252                    }
253                }
254                _ => {}
255            }
256        }
257
258        // Build ExecutorSchema from metadata manager snapshots
259        let mut executor_columns = Vec::new();
260        let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
261
262        for (idx, lfid) in logical_fields.iter().enumerate() {
263            let field_id = lfid.field_id();
264            let normalized_index = executor_columns.len();
265
266            let column_name = column_metas
267                .get(idx)
268                .and_then(|meta| meta.as_ref())
269                .and_then(|meta| meta.name.clone())
270                .unwrap_or_else(|| format!("col_{}", field_id));
271
272            let normalized = column_name.to_ascii_lowercase();
273            lookup.insert(normalized, normalized_index);
274
275            let fallback_constraints: FieldConstraints = catalog_field_resolver
276                .as_ref()
277                .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
278                .unwrap_or_default();
279
280            let metadata_primary = metadata_primary_keys.contains(&field_id);
281            let primary_key = if has_primary_key_records {
282                metadata_primary
283            } else {
284                fallback_constraints.primary_key
285            };
286
287            let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
288            let unique = if has_primary_key_records || has_single_unique_records {
289                metadata_unique
290            } else {
291                fallback_constraints.primary_key || fallback_constraints.unique
292            };
293
294            let data_type = store.data_type(*lfid)?;
295            let nullable = !primary_key;
296
297            executor_columns.push(ExecutorColumn {
298                name: column_name,
299                data_type,
300                nullable,
301                primary_key,
302                unique,
303                field_id,
304                check_expr: fallback_constraints.check_expr.clone(),
305            });
306        }
307
308        let exec_schema = Arc::new(ExecutorSchema {
309            columns: executor_columns,
310            lookup,
311        });
312
313        // Find the maximum row_id in the table to set next_row_id correctly
314        let max_row_id = {
315            use arrow::array::UInt64Array;
316            use llkv_column_map::store::rowid_fid;
317            use llkv_column_map::store::scan::{
318                PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
319                PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
320            };
321
322            struct MaxRowIdVisitor {
323                max: RowId,
324            }
325
326            impl PrimitiveVisitor for MaxRowIdVisitor {
327                fn u64_chunk(&mut self, values: &UInt64Array) {
328                    for i in 0..values.len() {
329                        let val = values.value(i);
330                        if val > self.max {
331                            self.max = val;
332                        }
333                    }
334                }
335            }
336
337            impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
338            impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
339            impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
340
341            // Scan the row_id column for any user field in this table
342            let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
343            let mut visitor = MaxRowIdVisitor { max: 0 };
344
345            match ScanBuilder::new(table.store(), row_id_field)
346                .options(ScanOptions::default())
347                .run(&mut visitor)
348            {
349                Ok(_) => visitor.max,
350                Err(llkv_result::Error::NotFound) => 0,
351                Err(e) => {
352                    tracing::warn!(
353                        "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
354                        canonical_name,
355                        e
356                    );
357                    0
358                }
359            }
360        };
361
362        let next_row_id = if max_row_id > 0 {
363            max_row_id.saturating_add(1)
364        } else {
365            0
366        };
367
368        // Get the actual persisted row count from table metadata
369        // This is an O(1) catalog lookup that reads ColumnDescriptor.total_row_count
370        // Fallback to 0 for truly empty tables
371        let total_rows = table.total_rows().unwrap_or(0);
372
373        let executor_table = Arc::new(ExecutorTable {
374            table: Arc::new(table),
375            schema: exec_schema,
376            next_row_id: AtomicU64::new(next_row_id),
377            total_rows: AtomicU64::new(total_rows),
378            multi_column_uniques: RwLock::new(Vec::new()),
379        });
380
381        if !multi_column_uniques.is_empty() {
382            let executor_uniques =
383                Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
384            executor_table.set_multi_column_uniques(executor_uniques);
385        }
386
387        // Cache the loaded table
388        {
389            let mut tables = self.tables.write().unwrap();
390            tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
391        }
392
393        // Register fields in catalog (may already be registered from RuntimeContext::new())
394        if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
395            for col in &executor_table.schema.columns {
396                let definition = FieldDefinition::new(&col.name)
397                    .with_primary_key(col.primary_key)
398                    .with_unique(col.unique)
399                    .with_check_expr(col.check_expr.clone());
400                let _ = field_resolver.register_field(definition); // Ignore "already exists" errors
401            }
402            tracing::debug!(
403                "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
404                executor_table.schema.columns.len(),
405                canonical_name
406            );
407        }
408
409        tracing::debug!(
410            "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
411            canonical_name,
412            table_id,
413            field_ids.len(),
414            next_row_id
415        );
416
417        Ok(executor_table)
418    }
419
420    pub(super) fn build_executor_multi_column_uniques(
421        table: &ExecutorTable<P>,
422        stored: &[MultiColumnUniqueEntryMeta],
423    ) -> Vec<ExecutorMultiColumnUnique> {
424        let mut results = Vec::with_capacity(stored.len());
425
426        'outer: for entry in stored {
427            if entry.column_ids.is_empty() {
428                continue;
429            }
430
431            let mut column_indices = Vec::with_capacity(entry.column_ids.len());
432            for field_id in &entry.column_ids {
433                if let Some((idx, _)) = table
434                    .schema
435                    .columns
436                    .iter()
437                    .enumerate()
438                    .find(|(_, col)| &col.field_id == field_id)
439                {
440                    column_indices.push(idx);
441                } else {
442                    tracing::warn!(
443                        "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
444                        entry.index_name,
445                        table.table.table_id(),
446                        field_id
447                    );
448                    continue 'outer;
449                }
450            }
451
452            results.push(ExecutorMultiColumnUnique {
453                index_name: entry.index_name.clone(),
454                column_indices,
455            });
456        }
457
458        results
459    }
460
461    pub(super) fn rebuild_executor_table_with_unique(
462        table: &ExecutorTable<P>,
463        field_id: FieldId,
464    ) -> Option<Arc<ExecutorTable<P>>> {
465        let mut columns = table.schema.columns.clone();
466        let mut found = false;
467        for column in &mut columns {
468            if column.field_id == field_id {
469                column.unique = true;
470                found = true;
471                break;
472            }
473        }
474        if !found {
475            return None;
476        }
477
478        let schema = Arc::new(ExecutorSchema {
479            columns,
480            lookup: table.schema.lookup.clone(),
481        });
482
483        let next_row_id = table.next_row_id.load(Ordering::SeqCst);
484        let total_rows = table.total_rows.load(Ordering::SeqCst);
485        let uniques = table.multi_column_uniques();
486
487        Some(Arc::new(ExecutorTable {
488            table: Arc::clone(&table.table),
489            schema,
490            next_row_id: AtomicU64::new(next_row_id),
491            total_rows: AtomicU64::new(total_rows),
492            multi_column_uniques: RwLock::new(uniques),
493        }))
494    }
495}