1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::stream::ColumnStream;
7use crate::types::TableId;
8
9use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
10use arrow::datatypes::{DataType, Field, Schema};
11use std::collections::HashMap;
12
13use crate::constants::STREAM_BATCH_ROWS;
14use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
15use llkv_column_map::{ColumnStore, types::LogicalFieldId};
16use llkv_storage::pager::{MemPager, Pager};
17use simd_r_drive_entry_handle::EntryHandle;
18
19use crate::reserved::is_reserved_table_id;
20use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
21use crate::types::{FieldId, RowId};
22use llkv_expr::{Expr, ScalarExpr};
23use llkv_result::{Error, Result as LlkvResult};
24
25#[derive(Debug, Clone, Copy)]
28struct MvccColumnCache {
29 has_created_by: bool,
30 has_deleted_by: bool,
31}
32
33pub struct Table<P = MemPager>
36where
37 P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39 store: Arc<ColumnStore<P>>,
40 table_id: TableId,
41 mvcc_cache: RwLock<Option<MvccColumnCache>>,
44}
45
46pub trait RowIdFilter<P>: Send + Sync
58where
59 P: Pager<Blob = EntryHandle> + Send + Sync,
60{
61 fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> LlkvResult<Vec<RowId>>;
67}
68
69pub struct ScanStreamOptions<P = MemPager>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub include_nulls: bool,
83 pub order: Option<ScanOrderSpec>,
88 pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
93}
94
95impl<P> fmt::Debug for ScanStreamOptions<P>
96where
97 P: Pager<Blob = EntryHandle> + Send + Sync,
98{
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("ScanStreamOptions")
101 .field("include_nulls", &self.include_nulls)
102 .field("order", &self.order)
103 .field(
104 "row_id_filter",
105 &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
106 )
107 .finish()
108 }
109}
110
111impl<P> Clone for ScanStreamOptions<P>
112where
113 P: Pager<Blob = EntryHandle> + Send + Sync,
114{
115 fn clone(&self) -> Self {
116 Self {
117 include_nulls: self.include_nulls,
118 order: self.order,
119 row_id_filter: self.row_id_filter.clone(),
120 }
121 }
122}
123
124impl<P> Default for ScanStreamOptions<P>
125where
126 P: Pager<Blob = EntryHandle> + Send + Sync,
127{
128 fn default() -> Self {
129 Self {
130 include_nulls: false,
131 order: None,
132 row_id_filter: None,
133 }
134 }
135}
136
137#[derive(Clone, Copy, Debug)]
141pub struct ScanOrderSpec {
142 pub field_id: FieldId,
144 pub direction: ScanOrderDirection,
146 pub nulls_first: bool,
148 pub transform: ScanOrderTransform,
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum ScanOrderDirection {
155 Ascending,
157 Descending,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum ScanOrderTransform {
166 IdentityInteger,
168 IdentityUtf8,
170 CastUtf8ToInteger,
172}
173
174#[derive(Clone, Debug)]
178pub enum ScanProjection {
179 Column(Projection),
181 Computed {
183 expr: ScalarExpr<FieldId>,
185 alias: String,
187 },
188}
189
190impl ScanProjection {
191 pub fn column<P: Into<Projection>>(proj: P) -> Self {
193 Self::Column(proj.into())
194 }
195
196 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
198 Self::Computed {
199 expr,
200 alias: alias.into(),
201 }
202 }
203}
204
205impl From<Projection> for ScanProjection {
206 fn from(value: Projection) -> Self {
207 ScanProjection::Column(value)
208 }
209}
210
211impl From<&Projection> for ScanProjection {
212 fn from(value: &Projection) -> Self {
213 ScanProjection::Column(value.clone())
214 }
215}
216
217impl From<&ScanProjection> for ScanProjection {
218 fn from(value: &ScanProjection) -> Self {
219 value.clone()
220 }
221}
222
223impl<P> Table<P>
224where
225 P: Pager<Blob = EntryHandle> + Send + Sync,
226{
227 pub fn create_from_columns(
231 display_name: &str,
232 canonical_name: &str,
233 columns: &[llkv_plan::ColumnSpec],
234 metadata: Arc<crate::metadata::MetadataManager<P>>,
235 catalog: Arc<crate::catalog::TableCatalog>,
236 store: Arc<ColumnStore<P>>,
237 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
238 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
239 service.create_table_from_columns(display_name, canonical_name, columns)
240 }
241
242 pub fn create_from_schema(
244 display_name: &str,
245 canonical_name: &str,
246 schema: &arrow::datatypes::Schema,
247 metadata: Arc<crate::metadata::MetadataManager<P>>,
248 catalog: Arc<crate::catalog::TableCatalog>,
249 store: Arc<ColumnStore<P>>,
250 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
251 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
252 service.create_table_from_schema(display_name, canonical_name, schema)
253 }
254
255 #[doc(hidden)]
260 pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
261 if is_reserved_table_id(table_id) {
262 return Err(Error::ReservedTableId(table_id));
263 }
264
265 tracing::trace!(
266 "Table::from_id: Opening table_id={} with pager at {:p}",
267 table_id,
268 &*pager
269 );
270 let store = ColumnStore::open(pager)?;
271 Ok(Self {
272 store: Arc::new(store),
273 table_id,
274 mvcc_cache: RwLock::new(None),
275 })
276 }
277
278 #[doc(hidden)]
283 pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
284 if is_reserved_table_id(table_id) {
285 return Err(Error::ReservedTableId(table_id));
286 }
287
288 Ok(Self {
289 store,
290 table_id,
291 mvcc_cache: RwLock::new(None),
292 })
293 }
294
295 pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
297 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
298 self.store
299 .register_index(logical_field_id, IndexKind::Sort)?;
300 Ok(())
301 }
302
303 pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
305 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
306 match self
307 .store
308 .unregister_index(logical_field_id, IndexKind::Sort)
309 {
310 Ok(()) | Err(Error::NotFound) => Ok(()),
311 Err(err) => Err(err),
312 }
313 }
314
315 pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
317 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
318 match self.store.list_persisted_indexes(logical_field_id) {
319 Ok(kinds) => Ok(kinds),
320 Err(Error::NotFound) => Ok(Vec::new()),
321 Err(err) => Err(err),
322 }
323 }
324
325 fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
328 {
330 let cache_read = self.mvcc_cache.read().unwrap();
331 if let Some(cache) = *cache_read {
332 return cache;
333 }
334 }
335
336 let has_created_by = schema
338 .fields()
339 .iter()
340 .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
341 let has_deleted_by = schema
342 .fields()
343 .iter()
344 .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
345
346 let cache = MvccColumnCache {
347 has_created_by,
348 has_deleted_by,
349 };
350
351 *self.mvcc_cache.write().unwrap() = Some(cache);
353
354 cache
355 }
356
357 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
382 use arrow::array::UInt64Builder;
383
384 let cache = self.get_mvcc_cache(&batch.schema());
387 let has_created_by = cache.has_created_by;
388 let has_deleted_by = cache.has_deleted_by;
389
390 let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
391 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
392
393 for (idx, field) in batch.schema().fields().iter().enumerate() {
394 let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
395 if maybe_field_id.is_none()
397 && (field.name() == ROW_ID_COLUMN_NAME
398 || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
399 || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
400 {
401 if field.name() == ROW_ID_COLUMN_NAME {
402 new_fields.push(field.as_ref().clone());
403 new_columns.push(batch.column(idx).clone());
404 } else {
405 let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
406 LogicalFieldId::for_mvcc_created_by(self.table_id)
407 } else {
408 LogicalFieldId::for_mvcc_deleted_by(self.table_id)
409 };
410
411 let mut metadata = field.metadata().clone();
412 let lfid_val: u64 = lfid.into();
413 metadata.insert(
414 crate::constants::FIELD_ID_META_KEY.to_string(),
415 lfid_val.to_string(),
416 );
417
418 let new_field =
419 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
420 .with_metadata(metadata);
421 new_fields.push(new_field);
422 new_columns.push(batch.column(idx).clone());
423 }
424 continue;
425 }
426
427 let raw_field_id = maybe_field_id
428 .ok_or_else(|| {
429 llkv_result::Error::Internal(format!(
430 "Field '{}' is missing a valid '{}' in its metadata.",
431 field.name(),
432 crate::constants::FIELD_ID_META_KEY
433 ))
434 })?
435 .parse::<u64>()
436 .map_err(|err| {
437 llkv_result::Error::Internal(format!(
438 "Field '{}' contains an invalid '{}': {}",
439 field.name(),
440 crate::constants::FIELD_ID_META_KEY,
441 err
442 ))
443 })?;
444
445 if raw_field_id > FieldId::MAX as u64 {
446 return Err(llkv_result::Error::Internal(format!(
447 "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
448 field.name(),
449 FieldId::MAX,
450 raw_field_id
451 )));
452 }
453
454 let user_field_id = raw_field_id as FieldId;
455 let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
456
457 let lfid = logical_field_id;
460 let mut new_metadata = field.metadata().clone();
461 let lfid_val: u64 = lfid.into();
462 new_metadata.insert(
463 crate::constants::FIELD_ID_META_KEY.to_string(),
464 lfid_val.to_string(),
465 );
466
467 let new_field =
468 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
469 .with_metadata(new_metadata);
470 new_fields.push(new_field);
471 new_columns.push(batch.column(idx).clone());
472
473 let need_meta = match self
479 .catalog()
480 .get_cols_meta(self.table_id, &[user_field_id])
481 {
482 metas if metas.is_empty() => true,
483 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
484 };
485
486 if need_meta {
487 let meta = ColMeta {
488 col_id: user_field_id,
489 name: Some(field.name().to_string()),
490 flags: 0,
491 default: None,
492 };
493 self.catalog().put_col_meta(self.table_id, &meta);
494 }
495 }
496
497 const TXN_ID_AUTO_COMMIT: u64 = 1;
502 const TXN_ID_NONE: u64 = 0;
503 let row_count = batch.num_rows();
504
505 if !has_created_by {
506 let mut created_by_builder = UInt64Builder::with_capacity(row_count);
507 for _ in 0..row_count {
508 created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
509 }
510 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
511 let mut metadata = HashMap::new();
512 let lfid_val: u64 = created_by_lfid.into();
513 metadata.insert(
514 crate::constants::FIELD_ID_META_KEY.to_string(),
515 lfid_val.to_string(),
516 );
517 new_fields.push(
518 Field::new(
519 llkv_column_map::store::CREATED_BY_COLUMN_NAME,
520 DataType::UInt64,
521 false,
522 )
523 .with_metadata(metadata),
524 );
525 new_columns.push(Arc::new(created_by_builder.finish()));
526 }
527
528 if !has_deleted_by {
529 let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
530 for _ in 0..row_count {
531 deleted_by_builder.append_value(TXN_ID_NONE);
532 }
533 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
534 let mut metadata = HashMap::new();
535 let lfid_val: u64 = deleted_by_lfid.into();
536 metadata.insert(
537 crate::constants::FIELD_ID_META_KEY.to_string(),
538 lfid_val.to_string(),
539 );
540 new_fields.push(
541 Field::new(
542 llkv_column_map::store::DELETED_BY_COLUMN_NAME,
543 DataType::UInt64,
544 false,
545 )
546 .with_metadata(metadata),
547 );
548 new_columns.push(Arc::new(deleted_by_builder.finish()));
549 }
550
551 let new_schema = Arc::new(Schema::new(new_fields));
552 let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
553
554 tracing::trace!(
555 table_id = self.table_id,
556 num_columns = namespaced_batch.num_columns(),
557 num_rows = namespaced_batch.num_rows(),
558 "Attempting append to table"
559 );
560
561 if let Err(err) = self.store.append(&namespaced_batch) {
562 let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
563 .schema()
564 .fields()
565 .iter()
566 .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
567 .filter_map(|s| s.parse::<u64>().ok())
568 .map(LogicalFieldId::from)
569 .collect();
570
571 let missing_fields: Vec<LogicalFieldId> = batch_field_ids
573 .iter()
574 .filter(|&&field_id| !self.store.has_field(field_id))
575 .copied()
576 .collect();
577
578 tracing::error!(
579 table_id = self.table_id,
580 error = ?err,
581 batch_field_ids = ?batch_field_ids,
582 missing_from_catalog = ?missing_fields,
583 "Append failed - some fields missing from catalog"
584 );
585 return Err(err);
586 }
587 Ok(())
588 }
589
590 pub fn scan_stream<'a, I, T, F>(
598 &self,
599 projections: I,
600 filter_expr: &Expr<'a, FieldId>,
601 options: ScanStreamOptions<P>,
602 on_batch: F,
603 ) -> LlkvResult<()>
604 where
605 I: IntoIterator<Item = T>,
606 T: Into<ScanProjection>,
607 F: FnMut(RecordBatch),
608 {
609 let stream_projections: Vec<ScanProjection> =
610 projections.into_iter().map(|p| p.into()).collect();
611 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
612 }
613
614 pub fn scan_stream_with_exprs<'a, F>(
616 &self,
617 projections: &[ScanProjection],
618 filter_expr: &Expr<'a, FieldId>,
619 options: ScanStreamOptions<P>,
620 on_batch: F,
621 ) -> LlkvResult<()>
622 where
623 F: FnMut(RecordBatch),
624 {
625 TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
626 }
627
628 pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<RowId>> {
629 collect_row_ids_for_table(self, filter_expr)
630 }
631
632 #[inline]
633 pub fn catalog(&self) -> SysCatalog<'_, P> {
634 SysCatalog::new(&self.store)
635 }
636
637 #[inline]
638 pub fn get_table_meta(&self) -> Option<TableMeta> {
639 self.catalog().get_table_meta(self.table_id)
640 }
641
642 #[inline]
643 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
644 self.catalog().get_cols_meta(self.table_id, col_ids)
645 }
646
647 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
654 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
656 logical_fields.sort_by_key(|lfid| lfid.field_id());
657
658 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
659 let metas = self.get_cols_meta(&field_ids);
660
661 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
662 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
664
665 for (idx, lfid) in logical_fields.into_iter().enumerate() {
666 let fid = lfid.field_id();
667 let dtype = self.store.data_type(lfid)?;
668 let name = metas
669 .get(idx)
670 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
671 .unwrap_or_else(|| format!("col_{}", fid));
672
673 let mut metadata: HashMap<String, String> = HashMap::new();
674 metadata.insert(
675 crate::constants::FIELD_ID_META_KEY.to_string(),
676 fid.to_string(),
677 );
678
679 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
680 }
681
682 Ok(Arc::new(Schema::new(fields)))
683 }
684
685 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
689 let schema = self.schema()?;
690 let fields = schema.fields();
691
692 let mut names: Vec<String> = Vec::with_capacity(fields.len());
693 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
694 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
695
696 for field in fields.iter() {
697 names.push(field.name().to_string());
698 let fid = field
699 .metadata()
700 .get(crate::constants::FIELD_ID_META_KEY)
701 .and_then(|s| s.parse::<u32>().ok())
702 .unwrap_or(0u32);
703 fids.push(fid);
704 dtypes.push(format!("{:?}", field.data_type()));
705 }
706
707 let name_array: ArrayRef = Arc::new(StringArray::from(names));
709 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
710 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
711
712 let rb_schema = Arc::new(Schema::new(vec![
713 Field::new("name", DataType::Utf8, false),
714 Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
715 Field::new("data_type", DataType::Utf8, false),
716 ]));
717
718 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
719 Ok(batch)
720 }
721
722 pub fn stream_columns(
724 &self,
725 logical_fields: impl Into<Arc<[LogicalFieldId]>>,
726 row_ids: Vec<RowId>,
727 policy: GatherNullPolicy,
728 ) -> LlkvResult<ColumnStream<'_, P>> {
729 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
730 let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
731 Ok(ColumnStream::new(
732 &self.store,
733 ctx,
734 row_ids,
735 STREAM_BATCH_ROWS,
736 policy,
737 logical_fields,
738 ))
739 }
740
741 pub fn store(&self) -> &ColumnStore<P> {
742 &self.store
743 }
744
745 #[inline]
746 pub fn table_id(&self) -> TableId {
747 self.table_id
748 }
749
750 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
755 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
756 self.store.total_rows_for_field(lfid)
757 }
758
759 pub fn total_rows(&self) -> llkv_result::Result<u64> {
764 use llkv_column_map::store::rowid_fid;
765 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
766 match self.store.total_rows_for_field(rid_lfid) {
768 Ok(n) => Ok(n),
769 Err(_) => {
770 self.store.total_rows_for_table(self.table_id)
772 }
773 }
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use crate::reserved::CATALOG_TABLE_ID;
781 use crate::types::RowId;
782 use arrow::array::Array;
783 use arrow::array::ArrayRef;
784 use arrow::array::{
785 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
786 UInt32Array, UInt64Array,
787 };
788 use arrow::compute::{cast, max, min, sum, unary};
789 use arrow::datatypes::DataType;
790 use llkv_column_map::ColumnStore;
791 use llkv_column_map::store::GatherNullPolicy;
792 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
793 use std::collections::HashMap;
794 use std::ops::Bound;
795
796 fn setup_test_table() -> Table {
797 let pager = Arc::new(MemPager::default());
798 setup_test_table_with_pager(&pager)
799 }
800
801 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
802 let table = Table::from_id(1, Arc::clone(pager)).unwrap();
803 const COL_A_U64: FieldId = 10;
804 const COL_B_BIN: FieldId = 11;
805 const COL_C_I32: FieldId = 12;
806 const COL_D_F64: FieldId = 13;
807 const COL_E_F32: FieldId = 14;
808
809 let schema = Arc::new(Schema::new(vec![
810 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
812 crate::constants::FIELD_ID_META_KEY.to_string(),
813 COL_A_U64.to_string(),
814 )])),
815 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
816 crate::constants::FIELD_ID_META_KEY.to_string(),
817 COL_B_BIN.to_string(),
818 )])),
819 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
820 crate::constants::FIELD_ID_META_KEY.to_string(),
821 COL_C_I32.to_string(),
822 )])),
823 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
824 crate::constants::FIELD_ID_META_KEY.to_string(),
825 COL_D_F64.to_string(),
826 )])),
827 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
828 crate::constants::FIELD_ID_META_KEY.to_string(),
829 COL_E_F32.to_string(),
830 )])),
831 ]));
832
833 let batch = RecordBatch::try_new(
834 schema.clone(),
835 vec![
836 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
837 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
838 Arc::new(BinaryArray::from(vec![
839 b"foo" as &[u8],
840 b"bar",
841 b"baz",
842 b"qux",
843 ])),
844 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
845 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
846 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
847 ],
848 )
849 .unwrap();
850
851 table.append(&batch).unwrap();
852 table
853 }
854
855 fn gather_single(
856 store: &ColumnStore<MemPager>,
857 field_id: LogicalFieldId,
858 row_ids: &[u64],
859 ) -> ArrayRef {
860 store
861 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
862 .unwrap()
863 .column(0)
864 .clone()
865 }
866
867 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
868 Expr::Pred(filter)
869 }
870
871 fn proj(table: &Table, field_id: FieldId) -> Projection {
872 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
873 }
874
875 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
876 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
877 }
878
879 #[test]
880 fn table_new_rejects_reserved_table_id() {
881 let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
882 assert!(matches!(
883 result,
884 Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
885 ));
886 }
887
888 #[test]
889 fn test_append_rejects_logical_field_id_in_metadata() {
890 let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
894
895 const USER_FID: FieldId = 42;
896 let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
898 let logical_val: u64 = logical.into();
899
900 let schema = Arc::new(Schema::new(vec![
901 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
902 Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
903 crate::constants::FIELD_ID_META_KEY.to_string(),
904 logical_val.to_string(),
905 )])),
906 ]));
907
908 let batch = RecordBatch::try_new(
909 schema,
910 vec![
911 Arc::new(UInt64Array::from(vec![1u64, 2u64])),
912 Arc::new(UInt64Array::from(vec![10u64, 20u64])),
913 ],
914 )
915 .unwrap();
916
917 let res = table.append(&batch);
918 assert!(matches!(res, Err(Error::Internal(_))));
919 }
920
921 #[test]
922 fn test_scan_with_u64_filter() {
923 let table = setup_test_table();
924 const COL_A_U64: FieldId = 10;
925 const COL_C_I32: FieldId = 12;
926
927 let expr = pred_expr(Filter {
928 field_id: COL_A_U64,
929 op: Operator::Equals(200.into()),
930 });
931
932 let mut vals: Vec<Option<i32>> = Vec::new();
933 table
934 .scan_stream(
935 &[proj(&table, COL_C_I32)],
936 &expr,
937 ScanStreamOptions::default(),
938 |b| {
939 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
940 vals.extend(
941 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
942 );
943 },
944 )
945 .unwrap();
946 assert_eq!(vals, vec![Some(20), Some(20)]);
947 }
948
949 #[test]
950 fn test_scan_with_string_filter() {
951 let pager = Arc::new(MemPager::default());
952 let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
953
954 const COL_STR: FieldId = 42;
955 let schema = Arc::new(Schema::new(vec![
956 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
957 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
958 crate::constants::FIELD_ID_META_KEY.to_string(),
959 COL_STR.to_string(),
960 )])),
961 ]));
962
963 let batch = RecordBatch::try_new(
964 schema,
965 vec![
966 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
967 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
968 ],
969 )
970 .unwrap();
971 table.append(&batch).unwrap();
972
973 let expr = pred_expr(Filter {
974 field_id: COL_STR,
975 op: Operator::starts_with("al", true),
976 });
977
978 let mut collected: Vec<Option<String>> = Vec::new();
979 table
980 .scan_stream(
981 &[proj(&table, COL_STR)],
982 &expr,
983 ScanStreamOptions::default(),
984 |b| {
985 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
986 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
987 },
988 )
989 .unwrap();
990
991 assert_eq!(
992 collected,
993 vec![Some("alice".to_string()), Some("albert".to_string())]
994 );
995 }
996
997 #[test]
998 fn test_table_reopen_with_shared_pager() {
999 const TABLE_ALPHA: TableId = 42;
1000 const TABLE_BETA: TableId = 43;
1001 const TABLE_GAMMA: TableId = 44;
1002 const COL_ALPHA_U64: FieldId = 100;
1003 const COL_ALPHA_I32: FieldId = 101;
1004 const COL_ALPHA_U32: FieldId = 102;
1005 const COL_ALPHA_I16: FieldId = 103;
1006 const COL_BETA_U64: FieldId = 200;
1007 const COL_BETA_U8: FieldId = 201;
1008 const COL_GAMMA_I16: FieldId = 300;
1009
1010 let pager = Arc::new(MemPager::default());
1011
1012 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1013 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1014 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1015 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1016 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1017
1018 let beta_rows: Vec<RowId> = vec![101, 102, 103];
1019 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1020 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1021
1022 let gamma_rows: Vec<RowId> = vec![501, 502];
1023 let gamma_vals_i16: Vec<i16> = vec![123, -321];
1024
1025 {
1027 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1028 let schema = Arc::new(Schema::new(vec![
1029 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1030 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1031 crate::constants::FIELD_ID_META_KEY.to_string(),
1032 COL_ALPHA_U64.to_string(),
1033 )])),
1034 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1035 crate::constants::FIELD_ID_META_KEY.to_string(),
1036 COL_ALPHA_I32.to_string(),
1037 )])),
1038 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1039 crate::constants::FIELD_ID_META_KEY.to_string(),
1040 COL_ALPHA_U32.to_string(),
1041 )])),
1042 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1043 crate::constants::FIELD_ID_META_KEY.to_string(),
1044 COL_ALPHA_I16.to_string(),
1045 )])),
1046 ]));
1047 let batch = RecordBatch::try_new(
1048 schema,
1049 vec![
1050 Arc::new(UInt64Array::from(alpha_rows.clone())),
1051 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1052 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1053 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1054 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1055 ],
1056 )
1057 .unwrap();
1058 table.append(&batch).unwrap();
1059 }
1060
1061 {
1062 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1063 let schema = Arc::new(Schema::new(vec![
1064 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1065 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1066 crate::constants::FIELD_ID_META_KEY.to_string(),
1067 COL_BETA_U64.to_string(),
1068 )])),
1069 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1070 crate::constants::FIELD_ID_META_KEY.to_string(),
1071 COL_BETA_U8.to_string(),
1072 )])),
1073 ]));
1074 let batch = RecordBatch::try_new(
1075 schema,
1076 vec![
1077 Arc::new(UInt64Array::from(beta_rows.clone())),
1078 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1079 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1080 ],
1081 )
1082 .unwrap();
1083 table.append(&batch).unwrap();
1084 }
1085
1086 {
1087 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1088 let schema = Arc::new(Schema::new(vec![
1089 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1090 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1091 crate::constants::FIELD_ID_META_KEY.to_string(),
1092 COL_GAMMA_I16.to_string(),
1093 )])),
1094 ]));
1095 let batch = RecordBatch::try_new(
1096 schema,
1097 vec![
1098 Arc::new(UInt64Array::from(gamma_rows.clone())),
1099 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1100 ],
1101 )
1102 .unwrap();
1103 table.append(&batch).unwrap();
1104 }
1105
1106 {
1108 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1109 let store = table.store();
1110
1111 let expectations: &[(FieldId, DataType)] = &[
1112 (COL_ALPHA_U64, DataType::UInt64),
1113 (COL_ALPHA_I32, DataType::Int32),
1114 (COL_ALPHA_U32, DataType::UInt32),
1115 (COL_ALPHA_I16, DataType::Int16),
1116 ];
1117
1118 for &(col, ref ty) in expectations {
1119 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1120 assert_eq!(store.data_type(lfid).unwrap(), *ty);
1121 let arr = gather_single(store, lfid, &alpha_rows);
1122 match ty {
1123 DataType::UInt64 => {
1124 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1125 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1126 }
1127 DataType::Int32 => {
1128 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1129 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1130 }
1131 DataType::UInt32 => {
1132 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1133 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1134 }
1135 DataType::Int16 => {
1136 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1137 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1138 }
1139 other => panic!("unexpected dtype {other:?}"),
1140 }
1141 }
1142 }
1143
1144 {
1145 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1146 let store = table.store();
1147
1148 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1149 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1150 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1151 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1152 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1153
1154 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1155 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1156 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1157 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1158 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1159 }
1160
1161 {
1162 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1163 let store = table.store();
1164 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1165 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1166 let arr = gather_single(store, lfid, &gamma_rows);
1167 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1168 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1169 }
1170 }
1171
1172 #[test]
1173 fn test_scan_with_i32_filter() {
1174 let table = setup_test_table();
1175 const COL_A_U64: FieldId = 10;
1176 const COL_C_I32: FieldId = 12;
1177
1178 let filter = pred_expr(Filter {
1179 field_id: COL_C_I32,
1180 op: Operator::Equals(20.into()),
1181 });
1182
1183 let mut vals: Vec<Option<u64>> = Vec::new();
1184 table
1185 .scan_stream(
1186 &[proj(&table, COL_A_U64)],
1187 &filter,
1188 ScanStreamOptions::default(),
1189 |b| {
1190 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1191 vals.extend(
1192 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1193 );
1194 },
1195 )
1196 .unwrap();
1197 assert_eq!(vals, vec![Some(200), Some(200)]);
1198 }
1199
1200 #[test]
1201 fn test_scan_with_greater_than_filter() {
1202 let table = setup_test_table();
1203 const COL_A_U64: FieldId = 10;
1204 const COL_C_I32: FieldId = 12;
1205
1206 let filter = pred_expr(Filter {
1207 field_id: COL_C_I32,
1208 op: Operator::GreaterThan(15.into()),
1209 });
1210
1211 let mut vals: Vec<Option<u64>> = Vec::new();
1212 table
1213 .scan_stream(
1214 &[proj(&table, COL_A_U64)],
1215 &filter,
1216 ScanStreamOptions::default(),
1217 |b| {
1218 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1219 vals.extend(
1220 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1221 );
1222 },
1223 )
1224 .unwrap();
1225 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1226 }
1227
1228 #[test]
1229 fn test_scan_with_range_filter() {
1230 let table = setup_test_table();
1231 const COL_A_U64: FieldId = 10;
1232 const COL_C_I32: FieldId = 12;
1233
1234 let filter = pred_expr(Filter {
1235 field_id: COL_A_U64,
1236 op: Operator::Range {
1237 lower: Bound::Included(150.into()),
1238 upper: Bound::Excluded(300.into()),
1239 },
1240 });
1241
1242 let mut vals: Vec<Option<i32>> = Vec::new();
1243 table
1244 .scan_stream(
1245 &[proj(&table, COL_C_I32)],
1246 &filter,
1247 ScanStreamOptions::default(),
1248 |b| {
1249 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1250 vals.extend(
1251 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1252 );
1253 },
1254 )
1255 .unwrap();
1256 assert_eq!(vals, vec![Some(20), Some(20)]);
1257 }
1258
1259 #[test]
1260 fn test_filtered_scan_sum_kernel() {
1261 let table = setup_test_table();
1265 const COL_A_U64: FieldId = 10;
1266
1267 let filter = pred_expr(Filter {
1268 field_id: COL_A_U64,
1269 op: Operator::Range {
1270 lower: Bound::Included(150.into()),
1271 upper: Bound::Excluded(300.into()),
1272 },
1273 });
1274
1275 let mut total: u128 = 0;
1276 table
1277 .scan_stream(
1278 &[proj(&table, COL_A_U64)],
1279 &filter,
1280 ScanStreamOptions::default(),
1281 |b| {
1282 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1283 if let Some(part) = sum(a) {
1284 total += part as u128;
1285 }
1286 },
1287 )
1288 .unwrap();
1289
1290 assert_eq!(total, 400);
1291 }
1292
1293 #[test]
1294 fn test_filtered_scan_sum_i32_kernel() {
1295 let table = setup_test_table();
1300 const COL_A_U64: FieldId = 10;
1301 const COL_C_I32: FieldId = 12;
1302
1303 let candidates = [100.into(), 300.into()];
1304 let filter = pred_expr(Filter {
1305 field_id: COL_A_U64,
1306 op: Operator::In(&candidates),
1307 });
1308
1309 let mut total: i64 = 0;
1310 table
1311 .scan_stream(
1312 &[proj(&table, COL_C_I32)],
1313 &filter,
1314 ScanStreamOptions::default(),
1315 |b| {
1316 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1317 if let Some(part) = sum(a) {
1318 total += part as i64;
1319 }
1320 },
1321 )
1322 .unwrap();
1323 assert_eq!(total, 40);
1324 }
1325
1326 #[test]
1327 fn test_filtered_scan_min_max_kernel() {
1328 let table = setup_test_table();
1333 const COL_A_U64: FieldId = 10;
1334 const COL_C_I32: FieldId = 12;
1335
1336 let candidates = [100.into(), 300.into()];
1337 let filter = pred_expr(Filter {
1338 field_id: COL_A_U64,
1339 op: Operator::In(&candidates),
1340 });
1341
1342 let mut mn: Option<i32> = None;
1343 let mut mx: Option<i32> = None;
1344 table
1345 .scan_stream(
1346 &[proj(&table, COL_C_I32)],
1347 &filter,
1348 ScanStreamOptions::default(),
1349 |b| {
1350 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1351
1352 if let Some(part_min) = min(a) {
1353 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1354 }
1355 if let Some(part_max) = max(a) {
1356 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1357 }
1358 },
1359 )
1360 .unwrap();
1361 assert_eq!(mn, Some(10));
1362 assert_eq!(mx, Some(30));
1363 }
1364
1365 #[test]
1366 fn test_filtered_scan_float64_column() {
1367 let table = setup_test_table();
1368 const COL_D_F64: FieldId = 13;
1369
1370 let filter = pred_expr(Filter {
1371 field_id: COL_D_F64,
1372 op: Operator::GreaterThan(2.0_f64.into()),
1373 });
1374
1375 let mut got = Vec::new();
1376 table
1377 .scan_stream(
1378 &[proj(&table, COL_D_F64)],
1379 &filter,
1380 ScanStreamOptions::default(),
1381 |b| {
1382 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1383 for i in 0..arr.len() {
1384 if arr.is_valid(i) {
1385 got.push(arr.value(i));
1386 }
1387 }
1388 },
1389 )
1390 .unwrap();
1391
1392 assert_eq!(got, vec![2.5, 3.5, 2.5]);
1393 }
1394
1395 #[test]
1396 fn test_filtered_scan_float32_in_operator() {
1397 let table = setup_test_table();
1398 const COL_E_F32: FieldId = 14;
1399
1400 let candidates = [2.0_f32.into(), 3.0_f32.into()];
1401 let filter = pred_expr(Filter {
1402 field_id: COL_E_F32,
1403 op: Operator::In(&candidates),
1404 });
1405
1406 let mut vals: Vec<Option<f32>> = Vec::new();
1407 table
1408 .scan_stream(
1409 &[proj(&table, COL_E_F32)],
1410 &filter,
1411 ScanStreamOptions::default(),
1412 |b| {
1413 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1414 vals.extend((0..arr.len()).map(|i| {
1415 if arr.is_null(i) {
1416 None
1417 } else {
1418 Some(arr.value(i))
1419 }
1420 }));
1421 },
1422 )
1423 .unwrap();
1424
1425 let collected: Vec<f32> = vals.into_iter().flatten().collect();
1426 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1427 }
1428
1429 #[test]
1430 fn test_scan_stream_and_expression() {
1431 let table = setup_test_table();
1432 const COL_A_U64: FieldId = 10;
1433 const COL_C_I32: FieldId = 12;
1434 const COL_E_F32: FieldId = 14;
1435
1436 let expr = Expr::all_of(vec![
1437 Filter {
1438 field_id: COL_C_I32,
1439 op: Operator::GreaterThan(15.into()),
1440 },
1441 Filter {
1442 field_id: COL_A_U64,
1443 op: Operator::LessThan(250.into()),
1444 },
1445 ]);
1446
1447 let mut vals: Vec<Option<f32>> = Vec::new();
1448 table
1449 .scan_stream(
1450 &[proj(&table, COL_E_F32)],
1451 &expr,
1452 ScanStreamOptions::default(),
1453 |b| {
1454 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1455 vals.extend((0..arr.len()).map(|i| {
1456 if arr.is_null(i) {
1457 None
1458 } else {
1459 Some(arr.value(i))
1460 }
1461 }));
1462 },
1463 )
1464 .unwrap();
1465
1466 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1467 }
1468
1469 #[test]
1470 fn test_scan_stream_or_expression() {
1471 let table = setup_test_table();
1472 const COL_A_U64: FieldId = 10;
1473 const COL_C_I32: FieldId = 12;
1474
1475 let expr = Expr::any_of(vec![
1476 Filter {
1477 field_id: COL_C_I32,
1478 op: Operator::Equals(10.into()),
1479 },
1480 Filter {
1481 field_id: COL_C_I32,
1482 op: Operator::Equals(30.into()),
1483 },
1484 ]);
1485
1486 let mut vals: Vec<Option<u64>> = Vec::new();
1487 table
1488 .scan_stream(
1489 &[proj(&table, COL_A_U64)],
1490 &expr,
1491 ScanStreamOptions::default(),
1492 |b| {
1493 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1494 vals.extend((0..arr.len()).map(|i| {
1495 if arr.is_null(i) {
1496 None
1497 } else {
1498 Some(arr.value(i))
1499 }
1500 }));
1501 },
1502 )
1503 .unwrap();
1504
1505 assert_eq!(vals, vec![Some(100), Some(300)]);
1506 }
1507
1508 #[test]
1509 fn test_scan_stream_not_predicate() {
1510 let table = setup_test_table();
1511 const COL_A_U64: FieldId = 10;
1512 const COL_C_I32: FieldId = 12;
1513
1514 let expr = Expr::not(pred_expr(Filter {
1515 field_id: COL_C_I32,
1516 op: Operator::Equals(20.into()),
1517 }));
1518
1519 let mut vals: Vec<Option<u64>> = Vec::new();
1520 table
1521 .scan_stream(
1522 &[proj(&table, COL_A_U64)],
1523 &expr,
1524 ScanStreamOptions::default(),
1525 |b| {
1526 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1527 vals.extend((0..arr.len()).map(|i| {
1528 if arr.is_null(i) {
1529 None
1530 } else {
1531 Some(arr.value(i))
1532 }
1533 }));
1534 },
1535 )
1536 .unwrap();
1537
1538 assert_eq!(vals, vec![Some(100), Some(300)]);
1539 }
1540
1541 #[test]
1542 fn test_scan_stream_not_and_expression() {
1543 let table = setup_test_table();
1544 const COL_A_U64: FieldId = 10;
1545 const COL_C_I32: FieldId = 12;
1546
1547 let expr = Expr::not(Expr::all_of(vec![
1548 Filter {
1549 field_id: COL_A_U64,
1550 op: Operator::GreaterThan(150.into()),
1551 },
1552 Filter {
1553 field_id: COL_C_I32,
1554 op: Operator::LessThan(40.into()),
1555 },
1556 ]));
1557
1558 let mut vals: Vec<Option<u64>> = Vec::new();
1559 table
1560 .scan_stream(
1561 &[proj(&table, COL_A_U64)],
1562 &expr,
1563 ScanStreamOptions::default(),
1564 |b| {
1565 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1566 vals.extend((0..arr.len()).map(|i| {
1567 if arr.is_null(i) {
1568 None
1569 } else {
1570 Some(arr.value(i))
1571 }
1572 }));
1573 },
1574 )
1575 .unwrap();
1576
1577 assert_eq!(vals, vec![Some(100)]);
1578 }
1579
1580 #[test]
1581 fn test_scan_stream_include_nulls_toggle() {
1582 let pager = Arc::new(MemPager::default());
1583 let table = setup_test_table_with_pager(&pager);
1584 const COL_A_U64: FieldId = 10;
1585 const COL_C_I32: FieldId = 12;
1586 const COL_B_BIN: FieldId = 11;
1587 const COL_D_F64: FieldId = 13;
1588 const COL_E_F32: FieldId = 14;
1589
1590 let schema = Arc::new(Schema::new(vec![
1591 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1592 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1593 crate::constants::FIELD_ID_META_KEY.to_string(),
1594 COL_A_U64.to_string(),
1595 )])),
1596 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1597 crate::constants::FIELD_ID_META_KEY.to_string(),
1598 COL_B_BIN.to_string(),
1599 )])),
1600 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1601 crate::constants::FIELD_ID_META_KEY.to_string(),
1602 COL_C_I32.to_string(),
1603 )])),
1604 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1605 crate::constants::FIELD_ID_META_KEY.to_string(),
1606 COL_D_F64.to_string(),
1607 )])),
1608 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1609 crate::constants::FIELD_ID_META_KEY.to_string(),
1610 COL_E_F32.to_string(),
1611 )])),
1612 ]));
1613
1614 let batch = RecordBatch::try_new(
1615 schema,
1616 vec![
1617 Arc::new(UInt64Array::from(vec![5, 6])),
1618 Arc::new(UInt64Array::from(vec![500, 600])),
1619 Arc::new(BinaryArray::from(vec![
1620 Some(&b"new"[..]),
1621 Some(&b"alt"[..]),
1622 ])),
1623 Arc::new(Int32Array::from(vec![Some(40), None])),
1624 Arc::new(Float64Array::from(vec![5.5, 6.5])),
1625 Arc::new(Float32Array::from(vec![5.0, 6.0])),
1626 ],
1627 )
1628 .unwrap();
1629 table.append(&batch).unwrap();
1630
1631 let filter = pred_expr(Filter {
1632 field_id: COL_A_U64,
1633 op: Operator::GreaterThan(450.into()),
1634 });
1635
1636 let mut default_vals: Vec<Option<i32>> = Vec::new();
1637 table
1638 .scan_stream(
1639 &[proj(&table, COL_C_I32)],
1640 &filter,
1641 ScanStreamOptions::default(),
1642 |b| {
1643 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1644 default_vals.extend((0..arr.len()).map(|i| {
1645 if arr.is_null(i) {
1646 None
1647 } else {
1648 Some(arr.value(i))
1649 }
1650 }));
1651 },
1652 )
1653 .unwrap();
1654 assert_eq!(default_vals, vec![Some(40)]);
1655
1656 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1657 table
1658 .scan_stream(
1659 &[proj(&table, COL_C_I32)],
1660 &filter,
1661 ScanStreamOptions {
1662 include_nulls: true,
1663 order: None,
1664 row_id_filter: None,
1665 },
1666 |b| {
1667 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1668
1669 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1670 table
1671 .scan_stream(
1672 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1673 &filter,
1674 ScanStreamOptions::default(),
1675 |b| {
1676 assert_eq!(b.num_columns(), 2);
1677 let c_arr =
1678 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1679 let d_arr =
1680 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1681 for i in 0..b.num_rows() {
1682 let c_val = if c_arr.is_null(i) {
1683 None
1684 } else {
1685 Some(c_arr.value(i))
1686 };
1687 let d_val = if d_arr.is_null(i) {
1688 None
1689 } else {
1690 Some(d_arr.value(i))
1691 };
1692 paired_vals.push((c_val, d_val));
1693 }
1694 },
1695 )
1696 .unwrap();
1697 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1698 include_null_vals.extend((0..arr.len()).map(|i| {
1699 if arr.is_null(i) {
1700 None
1701 } else {
1702 Some(arr.value(i))
1703 }
1704 }));
1705 },
1706 )
1707 .unwrap();
1708 assert_eq!(include_null_vals, vec![Some(40), None]);
1709 }
1710
1711 #[test]
1712 fn test_filtered_scan_int_sqrt_float64() {
1713 let table = setup_test_table();
1719 const COL_A_U64: FieldId = 10;
1720 const COL_C_I32: FieldId = 12;
1721
1722 let filter = pred_expr(Filter {
1723 field_id: COL_C_I32,
1724 op: Operator::GreaterThan(15.into()),
1725 });
1726
1727 let mut got: Vec<f64> = Vec::new();
1728 table
1729 .scan_stream(
1730 &[proj(&table, COL_A_U64)],
1731 &filter,
1732 ScanStreamOptions::default(),
1733 |b| {
1734 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1735 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1736
1737 let sqrt_arr = unary::<
1739 arrow::datatypes::Float64Type,
1740 _,
1741 arrow::datatypes::Float64Type,
1742 >(f64_arr, |v: f64| v.sqrt());
1743
1744 for i in 0..sqrt_arr.len() {
1745 if !sqrt_arr.is_null(i) {
1746 got.push(sqrt_arr.value(i));
1747 }
1748 }
1749 },
1750 )
1751 .unwrap();
1752
1753 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1754 assert_eq!(got, expected);
1755 }
1756
1757 #[test]
1758 fn test_multi_field_kernels_with_filters() {
1759 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1763
1764 let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1765
1766 const COL_A_U64: FieldId = 20;
1767 const COL_D_U32: FieldId = 21;
1768 const COL_E_I16: FieldId = 22;
1769 const COL_F_U8: FieldId = 23;
1770 const COL_C_I32: FieldId = 24;
1771
1772 let schema = Arc::new(Schema::new(vec![
1773 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1774 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1775 crate::constants::FIELD_ID_META_KEY.to_string(),
1776 COL_A_U64.to_string(),
1777 )])),
1778 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1779 crate::constants::FIELD_ID_META_KEY.to_string(),
1780 COL_D_U32.to_string(),
1781 )])),
1782 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1783 crate::constants::FIELD_ID_META_KEY.to_string(),
1784 COL_E_I16.to_string(),
1785 )])),
1786 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1787 crate::constants::FIELD_ID_META_KEY.to_string(),
1788 COL_F_U8.to_string(),
1789 )])),
1790 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1791 crate::constants::FIELD_ID_META_KEY.to_string(),
1792 COL_C_I32.to_string(),
1793 )])),
1794 ]));
1795
1796 let batch = RecordBatch::try_new(
1798 schema.clone(),
1799 vec![
1800 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1801 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1802 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1803 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1804 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1805 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1806 ],
1807 )
1808 .unwrap();
1809
1810 table.append(&batch).unwrap();
1811
1812 let filter = pred_expr(Filter {
1814 field_id: COL_C_I32,
1815 op: Operator::GreaterThanOrEquals(20.into()),
1816 });
1817
1818 let mut d_sum: u128 = 0;
1820 table
1821 .scan_stream(
1822 &[proj(&table, COL_D_U32)],
1823 &filter,
1824 ScanStreamOptions::default(),
1825 |b| {
1826 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1827 if let Some(part) = sum(a) {
1828 d_sum += part as u128;
1829 }
1830 },
1831 )
1832 .unwrap();
1833 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1834
1835 let mut e_min: Option<i16> = None;
1837 table
1838 .scan_stream(
1839 &[proj(&table, COL_E_I16)],
1840 &filter,
1841 ScanStreamOptions::default(),
1842 |b| {
1843 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1844 if let Some(part_min) = min(a) {
1845 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1846 }
1847 },
1848 )
1849 .unwrap();
1850 assert_eq!(e_min, Some(-6));
1851
1852 let mut f_max: Option<u8> = None;
1854 table
1855 .scan_stream(
1856 &[proj(&table, COL_F_U8)],
1857 &filter,
1858 ScanStreamOptions::default(),
1859 |b| {
1860 let a = b
1861 .column(0)
1862 .as_any()
1863 .downcast_ref::<arrow::array::UInt8Array>()
1864 .unwrap();
1865 if let Some(part_max) = max(a) {
1866 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1867 }
1868 },
1869 )
1870 .unwrap();
1871 assert_eq!(f_max, Some(10));
1872
1873 let mut got: Vec<f64> = Vec::new();
1875 table
1876 .scan_stream(
1877 &[proj(&table, COL_A_U64)],
1878 &filter,
1879 ScanStreamOptions::default(),
1880 |b| {
1881 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1882 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1883 let sqrt_arr = unary::<
1884 arrow::datatypes::Float64Type,
1885 _,
1886 arrow::datatypes::Float64Type,
1887 >(f64_arr, |v: f64| v.sqrt());
1888
1889 for i in 0..sqrt_arr.len() {
1890 if !sqrt_arr.is_null(i) {
1891 got.push(sqrt_arr.value(i));
1892 }
1893 }
1894 },
1895 )
1896 .unwrap();
1897 let expected = [15.0_f64, 20.0, 30.0, 40.0];
1898 assert_eq!(got, expected);
1899 }
1900
1901 #[test]
1902 fn test_scan_with_in_filter() {
1903 let table = setup_test_table();
1904 const COL_A_U64: FieldId = 10;
1905 const COL_C_I32: FieldId = 12;
1906
1907 let candidates = [10.into(), 30.into()];
1909 let filter = pred_expr(Filter {
1910 field_id: COL_C_I32,
1911 op: Operator::In(&candidates),
1912 });
1913
1914 let mut vals: Vec<Option<u64>> = Vec::new();
1915 table
1916 .scan_stream(
1917 &[proj(&table, COL_A_U64)],
1918 &filter,
1919 ScanStreamOptions::default(),
1920 |b| {
1921 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1922 vals.extend(
1923 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1924 );
1925 },
1926 )
1927 .unwrap();
1928 assert_eq!(vals, vec![Some(100), Some(300)]);
1929 }
1930
1931 #[test]
1932 fn test_scan_stream_single_column_batches() {
1933 let table = setup_test_table();
1934 const COL_A_U64: FieldId = 10;
1935 const COL_C_I32: FieldId = 12;
1936
1937 let filter = pred_expr(Filter {
1939 field_id: COL_C_I32,
1940 op: Operator::Equals(20.into()),
1941 });
1942
1943 let mut seen_cols = Vec::<u64>::new();
1944 table
1945 .scan_stream(
1946 &[proj(&table, COL_A_U64)],
1947 &filter,
1948 ScanStreamOptions::default(),
1949 |b| {
1950 assert_eq!(b.num_columns(), 1);
1951 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1952 for i in 0..a.len() {
1954 if !a.is_null(i) {
1955 seen_cols.push(a.value(i));
1956 }
1957 }
1958 },
1959 )
1960 .unwrap();
1961
1962 assert_eq!(seen_cols, vec![200, 200]);
1964 }
1965
1966 #[test]
1967 fn test_scan_with_multiple_projection_columns() {
1968 let table = setup_test_table();
1969 const COL_A_U64: FieldId = 10;
1970 const COL_C_I32: FieldId = 12;
1971
1972 let filter = pred_expr(Filter {
1973 field_id: COL_C_I32,
1974 op: Operator::Equals(20.into()),
1975 });
1976
1977 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1978
1979 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1980 table
1981 .scan_stream(
1982 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1983 &filter,
1984 ScanStreamOptions::default(),
1985 |b| {
1986 assert_eq!(b.num_columns(), 2);
1987 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1988 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1989
1990 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1991 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1992 for i in 0..b.num_rows() {
1993 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1994 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1995 combined.push((left, right));
1996 }
1997 },
1998 )
1999 .unwrap();
2000
2001 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2002 }
2003
2004 #[test]
2005 fn test_scan_stream_projection_validation() {
2006 let table = setup_test_table();
2007 const COL_A_U64: FieldId = 10;
2008 const COL_C_I32: FieldId = 12;
2009
2010 let filter = pred_expr(Filter {
2011 field_id: COL_C_I32,
2012 op: Operator::Equals(20.into()),
2013 });
2014
2015 let empty: [Projection; 0] = [];
2016 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2017 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2018
2019 let duplicate = [
2024 proj(&table, COL_A_U64),
2025 proj_alias(&table, COL_A_U64, "alias_a"),
2026 ];
2027 let mut collected = Vec::<u64>::new();
2028 table
2029 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2030 assert_eq!(b.num_columns(), 2);
2031 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2032 assert_eq!(b.schema().field(1).name(), "alias_a");
2033 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2034 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2035 for i in 0..b.num_rows() {
2036 if !a0.is_null(i) {
2037 collected.push(a0.value(i));
2038 }
2039 if !a1.is_null(i) {
2040 collected.push(a1.value(i));
2041 }
2042 }
2043 })
2044 .unwrap();
2045 assert_eq!(collected, vec![200, 200, 200, 200]);
2047 }
2048
2049 #[test]
2050 fn test_scan_stream_computed_projection() {
2051 let table = setup_test_table();
2052 const COL_A_U64: FieldId = 10;
2053
2054 let projections = [
2055 ScanProjection::column(proj(&table, COL_A_U64)),
2056 ScanProjection::computed(
2057 ScalarExpr::binary(
2058 ScalarExpr::column(COL_A_U64),
2059 BinaryOp::Multiply,
2060 ScalarExpr::literal(2),
2061 ),
2062 "a_times_two",
2063 ),
2064 ];
2065
2066 let filter = pred_expr(Filter {
2067 field_id: COL_A_U64,
2068 op: Operator::GreaterThanOrEquals(0.into()),
2069 });
2070
2071 let mut computed: Vec<(u64, f64)> = Vec::new();
2072 table
2073 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2074 assert_eq!(b.num_columns(), 2);
2075 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2076 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2077 for i in 0..b.num_rows() {
2078 if base.is_null(i) || comp.is_null(i) {
2079 continue;
2080 }
2081 computed.push((base.value(i), comp.value(i)));
2082 }
2083 })
2084 .unwrap();
2085
2086 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2087 assert_eq!(computed, expected);
2088 }
2089
2090 #[test]
2091 fn test_scan_stream_multi_column_filter_compare() {
2092 let table = setup_test_table();
2093 const COL_A_U64: FieldId = 10;
2094 const COL_C_I32: FieldId = 12;
2095
2096 let expr = Expr::Compare {
2097 left: ScalarExpr::binary(
2098 ScalarExpr::column(COL_A_U64),
2099 BinaryOp::Add,
2100 ScalarExpr::column(COL_C_I32),
2101 ),
2102 op: CompareOp::Gt,
2103 right: ScalarExpr::literal(220_i64),
2104 };
2105
2106 let mut vals: Vec<Option<u64>> = Vec::new();
2107 table
2108 .scan_stream(
2109 &[proj(&table, COL_A_U64)],
2110 &expr,
2111 ScanStreamOptions::default(),
2112 |b| {
2113 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2114 for i in 0..b.num_rows() {
2115 vals.push(if col.is_null(i) {
2116 None
2117 } else {
2118 Some(col.value(i))
2119 });
2120 }
2121 },
2122 )
2123 .unwrap();
2124
2125 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2126 }
2127}