llkv_runtime/runtime_context/
table_access.rs1use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_executor::{
13 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
14 TableStorageAdapter, translation,
15};
16use llkv_result::{Error, Result};
17use llkv_storage::pager::Pager;
18use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
19use llkv_table::{
20 ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, RowStream, Table,
21 TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use llkv_types::LogicalFieldId;
25use rustc_hash::{FxHashMap, FxHashSet};
26use simd_r_drive_entry_handle::EntryHandle;
27use std::sync::{
28 Arc, RwLock,
29 atomic::{AtomicU64, Ordering},
30};
31
32impl<P> RuntimeContext<P>
33where
34 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
35{
36 pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
39 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
40 handle.lazy()?.collect_rows()
41 }
42
43 pub(crate) fn get_batches_with_row_ids(
47 &self,
48 table_name: &str,
49 filter: Option<llkv_expr::Expr<'static, String>>,
50 snapshot: TransactionSnapshot,
51 ) -> Result<Vec<RecordBatch>> {
52 let (_, canonical_name) = canonical_table_name(table_name)?;
53 let table = self.lookup_table(&canonical_name)?;
54
55 let filter_expr = match filter {
56 Some(expr) => {
57 translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
58 Error::InvalidArgumentError(format!(
59 "Binder Error: does not have a column named '{}'",
60 name
61 ))
62 })?
63 }
64 None => {
65 let field_id = table.schema.first_field_id().ok_or_else(|| {
66 Error::InvalidArgumentError(
67 "table has no columns; cannot perform wildcard scan".into(),
68 )
69 })?;
70 translation::expression::full_table_scan_filter(field_id)
71 }
72 };
73
74 let row_ids = table.filter_row_ids(&filter_expr)?;
76 if row_ids.is_empty() {
77 return Ok(Vec::new());
78 }
79
80 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
81 if visible_row_ids.is_empty() {
82 return Ok(Vec::new());
83 }
84
85 let table_id = table.table_id();
87
88 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
89 let mut logical_fields: Vec<LogicalFieldId> =
90 Vec::with_capacity(table.schema.columns.len());
91
92 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
93
94 for column in &table.schema.columns {
95 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
96 logical_fields.push(logical_field_id);
97 let field = mvcc::build_field_with_metadata(
98 &column.name,
99 column.data_type.clone(),
100 column.nullable,
101 column.field_id,
102 );
103 fields.push(field);
104 }
105
106 let schema = Arc::new(Schema::new(fields));
107
108 if logical_fields.is_empty() {
109 let mut row_id_builder =
111 UInt64Builder::with_capacity(visible_row_ids.cardinality() as usize);
112 for row_id in visible_row_ids.iter() {
113 row_id_builder.append_value(row_id);
114 }
115 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
116 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
117 return Ok(vec![batch]);
118 }
119
120 let mut stream = table.stream_columns(
121 Arc::from(logical_fields),
122 &visible_row_ids,
123 GatherNullPolicy::IncludeNulls,
124 )?;
125
126 let mut batches = Vec::new();
127 while let Some(chunk) = stream.next_chunk()? {
128 let batch = chunk.record_batch();
129 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns() + 1);
130
131 let row_ids = chunk
132 .row_ids
133 .expect("table access requires row ids when streaming");
134 let mut row_id_builder = UInt64Builder::with_capacity(row_ids.len());
135 for idx in 0..row_ids.len() {
136 row_id_builder.append_value(row_ids.value(idx));
137 }
138 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
139
140 for column_array in batch.columns() {
141 arrays.push(column_array.clone());
142 }
143
144 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
145 batches.push(batch);
146 }
147
148 Ok(batches)
149 }
150
151 pub(crate) fn append_batches_with_row_ids(
155 &self,
156 table_name: &str,
157 batches: Vec<RecordBatch>,
158 ) -> Result<usize> {
159 let (_, canonical_name) = canonical_table_name(table_name)?;
160 let table = self.lookup_table(&canonical_name)?;
161
162 let mut total_rows = 0;
163 for batch in batches {
164 if batch.num_rows() == 0 {
165 continue;
166 }
167
168 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
170 Error::InvalidArgumentError(
171 "batch must contain row_id column for direct append".into(),
172 )
173 })?;
174
175 table.table.append(&batch)?;
177 total_rows += batch.num_rows();
178 }
179
180 Ok(total_rows)
181 }
182
183 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
186 {
188 let tables = self.tables.read().unwrap();
189 if let Some(table) = tables.get(canonical_name) {
190 if self.dropped_tables.read().unwrap().contains(canonical_name) {
192 return Err(Error::NotFound);
194 }
195 tracing::trace!(
196 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
197 canonical_name,
198 table.table_id(),
199 table.schema.columns.len(),
200 &*self.pager
201 );
202 return Ok(Arc::clone(table));
203 }
204 } tracing::debug!(
208 "[LAZY_LOAD] Loading table '{}' from catalog",
209 canonical_name
210 );
211
212 tracing::debug!(
214 "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
215 canonical_name,
216 &*self.catalog
217 );
218 let catalog_table_id = match self.catalog.table_id(canonical_name) {
219 Some(id) => {
220 tracing::debug!(
221 "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
222 canonical_name,
223 id
224 );
225 id
226 }
227 None => {
228 tracing::debug!(
229 "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
230 canonical_name,
231 &*self.catalog
232 );
233 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
235 tracing::debug!(
236 "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
237 canonical_name
238 );
239 return fallback.lookup_table(canonical_name);
240 }
241 return Err(Error::InvalidArgumentError(format!(
242 "unknown table '{}'",
243 canonical_name
244 )));
245 }
246 };
247
248 let table_id = catalog_table_id;
249
250 let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
252 Ok(t) => t,
253 Err(e) => {
254 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
256 tracing::debug!(
257 "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
258 canonical_name,
259 e
260 );
261 return fallback.lookup_table(canonical_name);
262 }
263 return Err(e);
264 }
265 };
266 let store = table.store();
267 let mut logical_fields = store.user_field_ids_for_table(table_id);
268 logical_fields.sort_by_key(|lfid| lfid.field_id());
269 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
270 let summary = self
271 .catalog_service
272 .table_constraint_summary(canonical_name)?;
273 let TableConstraintSummaryView {
274 table_meta,
275 column_metas,
276 constraint_records,
277 multi_column_uniques,
278 constraint_names: _constraint_names,
279 } = summary;
280
281 let _table_meta = match table_meta {
284 Some(meta) => meta,
285 None => {
286 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
287 tracing::debug!(
288 "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
289 canonical_name
290 );
291 return fallback.lookup_table(canonical_name);
292 }
293 return Err(Error::InvalidArgumentError(format!(
294 "unknown table '{}'",
295 canonical_name
296 )));
297 }
298 };
299 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
300 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
301 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
302 let mut has_primary_key_records = false;
303 let mut has_single_unique_records = false;
304
305 for record in constraint_records
306 .iter()
307 .filter(|record| record.is_active())
308 {
309 match &record.kind {
310 ConstraintKind::PrimaryKey(pk) => {
311 has_primary_key_records = true;
312 for field_id in &pk.field_ids {
313 metadata_primary_keys.insert(*field_id);
314 metadata_unique_fields.insert(*field_id);
315 }
316 }
317 ConstraintKind::Unique(unique) => {
318 if unique.field_ids.len() == 1 {
319 has_single_unique_records = true;
320 metadata_unique_fields.insert(unique.field_ids[0]);
321 }
322 }
323 _ => {}
324 }
325 }
326
327 let mut executor_columns = Vec::new();
329 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
330
331 for (idx, lfid) in logical_fields.iter().enumerate() {
332 let field_id = lfid.field_id();
333 let normalized_index = executor_columns.len();
334
335 let column_name = column_metas
336 .get(idx)
337 .and_then(|meta| meta.as_ref())
338 .and_then(|meta| meta.name.clone())
339 .unwrap_or_else(|| format!("col_{}", field_id));
340
341 let normalized = column_name.to_ascii_lowercase();
342 lookup.insert(normalized, normalized_index);
343
344 let fallback_constraints: FieldConstraints = catalog_field_resolver
345 .as_ref()
346 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
347 .unwrap_or_default();
348
349 let metadata_primary = metadata_primary_keys.contains(&field_id);
350 let primary_key = if has_primary_key_records {
351 metadata_primary
352 } else {
353 fallback_constraints.primary_key
354 };
355
356 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
357 let unique = if has_primary_key_records || has_single_unique_records {
358 metadata_unique
359 } else {
360 fallback_constraints.primary_key || fallback_constraints.unique
361 };
362
363 let data_type = store.data_type(*lfid)?;
364 let nullable = !primary_key;
365
366 executor_columns.push(ExecutorColumn {
367 name: column_name,
368 data_type,
369 nullable,
370 primary_key,
371 unique,
372 field_id,
373 check_expr: fallback_constraints.check_expr.clone(),
374 });
375 }
376
377 let exec_schema = Arc::new(ExecutorSchema {
378 columns: executor_columns,
379 lookup,
380 });
381
382 let max_row_id = {
384 use arrow::array::UInt64Array;
385 use llkv_column_map::store::rowid_fid;
386 use llkv_column_map::store::scan::{
387 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
388 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
389 };
390
391 struct MaxRowIdVisitor {
392 max: RowId,
393 }
394
395 impl PrimitiveVisitor for MaxRowIdVisitor {
396 fn u64_chunk(&mut self, values: &UInt64Array) {
397 for i in 0..values.len() {
398 let val = values.value(i);
399 if val > self.max {
400 self.max = val;
401 }
402 }
403 }
404 }
405
406 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
407 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
408 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
409
410 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
412 let mut visitor = MaxRowIdVisitor { max: 0 };
413
414 match ScanBuilder::new(table.store(), row_id_field)
415 .options(ScanOptions::default())
416 .run(&mut visitor)
417 {
418 Ok(_) => visitor.max,
419 Err(llkv_result::Error::NotFound) => 0,
420 Err(e) => {
421 tracing::warn!(
422 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
423 canonical_name,
424 e
425 );
426 0
427 }
428 }
429 };
430
431 let next_row_id = if max_row_id > 0 {
432 max_row_id.saturating_add(1)
433 } else {
434 0
435 };
436
437 let total_rows = table.total_rows().unwrap_or(0);
441
442 let table = Arc::new(table);
443 let executor_table = Arc::new(ExecutorTable {
444 storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table))),
445 table,
446 schema: exec_schema,
447 next_row_id: AtomicU64::new(next_row_id),
448 total_rows: AtomicU64::new(total_rows),
449 multi_column_uniques: RwLock::new(Vec::new()),
450 });
451
452 if !multi_column_uniques.is_empty() {
453 let executor_uniques =
454 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
455 executor_table.set_multi_column_uniques(executor_uniques);
456 }
457
458 {
460 let mut tables = self.tables.write().unwrap();
461 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
462 }
463
464 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
466 for col in &executor_table.schema.columns {
467 let definition = FieldDefinition::new(&col.name)
468 .with_primary_key(col.primary_key)
469 .with_unique(col.unique)
470 .with_check_expr(col.check_expr.clone());
471 let _ = field_resolver.register_field(definition); }
473 tracing::debug!(
474 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
475 executor_table.schema.columns.len(),
476 canonical_name
477 );
478 }
479
480 tracing::debug!(
481 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
482 canonical_name,
483 table_id,
484 field_ids.len(),
485 next_row_id
486 );
487
488 Ok(executor_table)
489 }
490
491 pub(super) fn build_executor_multi_column_uniques(
492 table: &ExecutorTable<P>,
493 stored: &[MultiColumnIndexEntryMeta],
494 ) -> Vec<ExecutorMultiColumnUnique> {
495 let mut results = Vec::with_capacity(stored.len());
496
497 'outer: for entry in stored {
498 if entry.column_ids.is_empty() {
499 continue;
500 }
501
502 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
503 for field_id in &entry.column_ids {
504 if let Some((idx, _)) = table
505 .schema
506 .columns
507 .iter()
508 .enumerate()
509 .find(|(_, col)| &col.field_id == field_id)
510 {
511 column_indices.push(idx);
512 } else {
513 tracing::warn!(
514 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
515 entry.index_name,
516 table.table_id(),
517 field_id
518 );
519 continue 'outer;
520 }
521 }
522
523 results.push(ExecutorMultiColumnUnique {
524 index_name: entry.index_name.clone(),
525 column_indices,
526 });
527 }
528
529 results
530 }
531
532 pub(super) fn rebuild_executor_table_with_unique(
533 table: &ExecutorTable<P>,
534 field_id: FieldId,
535 ) -> Option<Arc<ExecutorTable<P>>> {
536 let mut columns = table.schema.columns.clone();
537 let mut found = false;
538 for column in &mut columns {
539 if column.field_id == field_id {
540 column.unique = true;
541 found = true;
542 break;
543 }
544 }
545 if !found {
546 return None;
547 }
548
549 let schema = Arc::new(ExecutorSchema {
550 columns,
551 lookup: table.schema.lookup.clone(),
552 });
553
554 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
555 let total_rows = table.total_rows.load(Ordering::SeqCst);
556 let uniques = table.multi_column_uniques();
557
558 Some(Arc::new(ExecutorTable {
559 storage: Arc::new(TableStorageAdapter::new(Arc::clone(&table.table))),
560 table: Arc::clone(&table.table),
561 schema,
562 next_row_id: AtomicU64::new(next_row_id),
563 total_rows: AtomicU64::new(total_rows),
564 multi_column_uniques: RwLock::new(uniques),
565 }))
566 }
567}