llkv_runtime/runtime_context/
table_access.rs1use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_executor::{
13 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
14 translation,
15};
16use llkv_result::{Error, Result};
17use llkv_storage::pager::Pager;
18use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
19use llkv_table::{
20 ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, Table, TableConstraintSummaryView,
21};
22use llkv_transaction::{TransactionSnapshot, mvcc};
23use llkv_types::LogicalFieldId;
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27 Arc, RwLock,
28 atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35 pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39 handle.lazy()?.collect_rows()
40 }
41
42 pub(crate) fn get_batches_with_row_ids(
46 &self,
47 table_name: &str,
48 filter: Option<llkv_expr::Expr<'static, String>>,
49 snapshot: TransactionSnapshot,
50 ) -> Result<Vec<RecordBatch>> {
51 let (_, canonical_name) = canonical_table_name(table_name)?;
52 let table = self.lookup_table(&canonical_name)?;
53
54 let filter_expr = match filter {
55 Some(expr) => {
56 translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57 Error::InvalidArgumentError(format!(
58 "Binder Error: does not have a column named '{}'",
59 name
60 ))
61 })?
62 }
63 None => {
64 let field_id = table.schema.first_field_id().ok_or_else(|| {
65 Error::InvalidArgumentError(
66 "table has no columns; cannot perform wildcard scan".into(),
67 )
68 })?;
69 translation::expression::full_table_scan_filter(field_id)
70 }
71 };
72
73 let row_ids = table.table.filter_row_ids(&filter_expr)?;
75 if row_ids.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80 if visible_row_ids.is_empty() {
81 return Ok(Vec::new());
82 }
83
84 let table_id = table.table.table_id();
86
87 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88 let mut logical_fields: Vec<LogicalFieldId> =
89 Vec::with_capacity(table.schema.columns.len());
90
91 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93 for column in &table.schema.columns {
94 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95 logical_fields.push(logical_field_id);
96 let field = mvcc::build_field_with_metadata(
97 &column.name,
98 column.data_type.clone(),
99 column.nullable,
100 column.field_id,
101 );
102 fields.push(field);
103 }
104
105 let schema = Arc::new(Schema::new(fields));
106
107 if logical_fields.is_empty() {
108 let mut row_id_builder =
110 UInt64Builder::with_capacity(visible_row_ids.cardinality() as usize);
111 for row_id in visible_row_ids.iter() {
112 row_id_builder.append_value(row_id);
113 }
114 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
115 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
116 return Ok(vec![batch]);
117 }
118
119 let mut stream = table.table.stream_columns(
120 Arc::from(logical_fields),
121 &visible_row_ids,
122 GatherNullPolicy::IncludeNulls,
123 )?;
124
125 let mut batches = Vec::new();
126 while let Some(chunk) = stream.next_batch()? {
127 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
128
129 let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
130 for row_id in chunk.row_ids() {
131 row_id_builder.append_value(*row_id);
132 }
133 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
134
135 let chunk_batch = chunk.into_batch();
136 for column_array in chunk_batch.columns() {
137 arrays.push(column_array.clone());
138 }
139
140 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
141 batches.push(batch);
142 }
143
144 Ok(batches)
145 }
146
147 pub(crate) fn append_batches_with_row_ids(
151 &self,
152 table_name: &str,
153 batches: Vec<RecordBatch>,
154 ) -> Result<usize> {
155 let (_, canonical_name) = canonical_table_name(table_name)?;
156 let table = self.lookup_table(&canonical_name)?;
157
158 let mut total_rows = 0;
159 for batch in batches {
160 if batch.num_rows() == 0 {
161 continue;
162 }
163
164 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
166 Error::InvalidArgumentError(
167 "batch must contain row_id column for direct append".into(),
168 )
169 })?;
170
171 table.table.append(&batch)?;
173 total_rows += batch.num_rows();
174 }
175
176 Ok(total_rows)
177 }
178
179 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
182 {
184 let tables = self.tables.read().unwrap();
185 if let Some(table) = tables.get(canonical_name) {
186 if self.dropped_tables.read().unwrap().contains(canonical_name) {
188 return Err(Error::NotFound);
190 }
191 tracing::trace!(
192 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
193 canonical_name,
194 table.table.table_id(),
195 table.schema.columns.len(),
196 &*self.pager
197 );
198 return Ok(Arc::clone(table));
199 }
200 } tracing::debug!(
204 "[LAZY_LOAD] Loading table '{}' from catalog",
205 canonical_name
206 );
207
208 tracing::debug!(
210 "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
211 canonical_name,
212 &*self.catalog
213 );
214 let catalog_table_id = match self.catalog.table_id(canonical_name) {
215 Some(id) => {
216 tracing::debug!(
217 "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
218 canonical_name,
219 id
220 );
221 id
222 }
223 None => {
224 tracing::debug!(
225 "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
226 canonical_name,
227 &*self.catalog
228 );
229 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
231 tracing::debug!(
232 "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
233 canonical_name
234 );
235 return fallback.lookup_table(canonical_name);
236 }
237 return Err(Error::InvalidArgumentError(format!(
238 "unknown table '{}'",
239 canonical_name
240 )));
241 }
242 };
243
244 let table_id = catalog_table_id;
245
246 let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
248 Ok(t) => t,
249 Err(e) => {
250 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
252 tracing::debug!(
253 "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
254 canonical_name,
255 e
256 );
257 return fallback.lookup_table(canonical_name);
258 }
259 return Err(e);
260 }
261 };
262 let store = table.store();
263 let mut logical_fields = store.user_field_ids_for_table(table_id);
264 logical_fields.sort_by_key(|lfid| lfid.field_id());
265 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
266 let summary = self
267 .catalog_service
268 .table_constraint_summary(canonical_name)?;
269 let TableConstraintSummaryView {
270 table_meta,
271 column_metas,
272 constraint_records,
273 multi_column_uniques,
274 constraint_names: _constraint_names,
275 } = summary;
276
277 let _table_meta = match table_meta {
280 Some(meta) => meta,
281 None => {
282 if let Some(fallback) = self.fallback_lookup.read().unwrap().as_ref() {
283 tracing::debug!(
284 "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
285 canonical_name
286 );
287 return fallback.lookup_table(canonical_name);
288 }
289 return Err(Error::InvalidArgumentError(format!(
290 "unknown table '{}'",
291 canonical_name
292 )));
293 }
294 };
295 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
296 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
297 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
298 let mut has_primary_key_records = false;
299 let mut has_single_unique_records = false;
300
301 for record in constraint_records
302 .iter()
303 .filter(|record| record.is_active())
304 {
305 match &record.kind {
306 ConstraintKind::PrimaryKey(pk) => {
307 has_primary_key_records = true;
308 for field_id in &pk.field_ids {
309 metadata_primary_keys.insert(*field_id);
310 metadata_unique_fields.insert(*field_id);
311 }
312 }
313 ConstraintKind::Unique(unique) => {
314 if unique.field_ids.len() == 1 {
315 has_single_unique_records = true;
316 metadata_unique_fields.insert(unique.field_ids[0]);
317 }
318 }
319 _ => {}
320 }
321 }
322
323 let mut executor_columns = Vec::new();
325 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
326
327 for (idx, lfid) in logical_fields.iter().enumerate() {
328 let field_id = lfid.field_id();
329 let normalized_index = executor_columns.len();
330
331 let column_name = column_metas
332 .get(idx)
333 .and_then(|meta| meta.as_ref())
334 .and_then(|meta| meta.name.clone())
335 .unwrap_or_else(|| format!("col_{}", field_id));
336
337 let normalized = column_name.to_ascii_lowercase();
338 lookup.insert(normalized, normalized_index);
339
340 let fallback_constraints: FieldConstraints = catalog_field_resolver
341 .as_ref()
342 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
343 .unwrap_or_default();
344
345 let metadata_primary = metadata_primary_keys.contains(&field_id);
346 let primary_key = if has_primary_key_records {
347 metadata_primary
348 } else {
349 fallback_constraints.primary_key
350 };
351
352 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
353 let unique = if has_primary_key_records || has_single_unique_records {
354 metadata_unique
355 } else {
356 fallback_constraints.primary_key || fallback_constraints.unique
357 };
358
359 let data_type = store.data_type(*lfid)?;
360 let nullable = !primary_key;
361
362 executor_columns.push(ExecutorColumn {
363 name: column_name,
364 data_type,
365 nullable,
366 primary_key,
367 unique,
368 field_id,
369 check_expr: fallback_constraints.check_expr.clone(),
370 });
371 }
372
373 let exec_schema = Arc::new(ExecutorSchema {
374 columns: executor_columns,
375 lookup,
376 });
377
378 let max_row_id = {
380 use arrow::array::UInt64Array;
381 use llkv_column_map::store::rowid_fid;
382 use llkv_column_map::store::scan::{
383 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
384 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
385 };
386
387 struct MaxRowIdVisitor {
388 max: RowId,
389 }
390
391 impl PrimitiveVisitor for MaxRowIdVisitor {
392 fn u64_chunk(&mut self, values: &UInt64Array) {
393 for i in 0..values.len() {
394 let val = values.value(i);
395 if val > self.max {
396 self.max = val;
397 }
398 }
399 }
400 }
401
402 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
403 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
404 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
405
406 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
408 let mut visitor = MaxRowIdVisitor { max: 0 };
409
410 match ScanBuilder::new(table.store(), row_id_field)
411 .options(ScanOptions::default())
412 .run(&mut visitor)
413 {
414 Ok(_) => visitor.max,
415 Err(llkv_result::Error::NotFound) => 0,
416 Err(e) => {
417 tracing::warn!(
418 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
419 canonical_name,
420 e
421 );
422 0
423 }
424 }
425 };
426
427 let next_row_id = if max_row_id > 0 {
428 max_row_id.saturating_add(1)
429 } else {
430 0
431 };
432
433 let total_rows = table.total_rows().unwrap_or(0);
437
438 let executor_table = Arc::new(ExecutorTable {
439 table: Arc::new(table),
440 schema: exec_schema,
441 next_row_id: AtomicU64::new(next_row_id),
442 total_rows: AtomicU64::new(total_rows),
443 multi_column_uniques: RwLock::new(Vec::new()),
444 });
445
446 if !multi_column_uniques.is_empty() {
447 let executor_uniques =
448 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
449 executor_table.set_multi_column_uniques(executor_uniques);
450 }
451
452 {
454 let mut tables = self.tables.write().unwrap();
455 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
456 }
457
458 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
460 for col in &executor_table.schema.columns {
461 let definition = FieldDefinition::new(&col.name)
462 .with_primary_key(col.primary_key)
463 .with_unique(col.unique)
464 .with_check_expr(col.check_expr.clone());
465 let _ = field_resolver.register_field(definition); }
467 tracing::debug!(
468 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
469 executor_table.schema.columns.len(),
470 canonical_name
471 );
472 }
473
474 tracing::debug!(
475 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
476 canonical_name,
477 table_id,
478 field_ids.len(),
479 next_row_id
480 );
481
482 Ok(executor_table)
483 }
484
485 pub(super) fn build_executor_multi_column_uniques(
486 table: &ExecutorTable<P>,
487 stored: &[MultiColumnIndexEntryMeta],
488 ) -> Vec<ExecutorMultiColumnUnique> {
489 let mut results = Vec::with_capacity(stored.len());
490
491 'outer: for entry in stored {
492 if entry.column_ids.is_empty() {
493 continue;
494 }
495
496 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
497 for field_id in &entry.column_ids {
498 if let Some((idx, _)) = table
499 .schema
500 .columns
501 .iter()
502 .enumerate()
503 .find(|(_, col)| &col.field_id == field_id)
504 {
505 column_indices.push(idx);
506 } else {
507 tracing::warn!(
508 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
509 entry.index_name,
510 table.table.table_id(),
511 field_id
512 );
513 continue 'outer;
514 }
515 }
516
517 results.push(ExecutorMultiColumnUnique {
518 index_name: entry.index_name.clone(),
519 column_indices,
520 });
521 }
522
523 results
524 }
525
526 pub(super) fn rebuild_executor_table_with_unique(
527 table: &ExecutorTable<P>,
528 field_id: FieldId,
529 ) -> Option<Arc<ExecutorTable<P>>> {
530 let mut columns = table.schema.columns.clone();
531 let mut found = false;
532 for column in &mut columns {
533 if column.field_id == field_id {
534 column.unique = true;
535 found = true;
536 break;
537 }
538 }
539 if !found {
540 return None;
541 }
542
543 let schema = Arc::new(ExecutorSchema {
544 columns,
545 lookup: table.schema.lookup.clone(),
546 });
547
548 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
549 let total_rows = table.total_rows.load(Ordering::SeqCst);
550 let uniques = table.multi_column_uniques();
551
552 Some(Arc::new(ExecutorTable {
553 table: Arc::clone(&table.table),
554 schema,
555 next_row_id: AtomicU64::new(next_row_id),
556 total_rows: AtomicU64::new(total_rows),
557 multi_column_uniques: RwLock::new(uniques),
558 }))
559 }
560}