1use croaring::Treemap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use crate::planner::{TablePlanner, collect_row_ids_for_table};
7use crate::stream::ColumnStream;
8use crate::stream::RowIdSource;
9
10use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use std::collections::HashMap;
13
14use crate::constants::STREAM_BATCH_ROWS;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
17use llkv_storage::pager::{MemPager, Pager};
18use llkv_types::ids::{LogicalFieldId, TableId};
19use simd_r_drive_entry_handle::EntryHandle;
20
21use crate::reserved::is_reserved_table_id;
22use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
23use crate::types::FieldId;
24use llkv_expr::{Expr, ScalarExpr};
25use llkv_result::{Error, Result as LlkvResult};
26
27#[derive(Debug, Clone, Copy)]
30struct MvccColumnCache {
31 has_created_by: bool,
32 has_deleted_by: bool,
33}
34
35pub struct Table<P = MemPager>
38where
39 P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41 store: Arc<ColumnStore<P>>,
42 table_id: TableId,
43 mvcc_cache: RwLock<Option<MvccColumnCache>>,
46}
47
48pub trait RowIdFilter<P>: Send + Sync
60where
61 P: Pager<Blob = EntryHandle> + Send + Sync,
62{
63 fn filter(&self, table: &Table<P>, row_ids: Treemap) -> LlkvResult<Treemap>;
69}
70
71pub struct ScanStreamOptions<P = MemPager>
76where
77 P: Pager<Blob = EntryHandle> + Send + Sync,
78{
79 pub include_nulls: bool,
85 pub order: Option<ScanOrderSpec>,
90 pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
95}
96
97impl<P> fmt::Debug for ScanStreamOptions<P>
98where
99 P: Pager<Blob = EntryHandle> + Send + Sync,
100{
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.debug_struct("ScanStreamOptions")
103 .field("include_nulls", &self.include_nulls)
104 .field("order", &self.order)
105 .field(
106 "row_id_filter",
107 &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
108 )
109 .finish()
110 }
111}
112
113impl<P> Clone for ScanStreamOptions<P>
114where
115 P: Pager<Blob = EntryHandle> + Send + Sync,
116{
117 fn clone(&self) -> Self {
118 Self {
119 include_nulls: self.include_nulls,
120 order: self.order,
121 row_id_filter: self.row_id_filter.clone(),
122 }
123 }
124}
125
126impl<P> Default for ScanStreamOptions<P>
127where
128 P: Pager<Blob = EntryHandle> + Send + Sync,
129{
130 fn default() -> Self {
131 Self {
132 include_nulls: false,
133 order: None,
134 row_id_filter: None,
135 }
136 }
137}
138
139#[derive(Clone, Copy, Debug)]
143pub struct ScanOrderSpec {
144 pub field_id: FieldId,
146 pub direction: ScanOrderDirection,
148 pub nulls_first: bool,
150 pub transform: ScanOrderTransform,
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq)]
156pub enum ScanOrderDirection {
157 Ascending,
159 Descending,
161}
162
163#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub enum ScanOrderTransform {
168 IdentityInt64,
170 IdentityInt32,
172 IdentityUtf8,
174 CastUtf8ToInteger,
176}
177
178#[derive(Clone, Debug)]
182pub enum ScanProjection {
183 Column(Projection),
185 Computed {
187 expr: ScalarExpr<FieldId>,
189 alias: String,
191 },
192}
193
194impl ScanProjection {
195 pub fn column<P: Into<Projection>>(proj: P) -> Self {
197 Self::Column(proj.into())
198 }
199
200 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
202 Self::Computed {
203 expr,
204 alias: alias.into(),
205 }
206 }
207}
208
209impl From<Projection> for ScanProjection {
210 fn from(value: Projection) -> Self {
211 ScanProjection::Column(value)
212 }
213}
214
215impl From<&Projection> for ScanProjection {
216 fn from(value: &Projection) -> Self {
217 ScanProjection::Column(value.clone())
218 }
219}
220
221impl From<&ScanProjection> for ScanProjection {
222 fn from(value: &ScanProjection) -> Self {
223 value.clone()
224 }
225}
226
227impl<P> Table<P>
228where
229 P: Pager<Blob = EntryHandle> + Send + Sync,
230{
231 pub fn create_from_columns(
235 display_name: &str,
236 canonical_name: &str,
237 columns: &[llkv_plan::PlanColumnSpec],
238 metadata: Arc<crate::metadata::MetadataManager<P>>,
239 catalog: Arc<crate::catalog::TableCatalog>,
240 store: Arc<ColumnStore<P>>,
241 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
242 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
243 service.create_table_from_columns(display_name, canonical_name, columns)
244 }
245
246 pub fn create_from_schema(
248 display_name: &str,
249 canonical_name: &str,
250 schema: &arrow::datatypes::Schema,
251 metadata: Arc<crate::metadata::MetadataManager<P>>,
252 catalog: Arc<crate::catalog::TableCatalog>,
253 store: Arc<ColumnStore<P>>,
254 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
255 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
256 service.create_table_from_schema(display_name, canonical_name, schema)
257 }
258
259 #[doc(hidden)]
264 pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
265 if is_reserved_table_id(table_id) {
266 return Err(Error::ReservedTableId(table_id));
267 }
268
269 tracing::trace!(
270 "Table::from_id: Opening table_id={} with pager at {:p}",
271 table_id,
272 &*pager
273 );
274 let store = ColumnStore::open(pager)?;
275 Ok(Self {
276 store: Arc::new(store),
277 table_id,
278 mvcc_cache: RwLock::new(None),
279 })
280 }
281
282 #[doc(hidden)]
287 pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
288 if is_reserved_table_id(table_id) {
289 return Err(Error::ReservedTableId(table_id));
290 }
291
292 Ok(Self {
293 store,
294 table_id,
295 mvcc_cache: RwLock::new(None),
296 })
297 }
298
299 pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
301 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
302 self.store
303 .register_index(logical_field_id, IndexKind::Sort)?;
304 Ok(())
305 }
306
307 pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
309 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
310 match self
311 .store
312 .unregister_index(logical_field_id, IndexKind::Sort)
313 {
314 Ok(()) | Err(Error::NotFound) => Ok(()),
315 Err(err) => Err(err),
316 }
317 }
318
319 pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
321 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
322 match self.store.list_persisted_indexes(logical_field_id) {
323 Ok(kinds) => Ok(kinds),
324 Err(Error::NotFound) => Ok(Vec::new()),
325 Err(err) => Err(err),
326 }
327 }
328
329 fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
332 {
334 let cache_read = self.mvcc_cache.read().unwrap();
335 if let Some(cache) = *cache_read {
336 return cache;
337 }
338 }
339
340 let has_created_by = schema
342 .fields()
343 .iter()
344 .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
345 let has_deleted_by = schema
346 .fields()
347 .iter()
348 .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
349
350 let cache = MvccColumnCache {
351 has_created_by,
352 has_deleted_by,
353 };
354
355 *self.mvcc_cache.write().unwrap() = Some(cache);
357
358 cache
359 }
360
361 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
386 use arrow::array::UInt64Builder;
387
388 let cache = self.get_mvcc_cache(&batch.schema());
391 let has_created_by = cache.has_created_by;
392 let has_deleted_by = cache.has_deleted_by;
393
394 let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
395 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
396
397 for (idx, field) in batch.schema().fields().iter().enumerate() {
398 let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
399 if maybe_field_id.is_none()
401 && (field.name() == ROW_ID_COLUMN_NAME
402 || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
403 || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
404 {
405 if field.name() == ROW_ID_COLUMN_NAME {
406 new_fields.push(field.as_ref().clone());
407 new_columns.push(batch.column(idx).clone());
408 } else {
409 let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
410 LogicalFieldId::for_mvcc_created_by(self.table_id)
411 } else {
412 LogicalFieldId::for_mvcc_deleted_by(self.table_id)
413 };
414
415 let mut metadata = field.metadata().clone();
416 let lfid_val: u64 = lfid.into();
417 metadata.insert(
418 crate::constants::FIELD_ID_META_KEY.to_string(),
419 lfid_val.to_string(),
420 );
421
422 let new_field =
423 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
424 .with_metadata(metadata);
425 new_fields.push(new_field);
426 new_columns.push(batch.column(idx).clone());
427 }
428 continue;
429 }
430
431 let raw_field_id = maybe_field_id
432 .ok_or_else(|| {
433 llkv_result::Error::Internal(format!(
434 "Field '{}' is missing a valid '{}' in its metadata.",
435 field.name(),
436 crate::constants::FIELD_ID_META_KEY
437 ))
438 })?
439 .parse::<u64>()
440 .map_err(|err| {
441 llkv_result::Error::Internal(format!(
442 "Field '{}' contains an invalid '{}': {}",
443 field.name(),
444 crate::constants::FIELD_ID_META_KEY,
445 err
446 ))
447 })?;
448
449 if raw_field_id > FieldId::MAX as u64 {
450 return Err(llkv_result::Error::Internal(format!(
451 "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
452 field.name(),
453 FieldId::MAX,
454 raw_field_id
455 )));
456 }
457
458 let user_field_id = raw_field_id as FieldId;
459 let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
460
461 let lfid = logical_field_id;
464 let mut new_metadata = field.metadata().clone();
465 let lfid_val: u64 = lfid.into();
466 new_metadata.insert(
467 crate::constants::FIELD_ID_META_KEY.to_string(),
468 lfid_val.to_string(),
469 );
470
471 let new_field =
472 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
473 .with_metadata(new_metadata);
474 new_fields.push(new_field);
475 new_columns.push(batch.column(idx).clone());
476
477 let need_meta = match self
483 .catalog()
484 .get_cols_meta(self.table_id, &[user_field_id])
485 {
486 metas if metas.is_empty() => true,
487 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
488 };
489
490 if need_meta {
491 let meta = ColMeta {
492 col_id: user_field_id,
493 name: Some(field.name().to_string()),
494 flags: 0,
495 default: None,
496 };
497 self.catalog().put_col_meta(self.table_id, &meta);
498 }
499 }
500
501 const TXN_ID_AUTO_COMMIT: u64 = 1;
506 const TXN_ID_NONE: u64 = 0;
507 let row_count = batch.num_rows();
508
509 if !has_created_by {
510 let mut created_by_builder = UInt64Builder::with_capacity(row_count);
511 for _ in 0..row_count {
512 created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
513 }
514 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
515 let mut metadata = HashMap::new();
516 let lfid_val: u64 = created_by_lfid.into();
517 metadata.insert(
518 crate::constants::FIELD_ID_META_KEY.to_string(),
519 lfid_val.to_string(),
520 );
521 new_fields.push(
522 Field::new(
523 llkv_column_map::store::CREATED_BY_COLUMN_NAME,
524 DataType::UInt64,
525 false,
526 )
527 .with_metadata(metadata),
528 );
529 new_columns.push(Arc::new(created_by_builder.finish()));
530 }
531
532 if !has_deleted_by {
533 let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
534 for _ in 0..row_count {
535 deleted_by_builder.append_value(TXN_ID_NONE);
536 }
537 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
538 let mut metadata = HashMap::new();
539 let lfid_val: u64 = deleted_by_lfid.into();
540 metadata.insert(
541 crate::constants::FIELD_ID_META_KEY.to_string(),
542 lfid_val.to_string(),
543 );
544 new_fields.push(
545 Field::new(
546 llkv_column_map::store::DELETED_BY_COLUMN_NAME,
547 DataType::UInt64,
548 false,
549 )
550 .with_metadata(metadata),
551 );
552 new_columns.push(Arc::new(deleted_by_builder.finish()));
553 }
554
555 let new_schema = Arc::new(Schema::new(new_fields));
556 let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
557
558 tracing::trace!(
559 table_id = self.table_id,
560 num_columns = namespaced_batch.num_columns(),
561 num_rows = namespaced_batch.num_rows(),
562 "Attempting append to table"
563 );
564
565 if let Err(err) = self.store.append(&namespaced_batch) {
566 let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
567 .schema()
568 .fields()
569 .iter()
570 .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
571 .filter_map(|s| s.parse::<u64>().ok())
572 .map(LogicalFieldId::from)
573 .collect();
574
575 let missing_fields: Vec<LogicalFieldId> = batch_field_ids
577 .iter()
578 .filter(|&&field_id| !self.store.has_field(field_id))
579 .copied()
580 .collect();
581
582 tracing::error!(
583 table_id = self.table_id,
584 error = ?err,
585 batch_field_ids = ?batch_field_ids,
586 missing_from_catalog = ?missing_fields,
587 "Append failed - some fields missing from catalog"
588 );
589 return Err(err);
590 }
591 Ok(())
592 }
593
594 pub fn scan_stream<'a, I, T, F>(
602 &self,
603 projections: I,
604 filter_expr: &Expr<'a, FieldId>,
605 options: ScanStreamOptions<P>,
606 on_batch: F,
607 ) -> LlkvResult<()>
608 where
609 I: IntoIterator<Item = T>,
610 T: Into<ScanProjection>,
611 F: FnMut(RecordBatch),
612 {
613 let stream_projections: Vec<ScanProjection> =
614 projections.into_iter().map(|p| p.into()).collect();
615 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
616 }
617
618 pub fn scan_stream_with_exprs<'a, F>(
624 &self,
625 projections: &[ScanProjection],
626 filter_expr: &Expr<'a, FieldId>,
627 options: ScanStreamOptions<P>,
628 on_batch: F,
629 ) -> LlkvResult<()>
630 where
631 F: FnMut(RecordBatch),
632 {
633 TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
634 }
635
636 pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Treemap> {
637 let source = collect_row_ids_for_table(self, filter_expr)?;
638 Ok(match source {
639 RowIdSource::Bitmap(b) => b,
640 RowIdSource::Vector(v) => Treemap::from_iter(v),
641 })
642 }
643
644 #[inline]
645 pub fn catalog(&self) -> SysCatalog<'_, P> {
646 SysCatalog::new(&self.store)
647 }
648
649 #[inline]
650 pub fn get_table_meta(&self) -> Option<TableMeta> {
651 self.catalog().get_table_meta(self.table_id)
652 }
653
654 #[inline]
655 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
656 self.catalog().get_cols_meta(self.table_id, col_ids)
657 }
658
659 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
666 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
668 logical_fields.sort_by_key(|lfid| lfid.field_id());
669
670 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
671 let metas = self.get_cols_meta(&field_ids);
672
673 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
674 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
676
677 for (idx, lfid) in logical_fields.into_iter().enumerate() {
678 let fid = lfid.field_id();
679 let dtype = self.store.data_type(lfid)?;
680 let name = metas
681 .get(idx)
682 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
683 .unwrap_or_else(|| format!("col_{}", fid));
684
685 let mut metadata: HashMap<String, String> = HashMap::new();
686 metadata.insert(
687 crate::constants::FIELD_ID_META_KEY.to_string(),
688 fid.to_string(),
689 );
690
691 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
692 }
693
694 Ok(Arc::new(Schema::new(fields)))
695 }
696
697 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
701 let schema = self.schema()?;
702 let fields = schema.fields();
703
704 let mut names: Vec<String> = Vec::with_capacity(fields.len());
705 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
706 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
707
708 for field in fields.iter() {
709 names.push(field.name().to_string());
710 let fid = field
711 .metadata()
712 .get(crate::constants::FIELD_ID_META_KEY)
713 .and_then(|s| s.parse::<u32>().ok())
714 .unwrap_or(0u32);
715 fids.push(fid);
716 dtypes.push(format!("{:?}", field.data_type()));
717 }
718
719 let name_array: ArrayRef = Arc::new(StringArray::from(names));
721 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
722 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
723
724 let rb_schema = Arc::new(Schema::new(vec![
725 Field::new("name", DataType::Utf8, false),
726 Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
727 Field::new("data_type", DataType::Utf8, false),
728 ]));
729
730 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
731 Ok(batch)
732 }
733
734 pub fn stream_columns<'table, 'a>(
736 &'table self,
737 logical_fields: impl Into<Arc<[LogicalFieldId]>>,
738 row_ids: impl crate::stream::RowIdStreamSource<'a>,
739 policy: GatherNullPolicy,
740 ) -> LlkvResult<ColumnStream<'table, 'a, P>> {
741 let total_rows = row_ids.count() as usize;
742 let iter = row_ids.into_iter_source();
743 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
744 let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
745 Ok(ColumnStream::new(
746 &self.store,
747 ctx,
748 iter,
749 total_rows,
750 STREAM_BATCH_ROWS,
751 policy,
752 logical_fields,
753 ))
754 }
755
756 pub fn store(&self) -> &ColumnStore<P> {
757 &self.store
758 }
759
760 #[inline]
761 pub fn table_id(&self) -> TableId {
762 self.table_id
763 }
764
765 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
770 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
771 self.store.total_rows_for_field(lfid)
772 }
773
774 pub fn total_rows(&self) -> llkv_result::Result<u64> {
779 use llkv_column_map::store::rowid_fid;
780 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
781 match self.store.total_rows_for_field(rid_lfid) {
783 Ok(n) => Ok(n),
784 Err(_) => {
785 self.store.total_rows_for_table(self.table_id)
787 }
788 }
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::reserved::CATALOG_TABLE_ID;
796 use crate::types::RowId;
797 use arrow::array::Array;
798 use arrow::array::ArrayRef;
799 use arrow::array::{
800 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
801 UInt32Array, UInt64Array,
802 };
803 use arrow::compute::{cast, max, min, sum, unary};
804 use arrow::datatypes::DataType;
805 use llkv_column_map::ColumnStore;
806 use llkv_column_map::store::GatherNullPolicy;
807 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
808 use std::collections::HashMap;
809 use std::ops::Bound;
810
811 fn setup_test_table() -> Table {
812 let pager = Arc::new(MemPager::default());
813 setup_test_table_with_pager(&pager)
814 }
815
816 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
817 let table = Table::from_id(1, Arc::clone(pager)).unwrap();
818 const COL_A_U64: FieldId = 10;
819 const COL_B_BIN: FieldId = 11;
820 const COL_C_I32: FieldId = 12;
821 const COL_D_F64: FieldId = 13;
822 const COL_E_F32: FieldId = 14;
823
824 let schema = Arc::new(Schema::new(vec![
825 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
826 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
827 crate::constants::FIELD_ID_META_KEY.to_string(),
828 COL_A_U64.to_string(),
829 )])),
830 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
831 crate::constants::FIELD_ID_META_KEY.to_string(),
832 COL_B_BIN.to_string(),
833 )])),
834 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
835 crate::constants::FIELD_ID_META_KEY.to_string(),
836 COL_C_I32.to_string(),
837 )])),
838 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
839 crate::constants::FIELD_ID_META_KEY.to_string(),
840 COL_D_F64.to_string(),
841 )])),
842 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
843 crate::constants::FIELD_ID_META_KEY.to_string(),
844 COL_E_F32.to_string(),
845 )])),
846 ]));
847
848 let batch = RecordBatch::try_new(
849 schema.clone(),
850 vec![
851 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
852 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
853 Arc::new(BinaryArray::from(vec![
854 b"foo" as &[u8],
855 b"bar",
856 b"baz",
857 b"qux",
858 ])),
859 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
860 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
861 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
862 ],
863 )
864 .unwrap();
865
866 table.append(&batch).unwrap();
867 table
868 }
869
870 fn gather_single(
871 store: &ColumnStore<MemPager>,
872 field_id: LogicalFieldId,
873 row_ids: &[u64],
874 ) -> ArrayRef {
875 store
876 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
877 .unwrap()
878 .column(0)
879 .clone()
880 }
881
882 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
883 Expr::Pred(filter)
884 }
885
886 fn proj(table: &Table, field_id: FieldId) -> Projection {
887 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
888 }
889
890 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
891 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
892 }
893
894 #[test]
895 fn table_new_rejects_reserved_table_id() {
896 let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
897 assert!(matches!(
898 result,
899 Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
900 ));
901 }
902
903 #[test]
904 fn test_append_rejects_logical_field_id_in_metadata() {
905 let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
909
910 const USER_FID: FieldId = 42;
911 let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
913 let logical_val: u64 = logical.into();
914
915 let schema = Arc::new(Schema::new(vec![
916 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
917 Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
918 crate::constants::FIELD_ID_META_KEY.to_string(),
919 logical_val.to_string(),
920 )])),
921 ]));
922
923 let batch = RecordBatch::try_new(
924 schema,
925 vec![
926 Arc::new(UInt64Array::from(vec![1u64, 2u64])),
927 Arc::new(UInt64Array::from(vec![10u64, 20u64])),
928 ],
929 )
930 .unwrap();
931
932 let res = table.append(&batch);
933 assert!(matches!(res, Err(Error::Internal(_))));
934 }
935
936 #[test]
937 fn test_scan_with_u64_filter() {
938 let table = setup_test_table();
939 const COL_A_U64: FieldId = 10;
940 const COL_C_I32: FieldId = 12;
941
942 let expr = pred_expr(Filter {
943 field_id: COL_A_U64,
944 op: Operator::Equals(200.into()),
945 });
946
947 let mut vals: Vec<Option<i32>> = Vec::new();
948 table
949 .scan_stream(
950 &[proj(&table, COL_C_I32)],
951 &expr,
952 ScanStreamOptions::default(),
953 |b| {
954 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
955 vals.extend(
956 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
957 );
958 },
959 )
960 .unwrap();
961 assert_eq!(vals, vec![Some(20), Some(20)]);
962 }
963
964 #[test]
965 fn test_scan_with_string_filter() {
966 let pager = Arc::new(MemPager::default());
967 let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
968
969 const COL_STR: FieldId = 42;
970 let schema = Arc::new(Schema::new(vec![
971 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
972 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
973 crate::constants::FIELD_ID_META_KEY.to_string(),
974 COL_STR.to_string(),
975 )])),
976 ]));
977
978 let batch = RecordBatch::try_new(
979 schema,
980 vec![
981 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
982 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
983 ],
984 )
985 .unwrap();
986 table.append(&batch).unwrap();
987
988 let expr = pred_expr(Filter {
989 field_id: COL_STR,
990 op: Operator::starts_with("al".to_string(), true),
991 });
992
993 let mut collected: Vec<Option<String>> = Vec::new();
994 table
995 .scan_stream(
996 &[proj(&table, COL_STR)],
997 &expr,
998 ScanStreamOptions::default(),
999 |b| {
1000 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1001 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
1002 },
1003 )
1004 .unwrap();
1005
1006 assert_eq!(
1007 collected,
1008 vec![Some("alice".to_string()), Some("albert".to_string())]
1009 );
1010 }
1011
1012 #[test]
1013 fn test_table_reopen_with_shared_pager() {
1014 const TABLE_ALPHA: TableId = 42;
1015 const TABLE_BETA: TableId = 43;
1016 const TABLE_GAMMA: TableId = 44;
1017 const COL_ALPHA_U64: FieldId = 100;
1018 const COL_ALPHA_I32: FieldId = 101;
1019 const COL_ALPHA_U32: FieldId = 102;
1020 const COL_ALPHA_I16: FieldId = 103;
1021 const COL_BETA_U64: FieldId = 200;
1022 const COL_BETA_U8: FieldId = 201;
1023 const COL_GAMMA_I16: FieldId = 300;
1024
1025 let pager = Arc::new(MemPager::default());
1026
1027 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1028 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1029 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1030 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1031 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1032
1033 let beta_rows: Vec<RowId> = vec![101, 102, 103];
1034 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1035 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1036
1037 let gamma_rows: Vec<RowId> = vec![501, 502];
1038 let gamma_vals_i16: Vec<i16> = vec![123, -321];
1039
1040 {
1042 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1043 let schema = Arc::new(Schema::new(vec![
1044 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1045 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1046 crate::constants::FIELD_ID_META_KEY.to_string(),
1047 COL_ALPHA_U64.to_string(),
1048 )])),
1049 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1050 crate::constants::FIELD_ID_META_KEY.to_string(),
1051 COL_ALPHA_I32.to_string(),
1052 )])),
1053 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1054 crate::constants::FIELD_ID_META_KEY.to_string(),
1055 COL_ALPHA_U32.to_string(),
1056 )])),
1057 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1058 crate::constants::FIELD_ID_META_KEY.to_string(),
1059 COL_ALPHA_I16.to_string(),
1060 )])),
1061 ]));
1062 let batch = RecordBatch::try_new(
1063 schema,
1064 vec![
1065 Arc::new(UInt64Array::from(alpha_rows.clone())),
1066 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1067 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1068 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1069 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1070 ],
1071 )
1072 .unwrap();
1073 table.append(&batch).unwrap();
1074 }
1075
1076 {
1077 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1078 let schema = Arc::new(Schema::new(vec![
1079 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1080 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1081 crate::constants::FIELD_ID_META_KEY.to_string(),
1082 COL_BETA_U64.to_string(),
1083 )])),
1084 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1085 crate::constants::FIELD_ID_META_KEY.to_string(),
1086 COL_BETA_U8.to_string(),
1087 )])),
1088 ]));
1089 let batch = RecordBatch::try_new(
1090 schema,
1091 vec![
1092 Arc::new(UInt64Array::from(beta_rows.clone())),
1093 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1094 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1095 ],
1096 )
1097 .unwrap();
1098 table.append(&batch).unwrap();
1099 }
1100
1101 {
1102 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1103 let schema = Arc::new(Schema::new(vec![
1104 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1105 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1106 crate::constants::FIELD_ID_META_KEY.to_string(),
1107 COL_GAMMA_I16.to_string(),
1108 )])),
1109 ]));
1110 let batch = RecordBatch::try_new(
1111 schema,
1112 vec![
1113 Arc::new(UInt64Array::from(gamma_rows.clone())),
1114 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1115 ],
1116 )
1117 .unwrap();
1118 table.append(&batch).unwrap();
1119 }
1120
1121 {
1123 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1124 let store = table.store();
1125
1126 let expectations: &[(FieldId, DataType)] = &[
1127 (COL_ALPHA_U64, DataType::UInt64),
1128 (COL_ALPHA_I32, DataType::Int32),
1129 (COL_ALPHA_U32, DataType::UInt32),
1130 (COL_ALPHA_I16, DataType::Int16),
1131 ];
1132
1133 for &(col, ref ty) in expectations {
1134 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1135 assert_eq!(store.data_type(lfid).unwrap(), *ty);
1136 let arr = gather_single(store, lfid, &alpha_rows);
1137 match ty {
1138 DataType::UInt64 => {
1139 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1140 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1141 }
1142 DataType::Int32 => {
1143 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1144 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1145 }
1146 DataType::UInt32 => {
1147 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1148 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1149 }
1150 DataType::Int16 => {
1151 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1152 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1153 }
1154 other => panic!("unexpected dtype {other:?}"),
1155 }
1156 }
1157 }
1158
1159 {
1160 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1161 let store = table.store();
1162
1163 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1164 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1165 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1166 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1167 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1168
1169 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1170 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1171 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1172 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1173 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1174 }
1175
1176 {
1177 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1178 let store = table.store();
1179 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1180 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1181 let arr = gather_single(store, lfid, &gamma_rows);
1182 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1183 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1184 }
1185 }
1186
1187 #[test]
1188 fn test_scan_with_i32_filter() {
1189 let table = setup_test_table();
1190 const COL_A_U64: FieldId = 10;
1191 const COL_C_I32: FieldId = 12;
1192
1193 let filter = pred_expr(Filter {
1194 field_id: COL_C_I32,
1195 op: Operator::Equals(20.into()),
1196 });
1197
1198 let mut vals: Vec<Option<u64>> = Vec::new();
1199 table
1200 .scan_stream(
1201 &[proj(&table, COL_A_U64)],
1202 &filter,
1203 ScanStreamOptions::default(),
1204 |b| {
1205 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1206 vals.extend(
1207 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1208 );
1209 },
1210 )
1211 .unwrap();
1212 assert_eq!(vals, vec![Some(200), Some(200)]);
1213 }
1214
1215 #[test]
1216 fn test_scan_with_greater_than_filter() {
1217 let table = setup_test_table();
1218 const COL_A_U64: FieldId = 10;
1219 const COL_C_I32: FieldId = 12;
1220
1221 let filter = pred_expr(Filter {
1222 field_id: COL_C_I32,
1223 op: Operator::GreaterThan(15.into()),
1224 });
1225
1226 let mut vals: Vec<Option<u64>> = Vec::new();
1227 table
1228 .scan_stream(
1229 &[proj(&table, COL_A_U64)],
1230 &filter,
1231 ScanStreamOptions::default(),
1232 |b| {
1233 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1234 vals.extend(
1235 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1236 );
1237 },
1238 )
1239 .unwrap();
1240 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1241 }
1242
1243 #[test]
1244 fn test_scan_with_range_filter() {
1245 let table = setup_test_table();
1246 const COL_A_U64: FieldId = 10;
1247 const COL_C_I32: FieldId = 12;
1248
1249 let filter = pred_expr(Filter {
1250 field_id: COL_A_U64,
1251 op: Operator::Range {
1252 lower: Bound::Included(150.into()),
1253 upper: Bound::Excluded(300.into()),
1254 },
1255 });
1256
1257 let mut vals: Vec<Option<i32>> = Vec::new();
1258 table
1259 .scan_stream(
1260 &[proj(&table, COL_C_I32)],
1261 &filter,
1262 ScanStreamOptions::default(),
1263 |b| {
1264 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1265 vals.extend(
1266 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1267 );
1268 },
1269 )
1270 .unwrap();
1271 assert_eq!(vals, vec![Some(20), Some(20)]);
1272 }
1273
1274 #[test]
1275 fn test_filtered_scan_sum_kernel() {
1276 let table = setup_test_table();
1280 const COL_A_U64: FieldId = 10;
1281
1282 let filter = pred_expr(Filter {
1283 field_id: COL_A_U64,
1284 op: Operator::Range {
1285 lower: Bound::Included(150.into()),
1286 upper: Bound::Excluded(300.into()),
1287 },
1288 });
1289
1290 let mut total: u128 = 0;
1291 table
1292 .scan_stream(
1293 &[proj(&table, COL_A_U64)],
1294 &filter,
1295 ScanStreamOptions::default(),
1296 |b| {
1297 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1298 if let Some(part) = sum(a) {
1299 total += part as u128;
1300 }
1301 },
1302 )
1303 .unwrap();
1304
1305 assert_eq!(total, 400);
1306 }
1307
1308 #[test]
1309 fn test_filtered_scan_sum_i32_kernel() {
1310 let table = setup_test_table();
1315 const COL_A_U64: FieldId = 10;
1316 const COL_C_I32: FieldId = 12;
1317
1318 let candidates = [100.into(), 300.into()];
1319 let filter = pred_expr(Filter {
1320 field_id: COL_A_U64,
1321 op: Operator::In(&candidates),
1322 });
1323
1324 let mut total: i64 = 0;
1325 table
1326 .scan_stream(
1327 &[proj(&table, COL_C_I32)],
1328 &filter,
1329 ScanStreamOptions::default(),
1330 |b| {
1331 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1332 if let Some(part) = sum(a) {
1333 total += part as i64;
1334 }
1335 },
1336 )
1337 .unwrap();
1338 assert_eq!(total, 40);
1339 }
1340
1341 #[test]
1342 fn test_filtered_scan_min_max_kernel() {
1343 let table = setup_test_table();
1348 const COL_A_U64: FieldId = 10;
1349 const COL_C_I32: FieldId = 12;
1350
1351 let candidates = [100.into(), 300.into()];
1352 let filter = pred_expr(Filter {
1353 field_id: COL_A_U64,
1354 op: Operator::In(&candidates),
1355 });
1356
1357 let mut mn: Option<i32> = None;
1358 let mut mx: Option<i32> = None;
1359 table
1360 .scan_stream(
1361 &[proj(&table, COL_C_I32)],
1362 &filter,
1363 ScanStreamOptions::default(),
1364 |b| {
1365 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1366
1367 if let Some(part_min) = min(a) {
1368 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1369 }
1370 if let Some(part_max) = max(a) {
1371 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1372 }
1373 },
1374 )
1375 .unwrap();
1376 assert_eq!(mn, Some(10));
1377 assert_eq!(mx, Some(30));
1378 }
1379
1380 #[test]
1381 fn test_filtered_scan_float64_column() {
1382 let table = setup_test_table();
1383 const COL_D_F64: FieldId = 13;
1384
1385 let filter = pred_expr(Filter {
1386 field_id: COL_D_F64,
1387 op: Operator::GreaterThan(2.0_f64.into()),
1388 });
1389
1390 let mut got = Vec::new();
1391 table
1392 .scan_stream(
1393 &[proj(&table, COL_D_F64)],
1394 &filter,
1395 ScanStreamOptions::default(),
1396 |b| {
1397 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1398 for i in 0..arr.len() {
1399 if arr.is_valid(i) {
1400 got.push(arr.value(i));
1401 }
1402 }
1403 },
1404 )
1405 .unwrap();
1406
1407 assert_eq!(got, vec![2.5, 3.5, 2.5]);
1408 }
1409
1410 #[test]
1411 fn test_filtered_scan_float32_in_operator() {
1412 let table = setup_test_table();
1413 const COL_E_F32: FieldId = 14;
1414
1415 let candidates = [2.0_f32.into(), 3.0_f32.into()];
1416 let filter = pred_expr(Filter {
1417 field_id: COL_E_F32,
1418 op: Operator::In(&candidates),
1419 });
1420
1421 let mut vals: Vec<Option<f32>> = Vec::new();
1422 table
1423 .scan_stream(
1424 &[proj(&table, COL_E_F32)],
1425 &filter,
1426 ScanStreamOptions::default(),
1427 |b| {
1428 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1429 vals.extend((0..arr.len()).map(|i| {
1430 if arr.is_null(i) {
1431 None
1432 } else {
1433 Some(arr.value(i))
1434 }
1435 }));
1436 },
1437 )
1438 .unwrap();
1439
1440 let collected: Vec<f32> = vals.into_iter().flatten().collect();
1441 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1442 }
1443
1444 #[test]
1445 fn test_scan_stream_and_expression() {
1446 let table = setup_test_table();
1447 const COL_A_U64: FieldId = 10;
1448 const COL_C_I32: FieldId = 12;
1449 const COL_E_F32: FieldId = 14;
1450
1451 let expr = Expr::all_of(vec![
1452 Filter {
1453 field_id: COL_C_I32,
1454 op: Operator::GreaterThan(15.into()),
1455 },
1456 Filter {
1457 field_id: COL_A_U64,
1458 op: Operator::LessThan(250.into()),
1459 },
1460 ]);
1461
1462 let mut vals: Vec<Option<f32>> = Vec::new();
1463 table
1464 .scan_stream(
1465 &[proj(&table, COL_E_F32)],
1466 &expr,
1467 ScanStreamOptions::default(),
1468 |b| {
1469 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1470 vals.extend((0..arr.len()).map(|i| {
1471 if arr.is_null(i) {
1472 None
1473 } else {
1474 Some(arr.value(i))
1475 }
1476 }));
1477 },
1478 )
1479 .unwrap();
1480
1481 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1482 }
1483
1484 #[test]
1485 fn test_scan_stream_or_expression() {
1486 let table = setup_test_table();
1487 const COL_A_U64: FieldId = 10;
1488 const COL_C_I32: FieldId = 12;
1489
1490 let expr = Expr::any_of(vec![
1491 Filter {
1492 field_id: COL_C_I32,
1493 op: Operator::Equals(10.into()),
1494 },
1495 Filter {
1496 field_id: COL_C_I32,
1497 op: Operator::Equals(30.into()),
1498 },
1499 ]);
1500
1501 let mut vals: Vec<Option<u64>> = Vec::new();
1502 table
1503 .scan_stream(
1504 &[proj(&table, COL_A_U64)],
1505 &expr,
1506 ScanStreamOptions::default(),
1507 |b| {
1508 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1509 vals.extend((0..arr.len()).map(|i| {
1510 if arr.is_null(i) {
1511 None
1512 } else {
1513 Some(arr.value(i))
1514 }
1515 }));
1516 },
1517 )
1518 .unwrap();
1519
1520 assert_eq!(vals, vec![Some(100), Some(300)]);
1521 }
1522
1523 #[test]
1524 fn test_scan_stream_not_predicate() {
1525 let table = setup_test_table();
1526 const COL_A_U64: FieldId = 10;
1527 const COL_C_I32: FieldId = 12;
1528
1529 let expr = Expr::not(pred_expr(Filter {
1530 field_id: COL_C_I32,
1531 op: Operator::Equals(20.into()),
1532 }));
1533
1534 let mut vals: Vec<Option<u64>> = Vec::new();
1535 table
1536 .scan_stream(
1537 &[proj(&table, COL_A_U64)],
1538 &expr,
1539 ScanStreamOptions::default(),
1540 |b| {
1541 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1542 vals.extend((0..arr.len()).map(|i| {
1543 if arr.is_null(i) {
1544 None
1545 } else {
1546 Some(arr.value(i))
1547 }
1548 }));
1549 },
1550 )
1551 .unwrap();
1552
1553 assert_eq!(vals, vec![Some(100), Some(300)]);
1554 }
1555
1556 #[test]
1557 fn test_scan_stream_not_and_expression() {
1558 let table = setup_test_table();
1559 const COL_A_U64: FieldId = 10;
1560 const COL_C_I32: FieldId = 12;
1561
1562 let expr = Expr::not(Expr::all_of(vec![
1563 Filter {
1564 field_id: COL_A_U64,
1565 op: Operator::GreaterThan(150.into()),
1566 },
1567 Filter {
1568 field_id: COL_C_I32,
1569 op: Operator::LessThan(40.into()),
1570 },
1571 ]));
1572
1573 let mut vals: Vec<Option<u64>> = Vec::new();
1574 table
1575 .scan_stream(
1576 &[proj(&table, COL_A_U64)],
1577 &expr,
1578 ScanStreamOptions::default(),
1579 |b| {
1580 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1581 vals.extend((0..arr.len()).map(|i| {
1582 if arr.is_null(i) {
1583 None
1584 } else {
1585 Some(arr.value(i))
1586 }
1587 }));
1588 },
1589 )
1590 .unwrap();
1591
1592 assert_eq!(vals, vec![Some(100)]);
1593 }
1594
1595 #[test]
1596 fn test_scan_stream_include_nulls_toggle() {
1597 let pager = Arc::new(MemPager::default());
1598 let table = setup_test_table_with_pager(&pager);
1599 const COL_A_U64: FieldId = 10;
1600 const COL_C_I32: FieldId = 12;
1601 const COL_B_BIN: FieldId = 11;
1602 const COL_D_F64: FieldId = 13;
1603 const COL_E_F32: FieldId = 14;
1604
1605 let schema = Arc::new(Schema::new(vec![
1606 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1607 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1608 crate::constants::FIELD_ID_META_KEY.to_string(),
1609 COL_A_U64.to_string(),
1610 )])),
1611 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1612 crate::constants::FIELD_ID_META_KEY.to_string(),
1613 COL_B_BIN.to_string(),
1614 )])),
1615 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1616 crate::constants::FIELD_ID_META_KEY.to_string(),
1617 COL_C_I32.to_string(),
1618 )])),
1619 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1620 crate::constants::FIELD_ID_META_KEY.to_string(),
1621 COL_D_F64.to_string(),
1622 )])),
1623 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1624 crate::constants::FIELD_ID_META_KEY.to_string(),
1625 COL_E_F32.to_string(),
1626 )])),
1627 ]));
1628
1629 let batch = RecordBatch::try_new(
1630 schema.clone(),
1631 vec![
1632 Arc::new(UInt64Array::from(vec![5, 6])),
1633 Arc::new(UInt64Array::from(vec![500, 600])),
1634 Arc::new(BinaryArray::from(vec![
1635 Some(&b"new"[..]),
1636 Some(&b"alt"[..]),
1637 ])),
1638 Arc::new(Int32Array::from(vec![Some(40), None])),
1639 Arc::new(Float64Array::from(vec![5.5, 6.5])),
1640 Arc::new(Float32Array::from(vec![5.0, 6.0])),
1641 ],
1642 )
1643 .unwrap();
1644 table.append(&batch).unwrap();
1645
1646 let filter = pred_expr(Filter {
1647 field_id: COL_A_U64,
1648 op: Operator::GreaterThan(450.into()),
1649 });
1650
1651 let mut default_vals: Vec<Option<i32>> = Vec::new();
1652 table
1653 .scan_stream(
1654 &[proj(&table, COL_C_I32)],
1655 &filter,
1656 ScanStreamOptions::default(),
1657 |b| {
1658 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1659 default_vals.extend((0..arr.len()).map(|i| {
1660 if arr.is_null(i) {
1661 None
1662 } else {
1663 Some(arr.value(i))
1664 }
1665 }));
1666 },
1667 )
1668 .unwrap();
1669 assert_eq!(default_vals, vec![Some(40)]);
1670
1671 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1672 table
1673 .scan_stream(
1674 &[proj(&table, COL_C_I32)],
1675 &filter,
1676 ScanStreamOptions {
1677 include_nulls: true,
1678 order: None,
1679 row_id_filter: None,
1680 },
1681 |b| {
1682 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1683
1684 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1685 table
1686 .scan_stream(
1687 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1688 &filter,
1689 ScanStreamOptions::default(),
1690 |b| {
1691 assert_eq!(b.num_columns(), 2);
1692 let c_arr =
1693 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1694 let d_arr =
1695 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1696 for i in 0..b.num_rows() {
1697 let c_val = if c_arr.is_null(i) {
1698 None
1699 } else {
1700 Some(c_arr.value(i))
1701 };
1702 let d_val = if d_arr.is_null(i) {
1703 None
1704 } else {
1705 Some(d_arr.value(i))
1706 };
1707 paired_vals.push((c_val, d_val));
1708 }
1709 },
1710 )
1711 .unwrap();
1712 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1713 include_null_vals.extend((0..arr.len()).map(|i| {
1714 if arr.is_null(i) {
1715 None
1716 } else {
1717 Some(arr.value(i))
1718 }
1719 }));
1720 },
1721 )
1722 .unwrap();
1723 assert_eq!(include_null_vals, vec![Some(40), None]);
1724 }
1725
1726 #[test]
1727 fn test_filtered_scan_int_sqrt_float64() {
1728 let table = setup_test_table();
1734 const COL_A_U64: FieldId = 10;
1735 const COL_C_I32: FieldId = 12;
1736
1737 let filter = pred_expr(Filter {
1738 field_id: COL_C_I32,
1739 op: Operator::GreaterThan(15.into()),
1740 });
1741
1742 let mut got: Vec<f64> = Vec::new();
1743 table
1744 .scan_stream(
1745 &[proj(&table, COL_A_U64)],
1746 &filter,
1747 ScanStreamOptions::default(),
1748 |b| {
1749 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1750 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1751
1752 let sqrt_arr = unary::<
1754 arrow::datatypes::Float64Type,
1755 _,
1756 arrow::datatypes::Float64Type,
1757 >(f64_arr, |v: f64| v.sqrt());
1758
1759 for i in 0..sqrt_arr.len() {
1760 if !sqrt_arr.is_null(i) {
1761 got.push(sqrt_arr.value(i));
1762 }
1763 }
1764 },
1765 )
1766 .unwrap();
1767
1768 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1769 assert_eq!(got, expected);
1770 }
1771
1772 #[test]
1773 fn test_multi_field_kernels_with_filters() {
1774 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1778
1779 let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1780
1781 const COL_A_U64: FieldId = 20;
1782 const COL_D_U32: FieldId = 21;
1783 const COL_E_I16: FieldId = 22;
1784 const COL_F_U8: FieldId = 23;
1785 const COL_C_I32: FieldId = 24;
1786
1787 let schema = Arc::new(Schema::new(vec![
1788 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1789 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1790 crate::constants::FIELD_ID_META_KEY.to_string(),
1791 COL_A_U64.to_string(),
1792 )])),
1793 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1794 crate::constants::FIELD_ID_META_KEY.to_string(),
1795 COL_D_U32.to_string(),
1796 )])),
1797 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1798 crate::constants::FIELD_ID_META_KEY.to_string(),
1799 COL_E_I16.to_string(),
1800 )])),
1801 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1802 crate::constants::FIELD_ID_META_KEY.to_string(),
1803 COL_F_U8.to_string(),
1804 )])),
1805 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1806 crate::constants::FIELD_ID_META_KEY.to_string(),
1807 COL_C_I32.to_string(),
1808 )])),
1809 ]));
1810
1811 let batch = RecordBatch::try_new(
1813 schema.clone(),
1814 vec![
1815 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1816 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1817 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1818 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1819 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1820 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1821 ],
1822 )
1823 .unwrap();
1824
1825 table.append(&batch).unwrap();
1826
1827 let filter = pred_expr(Filter {
1829 field_id: COL_C_I32,
1830 op: Operator::GreaterThanOrEquals(20.into()),
1831 });
1832
1833 let mut d_sum: u128 = 0;
1835 table
1836 .scan_stream(
1837 &[proj(&table, COL_D_U32)],
1838 &filter,
1839 ScanStreamOptions::default(),
1840 |b| {
1841 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1842 if let Some(part) = sum(a) {
1843 d_sum += part as u128;
1844 }
1845 },
1846 )
1847 .unwrap();
1848 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1849
1850 let mut e_min: Option<i16> = None;
1852 table
1853 .scan_stream(
1854 &[proj(&table, COL_E_I16)],
1855 &filter,
1856 ScanStreamOptions::default(),
1857 |b| {
1858 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1859 if let Some(part_min) = min(a) {
1860 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1861 }
1862 },
1863 )
1864 .unwrap();
1865 assert_eq!(e_min, Some(-6));
1866
1867 let mut f_max: Option<u8> = None;
1869 table
1870 .scan_stream(
1871 &[proj(&table, COL_F_U8)],
1872 &filter,
1873 ScanStreamOptions::default(),
1874 |b| {
1875 let a = b
1876 .column(0)
1877 .as_any()
1878 .downcast_ref::<arrow::array::UInt8Array>()
1879 .unwrap();
1880 if let Some(part_max) = max(a) {
1881 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1882 }
1883 },
1884 )
1885 .unwrap();
1886 assert_eq!(f_max, Some(10));
1887
1888 let mut got: Vec<f64> = Vec::new();
1890 table
1891 .scan_stream(
1892 &[proj(&table, COL_A_U64)],
1893 &filter,
1894 ScanStreamOptions::default(),
1895 |b| {
1896 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1897 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1898 let sqrt_arr = unary::<
1899 arrow::datatypes::Float64Type,
1900 _,
1901 arrow::datatypes::Float64Type,
1902 >(f64_arr, |v: f64| v.sqrt());
1903
1904 for i in 0..sqrt_arr.len() {
1905 if !sqrt_arr.is_null(i) {
1906 got.push(sqrt_arr.value(i));
1907 }
1908 }
1909 },
1910 )
1911 .unwrap();
1912 let expected = [15.0_f64, 20.0, 30.0, 40.0];
1913 assert_eq!(got, expected);
1914 }
1915
1916 #[test]
1917 fn test_scan_with_in_filter() {
1918 let table = setup_test_table();
1919 const COL_A_U64: FieldId = 10;
1920 const COL_C_I32: FieldId = 12;
1921
1922 let candidates = [10.into(), 30.into()];
1924 let filter = pred_expr(Filter {
1925 field_id: COL_C_I32,
1926 op: Operator::In(&candidates),
1927 });
1928
1929 let mut vals: Vec<Option<u64>> = Vec::new();
1930 table
1931 .scan_stream(
1932 &[proj(&table, COL_A_U64)],
1933 &filter,
1934 ScanStreamOptions::default(),
1935 |b| {
1936 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1937 vals.extend(
1938 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1939 );
1940 },
1941 )
1942 .unwrap();
1943 assert_eq!(vals, vec![Some(100), Some(300)]);
1944 }
1945
1946 #[test]
1947 fn test_scan_stream_single_column_batches() {
1948 let table = setup_test_table();
1949 const COL_A_U64: FieldId = 10;
1950 const COL_C_I32: FieldId = 12;
1951
1952 let filter = pred_expr(Filter {
1954 field_id: COL_C_I32,
1955 op: Operator::Equals(20.into()),
1956 });
1957
1958 let mut seen_cols = Vec::<u64>::new();
1959 table
1960 .scan_stream(
1961 &[proj(&table, COL_A_U64)],
1962 &filter,
1963 ScanStreamOptions::default(),
1964 |b| {
1965 assert_eq!(b.num_columns(), 1);
1966 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1967 for i in 0..a.len() {
1969 if !a.is_null(i) {
1970 seen_cols.push(a.value(i));
1971 }
1972 }
1973 },
1974 )
1975 .unwrap();
1976
1977 assert_eq!(seen_cols, vec![200, 200]);
1979 }
1980
1981 #[test]
1982 fn test_scan_with_multiple_projection_columns() {
1983 let table = setup_test_table();
1984 const COL_A_U64: FieldId = 10;
1985 const COL_C_I32: FieldId = 12;
1986
1987 let filter = pred_expr(Filter {
1988 field_id: COL_C_I32,
1989 op: Operator::Equals(20.into()),
1990 });
1991
1992 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1993
1994 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1995 table
1996 .scan_stream(
1997 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1998 &filter,
1999 ScanStreamOptions::default(),
2000 |b| {
2001 assert_eq!(b.num_columns(), 2);
2002 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
2003 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
2004
2005 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2006 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
2007 for i in 0..b.num_rows() {
2008 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
2009 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
2010 combined.push((left, right));
2011 }
2012 },
2013 )
2014 .unwrap();
2015
2016 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2017 }
2018
2019 #[test]
2020 fn test_scan_stream_projection_validation() {
2021 let table = setup_test_table();
2022 const COL_A_U64: FieldId = 10;
2023 const COL_C_I32: FieldId = 12;
2024
2025 let filter = pred_expr(Filter {
2026 field_id: COL_C_I32,
2027 op: Operator::Equals(20.into()),
2028 });
2029
2030 let empty: [Projection; 0] = [];
2031 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2032 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2033
2034 let duplicate = [
2039 proj(&table, COL_A_U64),
2040 proj_alias(&table, COL_A_U64, "alias_a"),
2041 ];
2042 let mut collected = Vec::<u64>::new();
2043 table
2044 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2045 assert_eq!(b.num_columns(), 2);
2046 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2047 assert_eq!(b.schema().field(1).name(), "alias_a");
2048 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2049 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2050 for i in 0..b.num_rows() {
2051 if !a0.is_null(i) {
2052 collected.push(a0.value(i));
2053 }
2054 if !a1.is_null(i) {
2055 collected.push(a1.value(i));
2056 }
2057 }
2058 })
2059 .unwrap();
2060 assert_eq!(collected, vec![200, 200, 200, 200]);
2062 }
2063
2064 #[test]
2065 fn test_scan_stream_computed_projection() {
2066 let table = setup_test_table();
2067 const COL_A_U64: FieldId = 10;
2068
2069 let projections = [
2070 ScanProjection::column(proj(&table, COL_A_U64)),
2071 ScanProjection::computed(
2072 ScalarExpr::binary(
2073 ScalarExpr::column(COL_A_U64),
2074 BinaryOp::Multiply,
2075 ScalarExpr::literal(2),
2076 ),
2077 "a_times_two",
2078 ),
2079 ];
2080
2081 let filter = pred_expr(Filter {
2082 field_id: COL_A_U64,
2083 op: Operator::GreaterThanOrEquals(0.into()),
2084 });
2085
2086 let mut computed: Vec<(u64, f64)> = Vec::new();
2087 table
2088 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2089 assert_eq!(b.num_columns(), 2);
2090 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2091 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2092 for i in 0..b.num_rows() {
2093 if base.is_null(i) || comp.is_null(i) {
2094 continue;
2095 }
2096 computed.push((base.value(i), comp.value(i)));
2097 }
2098 })
2099 .unwrap();
2100
2101 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2102 assert_eq!(computed, expected);
2103 }
2104
2105 #[test]
2106 fn test_scan_stream_multi_column_filter_compare() {
2107 let table = setup_test_table();
2108 const COL_A_U64: FieldId = 10;
2109 const COL_C_I32: FieldId = 12;
2110
2111 let expr = Expr::Compare {
2112 left: ScalarExpr::binary(
2113 ScalarExpr::column(COL_A_U64),
2114 BinaryOp::Add,
2115 ScalarExpr::column(COL_C_I32),
2116 ),
2117 op: CompareOp::Gt,
2118 right: ScalarExpr::literal(220_i64),
2119 };
2120
2121 let mut vals: Vec<Option<u64>> = Vec::new();
2122 table
2123 .scan_stream(
2124 &[proj(&table, COL_A_U64)],
2125 &expr,
2126 ScanStreamOptions::default(),
2127 |b| {
2128 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2129 for i in 0..b.num_rows() {
2130 vals.push(if col.is_null(i) {
2131 None
2132 } else {
2133 Some(col.value(i))
2134 });
2135 }
2136 },
2137 )
2138 .unwrap();
2139
2140 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2141 }
2142}