llkv_runtime/runtime_context/
table_access.rs1use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_column_map::types::LogicalFieldId;
13use llkv_executor::{
14 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
15 translation,
16};
17use llkv_result::{Error, Result};
18use llkv_storage::pager::Pager;
19use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
20use llkv_table::{
21 ConstraintKind, FieldId, MultiColumnIndexEntryMeta, RowId, Table, TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27 Arc, RwLock,
28 atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35 pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39 handle.lazy()?.collect_rows()
40 }
41
42 pub(crate) fn get_batches_with_row_ids(
46 &self,
47 table_name: &str,
48 filter: Option<llkv_expr::Expr<'static, String>>,
49 snapshot: TransactionSnapshot,
50 ) -> Result<Vec<RecordBatch>> {
51 let (_, canonical_name) = canonical_table_name(table_name)?;
52 let table = self.lookup_table(&canonical_name)?;
53
54 let filter_expr = match filter {
55 Some(expr) => {
56 translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57 Error::InvalidArgumentError(format!(
58 "Binder Error: does not have a column named '{}'",
59 name
60 ))
61 })?
62 }
63 None => {
64 let field_id = table.schema.first_field_id().ok_or_else(|| {
65 Error::InvalidArgumentError(
66 "table has no columns; cannot perform wildcard scan".into(),
67 )
68 })?;
69 translation::expression::full_table_scan_filter(field_id)
70 }
71 };
72
73 let row_ids = table.table.filter_row_ids(&filter_expr)?;
75 if row_ids.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80 if visible_row_ids.is_empty() {
81 return Ok(Vec::new());
82 }
83
84 let table_id = table.table.table_id();
86
87 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88 let mut logical_fields: Vec<LogicalFieldId> =
89 Vec::with_capacity(table.schema.columns.len());
90
91 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93 for column in &table.schema.columns {
94 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95 logical_fields.push(logical_field_id);
96 let field = mvcc::build_field_with_metadata(
97 &column.name,
98 column.data_type.clone(),
99 column.nullable,
100 column.field_id,
101 );
102 fields.push(field);
103 }
104
105 let schema = Arc::new(Schema::new(fields));
106
107 if logical_fields.is_empty() {
108 let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
110 for row_id in &visible_row_ids {
111 row_id_builder.append_value(*row_id);
112 }
113 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
114 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
115 return Ok(vec![batch]);
116 }
117
118 let mut stream = table.table.stream_columns(
119 Arc::from(logical_fields),
120 visible_row_ids,
121 GatherNullPolicy::IncludeNulls,
122 )?;
123
124 let mut batches = Vec::new();
125 while let Some(chunk) = stream.next_batch()? {
126 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
127
128 let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
129 for row_id in chunk.row_ids() {
130 row_id_builder.append_value(*row_id);
131 }
132 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
133
134 let chunk_batch = chunk.into_batch();
135 for column_array in chunk_batch.columns() {
136 arrays.push(column_array.clone());
137 }
138
139 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
140 batches.push(batch);
141 }
142
143 Ok(batches)
144 }
145
146 pub(crate) fn append_batches_with_row_ids(
150 &self,
151 table_name: &str,
152 batches: Vec<RecordBatch>,
153 ) -> Result<usize> {
154 let (_, canonical_name) = canonical_table_name(table_name)?;
155 let table = self.lookup_table(&canonical_name)?;
156
157 let mut total_rows = 0;
158 for batch in batches {
159 if batch.num_rows() == 0 {
160 continue;
161 }
162
163 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
165 Error::InvalidArgumentError(
166 "batch must contain row_id column for direct append".into(),
167 )
168 })?;
169
170 table.table.append(&batch)?;
172 total_rows += batch.num_rows();
173 }
174
175 Ok(total_rows)
176 }
177
178 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
181 {
183 let tables = self.tables.read().unwrap();
184 if let Some(table) = tables.get(canonical_name) {
185 if self.dropped_tables.read().unwrap().contains(canonical_name) {
187 return Err(Error::NotFound);
189 }
190 tracing::trace!(
191 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
192 canonical_name,
193 table.table.table_id(),
194 table.schema.columns.len(),
195 &*self.pager
196 );
197 return Ok(Arc::clone(table));
198 }
199 } tracing::debug!(
203 "[LAZY_LOAD] Loading table '{}' from catalog",
204 canonical_name
205 );
206
207 tracing::debug!(
209 "[CATALOG_LOOKUP] Looking up table '{}' in catalog @ {:p}",
210 canonical_name,
211 &*self.catalog
212 );
213 let catalog_table_id = match self.catalog.table_id(canonical_name) {
214 Some(id) => {
215 tracing::debug!(
216 "[CATALOG_LOOKUP] Found table '{}' with id={} in catalog",
217 canonical_name,
218 id
219 );
220 id
221 }
222 None => {
223 tracing::debug!(
224 "[CATALOG_LOOKUP] Table '{}' NOT FOUND in catalog @ {:p}",
225 canonical_name,
226 &*self.catalog
227 );
228 if let Some(fallback) = &self.fallback_lookup {
230 tracing::debug!(
231 "[LAZY_LOAD] Table '{}' not found in catalog, trying fallback context",
232 canonical_name
233 );
234 return fallback.lookup_table(canonical_name);
235 }
236 return Err(Error::InvalidArgumentError(format!(
237 "unknown table '{}'",
238 canonical_name
239 )));
240 }
241 };
242
243 let table_id = catalog_table_id;
244
245 let table = match Table::from_id_and_store(table_id, Arc::clone(&self.store)) {
247 Ok(t) => t,
248 Err(e) => {
249 if let Some(fallback) = &self.fallback_lookup {
251 tracing::debug!(
252 "[LAZY_LOAD] Table '{}' found in catalog but not in store ({}), trying fallback context",
253 canonical_name,
254 e
255 );
256 return fallback.lookup_table(canonical_name);
257 }
258 return Err(e);
259 }
260 };
261 let store = table.store();
262 let mut logical_fields = store.user_field_ids_for_table(table_id);
263 logical_fields.sort_by_key(|lfid| lfid.field_id());
264 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
265 let summary = self
266 .catalog_service
267 .table_constraint_summary(canonical_name)?;
268 let TableConstraintSummaryView {
269 table_meta,
270 column_metas,
271 constraint_records,
272 multi_column_uniques,
273 } = summary;
274
275 let _table_meta = match table_meta {
278 Some(meta) => meta,
279 None => {
280 if let Some(fallback) = &self.fallback_lookup {
281 tracing::debug!(
282 "[LAZY_LOAD] Table '{}' metadata not found, trying fallback context",
283 canonical_name
284 );
285 return fallback.lookup_table(canonical_name);
286 }
287 return Err(Error::InvalidArgumentError(format!(
288 "unknown table '{}'",
289 canonical_name
290 )));
291 }
292 };
293 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
294 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
295 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
296 let mut has_primary_key_records = false;
297 let mut has_single_unique_records = false;
298
299 for record in constraint_records
300 .iter()
301 .filter(|record| record.is_active())
302 {
303 match &record.kind {
304 ConstraintKind::PrimaryKey(pk) => {
305 has_primary_key_records = true;
306 for field_id in &pk.field_ids {
307 metadata_primary_keys.insert(*field_id);
308 metadata_unique_fields.insert(*field_id);
309 }
310 }
311 ConstraintKind::Unique(unique) => {
312 if unique.field_ids.len() == 1 {
313 has_single_unique_records = true;
314 metadata_unique_fields.insert(unique.field_ids[0]);
315 }
316 }
317 _ => {}
318 }
319 }
320
321 let mut executor_columns = Vec::new();
323 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
324
325 for (idx, lfid) in logical_fields.iter().enumerate() {
326 let field_id = lfid.field_id();
327 let normalized_index = executor_columns.len();
328
329 let column_name = column_metas
330 .get(idx)
331 .and_then(|meta| meta.as_ref())
332 .and_then(|meta| meta.name.clone())
333 .unwrap_or_else(|| format!("col_{}", field_id));
334
335 let normalized = column_name.to_ascii_lowercase();
336 lookup.insert(normalized, normalized_index);
337
338 let fallback_constraints: FieldConstraints = catalog_field_resolver
339 .as_ref()
340 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
341 .unwrap_or_default();
342
343 let metadata_primary = metadata_primary_keys.contains(&field_id);
344 let primary_key = if has_primary_key_records {
345 metadata_primary
346 } else {
347 fallback_constraints.primary_key
348 };
349
350 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
351 let unique = if has_primary_key_records || has_single_unique_records {
352 metadata_unique
353 } else {
354 fallback_constraints.primary_key || fallback_constraints.unique
355 };
356
357 let data_type = store.data_type(*lfid)?;
358 let nullable = !primary_key;
359
360 executor_columns.push(ExecutorColumn {
361 name: column_name,
362 data_type,
363 nullable,
364 primary_key,
365 unique,
366 field_id,
367 check_expr: fallback_constraints.check_expr.clone(),
368 });
369 }
370
371 let exec_schema = Arc::new(ExecutorSchema {
372 columns: executor_columns,
373 lookup,
374 });
375
376 let max_row_id = {
378 use arrow::array::UInt64Array;
379 use llkv_column_map::store::rowid_fid;
380 use llkv_column_map::store::scan::{
381 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
382 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
383 };
384
385 struct MaxRowIdVisitor {
386 max: RowId,
387 }
388
389 impl PrimitiveVisitor for MaxRowIdVisitor {
390 fn u64_chunk(&mut self, values: &UInt64Array) {
391 for i in 0..values.len() {
392 let val = values.value(i);
393 if val > self.max {
394 self.max = val;
395 }
396 }
397 }
398 }
399
400 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
401 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
402 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
403
404 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
406 let mut visitor = MaxRowIdVisitor { max: 0 };
407
408 match ScanBuilder::new(table.store(), row_id_field)
409 .options(ScanOptions::default())
410 .run(&mut visitor)
411 {
412 Ok(_) => visitor.max,
413 Err(llkv_result::Error::NotFound) => 0,
414 Err(e) => {
415 tracing::warn!(
416 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
417 canonical_name,
418 e
419 );
420 0
421 }
422 }
423 };
424
425 let next_row_id = if max_row_id > 0 {
426 max_row_id.saturating_add(1)
427 } else {
428 0
429 };
430
431 let total_rows = table.total_rows().unwrap_or(0);
435
436 let executor_table = Arc::new(ExecutorTable {
437 table: Arc::new(table),
438 schema: exec_schema,
439 next_row_id: AtomicU64::new(next_row_id),
440 total_rows: AtomicU64::new(total_rows),
441 multi_column_uniques: RwLock::new(Vec::new()),
442 });
443
444 if !multi_column_uniques.is_empty() {
445 let executor_uniques =
446 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
447 executor_table.set_multi_column_uniques(executor_uniques);
448 }
449
450 {
452 let mut tables = self.tables.write().unwrap();
453 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
454 }
455
456 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
458 for col in &executor_table.schema.columns {
459 let definition = FieldDefinition::new(&col.name)
460 .with_primary_key(col.primary_key)
461 .with_unique(col.unique)
462 .with_check_expr(col.check_expr.clone());
463 let _ = field_resolver.register_field(definition); }
465 tracing::debug!(
466 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
467 executor_table.schema.columns.len(),
468 canonical_name
469 );
470 }
471
472 tracing::debug!(
473 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
474 canonical_name,
475 table_id,
476 field_ids.len(),
477 next_row_id
478 );
479
480 Ok(executor_table)
481 }
482
483 pub(super) fn build_executor_multi_column_uniques(
484 table: &ExecutorTable<P>,
485 stored: &[MultiColumnIndexEntryMeta],
486 ) -> Vec<ExecutorMultiColumnUnique> {
487 let mut results = Vec::with_capacity(stored.len());
488
489 'outer: for entry in stored {
490 if entry.column_ids.is_empty() {
491 continue;
492 }
493
494 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
495 for field_id in &entry.column_ids {
496 if let Some((idx, _)) = table
497 .schema
498 .columns
499 .iter()
500 .enumerate()
501 .find(|(_, col)| &col.field_id == field_id)
502 {
503 column_indices.push(idx);
504 } else {
505 tracing::warn!(
506 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
507 entry.index_name,
508 table.table.table_id(),
509 field_id
510 );
511 continue 'outer;
512 }
513 }
514
515 results.push(ExecutorMultiColumnUnique {
516 index_name: entry.index_name.clone(),
517 column_indices,
518 });
519 }
520
521 results
522 }
523
524 pub(super) fn rebuild_executor_table_with_unique(
525 table: &ExecutorTable<P>,
526 field_id: FieldId,
527 ) -> Option<Arc<ExecutorTable<P>>> {
528 let mut columns = table.schema.columns.clone();
529 let mut found = false;
530 for column in &mut columns {
531 if column.field_id == field_id {
532 column.unique = true;
533 found = true;
534 break;
535 }
536 }
537 if !found {
538 return None;
539 }
540
541 let schema = Arc::new(ExecutorSchema {
542 columns,
543 lookup: table.schema.lookup.clone(),
544 });
545
546 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
547 let total_rows = table.total_rows.load(Ordering::SeqCst);
548 let uniques = table.multi_column_uniques();
549
550 Some(Arc::new(ExecutorTable {
551 table: Arc::clone(&table.table),
552 schema,
553 next_row_id: AtomicU64::new(next_row_id),
554 total_rows: AtomicU64::new(total_rows),
555 multi_column_uniques: RwLock::new(uniques),
556 }))
557 }
558}