llkv_runtime/runtime_context/
table_access.rs1use crate::{RuntimeContext, RuntimeTableHandle, canonical_table_name};
9use arrow::array::{ArrayRef, RecordBatch, UInt64Builder};
10use arrow::datatypes::{DataType, Field, Schema};
11use llkv_column_map::store::{GatherNullPolicy, ROW_ID_COLUMN_NAME};
12use llkv_column_map::types::LogicalFieldId;
13use llkv_executor::{
14 ExecutorColumn, ExecutorMultiColumnUnique, ExecutorRowBatch, ExecutorSchema, ExecutorTable,
15 translation,
16};
17use llkv_result::{Error, Result};
18use llkv_storage::pager::Pager;
19use llkv_table::resolvers::{FieldConstraints, FieldDefinition};
20use llkv_table::{
21 ConstraintKind, FieldId, MultiColumnUniqueEntryMeta, RowId, Table, TableConstraintSummaryView,
22};
23use llkv_transaction::{TransactionSnapshot, mvcc};
24use rustc_hash::{FxHashMap, FxHashSet};
25use simd_r_drive_entry_handle::EntryHandle;
26use std::sync::{
27 Arc, RwLock,
28 atomic::{AtomicU64, Ordering},
29};
30
31impl<P> RuntimeContext<P>
32where
33 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
34{
35 pub(crate) fn export_table_rows(self: &Arc<Self>, name: &str) -> Result<ExecutorRowBatch> {
38 let handle = RuntimeTableHandle::new(Arc::clone(self), name)?;
39 handle.lazy()?.collect_rows()
40 }
41
42 pub(crate) fn get_batches_with_row_ids(
46 &self,
47 table_name: &str,
48 filter: Option<llkv_expr::Expr<'static, String>>,
49 snapshot: TransactionSnapshot,
50 ) -> Result<Vec<RecordBatch>> {
51 let (_, canonical_name) = canonical_table_name(table_name)?;
52 let table = self.lookup_table(&canonical_name)?;
53
54 let filter_expr = match filter {
55 Some(expr) => {
56 translation::expression::translate_predicate(expr, table.schema.as_ref(), |name| {
57 Error::InvalidArgumentError(format!(
58 "Binder Error: does not have a column named '{}'",
59 name
60 ))
61 })?
62 }
63 None => {
64 let field_id = table.schema.first_field_id().ok_or_else(|| {
65 Error::InvalidArgumentError(
66 "table has no columns; cannot perform wildcard scan".into(),
67 )
68 })?;
69 translation::expression::full_table_scan_filter(field_id)
70 }
71 };
72
73 let row_ids = table.table.filter_row_ids(&filter_expr)?;
75 if row_ids.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let visible_row_ids = self.filter_visible_row_ids(table.as_ref(), row_ids, snapshot)?;
80 if visible_row_ids.is_empty() {
81 return Ok(Vec::new());
82 }
83
84 let table_id = table.table.table_id();
86
87 let mut fields: Vec<Field> = Vec::with_capacity(table.schema.columns.len() + 1);
88 let mut logical_fields: Vec<LogicalFieldId> =
89 Vec::with_capacity(table.schema.columns.len());
90
91 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
92
93 for column in &table.schema.columns {
94 let logical_field_id = LogicalFieldId::for_user(table_id, column.field_id);
95 logical_fields.push(logical_field_id);
96 let field = mvcc::build_field_with_metadata(
97 &column.name,
98 column.data_type.clone(),
99 column.nullable,
100 column.field_id,
101 );
102 fields.push(field);
103 }
104
105 let schema = Arc::new(Schema::new(fields));
106
107 if logical_fields.is_empty() {
108 let mut row_id_builder = UInt64Builder::with_capacity(visible_row_ids.len());
110 for row_id in &visible_row_ids {
111 row_id_builder.append_value(*row_id);
112 }
113 let arrays: Vec<ArrayRef> = vec![Arc::new(row_id_builder.finish()) as ArrayRef];
114 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
115 return Ok(vec![batch]);
116 }
117
118 let mut stream = table.table.stream_columns(
119 Arc::from(logical_fields),
120 visible_row_ids,
121 GatherNullPolicy::IncludeNulls,
122 )?;
123
124 let mut batches = Vec::new();
125 while let Some(chunk) = stream.next_batch()? {
126 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(chunk.batch().num_columns() + 1);
127
128 let mut row_id_builder = UInt64Builder::with_capacity(chunk.len());
129 for row_id in chunk.row_ids() {
130 row_id_builder.append_value(*row_id);
131 }
132 arrays.push(Arc::new(row_id_builder.finish()) as ArrayRef);
133
134 let chunk_batch = chunk.into_batch();
135 for column_array in chunk_batch.columns() {
136 arrays.push(column_array.clone());
137 }
138
139 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
140 batches.push(batch);
141 }
142
143 Ok(batches)
144 }
145
146 pub(crate) fn append_batches_with_row_ids(
150 &self,
151 table_name: &str,
152 batches: Vec<RecordBatch>,
153 ) -> Result<usize> {
154 let (_, canonical_name) = canonical_table_name(table_name)?;
155 let table = self.lookup_table(&canonical_name)?;
156
157 let mut total_rows = 0;
158 for batch in batches {
159 if batch.num_rows() == 0 {
160 continue;
161 }
162
163 let _row_id_idx = batch.schema().index_of(ROW_ID_COLUMN_NAME).map_err(|_| {
165 Error::InvalidArgumentError(
166 "batch must contain row_id column for direct append".into(),
167 )
168 })?;
169
170 table.table.append(&batch)?;
172 total_rows += batch.num_rows();
173 }
174
175 Ok(total_rows)
176 }
177
178 pub fn lookup_table(&self, canonical_name: &str) -> Result<Arc<ExecutorTable<P>>> {
181 {
183 let tables = self.tables.read().unwrap();
184 if let Some(table) = tables.get(canonical_name) {
185 if self.dropped_tables.read().unwrap().contains(canonical_name) {
187 return Err(Error::NotFound);
189 }
190 tracing::trace!(
191 "=== LOOKUP_TABLE '{}' (cached) table_id={} columns={} context_pager={:p} ===",
192 canonical_name,
193 table.table.table_id(),
194 table.schema.columns.len(),
195 &*self.pager
196 );
197 return Ok(Arc::clone(table));
198 }
199 } tracing::debug!(
203 "[LAZY_LOAD] Loading table '{}' from catalog",
204 canonical_name
205 );
206
207 let catalog_table_id = self.catalog.table_id(canonical_name).ok_or_else(|| {
209 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
210 })?;
211
212 let table_id = catalog_table_id;
213 let table = Table::from_id_and_store(table_id, Arc::clone(&self.store))?;
214 let store = table.store();
215 let mut logical_fields = store.user_field_ids_for_table(table_id);
216 logical_fields.sort_by_key(|lfid| lfid.field_id());
217 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
218 let summary = self
219 .catalog_service
220 .table_constraint_summary(canonical_name)?;
221 let TableConstraintSummaryView {
222 table_meta,
223 column_metas,
224 constraint_records,
225 multi_column_uniques,
226 } = summary;
227 let _table_meta = table_meta.ok_or_else(|| {
228 Error::InvalidArgumentError(format!("unknown table '{}'", canonical_name))
229 })?;
230 let catalog_field_resolver = self.catalog.field_resolver(catalog_table_id);
231 let mut metadata_primary_keys: FxHashSet<FieldId> = FxHashSet::default();
232 let mut metadata_unique_fields: FxHashSet<FieldId> = FxHashSet::default();
233 let mut has_primary_key_records = false;
234 let mut has_single_unique_records = false;
235
236 for record in constraint_records
237 .iter()
238 .filter(|record| record.is_active())
239 {
240 match &record.kind {
241 ConstraintKind::PrimaryKey(pk) => {
242 has_primary_key_records = true;
243 for field_id in &pk.field_ids {
244 metadata_primary_keys.insert(*field_id);
245 metadata_unique_fields.insert(*field_id);
246 }
247 }
248 ConstraintKind::Unique(unique) => {
249 if unique.field_ids.len() == 1 {
250 has_single_unique_records = true;
251 metadata_unique_fields.insert(unique.field_ids[0]);
252 }
253 }
254 _ => {}
255 }
256 }
257
258 let mut executor_columns = Vec::new();
260 let mut lookup = FxHashMap::with_capacity_and_hasher(field_ids.len(), Default::default());
261
262 for (idx, lfid) in logical_fields.iter().enumerate() {
263 let field_id = lfid.field_id();
264 let normalized_index = executor_columns.len();
265
266 let column_name = column_metas
267 .get(idx)
268 .and_then(|meta| meta.as_ref())
269 .and_then(|meta| meta.name.clone())
270 .unwrap_or_else(|| format!("col_{}", field_id));
271
272 let normalized = column_name.to_ascii_lowercase();
273 lookup.insert(normalized, normalized_index);
274
275 let fallback_constraints: FieldConstraints = catalog_field_resolver
276 .as_ref()
277 .and_then(|resolver| resolver.field_constraints_by_name(&column_name))
278 .unwrap_or_default();
279
280 let metadata_primary = metadata_primary_keys.contains(&field_id);
281 let primary_key = if has_primary_key_records {
282 metadata_primary
283 } else {
284 fallback_constraints.primary_key
285 };
286
287 let metadata_unique = metadata_primary || metadata_unique_fields.contains(&field_id);
288 let unique = if has_primary_key_records || has_single_unique_records {
289 metadata_unique
290 } else {
291 fallback_constraints.primary_key || fallback_constraints.unique
292 };
293
294 let data_type = store.data_type(*lfid)?;
295 let nullable = !primary_key;
296
297 executor_columns.push(ExecutorColumn {
298 name: column_name,
299 data_type,
300 nullable,
301 primary_key,
302 unique,
303 field_id,
304 check_expr: fallback_constraints.check_expr.clone(),
305 });
306 }
307
308 let exec_schema = Arc::new(ExecutorSchema {
309 columns: executor_columns,
310 lookup,
311 });
312
313 let max_row_id = {
315 use arrow::array::UInt64Array;
316 use llkv_column_map::store::rowid_fid;
317 use llkv_column_map::store::scan::{
318 PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
319 PrimitiveWithRowIdsVisitor, ScanBuilder, ScanOptions,
320 };
321
322 struct MaxRowIdVisitor {
323 max: RowId,
324 }
325
326 impl PrimitiveVisitor for MaxRowIdVisitor {
327 fn u64_chunk(&mut self, values: &UInt64Array) {
328 for i in 0..values.len() {
329 let val = values.value(i);
330 if val > self.max {
331 self.max = val;
332 }
333 }
334 }
335 }
336
337 impl PrimitiveWithRowIdsVisitor for MaxRowIdVisitor {}
338 impl PrimitiveSortedVisitor for MaxRowIdVisitor {}
339 impl PrimitiveSortedWithRowIdsVisitor for MaxRowIdVisitor {}
340
341 let row_id_field = rowid_fid(LogicalFieldId::for_user(table_id, 1));
343 let mut visitor = MaxRowIdVisitor { max: 0 };
344
345 match ScanBuilder::new(table.store(), row_id_field)
346 .options(ScanOptions::default())
347 .run(&mut visitor)
348 {
349 Ok(_) => visitor.max,
350 Err(llkv_result::Error::NotFound) => 0,
351 Err(e) => {
352 tracing::warn!(
353 "[LAZY_LOAD] Failed to scan max row_id for table '{}': {}",
354 canonical_name,
355 e
356 );
357 0
358 }
359 }
360 };
361
362 let next_row_id = if max_row_id > 0 {
363 max_row_id.saturating_add(1)
364 } else {
365 0
366 };
367
368 let total_rows = table.total_rows().unwrap_or(0);
372
373 let executor_table = Arc::new(ExecutorTable {
374 table: Arc::new(table),
375 schema: exec_schema,
376 next_row_id: AtomicU64::new(next_row_id),
377 total_rows: AtomicU64::new(total_rows),
378 multi_column_uniques: RwLock::new(Vec::new()),
379 });
380
381 if !multi_column_uniques.is_empty() {
382 let executor_uniques =
383 Self::build_executor_multi_column_uniques(&executor_table, &multi_column_uniques);
384 executor_table.set_multi_column_uniques(executor_uniques);
385 }
386
387 {
389 let mut tables = self.tables.write().unwrap();
390 tables.insert(canonical_name.to_string(), Arc::clone(&executor_table));
391 }
392
393 if let Some(field_resolver) = self.catalog.field_resolver(catalog_table_id) {
395 for col in &executor_table.schema.columns {
396 let definition = FieldDefinition::new(&col.name)
397 .with_primary_key(col.primary_key)
398 .with_unique(col.unique)
399 .with_check_expr(col.check_expr.clone());
400 let _ = field_resolver.register_field(definition); }
402 tracing::debug!(
403 "[CATALOG] Registered {} field(s) for lazy-loaded table '{}'",
404 executor_table.schema.columns.len(),
405 canonical_name
406 );
407 }
408
409 tracing::debug!(
410 "[LAZY_LOAD] Loaded table '{}' (id={}) with {} columns, next_row_id={}",
411 canonical_name,
412 table_id,
413 field_ids.len(),
414 next_row_id
415 );
416
417 Ok(executor_table)
418 }
419
420 pub(super) fn build_executor_multi_column_uniques(
421 table: &ExecutorTable<P>,
422 stored: &[MultiColumnUniqueEntryMeta],
423 ) -> Vec<ExecutorMultiColumnUnique> {
424 let mut results = Vec::with_capacity(stored.len());
425
426 'outer: for entry in stored {
427 if entry.column_ids.is_empty() {
428 continue;
429 }
430
431 let mut column_indices = Vec::with_capacity(entry.column_ids.len());
432 for field_id in &entry.column_ids {
433 if let Some((idx, _)) = table
434 .schema
435 .columns
436 .iter()
437 .enumerate()
438 .find(|(_, col)| &col.field_id == field_id)
439 {
440 column_indices.push(idx);
441 } else {
442 tracing::warn!(
443 "[CATALOG] Skipping persisted multi-column UNIQUE {:?} for table_id={} missing field_id {}",
444 entry.index_name,
445 table.table.table_id(),
446 field_id
447 );
448 continue 'outer;
449 }
450 }
451
452 results.push(ExecutorMultiColumnUnique {
453 index_name: entry.index_name.clone(),
454 column_indices,
455 });
456 }
457
458 results
459 }
460
461 pub(super) fn rebuild_executor_table_with_unique(
462 table: &ExecutorTable<P>,
463 field_id: FieldId,
464 ) -> Option<Arc<ExecutorTable<P>>> {
465 let mut columns = table.schema.columns.clone();
466 let mut found = false;
467 for column in &mut columns {
468 if column.field_id == field_id {
469 column.unique = true;
470 found = true;
471 break;
472 }
473 }
474 if !found {
475 return None;
476 }
477
478 let schema = Arc::new(ExecutorSchema {
479 columns,
480 lookup: table.schema.lookup.clone(),
481 });
482
483 let next_row_id = table.next_row_id.load(Ordering::SeqCst);
484 let total_rows = table.total_rows.load(Ordering::SeqCst);
485 let uniques = table.multi_column_uniques();
486
487 Some(Arc::new(ExecutorTable {
488 table: Arc::clone(&table.table),
489 schema,
490 next_row_id: AtomicU64::new(next_row_id),
491 total_rows: AtomicU64::new(total_rows),
492 multi_column_uniques: RwLock::new(uniques),
493 }))
494 }
495}