1use croaring::Treemap;
2use std::cmp;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use arrow::array::{
7 Array, ArrayRef, OffsetSizeTrait, RecordBatch, StringArray, UInt32Array, UInt64Array,
8};
9use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema, UInt64Type};
10use std::collections::HashMap;
11
12use crate::constants::STREAM_BATCH_ROWS;
13use llkv_column_map::ColumnStore;
14use llkv_column_map::ScanBuilder;
15use llkv_column_map::store::scan::ScanOptions;
16use llkv_column_map::store::scan::filter::{FilterDispatch, FilterPrimitive, Utf8Filter};
17use llkv_column_map::store::{GatherNullPolicy, IndexKind, MultiGatherContext, ROW_ID_COLUMN_NAME};
18use llkv_column_map::{
19 llkv_for_each_arrow_boolean, llkv_for_each_arrow_numeric, llkv_for_each_arrow_string,
20};
21use llkv_storage::pager::{MemPager, Pager};
22use llkv_types::ids::{LogicalFieldId, TableId};
23use simd_r_drive_entry_handle::EntryHandle;
24
25use crate::ROW_ID_FIELD_ID;
26use crate::reserved::is_reserved_table_id;
27use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
28use crate::types::{FieldId, RowId};
29use llkv_compute::analysis::PredicateFusionCache;
30use llkv_compute::program::{OwnedFilter, OwnedOperator, ProgramCompiler};
31use llkv_compute::rowid::RowIdFilter as RowIdBitmapFilter;
32use llkv_expr::literal::FromLiteral;
33use llkv_expr::typed_predicate::{
34 Predicate, PredicateValue, build_bool_predicate, build_fixed_width_predicate,
35 build_var_width_predicate,
36};
37use llkv_expr::{Expr, Operator};
38use llkv_result::{Error, Result as LlkvResult};
39use llkv_scan::execute::execute_scan;
40use llkv_scan::row_stream::{
41 ColumnProjectionInfo, ProjectionEval, RowIdSource, RowStreamBuilder, ScanRowStream,
42};
43pub use llkv_scan::{
44 RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
45 ScanStorage, ScanStreamOptions,
46};
47use rustc_hash::{FxHashMap, FxHashSet};
48use std::ops::Bound;
49
50#[derive(Debug, Clone, Copy)]
53struct MvccColumnCache {
54 has_created_by: bool,
55 has_deleted_by: bool,
56}
57
58pub struct Table<P = MemPager>
61where
62 P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64 store: Arc<ColumnStore<P>>,
65 table_id: TableId,
66 mvcc_cache: RwLock<Option<MvccColumnCache>>,
69}
70
71pub type TableScanStream<'table, P> = ScanRowStream<'table, P, Table<P>>;
72
73impl<P> Table<P>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub fn create_from_columns(
81 display_name: &str,
82 canonical_name: &str,
83 columns: &[llkv_plan::PlanColumnSpec],
84 metadata: Arc<crate::metadata::MetadataManager<P>>,
85 catalog: Arc<crate::catalog::TableCatalog>,
86 store: Arc<ColumnStore<P>>,
87 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
88 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
89 service.create_table_from_columns(display_name, canonical_name, columns)
90 }
91
92 pub fn create_from_schema(
94 display_name: &str,
95 canonical_name: &str,
96 schema: &arrow::datatypes::Schema,
97 metadata: Arc<crate::metadata::MetadataManager<P>>,
98 catalog: Arc<crate::catalog::TableCatalog>,
99 store: Arc<ColumnStore<P>>,
100 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
101 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
102 service.create_table_from_schema(display_name, canonical_name, schema)
103 }
104
105 #[doc(hidden)]
110 pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
111 if is_reserved_table_id(table_id) {
112 return Err(Error::ReservedTableId(table_id));
113 }
114
115 tracing::trace!(
116 "Table::from_id: Opening table_id={} with pager at {:p}",
117 table_id,
118 &*pager
119 );
120 let store = ColumnStore::open(pager)?;
121 Ok(Self {
122 store: Arc::new(store),
123 table_id,
124 mvcc_cache: RwLock::new(None),
125 })
126 }
127
128 #[doc(hidden)]
133 pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
134 if is_reserved_table_id(table_id) {
135 return Err(Error::ReservedTableId(table_id));
136 }
137
138 Ok(Self {
139 store,
140 table_id,
141 mvcc_cache: RwLock::new(None),
142 })
143 }
144
145 pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
147 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
148 self.store
149 .register_index(logical_field_id, IndexKind::Sort)?;
150 Ok(())
151 }
152
153 pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
155 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
156 match self
157 .store
158 .unregister_index(logical_field_id, IndexKind::Sort)
159 {
160 Ok(()) | Err(Error::NotFound) => Ok(()),
161 Err(err) => Err(err),
162 }
163 }
164
165 pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
167 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
168 match self.store.list_persisted_indexes(logical_field_id) {
169 Ok(kinds) => Ok(kinds),
170 Err(Error::NotFound) => Ok(Vec::new()),
171 Err(err) => Err(err),
172 }
173 }
174
175 fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
178 {
180 let cache_read = self.mvcc_cache.read().unwrap();
181 if let Some(cache) = *cache_read {
182 return cache;
183 }
184 }
185
186 let has_created_by = schema
188 .fields()
189 .iter()
190 .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
191 let has_deleted_by = schema
192 .fields()
193 .iter()
194 .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
195
196 let cache = MvccColumnCache {
197 has_created_by,
198 has_deleted_by,
199 };
200
201 *self.mvcc_cache.write().unwrap() = Some(cache);
203
204 cache
205 }
206
207 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
232 use arrow::array::UInt64Builder;
233
234 let cache = self.get_mvcc_cache(&batch.schema());
237 let has_created_by = cache.has_created_by;
238 let has_deleted_by = cache.has_deleted_by;
239
240 let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
241 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
242
243 for (idx, field) in batch.schema().fields().iter().enumerate() {
244 let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
245 if maybe_field_id.is_none()
247 && (field.name() == ROW_ID_COLUMN_NAME
248 || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
249 || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
250 {
251 if field.name() == ROW_ID_COLUMN_NAME {
252 new_fields.push(field.as_ref().clone());
253 new_columns.push(batch.column(idx).clone());
254 } else {
255 let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
256 LogicalFieldId::for_mvcc_created_by(self.table_id)
257 } else {
258 LogicalFieldId::for_mvcc_deleted_by(self.table_id)
259 };
260
261 let mut metadata = field.metadata().clone();
262 let lfid_val: u64 = lfid.into();
263 metadata.insert(
264 crate::constants::FIELD_ID_META_KEY.to_string(),
265 lfid_val.to_string(),
266 );
267
268 let new_field =
269 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
270 .with_metadata(metadata);
271 new_fields.push(new_field);
272 new_columns.push(batch.column(idx).clone());
273 }
274 continue;
275 }
276
277 let raw_field_id = maybe_field_id
278 .ok_or_else(|| {
279 llkv_result::Error::Internal(format!(
280 "Field '{}' is missing a valid '{}' in its metadata.",
281 field.name(),
282 crate::constants::FIELD_ID_META_KEY
283 ))
284 })?
285 .parse::<u64>()
286 .map_err(|err| {
287 llkv_result::Error::Internal(format!(
288 "Field '{}' contains an invalid '{}': {}",
289 field.name(),
290 crate::constants::FIELD_ID_META_KEY,
291 err
292 ))
293 })?;
294
295 if raw_field_id > FieldId::MAX as u64 {
296 return Err(llkv_result::Error::Internal(format!(
297 "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
298 field.name(),
299 FieldId::MAX,
300 raw_field_id
301 )));
302 }
303
304 let user_field_id = raw_field_id as FieldId;
305 let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
306
307 let lfid = logical_field_id;
310 let mut new_metadata = field.metadata().clone();
311 let lfid_val: u64 = lfid.into();
312 new_metadata.insert(
313 crate::constants::FIELD_ID_META_KEY.to_string(),
314 lfid_val.to_string(),
315 );
316
317 let new_field =
318 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
319 .with_metadata(new_metadata);
320 new_fields.push(new_field);
321 new_columns.push(batch.column(idx).clone());
322
323 let need_meta = match self
329 .catalog()
330 .get_cols_meta(self.table_id, &[user_field_id])
331 {
332 metas if metas.is_empty() => true,
333 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
334 };
335
336 if need_meta {
337 let meta = ColMeta {
338 col_id: user_field_id,
339 name: Some(field.name().to_string()),
340 flags: 0,
341 default: None,
342 };
343 self.catalog().put_col_meta(self.table_id, &meta);
344 }
345 }
346
347 const TXN_ID_AUTO_COMMIT: u64 = 1;
352 const TXN_ID_NONE: u64 = 0;
353 let row_count = batch.num_rows();
354
355 if !has_created_by {
356 let mut created_by_builder = UInt64Builder::with_capacity(row_count);
357 for _ in 0..row_count {
358 created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
359 }
360 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
361 let mut metadata = HashMap::new();
362 let lfid_val: u64 = created_by_lfid.into();
363 metadata.insert(
364 crate::constants::FIELD_ID_META_KEY.to_string(),
365 lfid_val.to_string(),
366 );
367 new_fields.push(
368 Field::new(
369 llkv_column_map::store::CREATED_BY_COLUMN_NAME,
370 DataType::UInt64,
371 false,
372 )
373 .with_metadata(metadata),
374 );
375 new_columns.push(Arc::new(created_by_builder.finish()));
376 }
377
378 if !has_deleted_by {
379 let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
380 for _ in 0..row_count {
381 deleted_by_builder.append_value(TXN_ID_NONE);
382 }
383 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
384 let mut metadata = HashMap::new();
385 let lfid_val: u64 = deleted_by_lfid.into();
386 metadata.insert(
387 crate::constants::FIELD_ID_META_KEY.to_string(),
388 lfid_val.to_string(),
389 );
390 new_fields.push(
391 Field::new(
392 llkv_column_map::store::DELETED_BY_COLUMN_NAME,
393 DataType::UInt64,
394 false,
395 )
396 .with_metadata(metadata),
397 );
398 new_columns.push(Arc::new(deleted_by_builder.finish()));
399 }
400
401 let new_schema = Arc::new(Schema::new(new_fields));
402 let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
403
404 tracing::trace!(
405 table_id = self.table_id,
406 num_columns = namespaced_batch.num_columns(),
407 num_rows = namespaced_batch.num_rows(),
408 "Attempting append to table"
409 );
410
411 if let Err(err) = self.store.append(&namespaced_batch) {
412 let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
413 .schema()
414 .fields()
415 .iter()
416 .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
417 .filter_map(|s| s.parse::<u64>().ok())
418 .map(LogicalFieldId::from)
419 .collect();
420
421 let missing_fields: Vec<LogicalFieldId> = batch_field_ids
423 .iter()
424 .filter(|&&field_id| !self.store.has_field(field_id))
425 .copied()
426 .collect();
427
428 tracing::error!(
429 table_id = self.table_id,
430 error = ?err,
431 batch_field_ids = ?batch_field_ids,
432 missing_from_catalog = ?missing_fields,
433 "Append failed - some fields missing from catalog"
434 );
435 return Err(err);
436 }
437 Ok(())
438 }
439
440 pub fn scan_stream<'a, I, T, F>(
448 &self,
449 projections: I,
450 filter_expr: &Expr<'a, FieldId>,
451 options: ScanStreamOptions<P>,
452 on_batch: F,
453 ) -> LlkvResult<()>
454 where
455 I: IntoIterator<Item = T>,
456 T: Into<ScanProjection>,
457 F: FnMut(RecordBatch),
458 {
459 let stream_projections: Vec<ScanProjection> =
460 projections.into_iter().map(|p| p.into()).collect();
461 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
462 }
463
464 pub fn scan_stream_with_exprs<'a, F>(
470 &self,
471 projections: &[ScanProjection],
472 filter_expr: &Expr<'a, FieldId>,
473 options: ScanStreamOptions<P>,
474 on_batch: F,
475 ) -> LlkvResult<()>
476 where
477 F: FnMut(RecordBatch),
478 {
479 let mut cb = on_batch;
480 execute_scan(
481 self,
482 self.table_id,
483 projections,
484 filter_expr,
485 options,
486 &mut cb,
487 )
488 }
489
490 pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Treemap> {
491 let source = self.collect_row_ids_for_table(filter_expr)?;
492 Ok(match source {
493 RowIdSource::Bitmap(b) => b,
494 RowIdSource::Vector(v) => Treemap::from_iter(v),
495 })
496 }
497
498 #[inline]
499 pub fn catalog(&self) -> SysCatalog<'_, P> {
500 SysCatalog::new(&self.store)
501 }
502
503 #[inline]
504 pub fn get_table_meta(&self) -> Option<TableMeta> {
505 self.catalog().get_table_meta(self.table_id)
506 }
507
508 #[inline]
509 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
510 self.catalog().get_cols_meta(self.table_id, col_ids)
511 }
512
513 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
520 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
522 logical_fields.sort_by_key(|lfid| lfid.field_id());
523
524 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
525 let metas = self.get_cols_meta(&field_ids);
526
527 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
528 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
530
531 for (idx, lfid) in logical_fields.into_iter().enumerate() {
532 let fid = lfid.field_id();
533 let dtype = self.store.data_type(lfid)?;
534 let name = metas
535 .get(idx)
536 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
537 .unwrap_or_else(|| format!("col_{}", fid));
538
539 let mut metadata: HashMap<String, String> = HashMap::new();
540 metadata.insert(
541 crate::constants::FIELD_ID_META_KEY.to_string(),
542 fid.to_string(),
543 );
544
545 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
546 }
547
548 Ok(Arc::new(Schema::new(fields)))
549 }
550
551 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
555 let schema = self.schema()?;
556 let fields = schema.fields();
557
558 let mut names: Vec<String> = Vec::with_capacity(fields.len());
559 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
560 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
561
562 for field in fields.iter() {
563 names.push(field.name().to_string());
564 let fid = field
565 .metadata()
566 .get(crate::constants::FIELD_ID_META_KEY)
567 .and_then(|s| s.parse::<u32>().ok())
568 .unwrap_or(0u32);
569 fids.push(fid);
570 dtypes.push(format!("{:?}", field.data_type()));
571 }
572
573 let name_array: ArrayRef = Arc::new(StringArray::from(names));
575 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
576 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
577
578 let rb_schema = Arc::new(Schema::new(vec![
579 Field::new("name", DataType::Utf8, false),
580 Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
581 Field::new("data_type", DataType::Utf8, false),
582 ]));
583
584 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
585 Ok(batch)
586 }
587
588 pub fn stream_columns<'table>(
590 &'table self,
591 logical_fields: impl Into<Arc<[LogicalFieldId]>>,
592 row_ids: impl Into<RowIdSource>,
593 policy: GatherNullPolicy,
594 ) -> LlkvResult<TableScanStream<'table, P>> {
595 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
596 let mut projection_evals = Vec::with_capacity(logical_fields.len());
597 let mut schema_fields = Vec::with_capacity(logical_fields.len());
598 let mut unique_index: FxHashMap<LogicalFieldId, usize> = FxHashMap::default();
599 let mut unique_lfids: Vec<LogicalFieldId> = Vec::new();
600
601 for &lfid in logical_fields.iter() {
602 let dtype = self.store.data_type(lfid)?;
603 if let std::collections::hash_map::Entry::Vacant(entry) = unique_index.entry(lfid) {
604 entry.insert(unique_lfids.len());
605 unique_lfids.push(lfid);
606 }
607
608 let field_name = self
609 .catalog()
610 .get_cols_meta(self.table_id, &[lfid.field_id()])
611 .into_iter()
612 .flatten()
613 .next()
614 .and_then(|meta| meta.name)
615 .unwrap_or_else(|| format!("col_{}", lfid.field_id()));
616
617 projection_evals.push(ProjectionEval::Column(ColumnProjectionInfo {
618 logical_field_id: lfid,
619 data_type: dtype.clone(),
620 output_name: field_name.clone(),
621 }));
622 schema_fields.push(Field::new(field_name, dtype, true));
623 }
624
625 let schema = Arc::new(Schema::new(schema_fields));
626 let passthrough_fields = vec![None; projection_evals.len()];
627 let row_source = row_ids.into();
628
629 RowStreamBuilder::new(
630 self,
631 self.table_id,
632 Arc::clone(&schema),
633 Arc::new(unique_lfids),
634 Arc::new(projection_evals),
635 Arc::new(passthrough_fields),
636 Arc::new(unique_index),
637 Arc::new(FxHashSet::default()),
638 false,
639 policy,
640 row_source,
641 STREAM_BATCH_ROWS,
642 true,
643 )
644 .build()
645 }
646
647 pub fn store(&self) -> &ColumnStore<P> {
648 &self.store
649 }
650
651 #[inline]
652 pub fn table_id(&self) -> TableId {
653 self.table_id
654 }
655
656 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
661 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
662 self.store.total_rows_for_field(lfid)
663 }
664
665 pub fn total_rows(&self) -> llkv_result::Result<u64> {
670 use llkv_column_map::store::rowid_fid;
671 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
672 match self.store.total_rows_for_field(rid_lfid) {
674 Ok(n) => Ok(n),
675 Err(_) => {
676 self.store.total_rows_for_table(self.table_id)
678 }
679 }
680 }
681}
682
683macro_rules! impl_row_id_ignore_chunk {
684 (
685 $_base:ident,
686 $chunk:ident,
687 $_chunk_with_rids:ident,
688 $_run:ident,
689 $_run_with_rids:ident,
690 $array_ty:ty,
691 $_arrow_ty:ty,
692 $_dtype:expr,
693 $_native_ty:ty,
694 $_cast:expr
695 ) => {
696 fn $chunk(&mut self, _values: &$array_ty) {}
697 };
698}
699
700macro_rules! impl_row_id_ignore_sorted_run {
701 (
702 $_base:ident,
703 $_chunk:ident,
704 $_chunk_with_rids:ident,
705 $run:ident,
706 $_run_with_rids:ident,
707 $array_ty:ty,
708 $_arrow_ty:ty,
709 $_dtype:expr,
710 $_native_ty:ty,
711 $_cast:expr
712 ) => {
713 fn $run(&mut self, _values: &$array_ty, _start: usize, _len: usize) {}
714 };
715}
716
717macro_rules! impl_row_id_collect_chunk_with_rids {
718 (
719 $_base:ident,
720 $_chunk:ident,
721 $chunk_with_rids:ident,
722 $_run:ident,
723 $_run_with_rids:ident,
724 $array_ty:ty,
725 $_arrow_ty:ty,
726 $_dtype:expr,
727 $_native_ty:ty,
728 $_cast:expr
729 ) => {
730 fn $chunk_with_rids(&mut self, _: &$array_ty, row_ids: &UInt64Array) {
731 self.extend_from_array(row_ids);
732 }
733 };
734}
735
736macro_rules! impl_row_id_collect_sorted_run_with_rids {
737 (
738 $_base:ident,
739 $_chunk:ident,
740 $_chunk_with_rids:ident,
741 $_run:ident,
742 $run_with_rids:ident,
743 $array_ty:ty,
744 $_arrow_ty:ty,
745 $_dtype:expr,
746 $_native_ty:ty,
747 $_cast:expr
748 ) => {
749 fn $run_with_rids(
750 &mut self,
751 _: &$array_ty,
752 row_ids: &UInt64Array,
753 start: usize,
754 len: usize,
755 ) {
756 self.extend_from_slice(row_ids, start, len);
757 }
758 };
759}
760
761macro_rules! impl_row_id_stream_chunk_with_rids {
762 (
763 $_base:ident,
764 $_chunk:ident,
765 $chunk_with_rids:ident,
766 $_run:ident,
767 $_run_with_rids:ident,
768 $array_ty:ty,
769 $_arrow_ty:ty,
770 $_dtype:expr,
771 $_native_ty:ty,
772 $_cast:expr
773 ) => {
774 fn $chunk_with_rids(&mut self, _: &$array_ty, row_ids: &UInt64Array) {
775 self.extend_from_array(row_ids);
776 }
777 };
778}
779
780macro_rules! impl_row_id_stream_sorted_run_with_rids {
781 (
782 $_base:ident,
783 $_chunk:ident,
784 $_chunk_with_rids:ident,
785 $_run:ident,
786 $run_with_rids:ident,
787 $array_ty:ty,
788 $_arrow_ty:ty,
789 $_dtype:expr,
790 $_native_ty:ty,
791 $_cast:expr
792 ) => {
793 fn $run_with_rids(
794 &mut self,
795 _: &$array_ty,
796 row_ids: &UInt64Array,
797 start: usize,
798 len: usize,
799 ) {
800 self.extend_sorted_run(row_ids, start, len);
801 }
802 };
803}
804
805#[derive(Default)]
806struct RowIdScanCollector {
807 row_ids: Treemap,
808}
809
810impl RowIdScanCollector {
811 fn extend_from_array(&mut self, row_ids: &UInt64Array) {
812 for idx in 0..row_ids.len() {
813 self.row_ids.add(row_ids.value(idx));
814 }
815 }
816
817 fn extend_from_slice(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
818 if len == 0 {
819 return;
820 }
821 let end = (start + len).min(row_ids.len());
822 for idx in start..end {
823 self.row_ids.add(row_ids.value(idx));
824 }
825 }
826
827 fn into_inner(self) -> Treemap {
828 self.row_ids
829 }
830}
831
832impl llkv_column_map::scan::PrimitiveVisitor for RowIdScanCollector {
833 llkv_for_each_arrow_numeric!(impl_row_id_ignore_chunk);
834 llkv_for_each_arrow_boolean!(impl_row_id_ignore_chunk);
835 llkv_for_each_arrow_string!(impl_row_id_ignore_chunk);
836}
837
838impl llkv_column_map::scan::PrimitiveSortedVisitor for RowIdScanCollector {
839 llkv_for_each_arrow_numeric!(impl_row_id_ignore_sorted_run);
840 llkv_for_each_arrow_boolean!(impl_row_id_ignore_sorted_run);
841 llkv_for_each_arrow_string!(impl_row_id_ignore_sorted_run);
842}
843
844impl llkv_column_map::scan::PrimitiveWithRowIdsVisitor for RowIdScanCollector {
845 llkv_for_each_arrow_numeric!(impl_row_id_collect_chunk_with_rids);
846 llkv_for_each_arrow_boolean!(impl_row_id_collect_chunk_with_rids);
847 llkv_for_each_arrow_string!(impl_row_id_collect_chunk_with_rids);
848}
849
850impl llkv_column_map::scan::PrimitiveSortedWithRowIdsVisitor for RowIdScanCollector {
851 llkv_for_each_arrow_numeric!(impl_row_id_collect_sorted_run_with_rids);
852 llkv_for_each_arrow_boolean!(impl_row_id_collect_sorted_run_with_rids);
853 llkv_for_each_arrow_string!(impl_row_id_collect_sorted_run_with_rids);
854
855 fn null_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
856 self.extend_from_slice(row_ids, start, len);
857 }
858}
859
860struct RowIdChunkEmitter<'a> {
861 chunk_size: usize,
862 buffer: Vec<RowId>,
863 reverse_sorted_runs: bool,
864 on_chunk: &'a mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
865 error: Option<Error>,
866}
867
868impl<'a> RowIdChunkEmitter<'a> {
869 fn new(
870 chunk_size: usize,
871 reverse_sorted_runs: bool,
872 on_chunk: &'a mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
873 ) -> Self {
874 let chunk_size = cmp::max(1, chunk_size);
875 Self {
876 chunk_size,
877 buffer: Vec::with_capacity(chunk_size),
878 reverse_sorted_runs,
879 on_chunk,
880 error: None,
881 }
882 }
883
884 fn extend_from_array(&mut self, row_ids: &UInt64Array) {
885 if self.error.is_some() {
886 return;
887 }
888
889 if self.buffer.is_empty() {
891 let values = row_ids.values();
892 let mut offset = 0;
893 while offset < values.len() {
894 let remaining = values.len() - offset;
895 if remaining >= self.chunk_size {
896 if let Err(err) = (self.on_chunk)(&values[offset..offset + self.chunk_size]) {
897 self.error = Some(err);
898 return;
899 }
900 offset += self.chunk_size;
901 } else {
902 self.buffer.extend_from_slice(&values[offset..]);
904 break;
905 }
906 }
907 return;
908 }
909
910 let values = row_ids.values();
911 let mut offset = 0;
912 while offset < values.len() {
913 let remaining = self.chunk_size - self.buffer.len();
914 let available = values.len() - offset;
915 let count = remaining.min(available);
916
917 self.buffer
918 .extend_from_slice(&values[offset..offset + count]);
919 offset += count;
920
921 if self.buffer.len() >= self.chunk_size {
922 self.flush();
923 }
924 }
925 }
926
927 fn extend_from_slice(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
928 if self.error.is_some() || len == 0 {
929 return;
930 }
931 let end = (start + len).min(row_ids.len());
932 let values = &row_ids.values()[start..end];
933
934 let mut offset = 0;
935 while offset < values.len() {
936 let remaining = self.chunk_size - self.buffer.len();
937 let available = values.len() - offset;
938 let count = remaining.min(available);
939
940 self.buffer
941 .extend_from_slice(&values[offset..offset + count]);
942 offset += count;
943
944 if self.buffer.len() >= self.chunk_size {
945 self.flush();
946 }
947 }
948 }
949
950 fn extend_sorted_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
951 if self.reverse_sorted_runs {
952 if self.error.is_some() || len == 0 {
953 return;
954 }
955 let mut idx = (start + len).min(row_ids.len());
956 while idx > start {
957 idx -= 1;
958 self.push(row_ids.value(idx));
959 }
960 } else {
961 self.extend_from_slice(row_ids, start, len);
962 }
963 }
964
965 fn push(&mut self, value: RowId) {
966 if self.error.is_some() {
967 return;
968 }
969 self.buffer.push(value);
970 if self.buffer.len() >= self.chunk_size {
971 self.flush();
972 }
973 }
974
975 fn flush(&mut self) {
976 if self.error.is_some() || self.buffer.is_empty() {
977 return;
978 }
979 if let Err(err) = (self.on_chunk)(self.buffer.as_slice()) {
980 self.error = Some(err);
981 } else {
982 self.buffer.clear();
983 }
984 }
985
986 fn finish(mut self) -> LlkvResult<()> {
987 self.flush();
988 match self.error {
989 Some(err) => Err(err),
990 None => Ok(()),
991 }
992 }
993}
994
995impl<'a> llkv_column_map::scan::PrimitiveVisitor for RowIdChunkEmitter<'a> {
996 llkv_for_each_arrow_numeric!(impl_row_id_ignore_chunk);
997 llkv_for_each_arrow_boolean!(impl_row_id_ignore_chunk);
998}
999
1000impl<'a> llkv_column_map::scan::PrimitiveSortedVisitor for RowIdChunkEmitter<'a> {
1001 llkv_for_each_arrow_numeric!(impl_row_id_ignore_sorted_run);
1002 llkv_for_each_arrow_boolean!(impl_row_id_ignore_sorted_run);
1003}
1004
1005impl<'a> llkv_column_map::scan::PrimitiveWithRowIdsVisitor for RowIdChunkEmitter<'a> {
1006 llkv_for_each_arrow_numeric!(impl_row_id_stream_chunk_with_rids);
1007 llkv_for_each_arrow_boolean!(impl_row_id_stream_chunk_with_rids);
1008}
1009
1010impl<'a> llkv_column_map::scan::PrimitiveSortedWithRowIdsVisitor for RowIdChunkEmitter<'a> {
1011 llkv_for_each_arrow_numeric!(impl_row_id_stream_sorted_run_with_rids);
1012 llkv_for_each_arrow_boolean!(impl_row_id_stream_sorted_run_with_rids);
1013
1014 fn null_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
1015 self.extend_sorted_run(row_ids, start, len);
1016 }
1017}
1018
1019impl<P> ScanStorage<P> for Table<P>
1020where
1021 P: Pager<Blob = EntryHandle> + Send + Sync,
1022{
1023 fn table_id(&self) -> TableId {
1024 self.table_id
1025 }
1026
1027 fn field_data_type(&self, fid: LogicalFieldId) -> LlkvResult<DataType> {
1028 self.store.data_type(fid)
1029 }
1030
1031 fn total_rows(&self) -> LlkvResult<u64> {
1032 Table::total_rows(self)
1033 }
1034
1035 fn prepare_gather_context(
1036 &self,
1037 logical_fields: &[LogicalFieldId],
1038 ) -> LlkvResult<MultiGatherContext> {
1039 self.store.prepare_gather_context(logical_fields)
1040 }
1041
1042 fn gather_row_window_with_context(
1043 &self,
1044 logical_fields: &[LogicalFieldId],
1045 row_ids: &[u64],
1046 null_policy: GatherNullPolicy,
1047 ctx: Option<&mut MultiGatherContext>,
1048 ) -> LlkvResult<RecordBatch> {
1049 self.store
1050 .gather_row_window_with_context(logical_fields, row_ids, null_policy, ctx)
1051 }
1052
1053 fn filter_row_ids<'expr>(&self, filter_expr: &Expr<'expr, FieldId>) -> LlkvResult<Treemap> {
1054 Table::filter_row_ids(self, filter_expr)
1055 }
1056
1057 fn filter_leaf(&self, filter: &OwnedFilter) -> LlkvResult<Treemap> {
1058 let source = self.collect_row_ids_for_filter(filter)?;
1059 Ok(match source {
1060 RowIdSource::Bitmap(b) => b,
1061 RowIdSource::Vector(v) => Treemap::from_iter(v),
1062 })
1063 }
1064
1065 fn filter_fused(
1066 &self,
1067 field_id: FieldId,
1068 filters: &[OwnedFilter],
1069 cache: &PredicateFusionCache,
1070 ) -> LlkvResult<RowIdSource> {
1071 self.collect_fused_predicates(field_id, filters, cache)
1072 }
1073
1074 fn all_row_ids(&self) -> LlkvResult<Treemap> {
1075 self.compute_table_row_ids()
1076 }
1077
1078 fn sorted_row_ids_full_table(&self, order_spec: ScanOrderSpec) -> LlkvResult<Option<Vec<u64>>> {
1079 self.collect_full_table_sorted_row_ids(order_spec)
1080 }
1081
1082 fn stream_row_ids(
1083 &self,
1084 chunk_size: usize,
1085 on_chunk: &mut dyn FnMut(&[u64]) -> LlkvResult<()>,
1086 ) -> LlkvResult<()> {
1087 self.stream_table_row_ids(chunk_size, on_chunk)
1088 }
1089
1090 fn as_any(&self) -> &dyn std::any::Any {
1091 self
1092 }
1093}
1094
1095impl<P> Table<P>
1096where
1097 P: Pager<Blob = EntryHandle> + Send + Sync,
1098{
1099 fn collect_row_ids_for_table<'expr>(
1100 &self,
1101 filter_expr: &Expr<'expr, FieldId>,
1102 ) -> LlkvResult<RowIdSource> {
1103 let fusion_cache = PredicateFusionCache::from_expr(filter_expr);
1104 let mut all_rows_cache: FxHashMap<FieldId, Treemap> = FxHashMap::default();
1105 let filter_arc = Arc::new(filter_expr.clone());
1106 let programs = ProgramCompiler::new(filter_arc).compile()?;
1107 llkv_scan::predicate::collect_row_ids_for_program(
1108 self,
1109 &programs,
1110 &fusion_cache,
1111 &mut all_rows_cache,
1112 )
1113 }
1114
1115 fn collect_row_ids_for_filter(&self, filter: &OwnedFilter) -> LlkvResult<RowIdSource> {
1116 if filter.field_id == ROW_ID_FIELD_ID {
1117 let op = filter.op.to_operator();
1118 let row_ids = self.collect_row_ids_for_rowid_filter(&op)?;
1119 return Ok(RowIdSource::Bitmap(row_ids));
1120 }
1121
1122 let filter_lfid = LogicalFieldId::for_user(self.table_id, filter.field_id);
1123 let dtype = self.store.data_type(filter_lfid)?;
1124
1125 match &filter.op {
1126 OwnedOperator::IsNotNull => {
1127 let mut cache = FxHashMap::default();
1128 let non_null = self.collect_all_row_ids_for_field(filter.field_id, &mut cache)?;
1129 return Ok(RowIdSource::Bitmap(non_null));
1130 }
1131 OwnedOperator::IsNull => {
1132 let all_row_ids = self.compute_table_row_ids()?;
1133 if all_row_ids.is_empty() {
1134 return Ok(RowIdSource::Bitmap(Treemap::new()));
1135 }
1136 let mut cache = FxHashMap::default();
1137 let non_null = self.collect_all_row_ids_for_field(filter.field_id, &mut cache)?;
1138 let null_ids = all_row_ids - non_null;
1139 return Ok(RowIdSource::Bitmap(null_ids));
1140 }
1141 _ => {}
1142 }
1143
1144 if let OwnedOperator::Range {
1145 lower: Bound::Unbounded,
1146 upper: Bound::Unbounded,
1147 } = &filter.op
1148 {
1149 let all_rows = self.compute_table_row_ids()?;
1150 return Ok(RowIdSource::Bitmap(all_rows));
1151 }
1152
1153 let op = filter.op.to_operator();
1154 let row_ids = match &dtype {
1155 DataType::Utf8 => self.collect_matching_row_ids_string::<i32>(filter_lfid, &op),
1156 DataType::LargeUtf8 => self.collect_matching_row_ids_string::<i64>(filter_lfid, &op),
1157 DataType::Boolean => self.collect_matching_row_ids_bool(filter_lfid, &op),
1158 other => llkv_column_map::with_integer_arrow_type!(
1159 other.clone(),
1160 |ArrowTy| self.collect_matching_row_ids::<ArrowTy>(filter_lfid, &op),
1161 Err(Error::Internal(format!(
1162 "Filtering on type {:?} is not supported",
1163 other
1164 )))
1165 ),
1166 }?;
1167
1168 Ok(RowIdSource::Bitmap(row_ids))
1169 }
1170
1171 fn collect_fused_predicates(
1172 &self,
1173 _field_id: FieldId,
1174 filters: &[OwnedFilter],
1175 _cache: &PredicateFusionCache,
1176 ) -> LlkvResult<RowIdSource> {
1177 let mut result: Option<Treemap> = None;
1178
1179 for filter in filters {
1180 let rows = match self.collect_row_ids_for_filter(filter)? {
1181 RowIdSource::Bitmap(b) => b,
1182 RowIdSource::Vector(v) => Treemap::from_iter(v),
1183 };
1184
1185 result = Some(match result {
1186 Some(acc) => acc & rows,
1187 None => rows,
1188 });
1189
1190 if let Some(ref r) = result
1191 && r.is_empty()
1192 {
1193 return Ok(RowIdSource::Bitmap(Treemap::new()));
1194 }
1195 }
1196
1197 Ok(RowIdSource::Bitmap(result.unwrap_or_default()))
1198 }
1199
1200 fn collect_all_row_ids_for_field(
1201 &self,
1202 field_id: FieldId,
1203 cache: &mut FxHashMap<FieldId, Treemap>,
1204 ) -> LlkvResult<Treemap> {
1205 if let Some(rows) = cache.get(&field_id) {
1206 return Ok(rows.clone());
1207 }
1208
1209 let lfid = LogicalFieldId::for_user(self.table_id, field_id);
1210 let mut collector = RowIdScanCollector::default();
1211 ScanBuilder::new(self.store(), lfid)
1212 .options(ScanOptions {
1213 with_row_ids: true,
1214 ..Default::default()
1215 })
1216 .run(&mut collector)?;
1217
1218 let rows = collector.into_inner();
1219 cache.insert(field_id, rows.clone());
1220 Ok(rows)
1221 }
1222
1223 fn collect_matching_row_ids<T>(
1224 &self,
1225 lfid: LogicalFieldId,
1226 op: &Operator,
1227 ) -> LlkvResult<Treemap>
1228 where
1229 T: ArrowPrimitiveType
1230 + FilterPrimitive<Native = <T as ArrowPrimitiveType>::Native>
1231 + FilterDispatch<Value = <T as ArrowPrimitiveType>::Native>,
1232 <T as ArrowPrimitiveType>::Native: PartialOrd + Copy + FromLiteral + PredicateValue,
1233 {
1234 let predicate = build_fixed_width_predicate::<T>(op).map_err(Error::predicate_build)?;
1235 let row_ids =
1236 <T as FilterPrimitive>::run_nullable_filter(self.store(), lfid, |v| match v {
1237 Some(val) => predicate.matches(PredicateValue::borrowed(&val)),
1238 None => false,
1239 })?;
1240 Ok(Treemap::from_iter(row_ids))
1241 }
1242
1243 fn collect_matching_row_ids_string<O>(
1244 &self,
1245 lfid: LogicalFieldId,
1246 op: &Operator,
1247 ) -> LlkvResult<Treemap>
1248 where
1249 O: OffsetSizeTrait + llkv_column_map::store::scan::StringContainsKernel,
1250 {
1251 let predicate = build_var_width_predicate(op).map_err(Error::predicate_build)?;
1252 let row_ids = Utf8Filter::<O>::run_filter(self.store(), lfid, &predicate)?;
1253 Ok(Treemap::from_iter(row_ids))
1254 }
1255
1256 fn collect_matching_row_ids_bool(
1257 &self,
1258 lfid: LogicalFieldId,
1259 op: &Operator,
1260 ) -> LlkvResult<Treemap> {
1261 let predicate = build_bool_predicate(op).map_err(Error::predicate_build)?;
1262
1263 let row_ids = arrow::datatypes::BooleanType::run_nullable_filter(
1264 self.store(),
1265 lfid,
1266 |val: Option<bool>| match val {
1267 Some(v) => predicate.matches(&v),
1268 None => false,
1269 },
1270 )?;
1271 Ok(Treemap::from_iter(row_ids))
1272 }
1273
1274 fn collect_row_ids_for_rowid_filter(&self, op: &Operator<'_>) -> LlkvResult<Treemap> {
1275 let all_row_ids = self.compute_table_row_ids()?;
1276 if all_row_ids.is_empty() {
1277 return Ok(Treemap::new());
1278 }
1279 RowIdBitmapFilter::filter_by_operator(&all_row_ids, op)
1280 }
1281
1282 fn collect_full_table_sorted_row_ids(
1283 &self,
1284 order_spec: ScanOrderSpec,
1285 ) -> LlkvResult<Option<Vec<u64>>> {
1286 use llkv_column_map::store::rowid_fid;
1287
1288 if !matches!(
1289 order_spec.transform,
1290 ScanOrderTransform::IdentityInt64
1291 | ScanOrderTransform::IdentityInt32
1292 | ScanOrderTransform::IdentityUtf8
1293 ) {
1294 return Ok(None);
1295 }
1296
1297 let lfid = LogicalFieldId::for_user(self.table_id, order_spec.field_id);
1298 let dtype = match self.store.data_type(lfid) {
1299 Ok(dt) => dt,
1300 Err(Error::NotFound) => return Ok(None),
1301 Err(err) => return Err(err),
1302 };
1303
1304 if !Self::order_transform_matches_dtype(order_spec.transform, &dtype) {
1305 return Ok(None);
1306 }
1307
1308 let mut ordered: Vec<u64> = Vec::new();
1309
1310 if let Ok(total_rows) = self.total_rows()
1311 && let Ok(cap) = usize::try_from(total_rows)
1312 {
1313 ordered.reserve(cap);
1314 }
1315
1316 let mut on_chunk = |chunk: &[RowId]| -> LlkvResult<()> {
1317 ordered.extend_from_slice(chunk);
1318 Ok(())
1319 };
1320 let reverse_sorted_runs = matches!(order_spec.direction, ScanOrderDirection::Descending);
1321 let mut emitter =
1322 RowIdChunkEmitter::new(STREAM_BATCH_ROWS, reverse_sorted_runs, &mut on_chunk);
1323 let options = ScanOptions {
1324 sorted: true,
1325 reverse: matches!(order_spec.direction, ScanOrderDirection::Descending),
1326 with_row_ids: true,
1327 include_nulls: true,
1328 nulls_first: order_spec.nulls_first,
1329 anchor_row_id_field: Some(rowid_fid(lfid)),
1330 ..Default::default()
1331 };
1332
1333 match ScanBuilder::new(self.store(), lfid)
1334 .options(options)
1335 .run(&mut emitter)
1336 {
1337 Ok(()) => emitter.finish()?,
1338 Err(Error::NotFound) => return Ok(None),
1339 Err(err) => return Err(err),
1340 }
1341
1342 Ok(Some(ordered))
1343 }
1344
1345 fn order_transform_matches_dtype(transform: ScanOrderTransform, dtype: &DataType) -> bool {
1346 match transform {
1347 ScanOrderTransform::IdentityInt64 => matches!(dtype, DataType::Int64),
1348 ScanOrderTransform::IdentityInt32 => matches!(dtype, DataType::Int32),
1349 ScanOrderTransform::IdentityUtf8 => matches!(dtype, DataType::Utf8),
1350 ScanOrderTransform::CastUtf8ToInteger => false,
1351 }
1352 }
1353
1354 fn compute_table_row_ids(&self) -> LlkvResult<Treemap> {
1355 use llkv_column_map::store::rowid_fid;
1356
1357 if let Some(rows) = self.collect_row_ids_from_mvcc()? {
1358 return Ok(rows);
1359 }
1360
1361 let fields = self.store.user_field_ids_for_table(self.table_id);
1362 if fields.is_empty() {
1363 return Ok(Treemap::new());
1364 }
1365
1366 let expected = self
1367 .store
1368 .total_rows_for_table(self.table_id)
1369 .unwrap_or_default();
1370
1371 if expected > 0
1372 && let Some(&first_field) = fields.first()
1373 {
1374 let rid_shadow = rowid_fid(first_field);
1375 let mut collector = RowIdScanCollector::default();
1376
1377 match ScanBuilder::new(self.store(), rid_shadow)
1378 .options(ScanOptions {
1379 with_row_ids: true,
1380 ..Default::default()
1381 })
1382 .run(&mut collector)
1383 {
1384 Ok(_) => {
1385 let row_ids = collector.into_inner();
1386 if row_ids.cardinality() == expected {
1387 return Ok(row_ids);
1388 }
1389 }
1390 Err(llkv_result::Error::NotFound) => {}
1391 Err(_) => {}
1392 }
1393 }
1394
1395 let mut collected = Treemap::new();
1396
1397 for lfid in fields.clone() {
1398 let mut collector = RowIdScanCollector::default();
1399 ScanBuilder::new(self.store(), lfid)
1400 .options(ScanOptions {
1401 with_row_ids: true,
1402 ..Default::default()
1403 })
1404 .run(&mut collector)?;
1405 let rows = collector.into_inner();
1406 collected.or_inplace(&rows);
1407
1408 if expected > 0 && collected.cardinality() >= expected {
1409 break;
1410 }
1411 }
1412
1413 Ok(collected)
1414 }
1415
1416 fn collect_row_ids_from_mvcc(&self) -> LlkvResult<Option<Treemap>> {
1417 let Some(rows) = self.fetch_mvcc_row_ids()? else {
1418 return Ok(None);
1419 };
1420 Ok(Some(Treemap::from_iter(rows)))
1421 }
1422
1423 fn stream_table_row_ids(
1424 &self,
1425 chunk_size: usize,
1426 on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
1427 ) -> LlkvResult<()> {
1428 use llkv_column_map::store::rowid_fid;
1429
1430 if self.try_stream_row_ids_from_mvcc(chunk_size, on_chunk)? {
1431 return Ok(());
1432 }
1433
1434 let fields = self.store.user_field_ids_for_table(self.table_id);
1435 if fields.is_empty() {
1436 return Ok(());
1437 }
1438
1439 let Some(&first_field) = fields.first() else {
1440 return Ok(());
1441 };
1442
1443 let rid_shadow = rowid_fid(first_field);
1444 let mut emitter = RowIdChunkEmitter::new(chunk_size, false, on_chunk);
1445 let scan_result = ScanBuilder::new(self.store(), rid_shadow)
1446 .options(ScanOptions {
1447 with_row_ids: true,
1448 ..Default::default()
1449 })
1450 .run(&mut emitter);
1451
1452 match scan_result {
1453 Ok(()) => emitter.finish(),
1454 Err(Error::NotFound) => {
1455 let _ = emitter.finish();
1456 let all_rows = self.compute_table_row_ids()?;
1457 if all_rows.is_empty() {
1458 return Ok(());
1459 }
1460
1461 let chunk_cap = cmp::max(1, chunk_size);
1462 let mut chunk = Vec::with_capacity(chunk_cap);
1463 for row_id in all_rows.iter() {
1464 chunk.push(row_id);
1465 if chunk.len() >= chunk_cap {
1466 (on_chunk)(chunk.as_slice())?;
1467 chunk.clear();
1468 }
1469 }
1470 if !chunk.is_empty() {
1471 (on_chunk)(chunk.as_slice())?;
1472 }
1473 Ok(())
1474 }
1475 Err(err) => {
1476 let _ = emitter.finish();
1477 Err(err)
1478 }
1479 }
1480 }
1481
1482 fn try_stream_row_ids_from_mvcc(
1483 &self,
1484 chunk_size: usize,
1485 on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
1486 ) -> LlkvResult<bool> {
1487 let created_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
1488 let mut emitter = RowIdChunkEmitter::new(chunk_size, false, on_chunk);
1489
1490 let scan_result = ScanBuilder::new(self.store(), created_lfid)
1491 .options(ScanOptions {
1492 with_row_ids: true,
1493 ..Default::default()
1494 })
1495 .run(&mut emitter);
1496
1497 match scan_result {
1498 Ok(()) => {
1499 emitter.finish()?;
1500 Ok(true)
1501 }
1502 Err(Error::NotFound) => Ok(false),
1503 Err(err) => Err(err),
1504 }
1505 }
1506
1507 fn fetch_mvcc_row_ids(&self) -> LlkvResult<Option<Vec<RowId>>> {
1508 let created_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
1509 match self
1510 .store
1511 .filter_row_ids::<UInt64Type>(created_lfid, &Predicate::All)
1512 {
1513 Ok(rows) => Ok(Some(rows)),
1514 Err(Error::NotFound) => Ok(None),
1515 Err(err) => Err(err),
1516 }
1517 }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522 use super::*;
1523 use crate::reserved::CATALOG_TABLE_ID;
1524 use crate::types::RowId;
1525 use arrow::array::Array;
1526 use arrow::array::ArrayRef;
1527 use arrow::array::{
1528 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
1529 UInt32Array, UInt64Array,
1530 };
1531 use arrow::compute::{cast, max, min, sum, unary};
1532 use arrow::datatypes::DataType;
1533 use llkv_column_map::ColumnStore;
1534 use llkv_column_map::store::{GatherNullPolicy, Projection};
1535 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
1536 use std::collections::HashMap;
1537 use std::ops::Bound;
1538
1539 fn setup_test_table() -> Table {
1540 let pager = Arc::new(MemPager::default());
1541 setup_test_table_with_pager(&pager)
1542 }
1543
1544 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
1545 let table = Table::from_id(1, Arc::clone(pager)).unwrap();
1546 const COL_A_U64: FieldId = 10;
1547 const COL_B_BIN: FieldId = 11;
1548 const COL_C_I32: FieldId = 12;
1549 const COL_D_F64: FieldId = 13;
1550 const COL_E_F32: FieldId = 14;
1551
1552 let schema = Arc::new(Schema::new(vec![
1553 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1554 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1555 crate::constants::FIELD_ID_META_KEY.to_string(),
1556 COL_A_U64.to_string(),
1557 )])),
1558 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1559 crate::constants::FIELD_ID_META_KEY.to_string(),
1560 COL_B_BIN.to_string(),
1561 )])),
1562 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1563 crate::constants::FIELD_ID_META_KEY.to_string(),
1564 COL_C_I32.to_string(),
1565 )])),
1566 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1567 crate::constants::FIELD_ID_META_KEY.to_string(),
1568 COL_D_F64.to_string(),
1569 )])),
1570 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1571 crate::constants::FIELD_ID_META_KEY.to_string(),
1572 COL_E_F32.to_string(),
1573 )])),
1574 ]));
1575
1576 let batch = RecordBatch::try_new(
1577 schema.clone(),
1578 vec![
1579 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
1580 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
1581 Arc::new(BinaryArray::from(vec![
1582 b"foo" as &[u8],
1583 b"bar",
1584 b"baz",
1585 b"qux",
1586 ])),
1587 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
1588 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
1589 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
1590 ],
1591 )
1592 .unwrap();
1593
1594 table.append(&batch).unwrap();
1595 table
1596 }
1597
1598 fn gather_single(
1599 store: &ColumnStore<MemPager>,
1600 field_id: LogicalFieldId,
1601 row_ids: &[u64],
1602 ) -> ArrayRef {
1603 store
1604 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
1605 .unwrap()
1606 .column(0)
1607 .clone()
1608 }
1609
1610 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
1611 Expr::Pred(filter)
1612 }
1613
1614 fn proj(table: &Table, field_id: FieldId) -> Projection {
1615 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
1616 }
1617
1618 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
1619 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
1620 }
1621
1622 #[test]
1623 fn row_id_chunk_emitter_reverses_sorted_runs() {
1624 let array = UInt64Array::from(vec![10_u64, 20, 30, 40, 50]);
1625 let mut emitted: Vec<RowId> = Vec::new();
1626
1627 {
1628 let mut on_chunk = |chunk: &[RowId]| -> LlkvResult<()> {
1629 emitted.extend_from_slice(chunk);
1630 Ok(())
1631 };
1632 let mut emitter = RowIdChunkEmitter::new(2, true, &mut on_chunk);
1633 emitter.extend_sorted_run(&array, 0, array.len());
1634 emitter.finish().unwrap();
1635 }
1636
1637 assert_eq!(emitted, vec![50, 40, 30, 20, 10]);
1638 }
1639
1640 #[test]
1641 fn table_new_rejects_reserved_table_id() {
1642 let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
1643 assert!(matches!(
1644 result,
1645 Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
1646 ));
1647 }
1648
1649 #[test]
1650 fn test_append_rejects_logical_field_id_in_metadata() {
1651 let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
1655
1656 const USER_FID: FieldId = 42;
1657 let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
1659 let logical_val: u64 = logical.into();
1660
1661 let schema = Arc::new(Schema::new(vec![
1662 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1663 Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
1664 crate::constants::FIELD_ID_META_KEY.to_string(),
1665 logical_val.to_string(),
1666 )])),
1667 ]));
1668
1669 let batch = RecordBatch::try_new(
1670 schema,
1671 vec![
1672 Arc::new(UInt64Array::from(vec![1u64, 2u64])),
1673 Arc::new(UInt64Array::from(vec![10u64, 20u64])),
1674 ],
1675 )
1676 .unwrap();
1677
1678 let res = table.append(&batch);
1679 assert!(matches!(res, Err(Error::Internal(_))));
1680 }
1681
1682 #[test]
1683 fn test_scan_with_u64_filter() {
1684 let table = setup_test_table();
1685 const COL_A_U64: FieldId = 10;
1686 const COL_C_I32: FieldId = 12;
1687
1688 let expr = pred_expr(Filter {
1689 field_id: COL_A_U64,
1690 op: Operator::Equals(200.into()),
1691 });
1692
1693 let mut vals: Vec<Option<i32>> = Vec::new();
1694 table
1695 .scan_stream(
1696 &[proj(&table, COL_C_I32)],
1697 &expr,
1698 ScanStreamOptions::default(),
1699 |b| {
1700 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1701 vals.extend(
1702 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1703 );
1704 },
1705 )
1706 .unwrap();
1707 assert_eq!(vals, vec![Some(20), Some(20)]);
1708 }
1709
1710 #[test]
1711 fn test_scan_with_string_filter() {
1712 let pager = Arc::new(MemPager::default());
1713 let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
1714
1715 const COL_STR: FieldId = 42;
1716 let schema = Arc::new(Schema::new(vec![
1717 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1718 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
1719 crate::constants::FIELD_ID_META_KEY.to_string(),
1720 COL_STR.to_string(),
1721 )])),
1722 ]));
1723
1724 let batch = RecordBatch::try_new(
1725 schema,
1726 vec![
1727 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
1728 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
1729 ],
1730 )
1731 .unwrap();
1732 table.append(&batch).unwrap();
1733
1734 let expr = pred_expr(Filter {
1735 field_id: COL_STR,
1736 op: Operator::starts_with("al".to_string(), true),
1737 });
1738
1739 let mut collected: Vec<Option<String>> = Vec::new();
1740 table
1741 .scan_stream(
1742 &[proj(&table, COL_STR)],
1743 &expr,
1744 ScanStreamOptions::default(),
1745 |b| {
1746 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1747 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
1748 },
1749 )
1750 .unwrap();
1751
1752 assert_eq!(
1753 collected,
1754 vec![Some("alice".to_string()), Some("albert".to_string())]
1755 );
1756 }
1757
1758 #[test]
1759 fn test_table_reopen_with_shared_pager() {
1760 const TABLE_ALPHA: TableId = 42;
1761 const TABLE_BETA: TableId = 43;
1762 const TABLE_GAMMA: TableId = 44;
1763 const COL_ALPHA_U64: FieldId = 100;
1764 const COL_ALPHA_I32: FieldId = 101;
1765 const COL_ALPHA_U32: FieldId = 102;
1766 const COL_ALPHA_I16: FieldId = 103;
1767 const COL_BETA_U64: FieldId = 200;
1768 const COL_BETA_U8: FieldId = 201;
1769 const COL_GAMMA_I16: FieldId = 300;
1770
1771 let pager = Arc::new(MemPager::default());
1772
1773 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1774 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1775 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1776 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1777 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1778
1779 let beta_rows: Vec<RowId> = vec![101, 102, 103];
1780 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1781 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1782
1783 let gamma_rows: Vec<RowId> = vec![501, 502];
1784 let gamma_vals_i16: Vec<i16> = vec![123, -321];
1785
1786 {
1788 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1789 let schema = Arc::new(Schema::new(vec![
1790 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1791 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1792 crate::constants::FIELD_ID_META_KEY.to_string(),
1793 COL_ALPHA_U64.to_string(),
1794 )])),
1795 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1796 crate::constants::FIELD_ID_META_KEY.to_string(),
1797 COL_ALPHA_I32.to_string(),
1798 )])),
1799 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1800 crate::constants::FIELD_ID_META_KEY.to_string(),
1801 COL_ALPHA_U32.to_string(),
1802 )])),
1803 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1804 crate::constants::FIELD_ID_META_KEY.to_string(),
1805 COL_ALPHA_I16.to_string(),
1806 )])),
1807 ]));
1808 let batch = RecordBatch::try_new(
1809 schema,
1810 vec![
1811 Arc::new(UInt64Array::from(alpha_rows.clone())),
1812 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1813 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1814 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1815 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1816 ],
1817 )
1818 .unwrap();
1819 table.append(&batch).unwrap();
1820 }
1821
1822 {
1823 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1824 let schema = Arc::new(Schema::new(vec![
1825 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1826 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1827 crate::constants::FIELD_ID_META_KEY.to_string(),
1828 COL_BETA_U64.to_string(),
1829 )])),
1830 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1831 crate::constants::FIELD_ID_META_KEY.to_string(),
1832 COL_BETA_U8.to_string(),
1833 )])),
1834 ]));
1835 let batch = RecordBatch::try_new(
1836 schema,
1837 vec![
1838 Arc::new(UInt64Array::from(beta_rows.clone())),
1839 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1840 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1841 ],
1842 )
1843 .unwrap();
1844 table.append(&batch).unwrap();
1845 }
1846
1847 {
1848 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1849 let schema = Arc::new(Schema::new(vec![
1850 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1851 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1852 crate::constants::FIELD_ID_META_KEY.to_string(),
1853 COL_GAMMA_I16.to_string(),
1854 )])),
1855 ]));
1856 let batch = RecordBatch::try_new(
1857 schema,
1858 vec![
1859 Arc::new(UInt64Array::from(gamma_rows.clone())),
1860 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1861 ],
1862 )
1863 .unwrap();
1864 table.append(&batch).unwrap();
1865 }
1866
1867 {
1869 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1870 let store = table.store();
1871
1872 let expectations: &[(FieldId, DataType)] = &[
1873 (COL_ALPHA_U64, DataType::UInt64),
1874 (COL_ALPHA_I32, DataType::Int32),
1875 (COL_ALPHA_U32, DataType::UInt32),
1876 (COL_ALPHA_I16, DataType::Int16),
1877 ];
1878
1879 for &(col, ref ty) in expectations {
1880 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1881 assert_eq!(store.data_type(lfid).unwrap(), *ty);
1882 let arr = gather_single(store, lfid, &alpha_rows);
1883 match ty {
1884 DataType::UInt64 => {
1885 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1886 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1887 }
1888 DataType::Int32 => {
1889 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1890 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1891 }
1892 DataType::UInt32 => {
1893 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1894 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1895 }
1896 DataType::Int16 => {
1897 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1898 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1899 }
1900 other => panic!("unexpected dtype {other:?}"),
1901 }
1902 }
1903 }
1904
1905 {
1906 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1907 let store = table.store();
1908
1909 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1910 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1911 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1912 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1913 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1914
1915 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1916 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1917 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1918 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1919 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1920 }
1921
1922 {
1923 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1924 let store = table.store();
1925 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1926 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1927 let arr = gather_single(store, lfid, &gamma_rows);
1928 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1929 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1930 }
1931 }
1932
1933 #[test]
1934 fn test_scan_with_i32_filter() {
1935 let table = setup_test_table();
1936 const COL_A_U64: FieldId = 10;
1937 const COL_C_I32: FieldId = 12;
1938
1939 let filter = pred_expr(Filter {
1940 field_id: COL_C_I32,
1941 op: Operator::Equals(20.into()),
1942 });
1943
1944 let mut vals: Vec<Option<u64>> = Vec::new();
1945 table
1946 .scan_stream(
1947 &[proj(&table, COL_A_U64)],
1948 &filter,
1949 ScanStreamOptions::default(),
1950 |b| {
1951 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1952 vals.extend(
1953 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1954 );
1955 },
1956 )
1957 .unwrap();
1958 assert_eq!(vals, vec![Some(200), Some(200)]);
1959 }
1960
1961 #[test]
1962 fn test_scan_with_greater_than_filter() {
1963 let table = setup_test_table();
1964 const COL_A_U64: FieldId = 10;
1965 const COL_C_I32: FieldId = 12;
1966
1967 let filter = pred_expr(Filter {
1968 field_id: COL_C_I32,
1969 op: Operator::GreaterThan(15.into()),
1970 });
1971
1972 let mut vals: Vec<Option<u64>> = Vec::new();
1973 table
1974 .scan_stream(
1975 &[proj(&table, COL_A_U64)],
1976 &filter,
1977 ScanStreamOptions::default(),
1978 |b| {
1979 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1980 vals.extend(
1981 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1982 );
1983 },
1984 )
1985 .unwrap();
1986 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1987 }
1988
1989 #[test]
1990 fn test_scan_with_range_filter() {
1991 let table = setup_test_table();
1992 const COL_A_U64: FieldId = 10;
1993 const COL_C_I32: FieldId = 12;
1994
1995 let filter = pred_expr(Filter {
1996 field_id: COL_A_U64,
1997 op: Operator::Range {
1998 lower: Bound::Included(150.into()),
1999 upper: Bound::Excluded(300.into()),
2000 },
2001 });
2002
2003 let mut vals: Vec<Option<i32>> = Vec::new();
2004 table
2005 .scan_stream(
2006 &[proj(&table, COL_C_I32)],
2007 &filter,
2008 ScanStreamOptions::default(),
2009 |b| {
2010 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2011 vals.extend(
2012 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
2013 );
2014 },
2015 )
2016 .unwrap();
2017 assert_eq!(vals, vec![Some(20), Some(20)]);
2018 }
2019
2020 #[test]
2021 fn test_filtered_scan_sum_kernel() {
2022 let table = setup_test_table();
2026 const COL_A_U64: FieldId = 10;
2027
2028 let filter = pred_expr(Filter {
2029 field_id: COL_A_U64,
2030 op: Operator::Range {
2031 lower: Bound::Included(150.into()),
2032 upper: Bound::Excluded(300.into()),
2033 },
2034 });
2035
2036 let mut total: u128 = 0;
2037 table
2038 .scan_stream(
2039 &[proj(&table, COL_A_U64)],
2040 &filter,
2041 ScanStreamOptions::default(),
2042 |b| {
2043 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2044 if let Some(part) = sum(a) {
2045 total += part as u128;
2046 }
2047 },
2048 )
2049 .unwrap();
2050
2051 assert_eq!(total, 400);
2052 }
2053
2054 #[test]
2055 fn test_filtered_scan_sum_i32_kernel() {
2056 let table = setup_test_table();
2061 const COL_A_U64: FieldId = 10;
2062 const COL_C_I32: FieldId = 12;
2063
2064 let candidates = [100.into(), 300.into()];
2065 let filter = pred_expr(Filter {
2066 field_id: COL_A_U64,
2067 op: Operator::In(&candidates),
2068 });
2069
2070 let mut total: i64 = 0;
2071 table
2072 .scan_stream(
2073 &[proj(&table, COL_C_I32)],
2074 &filter,
2075 ScanStreamOptions::default(),
2076 |b| {
2077 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2078 if let Some(part) = sum(a) {
2079 total += part as i64;
2080 }
2081 },
2082 )
2083 .unwrap();
2084 assert_eq!(total, 40);
2085 }
2086
2087 #[test]
2088 fn test_filtered_scan_min_max_kernel() {
2089 let table = setup_test_table();
2094 const COL_A_U64: FieldId = 10;
2095 const COL_C_I32: FieldId = 12;
2096
2097 let candidates = [100.into(), 300.into()];
2098 let filter = pred_expr(Filter {
2099 field_id: COL_A_U64,
2100 op: Operator::In(&candidates),
2101 });
2102
2103 let mut mn: Option<i32> = None;
2104 let mut mx: Option<i32> = None;
2105 table
2106 .scan_stream(
2107 &[proj(&table, COL_C_I32)],
2108 &filter,
2109 ScanStreamOptions::default(),
2110 |b| {
2111 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2112
2113 if let Some(part_min) = min(a) {
2114 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
2115 }
2116 if let Some(part_max) = max(a) {
2117 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
2118 }
2119 },
2120 )
2121 .unwrap();
2122 assert_eq!(mn, Some(10));
2123 assert_eq!(mx, Some(30));
2124 }
2125
2126 #[test]
2127 fn test_filtered_scan_float64_column() {
2128 let table = setup_test_table();
2129 const COL_D_F64: FieldId = 13;
2130
2131 let filter = pred_expr(Filter {
2132 field_id: COL_D_F64,
2133 op: Operator::GreaterThan(2.0_f64.into()),
2134 });
2135
2136 let mut got = Vec::new();
2137 table
2138 .scan_stream(
2139 &[proj(&table, COL_D_F64)],
2140 &filter,
2141 ScanStreamOptions::default(),
2142 |b| {
2143 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
2144 for i in 0..arr.len() {
2145 if arr.is_valid(i) {
2146 got.push(arr.value(i));
2147 }
2148 }
2149 },
2150 )
2151 .unwrap();
2152
2153 assert_eq!(got, vec![2.5, 3.5, 2.5]);
2154 }
2155
2156 #[test]
2157 fn test_filtered_scan_float32_in_operator() {
2158 let table = setup_test_table();
2159 const COL_E_F32: FieldId = 14;
2160
2161 let candidates = [2.0_f32.into(), 3.0_f32.into()];
2162 let filter = pred_expr(Filter {
2163 field_id: COL_E_F32,
2164 op: Operator::In(&candidates),
2165 });
2166
2167 let mut vals: Vec<Option<f32>> = Vec::new();
2168 table
2169 .scan_stream(
2170 &[proj(&table, COL_E_F32)],
2171 &filter,
2172 ScanStreamOptions::default(),
2173 |b| {
2174 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
2175 vals.extend((0..arr.len()).map(|i| {
2176 if arr.is_null(i) {
2177 None
2178 } else {
2179 Some(arr.value(i))
2180 }
2181 }));
2182 },
2183 )
2184 .unwrap();
2185
2186 let collected: Vec<f32> = vals.into_iter().flatten().collect();
2187 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
2188 }
2189
2190 #[test]
2191 fn test_scan_stream_and_expression() {
2192 let table = setup_test_table();
2193 const COL_A_U64: FieldId = 10;
2194 const COL_C_I32: FieldId = 12;
2195 const COL_E_F32: FieldId = 14;
2196
2197 let expr = Expr::all_of(vec![
2198 Filter {
2199 field_id: COL_C_I32,
2200 op: Operator::GreaterThan(15.into()),
2201 },
2202 Filter {
2203 field_id: COL_A_U64,
2204 op: Operator::LessThan(250.into()),
2205 },
2206 ]);
2207
2208 let mut vals: Vec<Option<f32>> = Vec::new();
2209 table
2210 .scan_stream(
2211 &[proj(&table, COL_E_F32)],
2212 &expr,
2213 ScanStreamOptions::default(),
2214 |b| {
2215 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
2216 vals.extend((0..arr.len()).map(|i| {
2217 if arr.is_null(i) {
2218 None
2219 } else {
2220 Some(arr.value(i))
2221 }
2222 }));
2223 },
2224 )
2225 .unwrap();
2226
2227 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
2228 }
2229
2230 #[test]
2231 fn test_scan_stream_or_expression() {
2232 let table = setup_test_table();
2233 const COL_A_U64: FieldId = 10;
2234 const COL_C_I32: FieldId = 12;
2235
2236 let expr = Expr::any_of(vec![
2237 Filter {
2238 field_id: COL_C_I32,
2239 op: Operator::Equals(10.into()),
2240 },
2241 Filter {
2242 field_id: COL_C_I32,
2243 op: Operator::Equals(30.into()),
2244 },
2245 ]);
2246
2247 let mut vals: Vec<Option<u64>> = Vec::new();
2248 table
2249 .scan_stream(
2250 &[proj(&table, COL_A_U64)],
2251 &expr,
2252 ScanStreamOptions::default(),
2253 |b| {
2254 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2255 vals.extend((0..arr.len()).map(|i| {
2256 if arr.is_null(i) {
2257 None
2258 } else {
2259 Some(arr.value(i))
2260 }
2261 }));
2262 },
2263 )
2264 .unwrap();
2265
2266 assert_eq!(vals, vec![Some(100), Some(300)]);
2267 }
2268
2269 #[test]
2270 fn test_scan_stream_not_predicate() {
2271 let table = setup_test_table();
2272 const COL_A_U64: FieldId = 10;
2273 const COL_C_I32: FieldId = 12;
2274
2275 let expr = Expr::not(pred_expr(Filter {
2276 field_id: COL_C_I32,
2277 op: Operator::Equals(20.into()),
2278 }));
2279
2280 let mut vals: Vec<Option<u64>> = Vec::new();
2281 table
2282 .scan_stream(
2283 &[proj(&table, COL_A_U64)],
2284 &expr,
2285 ScanStreamOptions::default(),
2286 |b| {
2287 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2288 vals.extend((0..arr.len()).map(|i| {
2289 if arr.is_null(i) {
2290 None
2291 } else {
2292 Some(arr.value(i))
2293 }
2294 }));
2295 },
2296 )
2297 .unwrap();
2298
2299 assert_eq!(vals, vec![Some(100), Some(300)]);
2300 }
2301
2302 #[test]
2303 fn test_scan_stream_not_and_expression() {
2304 let table = setup_test_table();
2305 const COL_A_U64: FieldId = 10;
2306 const COL_C_I32: FieldId = 12;
2307
2308 let expr = Expr::not(Expr::all_of(vec![
2309 Filter {
2310 field_id: COL_A_U64,
2311 op: Operator::GreaterThan(150.into()),
2312 },
2313 Filter {
2314 field_id: COL_C_I32,
2315 op: Operator::LessThan(40.into()),
2316 },
2317 ]));
2318
2319 let mut vals: Vec<Option<u64>> = Vec::new();
2320 table
2321 .scan_stream(
2322 &[proj(&table, COL_A_U64)],
2323 &expr,
2324 ScanStreamOptions::default(),
2325 |b| {
2326 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2327 vals.extend((0..arr.len()).map(|i| {
2328 if arr.is_null(i) {
2329 None
2330 } else {
2331 Some(arr.value(i))
2332 }
2333 }));
2334 },
2335 )
2336 .unwrap();
2337
2338 assert_eq!(vals, vec![Some(100)]);
2339 }
2340
2341 #[test]
2342 fn test_scan_stream_include_nulls_toggle() {
2343 let pager = Arc::new(MemPager::default());
2344 let table = setup_test_table_with_pager(&pager);
2345 const COL_A_U64: FieldId = 10;
2346 const COL_C_I32: FieldId = 12;
2347 const COL_B_BIN: FieldId = 11;
2348 const COL_D_F64: FieldId = 13;
2349 const COL_E_F32: FieldId = 14;
2350
2351 let schema = Arc::new(Schema::new(vec![
2352 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
2353 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
2354 crate::constants::FIELD_ID_META_KEY.to_string(),
2355 COL_A_U64.to_string(),
2356 )])),
2357 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
2358 crate::constants::FIELD_ID_META_KEY.to_string(),
2359 COL_B_BIN.to_string(),
2360 )])),
2361 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
2362 crate::constants::FIELD_ID_META_KEY.to_string(),
2363 COL_C_I32.to_string(),
2364 )])),
2365 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
2366 crate::constants::FIELD_ID_META_KEY.to_string(),
2367 COL_D_F64.to_string(),
2368 )])),
2369 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
2370 crate::constants::FIELD_ID_META_KEY.to_string(),
2371 COL_E_F32.to_string(),
2372 )])),
2373 ]));
2374
2375 let batch = RecordBatch::try_new(
2376 schema.clone(),
2377 vec![
2378 Arc::new(UInt64Array::from(vec![5, 6])),
2379 Arc::new(UInt64Array::from(vec![500, 600])),
2380 Arc::new(BinaryArray::from(vec![
2381 Some(&b"new"[..]),
2382 Some(&b"alt"[..]),
2383 ])),
2384 Arc::new(Int32Array::from(vec![Some(40), None])),
2385 Arc::new(Float64Array::from(vec![5.5, 6.5])),
2386 Arc::new(Float32Array::from(vec![5.0, 6.0])),
2387 ],
2388 )
2389 .unwrap();
2390 table.append(&batch).unwrap();
2391
2392 let filter = pred_expr(Filter {
2393 field_id: COL_A_U64,
2394 op: Operator::GreaterThan(450.into()),
2395 });
2396
2397 let mut default_vals: Vec<Option<i32>> = Vec::new();
2398 table
2399 .scan_stream(
2400 &[proj(&table, COL_C_I32)],
2401 &filter,
2402 ScanStreamOptions::default(),
2403 |b| {
2404 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2405 default_vals.extend((0..arr.len()).map(|i| {
2406 if arr.is_null(i) {
2407 None
2408 } else {
2409 Some(arr.value(i))
2410 }
2411 }));
2412 },
2413 )
2414 .unwrap();
2415 assert_eq!(default_vals, vec![Some(40)]);
2416
2417 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
2418 table
2419 .scan_stream(
2420 &[proj(&table, COL_C_I32)],
2421 &filter,
2422 ScanStreamOptions {
2423 include_nulls: true,
2424 order: None,
2425 row_id_filter: None,
2426 include_row_ids: true,
2427 },
2428 |b| {
2429 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2430
2431 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
2432 table
2433 .scan_stream(
2434 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
2435 &filter,
2436 ScanStreamOptions::default(),
2437 |b| {
2438 assert_eq!(b.num_columns(), 2);
2439 let c_arr =
2440 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2441 let d_arr =
2442 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2443 for i in 0..b.num_rows() {
2444 let c_val = if c_arr.is_null(i) {
2445 None
2446 } else {
2447 Some(c_arr.value(i))
2448 };
2449 let d_val = if d_arr.is_null(i) {
2450 None
2451 } else {
2452 Some(d_arr.value(i))
2453 };
2454 paired_vals.push((c_val, d_val));
2455 }
2456 },
2457 )
2458 .unwrap();
2459 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
2460 include_null_vals.extend((0..arr.len()).map(|i| {
2461 if arr.is_null(i) {
2462 None
2463 } else {
2464 Some(arr.value(i))
2465 }
2466 }));
2467 },
2468 )
2469 .unwrap();
2470 assert_eq!(include_null_vals, vec![Some(40), None]);
2471 }
2472
2473 #[test]
2474 fn test_filtered_scan_int_sqrt_float64() {
2475 let table = setup_test_table();
2481 const COL_A_U64: FieldId = 10;
2482 const COL_C_I32: FieldId = 12;
2483
2484 let filter = pred_expr(Filter {
2485 field_id: COL_C_I32,
2486 op: Operator::GreaterThan(15.into()),
2487 });
2488
2489 let mut got: Vec<f64> = Vec::new();
2490 table
2491 .scan_stream(
2492 &[proj(&table, COL_A_U64)],
2493 &filter,
2494 ScanStreamOptions::default(),
2495 |b| {
2496 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
2497 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
2498
2499 let sqrt_arr = unary::<
2501 arrow::datatypes::Float64Type,
2502 _,
2503 arrow::datatypes::Float64Type,
2504 >(f64_arr, |v: f64| v.sqrt());
2505
2506 for i in 0..sqrt_arr.len() {
2507 if !sqrt_arr.is_null(i) {
2508 got.push(sqrt_arr.value(i));
2509 }
2510 }
2511 },
2512 )
2513 .unwrap();
2514
2515 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
2516 assert_eq!(got, expected);
2517 }
2518
2519 #[test]
2520 fn test_multi_field_kernels_with_filters() {
2521 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
2525
2526 let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
2527
2528 const COL_A_U64: FieldId = 20;
2529 const COL_D_U32: FieldId = 21;
2530 const COL_E_I16: FieldId = 22;
2531 const COL_F_U8: FieldId = 23;
2532 const COL_C_I32: FieldId = 24;
2533
2534 let schema = Arc::new(Schema::new(vec![
2535 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
2536 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
2537 crate::constants::FIELD_ID_META_KEY.to_string(),
2538 COL_A_U64.to_string(),
2539 )])),
2540 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
2541 crate::constants::FIELD_ID_META_KEY.to_string(),
2542 COL_D_U32.to_string(),
2543 )])),
2544 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
2545 crate::constants::FIELD_ID_META_KEY.to_string(),
2546 COL_E_I16.to_string(),
2547 )])),
2548 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
2549 crate::constants::FIELD_ID_META_KEY.to_string(),
2550 COL_F_U8.to_string(),
2551 )])),
2552 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
2553 crate::constants::FIELD_ID_META_KEY.to_string(),
2554 COL_C_I32.to_string(),
2555 )])),
2556 ]));
2557
2558 let batch = RecordBatch::try_new(
2560 schema.clone(),
2561 vec![
2562 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
2563 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
2564 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
2565 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
2566 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
2567 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
2568 ],
2569 )
2570 .unwrap();
2571
2572 table.append(&batch).unwrap();
2573
2574 let filter = pred_expr(Filter {
2576 field_id: COL_C_I32,
2577 op: Operator::GreaterThanOrEquals(20.into()),
2578 });
2579
2580 let mut d_sum: u128 = 0;
2582 table
2583 .scan_stream(
2584 &[proj(&table, COL_D_U32)],
2585 &filter,
2586 ScanStreamOptions::default(),
2587 |b| {
2588 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
2589 if let Some(part) = sum(a) {
2590 d_sum += part as u128;
2591 }
2592 },
2593 )
2594 .unwrap();
2595 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
2596
2597 let mut e_min: Option<i16> = None;
2599 table
2600 .scan_stream(
2601 &[proj(&table, COL_E_I16)],
2602 &filter,
2603 ScanStreamOptions::default(),
2604 |b| {
2605 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
2606 if let Some(part_min) = min(a) {
2607 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
2608 }
2609 },
2610 )
2611 .unwrap();
2612 assert_eq!(e_min, Some(-6));
2613
2614 let mut f_max: Option<u8> = None;
2616 table
2617 .scan_stream(
2618 &[proj(&table, COL_F_U8)],
2619 &filter,
2620 ScanStreamOptions::default(),
2621 |b| {
2622 let a = b
2623 .column(0)
2624 .as_any()
2625 .downcast_ref::<arrow::array::UInt8Array>()
2626 .unwrap();
2627 if let Some(part_max) = max(a) {
2628 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
2629 }
2630 },
2631 )
2632 .unwrap();
2633 assert_eq!(f_max, Some(10));
2634
2635 let mut got: Vec<f64> = Vec::new();
2637 table
2638 .scan_stream(
2639 &[proj(&table, COL_A_U64)],
2640 &filter,
2641 ScanStreamOptions::default(),
2642 |b| {
2643 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
2644 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
2645 let sqrt_arr = unary::<
2646 arrow::datatypes::Float64Type,
2647 _,
2648 arrow::datatypes::Float64Type,
2649 >(f64_arr, |v: f64| v.sqrt());
2650
2651 for i in 0..sqrt_arr.len() {
2652 if !sqrt_arr.is_null(i) {
2653 got.push(sqrt_arr.value(i));
2654 }
2655 }
2656 },
2657 )
2658 .unwrap();
2659 let expected = [15.0_f64, 20.0, 30.0, 40.0];
2660 assert_eq!(got, expected);
2661 }
2662
2663 #[test]
2664 fn test_scan_with_in_filter() {
2665 let table = setup_test_table();
2666 const COL_A_U64: FieldId = 10;
2667 const COL_C_I32: FieldId = 12;
2668
2669 let candidates = [10.into(), 30.into()];
2671 let filter = pred_expr(Filter {
2672 field_id: COL_C_I32,
2673 op: Operator::In(&candidates),
2674 });
2675
2676 let mut vals: Vec<Option<u64>> = Vec::new();
2677 table
2678 .scan_stream(
2679 &[proj(&table, COL_A_U64)],
2680 &filter,
2681 ScanStreamOptions::default(),
2682 |b| {
2683 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2684 vals.extend(
2685 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
2686 );
2687 },
2688 )
2689 .unwrap();
2690 assert_eq!(vals, vec![Some(100), Some(300)]);
2691 }
2692
2693 #[test]
2694 fn test_scan_stream_single_column_batches() {
2695 let table = setup_test_table();
2696 const COL_A_U64: FieldId = 10;
2697 const COL_C_I32: FieldId = 12;
2698
2699 let filter = pred_expr(Filter {
2701 field_id: COL_C_I32,
2702 op: Operator::Equals(20.into()),
2703 });
2704
2705 let mut seen_cols = Vec::<u64>::new();
2706 table
2707 .scan_stream(
2708 &[proj(&table, COL_A_U64)],
2709 &filter,
2710 ScanStreamOptions::default(),
2711 |b| {
2712 assert_eq!(b.num_columns(), 1);
2713 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2714 for i in 0..a.len() {
2716 if !a.is_null(i) {
2717 seen_cols.push(a.value(i));
2718 }
2719 }
2720 },
2721 )
2722 .unwrap();
2723
2724 assert_eq!(seen_cols, vec![200, 200]);
2726 }
2727
2728 #[test]
2729 fn test_scan_with_multiple_projection_columns() {
2730 let table = setup_test_table();
2731 const COL_A_U64: FieldId = 10;
2732 const COL_C_I32: FieldId = 12;
2733
2734 let filter = pred_expr(Filter {
2735 field_id: COL_C_I32,
2736 op: Operator::Equals(20.into()),
2737 });
2738
2739 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
2740
2741 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
2742 table
2743 .scan_stream(
2744 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
2745 &filter,
2746 ScanStreamOptions::default(),
2747 |b| {
2748 assert_eq!(b.num_columns(), 2);
2749 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
2750 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
2751
2752 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2753 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
2754 for i in 0..b.num_rows() {
2755 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
2756 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
2757 combined.push((left, right));
2758 }
2759 },
2760 )
2761 .unwrap();
2762
2763 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2764 }
2765
2766 #[test]
2767 fn test_scan_stream_projection_validation() {
2768 let table = setup_test_table();
2769 const COL_A_U64: FieldId = 10;
2770 const COL_C_I32: FieldId = 12;
2771
2772 let filter = pred_expr(Filter {
2773 field_id: COL_C_I32,
2774 op: Operator::Equals(20.into()),
2775 });
2776
2777 let empty: [Projection; 0] = [];
2778 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2779 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2780
2781 let duplicate = [
2786 proj(&table, COL_A_U64),
2787 proj_alias(&table, COL_A_U64, "alias_a"),
2788 ];
2789 let mut collected = Vec::<u64>::new();
2790 table
2791 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2792 assert_eq!(b.num_columns(), 2);
2793 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2794 assert_eq!(b.schema().field(1).name(), "alias_a");
2795 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2796 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2797 for i in 0..b.num_rows() {
2798 if !a0.is_null(i) {
2799 collected.push(a0.value(i));
2800 }
2801 if !a1.is_null(i) {
2802 collected.push(a1.value(i));
2803 }
2804 }
2805 })
2806 .unwrap();
2807 assert_eq!(collected, vec![200, 200, 200, 200]);
2809 }
2810
2811 #[test]
2812 fn test_scan_stream_computed_projection() {
2813 let table = setup_test_table();
2814 const COL_A_U64: FieldId = 10;
2815
2816 let projections = [
2817 ScanProjection::column(proj(&table, COL_A_U64)),
2818 ScanProjection::computed(
2819 ScalarExpr::binary(
2820 ScalarExpr::column(COL_A_U64),
2821 BinaryOp::Multiply,
2822 ScalarExpr::literal(2),
2823 ),
2824 "a_times_two",
2825 ),
2826 ];
2827
2828 let filter = pred_expr(Filter {
2829 field_id: COL_A_U64,
2830 op: Operator::GreaterThanOrEquals(0.into()),
2831 });
2832
2833 let mut computed: Vec<(u64, f64)> = Vec::new();
2834 table
2835 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2836 assert_eq!(b.num_columns(), 2);
2837 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2838 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2839 for i in 0..b.num_rows() {
2840 if base.is_null(i) || comp.is_null(i) {
2841 continue;
2842 }
2843 computed.push((base.value(i), comp.value(i)));
2844 }
2845 })
2846 .unwrap();
2847
2848 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2849 assert_eq!(computed, expected);
2850 }
2851
2852 #[test]
2853 fn test_scan_stream_multi_column_filter_compare() {
2854 let table = setup_test_table();
2855 const COL_A_U64: FieldId = 10;
2856 const COL_C_I32: FieldId = 12;
2857
2858 let expr = Expr::Compare {
2859 left: ScalarExpr::binary(
2860 ScalarExpr::column(COL_A_U64),
2861 BinaryOp::Add,
2862 ScalarExpr::column(COL_C_I32),
2863 ),
2864 op: CompareOp::Gt,
2865 right: ScalarExpr::literal(220_i64),
2866 };
2867
2868 let mut vals: Vec<Option<u64>> = Vec::new();
2869 table
2870 .scan_stream(
2871 &[proj(&table, COL_A_U64)],
2872 &expr,
2873 ScanStreamOptions::default(),
2874 |b| {
2875 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2876 for i in 0..b.num_rows() {
2877 vals.push(if col.is_null(i) {
2878 None
2879 } else {
2880 Some(col.value(i))
2881 });
2882 }
2883 },
2884 )
2885 .unwrap();
2886
2887 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2888 }
2889}