1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::stream::ColumnStream;
7use crate::types::TableId;
8
9use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
10use arrow::datatypes::{DataType, Field, Schema};
11use std::collections::HashMap;
12
13use crate::constants::STREAM_BATCH_ROWS;
14use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
15use llkv_column_map::{ColumnStore, types::LogicalFieldId};
16use llkv_storage::pager::{MemPager, Pager};
17use simd_r_drive_entry_handle::EntryHandle;
18
19use crate::reserved::is_reserved_table_id;
20use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
21use crate::types::{FieldId, RowId};
22use llkv_expr::{Expr, ScalarExpr};
23use llkv_result::{Error, Result as LlkvResult};
24
25#[derive(Debug, Clone, Copy)]
28struct MvccColumnCache {
29 has_created_by: bool,
30 has_deleted_by: bool,
31}
32
33pub struct Table<P = MemPager>
36where
37 P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39 store: Arc<ColumnStore<P>>,
40 table_id: TableId,
41 mvcc_cache: RwLock<Option<MvccColumnCache>>,
44}
45
46pub trait RowIdFilter<P>: Send + Sync
58where
59 P: Pager<Blob = EntryHandle> + Send + Sync,
60{
61 fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> LlkvResult<Vec<RowId>>;
67}
68
69pub struct ScanStreamOptions<P = MemPager>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub include_nulls: bool,
83 pub order: Option<ScanOrderSpec>,
88 pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
93}
94
95impl<P> fmt::Debug for ScanStreamOptions<P>
96where
97 P: Pager<Blob = EntryHandle> + Send + Sync,
98{
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("ScanStreamOptions")
101 .field("include_nulls", &self.include_nulls)
102 .field("order", &self.order)
103 .field(
104 "row_id_filter",
105 &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
106 )
107 .finish()
108 }
109}
110
111impl<P> Clone for ScanStreamOptions<P>
112where
113 P: Pager<Blob = EntryHandle> + Send + Sync,
114{
115 fn clone(&self) -> Self {
116 Self {
117 include_nulls: self.include_nulls,
118 order: self.order,
119 row_id_filter: self.row_id_filter.clone(),
120 }
121 }
122}
123
124impl<P> Default for ScanStreamOptions<P>
125where
126 P: Pager<Blob = EntryHandle> + Send + Sync,
127{
128 fn default() -> Self {
129 Self {
130 include_nulls: false,
131 order: None,
132 row_id_filter: None,
133 }
134 }
135}
136
137#[derive(Clone, Copy, Debug)]
141pub struct ScanOrderSpec {
142 pub field_id: FieldId,
144 pub direction: ScanOrderDirection,
146 pub nulls_first: bool,
148 pub transform: ScanOrderTransform,
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum ScanOrderDirection {
155 Ascending,
157 Descending,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum ScanOrderTransform {
166 IdentityInteger,
168 IdentityUtf8,
170 CastUtf8ToInteger,
172}
173
174#[derive(Clone, Debug)]
178pub enum ScanProjection {
179 Column(Projection),
181 Computed {
183 expr: ScalarExpr<FieldId>,
185 alias: String,
187 },
188}
189
190impl ScanProjection {
191 pub fn column<P: Into<Projection>>(proj: P) -> Self {
193 Self::Column(proj.into())
194 }
195
196 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
198 Self::Computed {
199 expr,
200 alias: alias.into(),
201 }
202 }
203}
204
205impl From<Projection> for ScanProjection {
206 fn from(value: Projection) -> Self {
207 ScanProjection::Column(value)
208 }
209}
210
211impl From<&Projection> for ScanProjection {
212 fn from(value: &Projection) -> Self {
213 ScanProjection::Column(value.clone())
214 }
215}
216
217impl From<&ScanProjection> for ScanProjection {
218 fn from(value: &ScanProjection) -> Self {
219 value.clone()
220 }
221}
222
223impl<P> Table<P>
224where
225 P: Pager<Blob = EntryHandle> + Send + Sync,
226{
227 pub fn create_from_columns(
231 display_name: &str,
232 canonical_name: &str,
233 columns: &[llkv_plan::PlanColumnSpec],
234 metadata: Arc<crate::metadata::MetadataManager<P>>,
235 catalog: Arc<crate::catalog::TableCatalog>,
236 store: Arc<ColumnStore<P>>,
237 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
238 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
239 service.create_table_from_columns(display_name, canonical_name, columns)
240 }
241
242 pub fn create_from_schema(
244 display_name: &str,
245 canonical_name: &str,
246 schema: &arrow::datatypes::Schema,
247 metadata: Arc<crate::metadata::MetadataManager<P>>,
248 catalog: Arc<crate::catalog::TableCatalog>,
249 store: Arc<ColumnStore<P>>,
250 ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
251 let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
252 service.create_table_from_schema(display_name, canonical_name, schema)
253 }
254
255 #[doc(hidden)]
260 pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
261 if is_reserved_table_id(table_id) {
262 return Err(Error::ReservedTableId(table_id));
263 }
264
265 tracing::trace!(
266 "Table::from_id: Opening table_id={} with pager at {:p}",
267 table_id,
268 &*pager
269 );
270 let store = ColumnStore::open(pager)?;
271 Ok(Self {
272 store: Arc::new(store),
273 table_id,
274 mvcc_cache: RwLock::new(None),
275 })
276 }
277
278 #[doc(hidden)]
283 pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
284 if is_reserved_table_id(table_id) {
285 return Err(Error::ReservedTableId(table_id));
286 }
287
288 Ok(Self {
289 store,
290 table_id,
291 mvcc_cache: RwLock::new(None),
292 })
293 }
294
295 pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
297 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
298 self.store
299 .register_index(logical_field_id, IndexKind::Sort)?;
300 Ok(())
301 }
302
303 pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
305 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
306 match self
307 .store
308 .unregister_index(logical_field_id, IndexKind::Sort)
309 {
310 Ok(()) | Err(Error::NotFound) => Ok(()),
311 Err(err) => Err(err),
312 }
313 }
314
315 pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
317 let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
318 match self.store.list_persisted_indexes(logical_field_id) {
319 Ok(kinds) => Ok(kinds),
320 Err(Error::NotFound) => Ok(Vec::new()),
321 Err(err) => Err(err),
322 }
323 }
324
325 fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
328 {
330 let cache_read = self.mvcc_cache.read().unwrap();
331 if let Some(cache) = *cache_read {
332 return cache;
333 }
334 }
335
336 let has_created_by = schema
338 .fields()
339 .iter()
340 .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
341 let has_deleted_by = schema
342 .fields()
343 .iter()
344 .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
345
346 let cache = MvccColumnCache {
347 has_created_by,
348 has_deleted_by,
349 };
350
351 *self.mvcc_cache.write().unwrap() = Some(cache);
353
354 cache
355 }
356
357 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
382 use arrow::array::UInt64Builder;
383
384 let cache = self.get_mvcc_cache(&batch.schema());
387 let has_created_by = cache.has_created_by;
388 let has_deleted_by = cache.has_deleted_by;
389
390 let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
391 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
392
393 for (idx, field) in batch.schema().fields().iter().enumerate() {
394 let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
395 if maybe_field_id.is_none()
397 && (field.name() == ROW_ID_COLUMN_NAME
398 || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
399 || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
400 {
401 if field.name() == ROW_ID_COLUMN_NAME {
402 new_fields.push(field.as_ref().clone());
403 new_columns.push(batch.column(idx).clone());
404 } else {
405 let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
406 LogicalFieldId::for_mvcc_created_by(self.table_id)
407 } else {
408 LogicalFieldId::for_mvcc_deleted_by(self.table_id)
409 };
410
411 let mut metadata = field.metadata().clone();
412 let lfid_val: u64 = lfid.into();
413 metadata.insert(
414 crate::constants::FIELD_ID_META_KEY.to_string(),
415 lfid_val.to_string(),
416 );
417
418 let new_field =
419 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
420 .with_metadata(metadata);
421 new_fields.push(new_field);
422 new_columns.push(batch.column(idx).clone());
423 }
424 continue;
425 }
426
427 let raw_field_id = maybe_field_id
428 .ok_or_else(|| {
429 llkv_result::Error::Internal(format!(
430 "Field '{}' is missing a valid '{}' in its metadata.",
431 field.name(),
432 crate::constants::FIELD_ID_META_KEY
433 ))
434 })?
435 .parse::<u64>()
436 .map_err(|err| {
437 llkv_result::Error::Internal(format!(
438 "Field '{}' contains an invalid '{}': {}",
439 field.name(),
440 crate::constants::FIELD_ID_META_KEY,
441 err
442 ))
443 })?;
444
445 if raw_field_id > FieldId::MAX as u64 {
446 return Err(llkv_result::Error::Internal(format!(
447 "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
448 field.name(),
449 FieldId::MAX,
450 raw_field_id
451 )));
452 }
453
454 let user_field_id = raw_field_id as FieldId;
455 let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
456
457 let lfid = logical_field_id;
460 let mut new_metadata = field.metadata().clone();
461 let lfid_val: u64 = lfid.into();
462 new_metadata.insert(
463 crate::constants::FIELD_ID_META_KEY.to_string(),
464 lfid_val.to_string(),
465 );
466
467 let new_field =
468 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
469 .with_metadata(new_metadata);
470 new_fields.push(new_field);
471 new_columns.push(batch.column(idx).clone());
472
473 let need_meta = match self
479 .catalog()
480 .get_cols_meta(self.table_id, &[user_field_id])
481 {
482 metas if metas.is_empty() => true,
483 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
484 };
485
486 if need_meta {
487 let meta = ColMeta {
488 col_id: user_field_id,
489 name: Some(field.name().to_string()),
490 flags: 0,
491 default: None,
492 };
493 self.catalog().put_col_meta(self.table_id, &meta);
494 }
495 }
496
497 const TXN_ID_AUTO_COMMIT: u64 = 1;
502 const TXN_ID_NONE: u64 = 0;
503 let row_count = batch.num_rows();
504
505 if !has_created_by {
506 let mut created_by_builder = UInt64Builder::with_capacity(row_count);
507 for _ in 0..row_count {
508 created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
509 }
510 let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
511 let mut metadata = HashMap::new();
512 let lfid_val: u64 = created_by_lfid.into();
513 metadata.insert(
514 crate::constants::FIELD_ID_META_KEY.to_string(),
515 lfid_val.to_string(),
516 );
517 new_fields.push(
518 Field::new(
519 llkv_column_map::store::CREATED_BY_COLUMN_NAME,
520 DataType::UInt64,
521 false,
522 )
523 .with_metadata(metadata),
524 );
525 new_columns.push(Arc::new(created_by_builder.finish()));
526 }
527
528 if !has_deleted_by {
529 let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
530 for _ in 0..row_count {
531 deleted_by_builder.append_value(TXN_ID_NONE);
532 }
533 let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
534 let mut metadata = HashMap::new();
535 let lfid_val: u64 = deleted_by_lfid.into();
536 metadata.insert(
537 crate::constants::FIELD_ID_META_KEY.to_string(),
538 lfid_val.to_string(),
539 );
540 new_fields.push(
541 Field::new(
542 llkv_column_map::store::DELETED_BY_COLUMN_NAME,
543 DataType::UInt64,
544 false,
545 )
546 .with_metadata(metadata),
547 );
548 new_columns.push(Arc::new(deleted_by_builder.finish()));
549 }
550
551 let new_schema = Arc::new(Schema::new(new_fields));
552 let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
553
554 tracing::trace!(
555 table_id = self.table_id,
556 num_columns = namespaced_batch.num_columns(),
557 num_rows = namespaced_batch.num_rows(),
558 "Attempting append to table"
559 );
560
561 if let Err(err) = self.store.append(&namespaced_batch) {
562 let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
563 .schema()
564 .fields()
565 .iter()
566 .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
567 .filter_map(|s| s.parse::<u64>().ok())
568 .map(LogicalFieldId::from)
569 .collect();
570
571 let missing_fields: Vec<LogicalFieldId> = batch_field_ids
573 .iter()
574 .filter(|&&field_id| !self.store.has_field(field_id))
575 .copied()
576 .collect();
577
578 tracing::error!(
579 table_id = self.table_id,
580 error = ?err,
581 batch_field_ids = ?batch_field_ids,
582 missing_from_catalog = ?missing_fields,
583 "Append failed - some fields missing from catalog"
584 );
585 return Err(err);
586 }
587 Ok(())
588 }
589
590 pub fn scan_stream<'a, I, T, F>(
598 &self,
599 projections: I,
600 filter_expr: &Expr<'a, FieldId>,
601 options: ScanStreamOptions<P>,
602 on_batch: F,
603 ) -> LlkvResult<()>
604 where
605 I: IntoIterator<Item = T>,
606 T: Into<ScanProjection>,
607 F: FnMut(RecordBatch),
608 {
609 let stream_projections: Vec<ScanProjection> =
610 projections.into_iter().map(|p| p.into()).collect();
611 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
612 }
613
614 pub fn scan_stream_with_exprs<'a, F>(
620 &self,
621 projections: &[ScanProjection],
622 filter_expr: &Expr<'a, FieldId>,
623 options: ScanStreamOptions<P>,
624 on_batch: F,
625 ) -> LlkvResult<()>
626 where
627 F: FnMut(RecordBatch),
628 {
629 TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
630 }
631
632 pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<RowId>> {
633 collect_row_ids_for_table(self, filter_expr)
634 }
635
636 #[inline]
637 pub fn catalog(&self) -> SysCatalog<'_, P> {
638 SysCatalog::new(&self.store)
639 }
640
641 #[inline]
642 pub fn get_table_meta(&self) -> Option<TableMeta> {
643 self.catalog().get_table_meta(self.table_id)
644 }
645
646 #[inline]
647 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
648 self.catalog().get_cols_meta(self.table_id, col_ids)
649 }
650
651 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
658 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
660 logical_fields.sort_by_key(|lfid| lfid.field_id());
661
662 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
663 let metas = self.get_cols_meta(&field_ids);
664
665 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
666 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
668
669 for (idx, lfid) in logical_fields.into_iter().enumerate() {
670 let fid = lfid.field_id();
671 let dtype = self.store.data_type(lfid)?;
672 let name = metas
673 .get(idx)
674 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
675 .unwrap_or_else(|| format!("col_{}", fid));
676
677 let mut metadata: HashMap<String, String> = HashMap::new();
678 metadata.insert(
679 crate::constants::FIELD_ID_META_KEY.to_string(),
680 fid.to_string(),
681 );
682
683 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
684 }
685
686 Ok(Arc::new(Schema::new(fields)))
687 }
688
689 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
693 let schema = self.schema()?;
694 let fields = schema.fields();
695
696 let mut names: Vec<String> = Vec::with_capacity(fields.len());
697 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
698 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
699
700 for field in fields.iter() {
701 names.push(field.name().to_string());
702 let fid = field
703 .metadata()
704 .get(crate::constants::FIELD_ID_META_KEY)
705 .and_then(|s| s.parse::<u32>().ok())
706 .unwrap_or(0u32);
707 fids.push(fid);
708 dtypes.push(format!("{:?}", field.data_type()));
709 }
710
711 let name_array: ArrayRef = Arc::new(StringArray::from(names));
713 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
714 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
715
716 let rb_schema = Arc::new(Schema::new(vec![
717 Field::new("name", DataType::Utf8, false),
718 Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
719 Field::new("data_type", DataType::Utf8, false),
720 ]));
721
722 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
723 Ok(batch)
724 }
725
726 pub fn stream_columns(
728 &self,
729 logical_fields: impl Into<Arc<[LogicalFieldId]>>,
730 row_ids: Vec<RowId>,
731 policy: GatherNullPolicy,
732 ) -> LlkvResult<ColumnStream<'_, P>> {
733 let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
734 let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
735 Ok(ColumnStream::new(
736 &self.store,
737 ctx,
738 row_ids,
739 STREAM_BATCH_ROWS,
740 policy,
741 logical_fields,
742 ))
743 }
744
745 pub fn store(&self) -> &ColumnStore<P> {
746 &self.store
747 }
748
749 #[inline]
750 pub fn table_id(&self) -> TableId {
751 self.table_id
752 }
753
754 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
759 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
760 self.store.total_rows_for_field(lfid)
761 }
762
763 pub fn total_rows(&self) -> llkv_result::Result<u64> {
768 use llkv_column_map::store::rowid_fid;
769 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
770 match self.store.total_rows_for_field(rid_lfid) {
772 Ok(n) => Ok(n),
773 Err(_) => {
774 self.store.total_rows_for_table(self.table_id)
776 }
777 }
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use crate::reserved::CATALOG_TABLE_ID;
785 use crate::types::RowId;
786 use arrow::array::Array;
787 use arrow::array::ArrayRef;
788 use arrow::array::{
789 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
790 UInt32Array, UInt64Array,
791 };
792 use arrow::compute::{cast, max, min, sum, unary};
793 use arrow::datatypes::DataType;
794 use llkv_column_map::ColumnStore;
795 use llkv_column_map::store::GatherNullPolicy;
796 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
797 use std::collections::HashMap;
798 use std::ops::Bound;
799
800 fn setup_test_table() -> Table {
801 let pager = Arc::new(MemPager::default());
802 setup_test_table_with_pager(&pager)
803 }
804
805 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
806 let table = Table::from_id(1, Arc::clone(pager)).unwrap();
807 const COL_A_U64: FieldId = 10;
808 const COL_B_BIN: FieldId = 11;
809 const COL_C_I32: FieldId = 12;
810 const COL_D_F64: FieldId = 13;
811 const COL_E_F32: FieldId = 14;
812
813 let schema = Arc::new(Schema::new(vec![
814 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
815 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
816 crate::constants::FIELD_ID_META_KEY.to_string(),
817 COL_A_U64.to_string(),
818 )])),
819 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
820 crate::constants::FIELD_ID_META_KEY.to_string(),
821 COL_B_BIN.to_string(),
822 )])),
823 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
824 crate::constants::FIELD_ID_META_KEY.to_string(),
825 COL_C_I32.to_string(),
826 )])),
827 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
828 crate::constants::FIELD_ID_META_KEY.to_string(),
829 COL_D_F64.to_string(),
830 )])),
831 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
832 crate::constants::FIELD_ID_META_KEY.to_string(),
833 COL_E_F32.to_string(),
834 )])),
835 ]));
836
837 let batch = RecordBatch::try_new(
838 schema.clone(),
839 vec![
840 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
841 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
842 Arc::new(BinaryArray::from(vec![
843 b"foo" as &[u8],
844 b"bar",
845 b"baz",
846 b"qux",
847 ])),
848 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
849 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
850 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
851 ],
852 )
853 .unwrap();
854
855 table.append(&batch).unwrap();
856 table
857 }
858
859 fn gather_single(
860 store: &ColumnStore<MemPager>,
861 field_id: LogicalFieldId,
862 row_ids: &[u64],
863 ) -> ArrayRef {
864 store
865 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
866 .unwrap()
867 .column(0)
868 .clone()
869 }
870
871 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
872 Expr::Pred(filter)
873 }
874
875 fn proj(table: &Table, field_id: FieldId) -> Projection {
876 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
877 }
878
879 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
880 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
881 }
882
883 #[test]
884 fn table_new_rejects_reserved_table_id() {
885 let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
886 assert!(matches!(
887 result,
888 Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
889 ));
890 }
891
892 #[test]
893 fn test_append_rejects_logical_field_id_in_metadata() {
894 let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
898
899 const USER_FID: FieldId = 42;
900 let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
902 let logical_val: u64 = logical.into();
903
904 let schema = Arc::new(Schema::new(vec![
905 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
906 Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
907 crate::constants::FIELD_ID_META_KEY.to_string(),
908 logical_val.to_string(),
909 )])),
910 ]));
911
912 let batch = RecordBatch::try_new(
913 schema,
914 vec![
915 Arc::new(UInt64Array::from(vec![1u64, 2u64])),
916 Arc::new(UInt64Array::from(vec![10u64, 20u64])),
917 ],
918 )
919 .unwrap();
920
921 let res = table.append(&batch);
922 assert!(matches!(res, Err(Error::Internal(_))));
923 }
924
925 #[test]
926 fn test_scan_with_u64_filter() {
927 let table = setup_test_table();
928 const COL_A_U64: FieldId = 10;
929 const COL_C_I32: FieldId = 12;
930
931 let expr = pred_expr(Filter {
932 field_id: COL_A_U64,
933 op: Operator::Equals(200.into()),
934 });
935
936 let mut vals: Vec<Option<i32>> = Vec::new();
937 table
938 .scan_stream(
939 &[proj(&table, COL_C_I32)],
940 &expr,
941 ScanStreamOptions::default(),
942 |b| {
943 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
944 vals.extend(
945 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
946 );
947 },
948 )
949 .unwrap();
950 assert_eq!(vals, vec![Some(20), Some(20)]);
951 }
952
953 #[test]
954 fn test_scan_with_string_filter() {
955 let pager = Arc::new(MemPager::default());
956 let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
957
958 const COL_STR: FieldId = 42;
959 let schema = Arc::new(Schema::new(vec![
960 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
961 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
962 crate::constants::FIELD_ID_META_KEY.to_string(),
963 COL_STR.to_string(),
964 )])),
965 ]));
966
967 let batch = RecordBatch::try_new(
968 schema,
969 vec![
970 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
971 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
972 ],
973 )
974 .unwrap();
975 table.append(&batch).unwrap();
976
977 let expr = pred_expr(Filter {
978 field_id: COL_STR,
979 op: Operator::starts_with("al", true),
980 });
981
982 let mut collected: Vec<Option<String>> = Vec::new();
983 table
984 .scan_stream(
985 &[proj(&table, COL_STR)],
986 &expr,
987 ScanStreamOptions::default(),
988 |b| {
989 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
990 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
991 },
992 )
993 .unwrap();
994
995 assert_eq!(
996 collected,
997 vec![Some("alice".to_string()), Some("albert".to_string())]
998 );
999 }
1000
1001 #[test]
1002 fn test_table_reopen_with_shared_pager() {
1003 const TABLE_ALPHA: TableId = 42;
1004 const TABLE_BETA: TableId = 43;
1005 const TABLE_GAMMA: TableId = 44;
1006 const COL_ALPHA_U64: FieldId = 100;
1007 const COL_ALPHA_I32: FieldId = 101;
1008 const COL_ALPHA_U32: FieldId = 102;
1009 const COL_ALPHA_I16: FieldId = 103;
1010 const COL_BETA_U64: FieldId = 200;
1011 const COL_BETA_U8: FieldId = 201;
1012 const COL_GAMMA_I16: FieldId = 300;
1013
1014 let pager = Arc::new(MemPager::default());
1015
1016 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1017 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1018 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1019 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1020 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1021
1022 let beta_rows: Vec<RowId> = vec![101, 102, 103];
1023 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1024 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1025
1026 let gamma_rows: Vec<RowId> = vec![501, 502];
1027 let gamma_vals_i16: Vec<i16> = vec![123, -321];
1028
1029 {
1031 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1032 let schema = Arc::new(Schema::new(vec![
1033 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1034 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1035 crate::constants::FIELD_ID_META_KEY.to_string(),
1036 COL_ALPHA_U64.to_string(),
1037 )])),
1038 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1039 crate::constants::FIELD_ID_META_KEY.to_string(),
1040 COL_ALPHA_I32.to_string(),
1041 )])),
1042 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1043 crate::constants::FIELD_ID_META_KEY.to_string(),
1044 COL_ALPHA_U32.to_string(),
1045 )])),
1046 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1047 crate::constants::FIELD_ID_META_KEY.to_string(),
1048 COL_ALPHA_I16.to_string(),
1049 )])),
1050 ]));
1051 let batch = RecordBatch::try_new(
1052 schema,
1053 vec![
1054 Arc::new(UInt64Array::from(alpha_rows.clone())),
1055 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1056 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1057 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1058 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1059 ],
1060 )
1061 .unwrap();
1062 table.append(&batch).unwrap();
1063 }
1064
1065 {
1066 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1067 let schema = Arc::new(Schema::new(vec![
1068 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1069 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1070 crate::constants::FIELD_ID_META_KEY.to_string(),
1071 COL_BETA_U64.to_string(),
1072 )])),
1073 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1074 crate::constants::FIELD_ID_META_KEY.to_string(),
1075 COL_BETA_U8.to_string(),
1076 )])),
1077 ]));
1078 let batch = RecordBatch::try_new(
1079 schema,
1080 vec![
1081 Arc::new(UInt64Array::from(beta_rows.clone())),
1082 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1083 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1084 ],
1085 )
1086 .unwrap();
1087 table.append(&batch).unwrap();
1088 }
1089
1090 {
1091 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1092 let schema = Arc::new(Schema::new(vec![
1093 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1094 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1095 crate::constants::FIELD_ID_META_KEY.to_string(),
1096 COL_GAMMA_I16.to_string(),
1097 )])),
1098 ]));
1099 let batch = RecordBatch::try_new(
1100 schema,
1101 vec![
1102 Arc::new(UInt64Array::from(gamma_rows.clone())),
1103 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1104 ],
1105 )
1106 .unwrap();
1107 table.append(&batch).unwrap();
1108 }
1109
1110 {
1112 let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1113 let store = table.store();
1114
1115 let expectations: &[(FieldId, DataType)] = &[
1116 (COL_ALPHA_U64, DataType::UInt64),
1117 (COL_ALPHA_I32, DataType::Int32),
1118 (COL_ALPHA_U32, DataType::UInt32),
1119 (COL_ALPHA_I16, DataType::Int16),
1120 ];
1121
1122 for &(col, ref ty) in expectations {
1123 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1124 assert_eq!(store.data_type(lfid).unwrap(), *ty);
1125 let arr = gather_single(store, lfid, &alpha_rows);
1126 match ty {
1127 DataType::UInt64 => {
1128 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1129 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1130 }
1131 DataType::Int32 => {
1132 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1133 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1134 }
1135 DataType::UInt32 => {
1136 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1137 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1138 }
1139 DataType::Int16 => {
1140 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1141 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1142 }
1143 other => panic!("unexpected dtype {other:?}"),
1144 }
1145 }
1146 }
1147
1148 {
1149 let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1150 let store = table.store();
1151
1152 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1153 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1154 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1155 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1156 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1157
1158 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1159 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1160 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1161 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1162 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1163 }
1164
1165 {
1166 let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1167 let store = table.store();
1168 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1169 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1170 let arr = gather_single(store, lfid, &gamma_rows);
1171 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1172 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1173 }
1174 }
1175
1176 #[test]
1177 fn test_scan_with_i32_filter() {
1178 let table = setup_test_table();
1179 const COL_A_U64: FieldId = 10;
1180 const COL_C_I32: FieldId = 12;
1181
1182 let filter = pred_expr(Filter {
1183 field_id: COL_C_I32,
1184 op: Operator::Equals(20.into()),
1185 });
1186
1187 let mut vals: Vec<Option<u64>> = Vec::new();
1188 table
1189 .scan_stream(
1190 &[proj(&table, COL_A_U64)],
1191 &filter,
1192 ScanStreamOptions::default(),
1193 |b| {
1194 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1195 vals.extend(
1196 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1197 );
1198 },
1199 )
1200 .unwrap();
1201 assert_eq!(vals, vec![Some(200), Some(200)]);
1202 }
1203
1204 #[test]
1205 fn test_scan_with_greater_than_filter() {
1206 let table = setup_test_table();
1207 const COL_A_U64: FieldId = 10;
1208 const COL_C_I32: FieldId = 12;
1209
1210 let filter = pred_expr(Filter {
1211 field_id: COL_C_I32,
1212 op: Operator::GreaterThan(15.into()),
1213 });
1214
1215 let mut vals: Vec<Option<u64>> = Vec::new();
1216 table
1217 .scan_stream(
1218 &[proj(&table, COL_A_U64)],
1219 &filter,
1220 ScanStreamOptions::default(),
1221 |b| {
1222 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1223 vals.extend(
1224 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1225 );
1226 },
1227 )
1228 .unwrap();
1229 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1230 }
1231
1232 #[test]
1233 fn test_scan_with_range_filter() {
1234 let table = setup_test_table();
1235 const COL_A_U64: FieldId = 10;
1236 const COL_C_I32: FieldId = 12;
1237
1238 let filter = pred_expr(Filter {
1239 field_id: COL_A_U64,
1240 op: Operator::Range {
1241 lower: Bound::Included(150.into()),
1242 upper: Bound::Excluded(300.into()),
1243 },
1244 });
1245
1246 let mut vals: Vec<Option<i32>> = Vec::new();
1247 table
1248 .scan_stream(
1249 &[proj(&table, COL_C_I32)],
1250 &filter,
1251 ScanStreamOptions::default(),
1252 |b| {
1253 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1254 vals.extend(
1255 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1256 );
1257 },
1258 )
1259 .unwrap();
1260 assert_eq!(vals, vec![Some(20), Some(20)]);
1261 }
1262
1263 #[test]
1264 fn test_filtered_scan_sum_kernel() {
1265 let table = setup_test_table();
1269 const COL_A_U64: FieldId = 10;
1270
1271 let filter = pred_expr(Filter {
1272 field_id: COL_A_U64,
1273 op: Operator::Range {
1274 lower: Bound::Included(150.into()),
1275 upper: Bound::Excluded(300.into()),
1276 },
1277 });
1278
1279 let mut total: u128 = 0;
1280 table
1281 .scan_stream(
1282 &[proj(&table, COL_A_U64)],
1283 &filter,
1284 ScanStreamOptions::default(),
1285 |b| {
1286 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1287 if let Some(part) = sum(a) {
1288 total += part as u128;
1289 }
1290 },
1291 )
1292 .unwrap();
1293
1294 assert_eq!(total, 400);
1295 }
1296
1297 #[test]
1298 fn test_filtered_scan_sum_i32_kernel() {
1299 let table = setup_test_table();
1304 const COL_A_U64: FieldId = 10;
1305 const COL_C_I32: FieldId = 12;
1306
1307 let candidates = [100.into(), 300.into()];
1308 let filter = pred_expr(Filter {
1309 field_id: COL_A_U64,
1310 op: Operator::In(&candidates),
1311 });
1312
1313 let mut total: i64 = 0;
1314 table
1315 .scan_stream(
1316 &[proj(&table, COL_C_I32)],
1317 &filter,
1318 ScanStreamOptions::default(),
1319 |b| {
1320 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1321 if let Some(part) = sum(a) {
1322 total += part as i64;
1323 }
1324 },
1325 )
1326 .unwrap();
1327 assert_eq!(total, 40);
1328 }
1329
1330 #[test]
1331 fn test_filtered_scan_min_max_kernel() {
1332 let table = setup_test_table();
1337 const COL_A_U64: FieldId = 10;
1338 const COL_C_I32: FieldId = 12;
1339
1340 let candidates = [100.into(), 300.into()];
1341 let filter = pred_expr(Filter {
1342 field_id: COL_A_U64,
1343 op: Operator::In(&candidates),
1344 });
1345
1346 let mut mn: Option<i32> = None;
1347 let mut mx: Option<i32> = None;
1348 table
1349 .scan_stream(
1350 &[proj(&table, COL_C_I32)],
1351 &filter,
1352 ScanStreamOptions::default(),
1353 |b| {
1354 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1355
1356 if let Some(part_min) = min(a) {
1357 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1358 }
1359 if let Some(part_max) = max(a) {
1360 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1361 }
1362 },
1363 )
1364 .unwrap();
1365 assert_eq!(mn, Some(10));
1366 assert_eq!(mx, Some(30));
1367 }
1368
1369 #[test]
1370 fn test_filtered_scan_float64_column() {
1371 let table = setup_test_table();
1372 const COL_D_F64: FieldId = 13;
1373
1374 let filter = pred_expr(Filter {
1375 field_id: COL_D_F64,
1376 op: Operator::GreaterThan(2.0_f64.into()),
1377 });
1378
1379 let mut got = Vec::new();
1380 table
1381 .scan_stream(
1382 &[proj(&table, COL_D_F64)],
1383 &filter,
1384 ScanStreamOptions::default(),
1385 |b| {
1386 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1387 for i in 0..arr.len() {
1388 if arr.is_valid(i) {
1389 got.push(arr.value(i));
1390 }
1391 }
1392 },
1393 )
1394 .unwrap();
1395
1396 assert_eq!(got, vec![2.5, 3.5, 2.5]);
1397 }
1398
1399 #[test]
1400 fn test_filtered_scan_float32_in_operator() {
1401 let table = setup_test_table();
1402 const COL_E_F32: FieldId = 14;
1403
1404 let candidates = [2.0_f32.into(), 3.0_f32.into()];
1405 let filter = pred_expr(Filter {
1406 field_id: COL_E_F32,
1407 op: Operator::In(&candidates),
1408 });
1409
1410 let mut vals: Vec<Option<f32>> = Vec::new();
1411 table
1412 .scan_stream(
1413 &[proj(&table, COL_E_F32)],
1414 &filter,
1415 ScanStreamOptions::default(),
1416 |b| {
1417 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1418 vals.extend((0..arr.len()).map(|i| {
1419 if arr.is_null(i) {
1420 None
1421 } else {
1422 Some(arr.value(i))
1423 }
1424 }));
1425 },
1426 )
1427 .unwrap();
1428
1429 let collected: Vec<f32> = vals.into_iter().flatten().collect();
1430 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1431 }
1432
1433 #[test]
1434 fn test_scan_stream_and_expression() {
1435 let table = setup_test_table();
1436 const COL_A_U64: FieldId = 10;
1437 const COL_C_I32: FieldId = 12;
1438 const COL_E_F32: FieldId = 14;
1439
1440 let expr = Expr::all_of(vec![
1441 Filter {
1442 field_id: COL_C_I32,
1443 op: Operator::GreaterThan(15.into()),
1444 },
1445 Filter {
1446 field_id: COL_A_U64,
1447 op: Operator::LessThan(250.into()),
1448 },
1449 ]);
1450
1451 let mut vals: Vec<Option<f32>> = Vec::new();
1452 table
1453 .scan_stream(
1454 &[proj(&table, COL_E_F32)],
1455 &expr,
1456 ScanStreamOptions::default(),
1457 |b| {
1458 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1459 vals.extend((0..arr.len()).map(|i| {
1460 if arr.is_null(i) {
1461 None
1462 } else {
1463 Some(arr.value(i))
1464 }
1465 }));
1466 },
1467 )
1468 .unwrap();
1469
1470 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1471 }
1472
1473 #[test]
1474 fn test_scan_stream_or_expression() {
1475 let table = setup_test_table();
1476 const COL_A_U64: FieldId = 10;
1477 const COL_C_I32: FieldId = 12;
1478
1479 let expr = Expr::any_of(vec![
1480 Filter {
1481 field_id: COL_C_I32,
1482 op: Operator::Equals(10.into()),
1483 },
1484 Filter {
1485 field_id: COL_C_I32,
1486 op: Operator::Equals(30.into()),
1487 },
1488 ]);
1489
1490 let mut vals: Vec<Option<u64>> = Vec::new();
1491 table
1492 .scan_stream(
1493 &[proj(&table, COL_A_U64)],
1494 &expr,
1495 ScanStreamOptions::default(),
1496 |b| {
1497 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1498 vals.extend((0..arr.len()).map(|i| {
1499 if arr.is_null(i) {
1500 None
1501 } else {
1502 Some(arr.value(i))
1503 }
1504 }));
1505 },
1506 )
1507 .unwrap();
1508
1509 assert_eq!(vals, vec![Some(100), Some(300)]);
1510 }
1511
1512 #[test]
1513 fn test_scan_stream_not_predicate() {
1514 let table = setup_test_table();
1515 const COL_A_U64: FieldId = 10;
1516 const COL_C_I32: FieldId = 12;
1517
1518 let expr = Expr::not(pred_expr(Filter {
1519 field_id: COL_C_I32,
1520 op: Operator::Equals(20.into()),
1521 }));
1522
1523 let mut vals: Vec<Option<u64>> = Vec::new();
1524 table
1525 .scan_stream(
1526 &[proj(&table, COL_A_U64)],
1527 &expr,
1528 ScanStreamOptions::default(),
1529 |b| {
1530 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1531 vals.extend((0..arr.len()).map(|i| {
1532 if arr.is_null(i) {
1533 None
1534 } else {
1535 Some(arr.value(i))
1536 }
1537 }));
1538 },
1539 )
1540 .unwrap();
1541
1542 assert_eq!(vals, vec![Some(100), Some(300)]);
1543 }
1544
1545 #[test]
1546 fn test_scan_stream_not_and_expression() {
1547 let table = setup_test_table();
1548 const COL_A_U64: FieldId = 10;
1549 const COL_C_I32: FieldId = 12;
1550
1551 let expr = Expr::not(Expr::all_of(vec![
1552 Filter {
1553 field_id: COL_A_U64,
1554 op: Operator::GreaterThan(150.into()),
1555 },
1556 Filter {
1557 field_id: COL_C_I32,
1558 op: Operator::LessThan(40.into()),
1559 },
1560 ]));
1561
1562 let mut vals: Vec<Option<u64>> = Vec::new();
1563 table
1564 .scan_stream(
1565 &[proj(&table, COL_A_U64)],
1566 &expr,
1567 ScanStreamOptions::default(),
1568 |b| {
1569 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1570 vals.extend((0..arr.len()).map(|i| {
1571 if arr.is_null(i) {
1572 None
1573 } else {
1574 Some(arr.value(i))
1575 }
1576 }));
1577 },
1578 )
1579 .unwrap();
1580
1581 assert_eq!(vals, vec![Some(100)]);
1582 }
1583
1584 #[test]
1585 fn test_scan_stream_include_nulls_toggle() {
1586 let pager = Arc::new(MemPager::default());
1587 let table = setup_test_table_with_pager(&pager);
1588 const COL_A_U64: FieldId = 10;
1589 const COL_C_I32: FieldId = 12;
1590 const COL_B_BIN: FieldId = 11;
1591 const COL_D_F64: FieldId = 13;
1592 const COL_E_F32: FieldId = 14;
1593
1594 let schema = Arc::new(Schema::new(vec![
1595 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1596 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1597 crate::constants::FIELD_ID_META_KEY.to_string(),
1598 COL_A_U64.to_string(),
1599 )])),
1600 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1601 crate::constants::FIELD_ID_META_KEY.to_string(),
1602 COL_B_BIN.to_string(),
1603 )])),
1604 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1605 crate::constants::FIELD_ID_META_KEY.to_string(),
1606 COL_C_I32.to_string(),
1607 )])),
1608 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1609 crate::constants::FIELD_ID_META_KEY.to_string(),
1610 COL_D_F64.to_string(),
1611 )])),
1612 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1613 crate::constants::FIELD_ID_META_KEY.to_string(),
1614 COL_E_F32.to_string(),
1615 )])),
1616 ]));
1617
1618 let batch = RecordBatch::try_new(
1619 schema,
1620 vec![
1621 Arc::new(UInt64Array::from(vec![5, 6])),
1622 Arc::new(UInt64Array::from(vec![500, 600])),
1623 Arc::new(BinaryArray::from(vec![
1624 Some(&b"new"[..]),
1625 Some(&b"alt"[..]),
1626 ])),
1627 Arc::new(Int32Array::from(vec![Some(40), None])),
1628 Arc::new(Float64Array::from(vec![5.5, 6.5])),
1629 Arc::new(Float32Array::from(vec![5.0, 6.0])),
1630 ],
1631 )
1632 .unwrap();
1633 table.append(&batch).unwrap();
1634
1635 let filter = pred_expr(Filter {
1636 field_id: COL_A_U64,
1637 op: Operator::GreaterThan(450.into()),
1638 });
1639
1640 let mut default_vals: Vec<Option<i32>> = Vec::new();
1641 table
1642 .scan_stream(
1643 &[proj(&table, COL_C_I32)],
1644 &filter,
1645 ScanStreamOptions::default(),
1646 |b| {
1647 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1648 default_vals.extend((0..arr.len()).map(|i| {
1649 if arr.is_null(i) {
1650 None
1651 } else {
1652 Some(arr.value(i))
1653 }
1654 }));
1655 },
1656 )
1657 .unwrap();
1658 assert_eq!(default_vals, vec![Some(40)]);
1659
1660 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1661 table
1662 .scan_stream(
1663 &[proj(&table, COL_C_I32)],
1664 &filter,
1665 ScanStreamOptions {
1666 include_nulls: true,
1667 order: None,
1668 row_id_filter: None,
1669 },
1670 |b| {
1671 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1672
1673 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1674 table
1675 .scan_stream(
1676 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1677 &filter,
1678 ScanStreamOptions::default(),
1679 |b| {
1680 assert_eq!(b.num_columns(), 2);
1681 let c_arr =
1682 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1683 let d_arr =
1684 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1685 for i in 0..b.num_rows() {
1686 let c_val = if c_arr.is_null(i) {
1687 None
1688 } else {
1689 Some(c_arr.value(i))
1690 };
1691 let d_val = if d_arr.is_null(i) {
1692 None
1693 } else {
1694 Some(d_arr.value(i))
1695 };
1696 paired_vals.push((c_val, d_val));
1697 }
1698 },
1699 )
1700 .unwrap();
1701 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1702 include_null_vals.extend((0..arr.len()).map(|i| {
1703 if arr.is_null(i) {
1704 None
1705 } else {
1706 Some(arr.value(i))
1707 }
1708 }));
1709 },
1710 )
1711 .unwrap();
1712 assert_eq!(include_null_vals, vec![Some(40), None]);
1713 }
1714
1715 #[test]
1716 fn test_filtered_scan_int_sqrt_float64() {
1717 let table = setup_test_table();
1723 const COL_A_U64: FieldId = 10;
1724 const COL_C_I32: FieldId = 12;
1725
1726 let filter = pred_expr(Filter {
1727 field_id: COL_C_I32,
1728 op: Operator::GreaterThan(15.into()),
1729 });
1730
1731 let mut got: Vec<f64> = Vec::new();
1732 table
1733 .scan_stream(
1734 &[proj(&table, COL_A_U64)],
1735 &filter,
1736 ScanStreamOptions::default(),
1737 |b| {
1738 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1739 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1740
1741 let sqrt_arr = unary::<
1743 arrow::datatypes::Float64Type,
1744 _,
1745 arrow::datatypes::Float64Type,
1746 >(f64_arr, |v: f64| v.sqrt());
1747
1748 for i in 0..sqrt_arr.len() {
1749 if !sqrt_arr.is_null(i) {
1750 got.push(sqrt_arr.value(i));
1751 }
1752 }
1753 },
1754 )
1755 .unwrap();
1756
1757 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1758 assert_eq!(got, expected);
1759 }
1760
1761 #[test]
1762 fn test_multi_field_kernels_with_filters() {
1763 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1767
1768 let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1769
1770 const COL_A_U64: FieldId = 20;
1771 const COL_D_U32: FieldId = 21;
1772 const COL_E_I16: FieldId = 22;
1773 const COL_F_U8: FieldId = 23;
1774 const COL_C_I32: FieldId = 24;
1775
1776 let schema = Arc::new(Schema::new(vec![
1777 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1778 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1779 crate::constants::FIELD_ID_META_KEY.to_string(),
1780 COL_A_U64.to_string(),
1781 )])),
1782 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1783 crate::constants::FIELD_ID_META_KEY.to_string(),
1784 COL_D_U32.to_string(),
1785 )])),
1786 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1787 crate::constants::FIELD_ID_META_KEY.to_string(),
1788 COL_E_I16.to_string(),
1789 )])),
1790 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1791 crate::constants::FIELD_ID_META_KEY.to_string(),
1792 COL_F_U8.to_string(),
1793 )])),
1794 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1795 crate::constants::FIELD_ID_META_KEY.to_string(),
1796 COL_C_I32.to_string(),
1797 )])),
1798 ]));
1799
1800 let batch = RecordBatch::try_new(
1802 schema.clone(),
1803 vec![
1804 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1805 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1806 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1807 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1808 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1809 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1810 ],
1811 )
1812 .unwrap();
1813
1814 table.append(&batch).unwrap();
1815
1816 let filter = pred_expr(Filter {
1818 field_id: COL_C_I32,
1819 op: Operator::GreaterThanOrEquals(20.into()),
1820 });
1821
1822 let mut d_sum: u128 = 0;
1824 table
1825 .scan_stream(
1826 &[proj(&table, COL_D_U32)],
1827 &filter,
1828 ScanStreamOptions::default(),
1829 |b| {
1830 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1831 if let Some(part) = sum(a) {
1832 d_sum += part as u128;
1833 }
1834 },
1835 )
1836 .unwrap();
1837 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1838
1839 let mut e_min: Option<i16> = None;
1841 table
1842 .scan_stream(
1843 &[proj(&table, COL_E_I16)],
1844 &filter,
1845 ScanStreamOptions::default(),
1846 |b| {
1847 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1848 if let Some(part_min) = min(a) {
1849 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1850 }
1851 },
1852 )
1853 .unwrap();
1854 assert_eq!(e_min, Some(-6));
1855
1856 let mut f_max: Option<u8> = None;
1858 table
1859 .scan_stream(
1860 &[proj(&table, COL_F_U8)],
1861 &filter,
1862 ScanStreamOptions::default(),
1863 |b| {
1864 let a = b
1865 .column(0)
1866 .as_any()
1867 .downcast_ref::<arrow::array::UInt8Array>()
1868 .unwrap();
1869 if let Some(part_max) = max(a) {
1870 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1871 }
1872 },
1873 )
1874 .unwrap();
1875 assert_eq!(f_max, Some(10));
1876
1877 let mut got: Vec<f64> = Vec::new();
1879 table
1880 .scan_stream(
1881 &[proj(&table, COL_A_U64)],
1882 &filter,
1883 ScanStreamOptions::default(),
1884 |b| {
1885 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1886 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1887 let sqrt_arr = unary::<
1888 arrow::datatypes::Float64Type,
1889 _,
1890 arrow::datatypes::Float64Type,
1891 >(f64_arr, |v: f64| v.sqrt());
1892
1893 for i in 0..sqrt_arr.len() {
1894 if !sqrt_arr.is_null(i) {
1895 got.push(sqrt_arr.value(i));
1896 }
1897 }
1898 },
1899 )
1900 .unwrap();
1901 let expected = [15.0_f64, 20.0, 30.0, 40.0];
1902 assert_eq!(got, expected);
1903 }
1904
1905 #[test]
1906 fn test_scan_with_in_filter() {
1907 let table = setup_test_table();
1908 const COL_A_U64: FieldId = 10;
1909 const COL_C_I32: FieldId = 12;
1910
1911 let candidates = [10.into(), 30.into()];
1913 let filter = pred_expr(Filter {
1914 field_id: COL_C_I32,
1915 op: Operator::In(&candidates),
1916 });
1917
1918 let mut vals: Vec<Option<u64>> = Vec::new();
1919 table
1920 .scan_stream(
1921 &[proj(&table, COL_A_U64)],
1922 &filter,
1923 ScanStreamOptions::default(),
1924 |b| {
1925 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1926 vals.extend(
1927 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1928 );
1929 },
1930 )
1931 .unwrap();
1932 assert_eq!(vals, vec![Some(100), Some(300)]);
1933 }
1934
1935 #[test]
1936 fn test_scan_stream_single_column_batches() {
1937 let table = setup_test_table();
1938 const COL_A_U64: FieldId = 10;
1939 const COL_C_I32: FieldId = 12;
1940
1941 let filter = pred_expr(Filter {
1943 field_id: COL_C_I32,
1944 op: Operator::Equals(20.into()),
1945 });
1946
1947 let mut seen_cols = Vec::<u64>::new();
1948 table
1949 .scan_stream(
1950 &[proj(&table, COL_A_U64)],
1951 &filter,
1952 ScanStreamOptions::default(),
1953 |b| {
1954 assert_eq!(b.num_columns(), 1);
1955 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1956 for i in 0..a.len() {
1958 if !a.is_null(i) {
1959 seen_cols.push(a.value(i));
1960 }
1961 }
1962 },
1963 )
1964 .unwrap();
1965
1966 assert_eq!(seen_cols, vec![200, 200]);
1968 }
1969
1970 #[test]
1971 fn test_scan_with_multiple_projection_columns() {
1972 let table = setup_test_table();
1973 const COL_A_U64: FieldId = 10;
1974 const COL_C_I32: FieldId = 12;
1975
1976 let filter = pred_expr(Filter {
1977 field_id: COL_C_I32,
1978 op: Operator::Equals(20.into()),
1979 });
1980
1981 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1982
1983 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1984 table
1985 .scan_stream(
1986 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1987 &filter,
1988 ScanStreamOptions::default(),
1989 |b| {
1990 assert_eq!(b.num_columns(), 2);
1991 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1992 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1993
1994 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1995 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1996 for i in 0..b.num_rows() {
1997 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1998 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1999 combined.push((left, right));
2000 }
2001 },
2002 )
2003 .unwrap();
2004
2005 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2006 }
2007
2008 #[test]
2009 fn test_scan_stream_projection_validation() {
2010 let table = setup_test_table();
2011 const COL_A_U64: FieldId = 10;
2012 const COL_C_I32: FieldId = 12;
2013
2014 let filter = pred_expr(Filter {
2015 field_id: COL_C_I32,
2016 op: Operator::Equals(20.into()),
2017 });
2018
2019 let empty: [Projection; 0] = [];
2020 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2021 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2022
2023 let duplicate = [
2028 proj(&table, COL_A_U64),
2029 proj_alias(&table, COL_A_U64, "alias_a"),
2030 ];
2031 let mut collected = Vec::<u64>::new();
2032 table
2033 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2034 assert_eq!(b.num_columns(), 2);
2035 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2036 assert_eq!(b.schema().field(1).name(), "alias_a");
2037 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2038 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2039 for i in 0..b.num_rows() {
2040 if !a0.is_null(i) {
2041 collected.push(a0.value(i));
2042 }
2043 if !a1.is_null(i) {
2044 collected.push(a1.value(i));
2045 }
2046 }
2047 })
2048 .unwrap();
2049 assert_eq!(collected, vec![200, 200, 200, 200]);
2051 }
2052
2053 #[test]
2054 fn test_scan_stream_computed_projection() {
2055 let table = setup_test_table();
2056 const COL_A_U64: FieldId = 10;
2057
2058 let projections = [
2059 ScanProjection::column(proj(&table, COL_A_U64)),
2060 ScanProjection::computed(
2061 ScalarExpr::binary(
2062 ScalarExpr::column(COL_A_U64),
2063 BinaryOp::Multiply,
2064 ScalarExpr::literal(2),
2065 ),
2066 "a_times_two",
2067 ),
2068 ];
2069
2070 let filter = pred_expr(Filter {
2071 field_id: COL_A_U64,
2072 op: Operator::GreaterThanOrEquals(0.into()),
2073 });
2074
2075 let mut computed: Vec<(u64, f64)> = Vec::new();
2076 table
2077 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2078 assert_eq!(b.num_columns(), 2);
2079 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2080 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2081 for i in 0..b.num_rows() {
2082 if base.is_null(i) || comp.is_null(i) {
2083 continue;
2084 }
2085 computed.push((base.value(i), comp.value(i)));
2086 }
2087 })
2088 .unwrap();
2089
2090 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2091 assert_eq!(computed, expected);
2092 }
2093
2094 #[test]
2095 fn test_scan_stream_multi_column_filter_compare() {
2096 let table = setup_test_table();
2097 const COL_A_U64: FieldId = 10;
2098 const COL_C_I32: FieldId = 12;
2099
2100 let expr = Expr::Compare {
2101 left: ScalarExpr::binary(
2102 ScalarExpr::column(COL_A_U64),
2103 BinaryOp::Add,
2104 ScalarExpr::column(COL_C_I32),
2105 ),
2106 op: CompareOp::Gt,
2107 right: ScalarExpr::literal(220_i64),
2108 };
2109
2110 let mut vals: Vec<Option<u64>> = Vec::new();
2111 table
2112 .scan_stream(
2113 &[proj(&table, COL_A_U64)],
2114 &expr,
2115 ScanStreamOptions::default(),
2116 |b| {
2117 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2118 for i in 0..b.num_rows() {
2119 vals.push(if col.is_null(i) {
2120 None
2121 } else {
2122 Some(col.value(i))
2123 });
2124 }
2125 },
2126 )
2127 .unwrap();
2128
2129 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2130 }
2131}