1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::types::TableId;
7
8use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
9use arrow::datatypes::{DataType, Field, Schema};
10use std::collections::HashMap;
11
12use llkv_column_map::store::{Projection, ROW_ID_COLUMN_NAME};
13use llkv_column_map::{ColumnStore, types::LogicalFieldId};
14use llkv_storage::pager::{MemPager, Pager};
15use simd_r_drive_entry_handle::EntryHandle;
16
17use crate::reserved::is_reserved_table_id;
18use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
19use crate::types::FieldId;
20use llkv_expr::{Expr, ScalarExpr};
21use llkv_result::{Error, Result as LlkvResult};
22
23#[derive(Debug, Clone, Copy)]
26struct MvccColumnCache {
27 has_created_by: bool,
28 has_deleted_by: bool,
29}
30
31pub struct Table<P = MemPager>
55where
56 P: Pager<Blob = EntryHandle> + Send + Sync,
57{
58 store: ColumnStore<P>,
59 table_id: TableId,
60 mvcc_cache: RwLock<Option<MvccColumnCache>>,
63}
64
65pub trait RowIdFilter<P>: Send + Sync
77where
78 P: Pager<Blob = EntryHandle> + Send + Sync,
79{
80 fn filter(&self, table: &Table<P>, row_ids: Vec<u64>) -> LlkvResult<Vec<u64>>;
86}
87
88pub struct ScanStreamOptions<P = MemPager>
93where
94 P: Pager<Blob = EntryHandle> + Send + Sync,
95{
96 pub include_nulls: bool,
102 pub order: Option<ScanOrderSpec>,
107 pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
112}
113
114impl<P> fmt::Debug for ScanStreamOptions<P>
115where
116 P: Pager<Blob = EntryHandle> + Send + Sync,
117{
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("ScanStreamOptions")
120 .field("include_nulls", &self.include_nulls)
121 .field("order", &self.order)
122 .field(
123 "row_id_filter",
124 &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
125 )
126 .finish()
127 }
128}
129
130impl<P> Clone for ScanStreamOptions<P>
131where
132 P: Pager<Blob = EntryHandle> + Send + Sync,
133{
134 fn clone(&self) -> Self {
135 Self {
136 include_nulls: self.include_nulls,
137 order: self.order,
138 row_id_filter: self.row_id_filter.clone(),
139 }
140 }
141}
142
143impl<P> Default for ScanStreamOptions<P>
144where
145 P: Pager<Blob = EntryHandle> + Send + Sync,
146{
147 fn default() -> Self {
148 Self {
149 include_nulls: false,
150 order: None,
151 row_id_filter: None,
152 }
153 }
154}
155
156#[derive(Clone, Copy, Debug)]
160pub struct ScanOrderSpec {
161 pub field_id: FieldId,
163 pub direction: ScanOrderDirection,
165 pub nulls_first: bool,
167 pub transform: ScanOrderTransform,
169}
170
171#[derive(Clone, Copy, Debug, PartialEq, Eq)]
173pub enum ScanOrderDirection {
174 Ascending,
176 Descending,
178}
179
180#[derive(Clone, Copy, Debug, PartialEq, Eq)]
184pub enum ScanOrderTransform {
185 IdentityInteger,
187 IdentityUtf8,
189 CastUtf8ToInteger,
191}
192
193#[derive(Clone, Debug)]
197pub enum ScanProjection {
198 Column(Projection),
200 Computed {
202 expr: ScalarExpr<FieldId>,
204 alias: String,
206 },
207}
208
209impl ScanProjection {
210 pub fn column<P: Into<Projection>>(proj: P) -> Self {
212 Self::Column(proj.into())
213 }
214
215 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
217 Self::Computed {
218 expr,
219 alias: alias.into(),
220 }
221 }
222}
223
224impl From<Projection> for ScanProjection {
225 fn from(value: Projection) -> Self {
226 ScanProjection::Column(value)
227 }
228}
229
230impl From<&Projection> for ScanProjection {
231 fn from(value: &Projection) -> Self {
232 ScanProjection::Column(value.clone())
233 }
234}
235
236impl From<&ScanProjection> for ScanProjection {
237 fn from(value: &ScanProjection) -> Self {
238 value.clone()
239 }
240}
241
242impl<P> Table<P>
243where
244 P: Pager<Blob = EntryHandle> + Send + Sync,
245{
246 pub fn new(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
258 if is_reserved_table_id(table_id) {
259 return Err(Error::ReservedTableId(table_id));
260 }
261
262 tracing::trace!(
263 "!!! Table::new: Creating table table_id={} with pager at {:p}",
264 table_id,
265 &*pager
266 );
267 let store = ColumnStore::open(pager)?;
268 Ok(Self {
269 store,
270 table_id,
271 mvcc_cache: RwLock::new(None),
272 })
273 }
274
275 fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
278 {
280 let cache_read = self.mvcc_cache.read().unwrap();
281 if let Some(cache) = *cache_read {
282 return cache;
283 }
284 }
285
286 let has_created_by = schema
288 .fields()
289 .iter()
290 .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
291 let has_deleted_by = schema
292 .fields()
293 .iter()
294 .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
295
296 let cache = MvccColumnCache {
297 has_created_by,
298 has_deleted_by,
299 };
300
301 *self.mvcc_cache.write().unwrap() = Some(cache);
303
304 cache
305 }
306
307 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
332 use arrow::array::UInt64Builder;
333
334 let cache = self.get_mvcc_cache(&batch.schema());
337 let has_created_by = cache.has_created_by;
338 let has_deleted_by = cache.has_deleted_by;
339
340 let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
341 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
342
343 for (idx, field) in batch.schema().fields().iter().enumerate() {
344 let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
345 if maybe_field_id.is_none()
347 && (field.name() == ROW_ID_COLUMN_NAME
348 || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
349 || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
350 {
351 if field.name() == ROW_ID_COLUMN_NAME {
352 new_fields.push(field.as_ref().clone());
353 new_columns.push(batch.column(idx).clone());
354 } else {
355 let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
356 LogicalFieldId::for_mvcc_created_by(self.table_id)
357 } else {
358 LogicalFieldId::for_mvcc_deleted_by(self.table_id)
359 };
360
361 let mut metadata = field.metadata().clone();
362 let lfid_val: u64 = lfid.into();
363 metadata.insert(
364 crate::constants::FIELD_ID_META_KEY.to_string(),
365 lfid_val.to_string(),
366 );
367
368 let new_field =
369 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
370 .with_metadata(metadata);
371 new_fields.push(new_field);
372 new_columns.push(batch.column(idx).clone());
373 }
374 continue;
375 }
376
377 let raw_field_id = maybe_field_id
378 .ok_or_else(|| {
379 llkv_result::Error::Internal(format!(
380 "Field '{}' is missing a valid '{}' in its metadata.",
381 field.name(),
382 crate::constants::FIELD_ID_META_KEY
383 ))
384 })?
385 .parse::<u64>()
386 .map_err(|err| {
387 llkv_result::Error::Internal(format!(
388 "Field '{}' contains an invalid '{}': {}",
389 field.name(),
390 crate::constants::FIELD_ID_META_KEY,
391 err
392 ))
393 })?;
394
395 if raw_field_id > FieldId::MAX as u64 {
396 return Err(llkv_result::Error::Internal(format!(
397 "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
398 field.name(),
399 FieldId::MAX,
400 raw_field_id
401 )));
402 }
403
404 let user_field_id = raw_field_id as FieldId;
405 let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
406
407 let lfid = logical_field_id;
410 let mut new_metadata = field.metadata().clone();
411 let lfid_val: u64 = lfid.into();
412 new_metadata.insert(
413 crate::constants::FIELD_ID_META_KEY.to_string(),
414 lfid_val.to_string(),
415 );
416
417 let new_field =
418 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
419 .with_metadata(new_metadata);
420 new_fields.push(new_field);
421 new_columns.push(batch.column(idx).clone());
422
423 let need_meta = match self
429 .catalog()
430 .get_cols_meta(self.table_id, &[user_field_id])
431 {
432 metas if metas.is_empty() => true,
433 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
434 };
435
436 if need_meta {
437 let meta = ColMeta {
438 col_id: user_field_id,
439 name: Some(field.name().to_string()),
440 flags: 0,
441 default: None,
442 };
443 self.put_col_meta(&meta);
444 }
445 }
446
447 const TXN_ID_AUTO_COMMIT: u64 = 1;
452 const TXN_ID_NONE: u64 = 0;
453 let row_count = batch.num_rows();
454
455 if !has_created_by {
456 let mut created_by_builder = UInt64Builder::with_capacity(row_count);
457 for _ in 0..row_count {
458 created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
459 }
460 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
461 let mut metadata = HashMap::new();
462 let lfid_val: u64 = created_by_lfid.into();
463 metadata.insert(
464 crate::constants::FIELD_ID_META_KEY.to_string(),
465 lfid_val.to_string(),
466 );
467 new_fields.push(
468 Field::new(
469 llkv_column_map::store::CREATED_BY_COLUMN_NAME,
470 DataType::UInt64,
471 false,
472 )
473 .with_metadata(metadata),
474 );
475 new_columns.push(Arc::new(created_by_builder.finish()));
476 }
477
478 if !has_deleted_by {
479 let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
480 for _ in 0..row_count {
481 deleted_by_builder.append_value(TXN_ID_NONE);
482 }
483 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
484 let mut metadata = HashMap::new();
485 let lfid_val: u64 = deleted_by_lfid.into();
486 metadata.insert(
487 crate::constants::FIELD_ID_META_KEY.to_string(),
488 lfid_val.to_string(),
489 );
490 new_fields.push(
491 Field::new(
492 llkv_column_map::store::DELETED_BY_COLUMN_NAME,
493 DataType::UInt64,
494 false,
495 )
496 .with_metadata(metadata),
497 );
498 new_columns.push(Arc::new(deleted_by_builder.finish()));
499 }
500
501 let new_schema = Arc::new(Schema::new(new_fields));
502 let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
503 self.store.append(&namespaced_batch)
504 }
505
506 pub fn scan_stream<'a, I, T, F>(
514 &self,
515 projections: I,
516 filter_expr: &Expr<'a, FieldId>,
517 options: ScanStreamOptions<P>,
518 on_batch: F,
519 ) -> LlkvResult<()>
520 where
521 I: IntoIterator<Item = T>,
522 T: Into<ScanProjection>,
523 F: FnMut(RecordBatch),
524 {
525 let stream_projections: Vec<ScanProjection> =
526 projections.into_iter().map(|p| p.into()).collect();
527 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
528 }
529
530 pub fn scan_stream_with_exprs<'a, F>(
532 &self,
533 projections: &[ScanProjection],
534 filter_expr: &Expr<'a, FieldId>,
535 options: ScanStreamOptions<P>,
536 on_batch: F,
537 ) -> LlkvResult<()>
538 where
539 F: FnMut(RecordBatch),
540 {
541 TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
542 }
543
544 pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<u64>> {
546 collect_row_ids_for_table(self, filter_expr)
547 }
548
549 #[inline]
550 pub fn catalog(&self) -> SysCatalog<'_, P> {
551 SysCatalog::new(&self.store)
552 }
553
554 #[inline]
555 pub fn put_table_meta(&self, meta: &TableMeta) {
556 debug_assert_eq!(meta.table_id, self.table_id);
557 self.catalog().put_table_meta(meta);
558 }
559
560 #[inline]
561 pub fn get_table_meta(&self) -> Option<TableMeta> {
562 self.catalog().get_table_meta(self.table_id)
563 }
564
565 #[inline]
566 pub fn put_col_meta(&self, meta: &ColMeta) {
567 self.catalog().put_col_meta(self.table_id, meta);
568 }
569
570 #[inline]
571 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
572 self.catalog().get_cols_meta(self.table_id, col_ids)
573 }
574
575 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
582 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
584 logical_fields.sort_by_key(|lfid| lfid.field_id());
585
586 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
587 let metas = self.get_cols_meta(&field_ids);
588
589 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
590 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
592
593 for (idx, lfid) in logical_fields.into_iter().enumerate() {
594 let fid = lfid.field_id();
595 let dtype = self.store.data_type(lfid)?;
596 let name = metas
597 .get(idx)
598 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
599 .unwrap_or_else(|| format!("col_{}", fid));
600
601 let mut metadata: HashMap<String, String> = HashMap::new();
602 metadata.insert(
603 crate::constants::FIELD_ID_META_KEY.to_string(),
604 fid.to_string(),
605 );
606
607 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
608 }
609
610 Ok(Arc::new(Schema::new(fields)))
611 }
612
613 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
617 let schema = self.schema()?;
618 let fields = schema.fields();
619
620 let mut names: Vec<String> = Vec::with_capacity(fields.len());
621 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
622 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
623
624 for field in fields.iter() {
625 names.push(field.name().to_string());
626 let fid = field
627 .metadata()
628 .get(crate::constants::FIELD_ID_META_KEY)
629 .and_then(|s| s.parse::<u32>().ok())
630 .unwrap_or(0u32);
631 fids.push(fid);
632 dtypes.push(format!("{:?}", field.data_type()));
633 }
634
635 let name_array: ArrayRef = Arc::new(StringArray::from(names));
637 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
638 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
639
640 let rb_schema = Arc::new(Schema::new(vec![
641 Field::new("name", DataType::Utf8, false),
642 Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
643 Field::new("data_type", DataType::Utf8, false),
644 ]));
645
646 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
647 Ok(batch)
648 }
649
650 pub fn store(&self) -> &ColumnStore<P> {
651 &self.store
652 }
653
654 #[inline]
655 pub fn table_id(&self) -> TableId {
656 self.table_id
657 }
658
659 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
664 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
665 self.store.total_rows_for_field(lfid)
666 }
667
668 pub fn total_rows(&self) -> llkv_result::Result<u64> {
673 use llkv_column_map::store::rowid_fid;
674 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
675 match self.store.total_rows_for_field(rid_lfid) {
677 Ok(n) => Ok(n),
678 Err(_) => {
679 self.store.total_rows_for_table(self.table_id)
681 }
682 }
683 }
684}
685
686#[cfg(test)]
687mod tests {
688 use super::*;
689 use crate::reserved::CATALOG_TABLE_ID;
690 use crate::types::RowId;
691 use arrow::array::Array;
692 use arrow::array::ArrayRef;
693 use arrow::array::{
694 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
695 UInt32Array, UInt64Array,
696 };
697 use arrow::compute::{cast, max, min, sum, unary};
698 use arrow::datatypes::DataType;
699 use llkv_column_map::ColumnStore;
700 use llkv_column_map::store::GatherNullPolicy;
701 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
702 use std::collections::HashMap;
703 use std::ops::Bound;
704
705 fn setup_test_table() -> Table {
706 let pager = Arc::new(MemPager::default());
707 setup_test_table_with_pager(&pager)
708 }
709
710 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
711 let table = Table::new(1, Arc::clone(pager)).unwrap();
712 const COL_A_U64: FieldId = 10;
713 const COL_B_BIN: FieldId = 11;
714 const COL_C_I32: FieldId = 12;
715 const COL_D_F64: FieldId = 13;
716 const COL_E_F32: FieldId = 14;
717
718 let schema = Arc::new(Schema::new(vec![
719 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
720 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
721 crate::constants::FIELD_ID_META_KEY.to_string(),
722 COL_A_U64.to_string(),
723 )])),
724 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
725 crate::constants::FIELD_ID_META_KEY.to_string(),
726 COL_B_BIN.to_string(),
727 )])),
728 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
729 crate::constants::FIELD_ID_META_KEY.to_string(),
730 COL_C_I32.to_string(),
731 )])),
732 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
733 crate::constants::FIELD_ID_META_KEY.to_string(),
734 COL_D_F64.to_string(),
735 )])),
736 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
737 crate::constants::FIELD_ID_META_KEY.to_string(),
738 COL_E_F32.to_string(),
739 )])),
740 ]));
741
742 let batch = RecordBatch::try_new(
743 schema.clone(),
744 vec![
745 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
746 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
747 Arc::new(BinaryArray::from(vec![
748 b"foo" as &[u8],
749 b"bar",
750 b"baz",
751 b"qux",
752 ])),
753 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
754 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
755 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
756 ],
757 )
758 .unwrap();
759
760 table.append(&batch).unwrap();
761 table
762 }
763
764 fn gather_single(
765 store: &ColumnStore<MemPager>,
766 field_id: LogicalFieldId,
767 row_ids: &[u64],
768 ) -> ArrayRef {
769 store
770 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
771 .unwrap()
772 .column(0)
773 .clone()
774 }
775
776 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
777 Expr::Pred(filter)
778 }
779
780 fn proj(table: &Table, field_id: FieldId) -> Projection {
781 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
782 }
783
784 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
785 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
786 }
787
788 #[test]
789 fn table_new_rejects_reserved_table_id() {
790 let result = Table::new(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
791 assert!(matches!(
792 result,
793 Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
794 ));
795 }
796
797 #[test]
798 fn test_append_rejects_logical_field_id_in_metadata() {
799 let table = Table::new(7, Arc::new(MemPager::default())).unwrap();
803
804 const USER_FID: FieldId = 42;
805 let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
807 let logical_val: u64 = logical.into();
808
809 let schema = Arc::new(Schema::new(vec![
810 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811 Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
812 crate::constants::FIELD_ID_META_KEY.to_string(),
813 logical_val.to_string(),
814 )])),
815 ]));
816
817 let batch = RecordBatch::try_new(
818 schema,
819 vec![
820 Arc::new(UInt64Array::from(vec![1u64, 2u64])),
821 Arc::new(UInt64Array::from(vec![10u64, 20u64])),
822 ],
823 )
824 .unwrap();
825
826 let res = table.append(&batch);
827 assert!(matches!(res, Err(Error::Internal(_))));
828 }
829
830 #[test]
831 fn test_scan_with_u64_filter() {
832 let table = setup_test_table();
833 const COL_A_U64: FieldId = 10;
834 const COL_C_I32: FieldId = 12;
835
836 let expr = pred_expr(Filter {
837 field_id: COL_A_U64,
838 op: Operator::Equals(200.into()),
839 });
840
841 let mut vals: Vec<Option<i32>> = Vec::new();
842 table
843 .scan_stream(
844 &[proj(&table, COL_C_I32)],
845 &expr,
846 ScanStreamOptions::default(),
847 |b| {
848 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
849 vals.extend(
850 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
851 );
852 },
853 )
854 .unwrap();
855 assert_eq!(vals, vec![Some(20), Some(20)]);
856 }
857
858 #[test]
859 fn test_scan_with_string_filter() {
860 let pager = Arc::new(MemPager::default());
861 let table = Table::new(500, Arc::clone(&pager)).unwrap();
862
863 const COL_STR: FieldId = 42;
864 let schema = Arc::new(Schema::new(vec![
865 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
866 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
867 crate::constants::FIELD_ID_META_KEY.to_string(),
868 COL_STR.to_string(),
869 )])),
870 ]));
871
872 let batch = RecordBatch::try_new(
873 schema,
874 vec![
875 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
876 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
877 ],
878 )
879 .unwrap();
880 table.append(&batch).unwrap();
881
882 let expr = pred_expr(Filter {
883 field_id: COL_STR,
884 op: Operator::starts_with("al", true),
885 });
886
887 let mut collected: Vec<Option<String>> = Vec::new();
888 table
889 .scan_stream(
890 &[proj(&table, COL_STR)],
891 &expr,
892 ScanStreamOptions::default(),
893 |b| {
894 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
895 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
896 },
897 )
898 .unwrap();
899
900 assert_eq!(
901 collected,
902 vec![Some("alice".to_string()), Some("albert".to_string())]
903 );
904 }
905
906 #[test]
907 fn test_table_reopen_with_shared_pager() {
908 const TABLE_ALPHA: TableId = 42;
909 const TABLE_BETA: TableId = 43;
910 const TABLE_GAMMA: TableId = 44;
911 const COL_ALPHA_U64: FieldId = 100;
912 const COL_ALPHA_I32: FieldId = 101;
913 const COL_ALPHA_U32: FieldId = 102;
914 const COL_ALPHA_I16: FieldId = 103;
915 const COL_BETA_U64: FieldId = 200;
916 const COL_BETA_U8: FieldId = 201;
917 const COL_GAMMA_I16: FieldId = 300;
918
919 let pager = Arc::new(MemPager::default());
920
921 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
922 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
923 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
924 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
925 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
926
927 let beta_rows: Vec<u64> = vec![101, 102, 103];
928 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
929 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
930
931 let gamma_rows: Vec<u64> = vec![501, 502];
932 let gamma_vals_i16: Vec<i16> = vec![123, -321];
933
934 {
936 let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
937 let schema = Arc::new(Schema::new(vec![
938 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
939 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
940 crate::constants::FIELD_ID_META_KEY.to_string(),
941 COL_ALPHA_U64.to_string(),
942 )])),
943 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
944 crate::constants::FIELD_ID_META_KEY.to_string(),
945 COL_ALPHA_I32.to_string(),
946 )])),
947 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
948 crate::constants::FIELD_ID_META_KEY.to_string(),
949 COL_ALPHA_U32.to_string(),
950 )])),
951 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
952 crate::constants::FIELD_ID_META_KEY.to_string(),
953 COL_ALPHA_I16.to_string(),
954 )])),
955 ]));
956 let batch = RecordBatch::try_new(
957 schema,
958 vec![
959 Arc::new(UInt64Array::from(alpha_rows.clone())),
960 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
961 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
962 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
963 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
964 ],
965 )
966 .unwrap();
967 table.append(&batch).unwrap();
968 }
969
970 {
971 let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
972 let schema = Arc::new(Schema::new(vec![
973 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
974 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
975 crate::constants::FIELD_ID_META_KEY.to_string(),
976 COL_BETA_U64.to_string(),
977 )])),
978 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
979 crate::constants::FIELD_ID_META_KEY.to_string(),
980 COL_BETA_U8.to_string(),
981 )])),
982 ]));
983 let batch = RecordBatch::try_new(
984 schema,
985 vec![
986 Arc::new(UInt64Array::from(beta_rows.clone())),
987 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
988 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
989 ],
990 )
991 .unwrap();
992 table.append(&batch).unwrap();
993 }
994
995 {
996 let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
997 let schema = Arc::new(Schema::new(vec![
998 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
999 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1000 crate::constants::FIELD_ID_META_KEY.to_string(),
1001 COL_GAMMA_I16.to_string(),
1002 )])),
1003 ]));
1004 let batch = RecordBatch::try_new(
1005 schema,
1006 vec![
1007 Arc::new(UInt64Array::from(gamma_rows.clone())),
1008 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1009 ],
1010 )
1011 .unwrap();
1012 table.append(&batch).unwrap();
1013 }
1014
1015 {
1017 let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1018 let store = table.store();
1019
1020 let expectations: &[(FieldId, DataType)] = &[
1021 (COL_ALPHA_U64, DataType::UInt64),
1022 (COL_ALPHA_I32, DataType::Int32),
1023 (COL_ALPHA_U32, DataType::UInt32),
1024 (COL_ALPHA_I16, DataType::Int16),
1025 ];
1026
1027 for &(col, ref ty) in expectations {
1028 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1029 assert_eq!(store.data_type(lfid).unwrap(), *ty);
1030 let arr = gather_single(store, lfid, &alpha_rows);
1031 match ty {
1032 DataType::UInt64 => {
1033 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1034 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1035 }
1036 DataType::Int32 => {
1037 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1038 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1039 }
1040 DataType::UInt32 => {
1041 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1042 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1043 }
1044 DataType::Int16 => {
1045 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1046 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1047 }
1048 other => panic!("unexpected dtype {other:?}"),
1049 }
1050 }
1051 }
1052
1053 {
1054 let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
1055 let store = table.store();
1056
1057 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1058 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1059 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1060 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1061 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1062
1063 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1064 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1065 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1066 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1067 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1068 }
1069
1070 {
1071 let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1072 let store = table.store();
1073 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1074 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1075 let arr = gather_single(store, lfid, &gamma_rows);
1076 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1077 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1078 }
1079 }
1080
1081 #[test]
1082 fn test_scan_with_i32_filter() {
1083 let table = setup_test_table();
1084 const COL_A_U64: FieldId = 10;
1085 const COL_C_I32: FieldId = 12;
1086
1087 let filter = pred_expr(Filter {
1088 field_id: COL_C_I32,
1089 op: Operator::Equals(20.into()),
1090 });
1091
1092 let mut vals: Vec<Option<u64>> = Vec::new();
1093 table
1094 .scan_stream(
1095 &[proj(&table, COL_A_U64)],
1096 &filter,
1097 ScanStreamOptions::default(),
1098 |b| {
1099 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1100 vals.extend(
1101 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1102 );
1103 },
1104 )
1105 .unwrap();
1106 assert_eq!(vals, vec![Some(200), Some(200)]);
1107 }
1108
1109 #[test]
1110 fn test_scan_with_greater_than_filter() {
1111 let table = setup_test_table();
1112 const COL_A_U64: FieldId = 10;
1113 const COL_C_I32: FieldId = 12;
1114
1115 let filter = pred_expr(Filter {
1116 field_id: COL_C_I32,
1117 op: Operator::GreaterThan(15.into()),
1118 });
1119
1120 let mut vals: Vec<Option<u64>> = Vec::new();
1121 table
1122 .scan_stream(
1123 &[proj(&table, COL_A_U64)],
1124 &filter,
1125 ScanStreamOptions::default(),
1126 |b| {
1127 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1128 vals.extend(
1129 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1130 );
1131 },
1132 )
1133 .unwrap();
1134 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1135 }
1136
1137 #[test]
1138 fn test_scan_with_range_filter() {
1139 let table = setup_test_table();
1140 const COL_A_U64: FieldId = 10;
1141 const COL_C_I32: FieldId = 12;
1142
1143 let filter = pred_expr(Filter {
1144 field_id: COL_A_U64,
1145 op: Operator::Range {
1146 lower: Bound::Included(150.into()),
1147 upper: Bound::Excluded(300.into()),
1148 },
1149 });
1150
1151 let mut vals: Vec<Option<i32>> = Vec::new();
1152 table
1153 .scan_stream(
1154 &[proj(&table, COL_C_I32)],
1155 &filter,
1156 ScanStreamOptions::default(),
1157 |b| {
1158 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1159 vals.extend(
1160 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1161 );
1162 },
1163 )
1164 .unwrap();
1165 assert_eq!(vals, vec![Some(20), Some(20)]);
1166 }
1167
1168 #[test]
1169 fn test_filtered_scan_sum_kernel() {
1170 let table = setup_test_table();
1174 const COL_A_U64: FieldId = 10;
1175
1176 let filter = pred_expr(Filter {
1177 field_id: COL_A_U64,
1178 op: Operator::Range {
1179 lower: Bound::Included(150.into()),
1180 upper: Bound::Excluded(300.into()),
1181 },
1182 });
1183
1184 let mut total: u128 = 0;
1185 table
1186 .scan_stream(
1187 &[proj(&table, COL_A_U64)],
1188 &filter,
1189 ScanStreamOptions::default(),
1190 |b| {
1191 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1192 if let Some(part) = sum(a) {
1193 total += part as u128;
1194 }
1195 },
1196 )
1197 .unwrap();
1198
1199 assert_eq!(total, 400);
1200 }
1201
1202 #[test]
1203 fn test_filtered_scan_sum_i32_kernel() {
1204 let table = setup_test_table();
1209 const COL_A_U64: FieldId = 10;
1210 const COL_C_I32: FieldId = 12;
1211
1212 let candidates = [100.into(), 300.into()];
1213 let filter = pred_expr(Filter {
1214 field_id: COL_A_U64,
1215 op: Operator::In(&candidates),
1216 });
1217
1218 let mut total: i64 = 0;
1219 table
1220 .scan_stream(
1221 &[proj(&table, COL_C_I32)],
1222 &filter,
1223 ScanStreamOptions::default(),
1224 |b| {
1225 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1226 if let Some(part) = sum(a) {
1227 total += part as i64;
1228 }
1229 },
1230 )
1231 .unwrap();
1232 assert_eq!(total, 40);
1233 }
1234
1235 #[test]
1236 fn test_filtered_scan_min_max_kernel() {
1237 let table = setup_test_table();
1242 const COL_A_U64: FieldId = 10;
1243 const COL_C_I32: FieldId = 12;
1244
1245 let candidates = [100.into(), 300.into()];
1246 let filter = pred_expr(Filter {
1247 field_id: COL_A_U64,
1248 op: Operator::In(&candidates),
1249 });
1250
1251 let mut mn: Option<i32> = None;
1252 let mut mx: Option<i32> = None;
1253 table
1254 .scan_stream(
1255 &[proj(&table, COL_C_I32)],
1256 &filter,
1257 ScanStreamOptions::default(),
1258 |b| {
1259 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1260
1261 if let Some(part_min) = min(a) {
1262 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1263 }
1264 if let Some(part_max) = max(a) {
1265 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1266 }
1267 },
1268 )
1269 .unwrap();
1270 assert_eq!(mn, Some(10));
1271 assert_eq!(mx, Some(30));
1272 }
1273
1274 #[test]
1275 fn test_filtered_scan_float64_column() {
1276 let table = setup_test_table();
1277 const COL_D_F64: FieldId = 13;
1278
1279 let filter = pred_expr(Filter {
1280 field_id: COL_D_F64,
1281 op: Operator::GreaterThan(2.0_f64.into()),
1282 });
1283
1284 let mut got = Vec::new();
1285 table
1286 .scan_stream(
1287 &[proj(&table, COL_D_F64)],
1288 &filter,
1289 ScanStreamOptions::default(),
1290 |b| {
1291 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1292 for i in 0..arr.len() {
1293 if arr.is_valid(i) {
1294 got.push(arr.value(i));
1295 }
1296 }
1297 },
1298 )
1299 .unwrap();
1300
1301 assert_eq!(got, vec![2.5, 3.5, 2.5]);
1302 }
1303
1304 #[test]
1305 fn test_filtered_scan_float32_in_operator() {
1306 let table = setup_test_table();
1307 const COL_E_F32: FieldId = 14;
1308
1309 let candidates = [2.0_f32.into(), 3.0_f32.into()];
1310 let filter = pred_expr(Filter {
1311 field_id: COL_E_F32,
1312 op: Operator::In(&candidates),
1313 });
1314
1315 let mut vals: Vec<Option<f32>> = Vec::new();
1316 table
1317 .scan_stream(
1318 &[proj(&table, COL_E_F32)],
1319 &filter,
1320 ScanStreamOptions::default(),
1321 |b| {
1322 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1323 vals.extend((0..arr.len()).map(|i| {
1324 if arr.is_null(i) {
1325 None
1326 } else {
1327 Some(arr.value(i))
1328 }
1329 }));
1330 },
1331 )
1332 .unwrap();
1333
1334 let collected: Vec<f32> = vals.into_iter().flatten().collect();
1335 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1336 }
1337
1338 #[test]
1339 fn test_scan_stream_and_expression() {
1340 let table = setup_test_table();
1341 const COL_A_U64: FieldId = 10;
1342 const COL_C_I32: FieldId = 12;
1343 const COL_E_F32: FieldId = 14;
1344
1345 let expr = Expr::all_of(vec![
1346 Filter {
1347 field_id: COL_C_I32,
1348 op: Operator::GreaterThan(15.into()),
1349 },
1350 Filter {
1351 field_id: COL_A_U64,
1352 op: Operator::LessThan(250.into()),
1353 },
1354 ]);
1355
1356 let mut vals: Vec<Option<f32>> = Vec::new();
1357 table
1358 .scan_stream(
1359 &[proj(&table, COL_E_F32)],
1360 &expr,
1361 ScanStreamOptions::default(),
1362 |b| {
1363 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1364 vals.extend((0..arr.len()).map(|i| {
1365 if arr.is_null(i) {
1366 None
1367 } else {
1368 Some(arr.value(i))
1369 }
1370 }));
1371 },
1372 )
1373 .unwrap();
1374
1375 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1376 }
1377
1378 #[test]
1379 fn test_scan_stream_or_expression() {
1380 let table = setup_test_table();
1381 const COL_A_U64: FieldId = 10;
1382 const COL_C_I32: FieldId = 12;
1383
1384 let expr = Expr::any_of(vec![
1385 Filter {
1386 field_id: COL_C_I32,
1387 op: Operator::Equals(10.into()),
1388 },
1389 Filter {
1390 field_id: COL_C_I32,
1391 op: Operator::Equals(30.into()),
1392 },
1393 ]);
1394
1395 let mut vals: Vec<Option<u64>> = Vec::new();
1396 table
1397 .scan_stream(
1398 &[proj(&table, COL_A_U64)],
1399 &expr,
1400 ScanStreamOptions::default(),
1401 |b| {
1402 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1403 vals.extend((0..arr.len()).map(|i| {
1404 if arr.is_null(i) {
1405 None
1406 } else {
1407 Some(arr.value(i))
1408 }
1409 }));
1410 },
1411 )
1412 .unwrap();
1413
1414 assert_eq!(vals, vec![Some(100), Some(300)]);
1415 }
1416
1417 #[test]
1418 fn test_scan_stream_not_predicate() {
1419 let table = setup_test_table();
1420 const COL_A_U64: FieldId = 10;
1421 const COL_C_I32: FieldId = 12;
1422
1423 let expr = Expr::not(pred_expr(Filter {
1424 field_id: COL_C_I32,
1425 op: Operator::Equals(20.into()),
1426 }));
1427
1428 let mut vals: Vec<Option<u64>> = Vec::new();
1429 table
1430 .scan_stream(
1431 &[proj(&table, COL_A_U64)],
1432 &expr,
1433 ScanStreamOptions::default(),
1434 |b| {
1435 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1436 vals.extend((0..arr.len()).map(|i| {
1437 if arr.is_null(i) {
1438 None
1439 } else {
1440 Some(arr.value(i))
1441 }
1442 }));
1443 },
1444 )
1445 .unwrap();
1446
1447 assert_eq!(vals, vec![Some(100), Some(300)]);
1448 }
1449
1450 #[test]
1451 fn test_scan_stream_not_and_expression() {
1452 let table = setup_test_table();
1453 const COL_A_U64: FieldId = 10;
1454 const COL_C_I32: FieldId = 12;
1455
1456 let expr = Expr::not(Expr::all_of(vec![
1457 Filter {
1458 field_id: COL_A_U64,
1459 op: Operator::GreaterThan(150.into()),
1460 },
1461 Filter {
1462 field_id: COL_C_I32,
1463 op: Operator::LessThan(40.into()),
1464 },
1465 ]));
1466
1467 let mut vals: Vec<Option<u64>> = Vec::new();
1468 table
1469 .scan_stream(
1470 &[proj(&table, COL_A_U64)],
1471 &expr,
1472 ScanStreamOptions::default(),
1473 |b| {
1474 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1475 vals.extend((0..arr.len()).map(|i| {
1476 if arr.is_null(i) {
1477 None
1478 } else {
1479 Some(arr.value(i))
1480 }
1481 }));
1482 },
1483 )
1484 .unwrap();
1485
1486 assert_eq!(vals, vec![Some(100)]);
1487 }
1488
1489 #[test]
1490 fn test_scan_stream_include_nulls_toggle() {
1491 let pager = Arc::new(MemPager::default());
1492 let table = setup_test_table_with_pager(&pager);
1493 const COL_A_U64: FieldId = 10;
1494 const COL_C_I32: FieldId = 12;
1495 const COL_B_BIN: FieldId = 11;
1496 const COL_D_F64: FieldId = 13;
1497 const COL_E_F32: FieldId = 14;
1498
1499 let schema = Arc::new(Schema::new(vec![
1500 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1501 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1502 crate::constants::FIELD_ID_META_KEY.to_string(),
1503 COL_A_U64.to_string(),
1504 )])),
1505 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1506 crate::constants::FIELD_ID_META_KEY.to_string(),
1507 COL_B_BIN.to_string(),
1508 )])),
1509 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1510 crate::constants::FIELD_ID_META_KEY.to_string(),
1511 COL_C_I32.to_string(),
1512 )])),
1513 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1514 crate::constants::FIELD_ID_META_KEY.to_string(),
1515 COL_D_F64.to_string(),
1516 )])),
1517 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1518 crate::constants::FIELD_ID_META_KEY.to_string(),
1519 COL_E_F32.to_string(),
1520 )])),
1521 ]));
1522
1523 let batch = RecordBatch::try_new(
1524 schema,
1525 vec![
1526 Arc::new(UInt64Array::from(vec![5, 6])),
1527 Arc::new(UInt64Array::from(vec![500, 600])),
1528 Arc::new(BinaryArray::from(vec![
1529 Some(&b"new"[..]),
1530 Some(&b"alt"[..]),
1531 ])),
1532 Arc::new(Int32Array::from(vec![Some(40), None])),
1533 Arc::new(Float64Array::from(vec![5.5, 6.5])),
1534 Arc::new(Float32Array::from(vec![5.0, 6.0])),
1535 ],
1536 )
1537 .unwrap();
1538 table.append(&batch).unwrap();
1539
1540 let filter = pred_expr(Filter {
1541 field_id: COL_A_U64,
1542 op: Operator::GreaterThan(450.into()),
1543 });
1544
1545 let mut default_vals: Vec<Option<i32>> = Vec::new();
1546 table
1547 .scan_stream(
1548 &[proj(&table, COL_C_I32)],
1549 &filter,
1550 ScanStreamOptions::default(),
1551 |b| {
1552 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1553 default_vals.extend((0..arr.len()).map(|i| {
1554 if arr.is_null(i) {
1555 None
1556 } else {
1557 Some(arr.value(i))
1558 }
1559 }));
1560 },
1561 )
1562 .unwrap();
1563 assert_eq!(default_vals, vec![Some(40)]);
1564
1565 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1566 table
1567 .scan_stream(
1568 &[proj(&table, COL_C_I32)],
1569 &filter,
1570 ScanStreamOptions {
1571 include_nulls: true,
1572 order: None,
1573 row_id_filter: None,
1574 },
1575 |b| {
1576 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1577
1578 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1579 table
1580 .scan_stream(
1581 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1582 &filter,
1583 ScanStreamOptions::default(),
1584 |b| {
1585 assert_eq!(b.num_columns(), 2);
1586 let c_arr =
1587 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1588 let d_arr =
1589 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1590 for i in 0..b.num_rows() {
1591 let c_val = if c_arr.is_null(i) {
1592 None
1593 } else {
1594 Some(c_arr.value(i))
1595 };
1596 let d_val = if d_arr.is_null(i) {
1597 None
1598 } else {
1599 Some(d_arr.value(i))
1600 };
1601 paired_vals.push((c_val, d_val));
1602 }
1603 },
1604 )
1605 .unwrap();
1606 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1607 include_null_vals.extend((0..arr.len()).map(|i| {
1608 if arr.is_null(i) {
1609 None
1610 } else {
1611 Some(arr.value(i))
1612 }
1613 }));
1614 },
1615 )
1616 .unwrap();
1617 assert_eq!(include_null_vals, vec![Some(40), None]);
1618 }
1619
1620 #[test]
1621 fn test_filtered_scan_int_sqrt_float64() {
1622 let table = setup_test_table();
1628 const COL_A_U64: FieldId = 10;
1629 const COL_C_I32: FieldId = 12;
1630
1631 let filter = pred_expr(Filter {
1632 field_id: COL_C_I32,
1633 op: Operator::GreaterThan(15.into()),
1634 });
1635
1636 let mut got: Vec<f64> = Vec::new();
1637 table
1638 .scan_stream(
1639 &[proj(&table, COL_A_U64)],
1640 &filter,
1641 ScanStreamOptions::default(),
1642 |b| {
1643 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1644 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1645
1646 let sqrt_arr = unary::<
1648 arrow::datatypes::Float64Type,
1649 _,
1650 arrow::datatypes::Float64Type,
1651 >(f64_arr, |v: f64| v.sqrt());
1652
1653 for i in 0..sqrt_arr.len() {
1654 if !sqrt_arr.is_null(i) {
1655 got.push(sqrt_arr.value(i));
1656 }
1657 }
1658 },
1659 )
1660 .unwrap();
1661
1662 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1663 assert_eq!(got, expected);
1664 }
1665
1666 #[test]
1667 fn test_multi_field_kernels_with_filters() {
1668 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1672
1673 let table = Table::new(2, Arc::new(MemPager::default())).unwrap();
1674
1675 const COL_A_U64: FieldId = 20;
1676 const COL_D_U32: FieldId = 21;
1677 const COL_E_I16: FieldId = 22;
1678 const COL_F_U8: FieldId = 23;
1679 const COL_C_I32: FieldId = 24;
1680
1681 let schema = Arc::new(Schema::new(vec![
1682 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1683 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1684 crate::constants::FIELD_ID_META_KEY.to_string(),
1685 COL_A_U64.to_string(),
1686 )])),
1687 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1688 crate::constants::FIELD_ID_META_KEY.to_string(),
1689 COL_D_U32.to_string(),
1690 )])),
1691 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1692 crate::constants::FIELD_ID_META_KEY.to_string(),
1693 COL_E_I16.to_string(),
1694 )])),
1695 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1696 crate::constants::FIELD_ID_META_KEY.to_string(),
1697 COL_F_U8.to_string(),
1698 )])),
1699 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1700 crate::constants::FIELD_ID_META_KEY.to_string(),
1701 COL_C_I32.to_string(),
1702 )])),
1703 ]));
1704
1705 let batch = RecordBatch::try_new(
1707 schema.clone(),
1708 vec![
1709 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1710 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1711 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1712 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1713 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1714 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1715 ],
1716 )
1717 .unwrap();
1718
1719 table.append(&batch).unwrap();
1720
1721 let filter = pred_expr(Filter {
1723 field_id: COL_C_I32,
1724 op: Operator::GreaterThanOrEquals(20.into()),
1725 });
1726
1727 let mut d_sum: u128 = 0;
1729 table
1730 .scan_stream(
1731 &[proj(&table, COL_D_U32)],
1732 &filter,
1733 ScanStreamOptions::default(),
1734 |b| {
1735 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1736 if let Some(part) = sum(a) {
1737 d_sum += part as u128;
1738 }
1739 },
1740 )
1741 .unwrap();
1742 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1743
1744 let mut e_min: Option<i16> = None;
1746 table
1747 .scan_stream(
1748 &[proj(&table, COL_E_I16)],
1749 &filter,
1750 ScanStreamOptions::default(),
1751 |b| {
1752 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1753 if let Some(part_min) = min(a) {
1754 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1755 }
1756 },
1757 )
1758 .unwrap();
1759 assert_eq!(e_min, Some(-6));
1760
1761 let mut f_max: Option<u8> = None;
1763 table
1764 .scan_stream(
1765 &[proj(&table, COL_F_U8)],
1766 &filter,
1767 ScanStreamOptions::default(),
1768 |b| {
1769 let a = b
1770 .column(0)
1771 .as_any()
1772 .downcast_ref::<arrow::array::UInt8Array>()
1773 .unwrap();
1774 if let Some(part_max) = max(a) {
1775 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1776 }
1777 },
1778 )
1779 .unwrap();
1780 assert_eq!(f_max, Some(10));
1781
1782 let mut got: Vec<f64> = Vec::new();
1784 table
1785 .scan_stream(
1786 &[proj(&table, COL_A_U64)],
1787 &filter,
1788 ScanStreamOptions::default(),
1789 |b| {
1790 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1791 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1792 let sqrt_arr = unary::<
1793 arrow::datatypes::Float64Type,
1794 _,
1795 arrow::datatypes::Float64Type,
1796 >(f64_arr, |v: f64| v.sqrt());
1797
1798 for i in 0..sqrt_arr.len() {
1799 if !sqrt_arr.is_null(i) {
1800 got.push(sqrt_arr.value(i));
1801 }
1802 }
1803 },
1804 )
1805 .unwrap();
1806 let expected = [15.0_f64, 20.0, 30.0, 40.0];
1807 assert_eq!(got, expected);
1808 }
1809
1810 #[test]
1811 fn test_scan_with_in_filter() {
1812 let table = setup_test_table();
1813 const COL_A_U64: FieldId = 10;
1814 const COL_C_I32: FieldId = 12;
1815
1816 let candidates = [10.into(), 30.into()];
1818 let filter = pred_expr(Filter {
1819 field_id: COL_C_I32,
1820 op: Operator::In(&candidates),
1821 });
1822
1823 let mut vals: Vec<Option<u64>> = Vec::new();
1824 table
1825 .scan_stream(
1826 &[proj(&table, COL_A_U64)],
1827 &filter,
1828 ScanStreamOptions::default(),
1829 |b| {
1830 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1831 vals.extend(
1832 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1833 );
1834 },
1835 )
1836 .unwrap();
1837 assert_eq!(vals, vec![Some(100), Some(300)]);
1838 }
1839
1840 #[test]
1841 fn test_scan_stream_single_column_batches() {
1842 let table = setup_test_table();
1843 const COL_A_U64: FieldId = 10;
1844 const COL_C_I32: FieldId = 12;
1845
1846 let filter = pred_expr(Filter {
1848 field_id: COL_C_I32,
1849 op: Operator::Equals(20.into()),
1850 });
1851
1852 let mut seen_cols = Vec::<u64>::new();
1853 table
1854 .scan_stream(
1855 &[proj(&table, COL_A_U64)],
1856 &filter,
1857 ScanStreamOptions::default(),
1858 |b| {
1859 assert_eq!(b.num_columns(), 1);
1860 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1861 for i in 0..a.len() {
1863 if !a.is_null(i) {
1864 seen_cols.push(a.value(i));
1865 }
1866 }
1867 },
1868 )
1869 .unwrap();
1870
1871 assert_eq!(seen_cols, vec![200, 200]);
1873 }
1874
1875 #[test]
1876 fn test_scan_with_multiple_projection_columns() {
1877 let table = setup_test_table();
1878 const COL_A_U64: FieldId = 10;
1879 const COL_C_I32: FieldId = 12;
1880
1881 let filter = pred_expr(Filter {
1882 field_id: COL_C_I32,
1883 op: Operator::Equals(20.into()),
1884 });
1885
1886 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1887
1888 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1889 table
1890 .scan_stream(
1891 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1892 &filter,
1893 ScanStreamOptions::default(),
1894 |b| {
1895 assert_eq!(b.num_columns(), 2);
1896 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1897 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1898
1899 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1900 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1901 for i in 0..b.num_rows() {
1902 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1903 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1904 combined.push((left, right));
1905 }
1906 },
1907 )
1908 .unwrap();
1909
1910 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
1911 }
1912
1913 #[test]
1914 fn test_scan_stream_projection_validation() {
1915 let table = setup_test_table();
1916 const COL_A_U64: FieldId = 10;
1917 const COL_C_I32: FieldId = 12;
1918
1919 let filter = pred_expr(Filter {
1920 field_id: COL_C_I32,
1921 op: Operator::Equals(20.into()),
1922 });
1923
1924 let empty: [Projection; 0] = [];
1925 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
1926 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
1927
1928 let duplicate = [
1933 proj(&table, COL_A_U64),
1934 proj_alias(&table, COL_A_U64, "alias_a"),
1935 ];
1936 let mut collected = Vec::<u64>::new();
1937 table
1938 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
1939 assert_eq!(b.num_columns(), 2);
1940 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
1941 assert_eq!(b.schema().field(1).name(), "alias_a");
1942 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1943 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1944 for i in 0..b.num_rows() {
1945 if !a0.is_null(i) {
1946 collected.push(a0.value(i));
1947 }
1948 if !a1.is_null(i) {
1949 collected.push(a1.value(i));
1950 }
1951 }
1952 })
1953 .unwrap();
1954 assert_eq!(collected, vec![200, 200, 200, 200]);
1956 }
1957
1958 #[test]
1959 fn test_scan_stream_computed_projection() {
1960 let table = setup_test_table();
1961 const COL_A_U64: FieldId = 10;
1962
1963 let projections = [
1964 ScanProjection::column(proj(&table, COL_A_U64)),
1965 ScanProjection::computed(
1966 ScalarExpr::binary(
1967 ScalarExpr::column(COL_A_U64),
1968 BinaryOp::Multiply,
1969 ScalarExpr::literal(2),
1970 ),
1971 "a_times_two",
1972 ),
1973 ];
1974
1975 let filter = pred_expr(Filter {
1976 field_id: COL_A_U64,
1977 op: Operator::GreaterThanOrEquals(0.into()),
1978 });
1979
1980 let mut computed: Vec<(u64, f64)> = Vec::new();
1981 table
1982 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
1983 assert_eq!(b.num_columns(), 2);
1984 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1985 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1986 for i in 0..b.num_rows() {
1987 if base.is_null(i) || comp.is_null(i) {
1988 continue;
1989 }
1990 computed.push((base.value(i), comp.value(i)));
1991 }
1992 })
1993 .unwrap();
1994
1995 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
1996 assert_eq!(computed, expected);
1997 }
1998
1999 #[test]
2000 fn test_scan_stream_multi_column_filter_compare() {
2001 let table = setup_test_table();
2002 const COL_A_U64: FieldId = 10;
2003 const COL_C_I32: FieldId = 12;
2004
2005 let expr = Expr::Compare {
2006 left: ScalarExpr::binary(
2007 ScalarExpr::column(COL_A_U64),
2008 BinaryOp::Add,
2009 ScalarExpr::column(COL_C_I32),
2010 ),
2011 op: CompareOp::Gt,
2012 right: ScalarExpr::literal(220_i64),
2013 };
2014
2015 let mut vals: Vec<Option<u64>> = Vec::new();
2016 table
2017 .scan_stream(
2018 &[proj(&table, COL_A_U64)],
2019 &expr,
2020 ScanStreamOptions::default(),
2021 |b| {
2022 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2023 for i in 0..b.num_rows() {
2024 vals.push(if col.is_null(i) {
2025 None
2026 } else {
2027 Some(col.value(i))
2028 });
2029 }
2030 },
2031 )
2032 .unwrap();
2033
2034 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2035 }
2036}