1use std::sync::Arc;
2
3use crate::planner::TablePlanner;
4use crate::types::TableId;
5
6use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt32Array};
7use arrow::datatypes::{DataType, Field, Schema};
8use std::collections::HashMap;
9
10use llkv_column_map::store::{Projection, ROW_ID_COLUMN_NAME};
11use llkv_column_map::{ColumnStore, types::LogicalFieldId};
12use llkv_storage::pager::{MemPager, Pager};
13use simd_r_drive_entry_handle::EntryHandle;
14
15use crate::sys_catalog::{CATALOG_TID, ColMeta, SysCatalog, TableMeta};
16use crate::types::FieldId;
17use llkv_expr::{Expr, ScalarExpr};
18use llkv_result::{Error, Result as LlkvResult};
19
20pub struct Table<P = MemPager>
21where
22 P: Pager<Blob = EntryHandle> + Send + Sync,
23{
24 store: ColumnStore<P>,
25 table_id: TableId,
26}
27
28#[derive(Clone, Copy, Debug, Default)]
29pub struct ScanStreamOptions {
30 pub include_nulls: bool,
36}
37
38#[derive(Clone, Debug)]
39pub enum ScanProjection {
40 Column(Projection),
41 Computed {
42 expr: ScalarExpr<FieldId>,
43 alias: String,
44 },
45}
46
47impl ScanProjection {
48 pub fn column<P: Into<Projection>>(proj: P) -> Self {
49 Self::Column(proj.into())
50 }
51
52 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
53 Self::Computed {
54 expr,
55 alias: alias.into(),
56 }
57 }
58}
59
60impl From<Projection> for ScanProjection {
61 fn from(value: Projection) -> Self {
62 ScanProjection::Column(value)
63 }
64}
65
66impl From<&Projection> for ScanProjection {
67 fn from(value: &Projection) -> Self {
68 ScanProjection::Column(value.clone())
69 }
70}
71
72impl From<&ScanProjection> for ScanProjection {
73 fn from(value: &ScanProjection) -> Self {
74 value.clone()
75 }
76}
77
78impl<P> Table<P>
79where
80 P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82 pub fn new(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
83 if table_id == CATALOG_TID {
84 return Err(Error::reserved_table_id(table_id));
85 }
86
87 let store = ColumnStore::open(pager)?;
88 Ok(Self { store, table_id })
89 }
90
91 pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
92 let mut new_fields = Vec::with_capacity(batch.schema().fields().len());
93 for field in batch.schema().fields() {
94 if field.name() == ROW_ID_COLUMN_NAME {
95 new_fields.push(field.as_ref().clone());
96 continue;
97 }
98
99 let user_field_id: FieldId = field
100 .metadata()
101 .get("field_id")
102 .and_then(|s| s.parse().ok())
103 .ok_or_else(|| {
104 llkv_result::Error::Internal(format!(
105 "Field '{}' is missing a valid 'field_id' in its \
106 metadata.",
107 field.name()
108 ))
109 })?;
110
111 let lfid = LogicalFieldId::for_user(self.table_id, user_field_id);
112 let mut new_metadata = field.metadata().clone();
113 let lfid_val: u64 = lfid.into();
114 new_metadata.insert("field_id".to_string(), lfid_val.to_string());
115
116 let new_field =
117 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
118 .with_metadata(new_metadata);
119 new_fields.push(new_field);
120
121 let need_meta = match self
127 .catalog()
128 .get_cols_meta(self.table_id, &[user_field_id])
129 {
130 metas if metas.is_empty() => true,
131 metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
132 };
133
134 if need_meta {
135 let meta = ColMeta {
136 col_id: user_field_id,
137 name: Some(field.name().to_string()),
138 flags: 0,
139 default: None,
140 };
141 self.put_col_meta(&meta);
142 }
143 }
144
145 let new_schema = Arc::new(Schema::new(new_fields));
146 let namespaced_batch = RecordBatch::try_new(new_schema, batch.columns().to_vec())?;
147 self.store.append(&namespaced_batch)
148 }
149
150 pub fn scan_stream<'a, I, T, F>(
158 &self,
159 projections: I,
160 filter_expr: &Expr<'a, FieldId>,
161 options: ScanStreamOptions,
162 on_batch: F,
163 ) -> LlkvResult<()>
164 where
165 I: IntoIterator<Item = T>,
166 T: Into<ScanProjection>,
167 F: FnMut(RecordBatch),
168 {
169 let stream_projections: Vec<ScanProjection> =
170 projections.into_iter().map(|p| p.into()).collect();
171 self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
172 }
173
174 pub fn scan_stream_with_exprs<'a, F>(
176 &self,
177 projections: &[ScanProjection],
178 filter_expr: &Expr<'a, FieldId>,
179 options: ScanStreamOptions,
180 on_batch: F,
181 ) -> LlkvResult<()>
182 where
183 F: FnMut(RecordBatch),
184 {
185 TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
186 }
187
188 #[inline]
189 pub fn catalog(&self) -> SysCatalog<'_, P> {
190 SysCatalog::new(&self.store)
191 }
192
193 #[inline]
194 pub fn put_table_meta(&self, meta: &TableMeta) {
195 debug_assert_eq!(meta.table_id, self.table_id);
196 self.catalog().put_table_meta(meta);
197 }
198
199 #[inline]
200 pub fn get_table_meta(&self) -> Option<TableMeta> {
201 self.catalog().get_table_meta(self.table_id)
202 }
203
204 #[inline]
205 pub fn put_col_meta(&self, meta: &ColMeta) {
206 self.catalog().put_col_meta(self.table_id, meta);
207 }
208
209 #[inline]
210 pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
211 self.catalog().get_cols_meta(self.table_id, col_ids)
212 }
213
214 pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
221 let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
223 logical_fields.sort_by_key(|lfid| lfid.field_id());
224
225 let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
226 let metas = self.get_cols_meta(&field_ids);
227
228 let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
229 fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
231
232 for (idx, lfid) in logical_fields.into_iter().enumerate() {
233 let fid = lfid.field_id();
234 let dtype = self.store.data_type(lfid)?;
235 let name = metas
236 .get(idx)
237 .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
238 .unwrap_or_else(|| format!("col_{}", fid));
239
240 let mut metadata: HashMap<String, String> = HashMap::new();
241 metadata.insert("field_id".to_string(), fid.to_string());
242
243 fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
244 }
245
246 Ok(Arc::new(Schema::new(fields)))
247 }
248
249 pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
253 let schema = self.schema()?;
254 let fields = schema.fields();
255
256 let mut names: Vec<String> = Vec::with_capacity(fields.len());
257 let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
258 let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
259
260 for field in fields.iter() {
261 names.push(field.name().to_string());
262 let fid = field
263 .metadata()
264 .get("field_id")
265 .and_then(|s| s.parse::<u32>().ok())
266 .unwrap_or(0u32);
267 fids.push(fid);
268 dtypes.push(format!("{:?}", field.data_type()));
269 }
270
271 let name_array: ArrayRef = Arc::new(StringArray::from(names));
273 let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
274 let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
275
276 let rb_schema = Arc::new(Schema::new(vec![
277 Field::new("name", DataType::Utf8, false),
278 Field::new("field_id", DataType::UInt32, false),
279 Field::new("data_type", DataType::Utf8, false),
280 ]));
281
282 let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
283 Ok(batch)
284 }
285
286 pub fn store(&self) -> &ColumnStore<P> {
287 &self.store
288 }
289
290 #[inline]
291 pub fn table_id(&self) -> TableId {
292 self.table_id
293 }
294
295 pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
300 let lfid = LogicalFieldId::for_user(self.table_id, col_id);
301 self.store.total_rows_for_field(lfid)
302 }
303
304 pub fn total_rows(&self) -> llkv_result::Result<u64> {
309 use llkv_column_map::store::rowid_fid;
310 let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
311 match self.store.total_rows_for_field(rid_lfid) {
313 Ok(n) => Ok(n),
314 Err(_) => {
315 self.store.total_rows_for_table(self.table_id)
317 }
318 }
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::sys_catalog::CATALOG_TID;
326 use crate::types::RowId;
327 use arrow::array::Array;
328 use arrow::array::ArrayRef;
329 use arrow::array::{
330 BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
331 UInt32Array, UInt64Array,
332 };
333 use arrow::compute::{cast, max, min, sum, unary};
334 use arrow::datatypes::DataType;
335 use llkv_column_map::ColumnStore;
336 use llkv_column_map::store::GatherNullPolicy;
337 use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
338 use std::collections::HashMap;
339 use std::ops::Bound;
340
341 fn setup_test_table() -> Table {
342 let pager = Arc::new(MemPager::default());
343 setup_test_table_with_pager(&pager)
344 }
345
346 fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
347 let table = Table::new(1, Arc::clone(pager)).unwrap();
348 const COL_A_U64: FieldId = 10;
349 const COL_B_BIN: FieldId = 11;
350 const COL_C_I32: FieldId = 12;
351 const COL_D_F64: FieldId = 13;
352 const COL_E_F32: FieldId = 14;
353
354 let schema = Arc::new(Schema::new(vec![
355 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
356 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
357 "field_id".to_string(),
358 COL_A_U64.to_string(),
359 )])),
360 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
361 "field_id".to_string(),
362 COL_B_BIN.to_string(),
363 )])),
364 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
365 "field_id".to_string(),
366 COL_C_I32.to_string(),
367 )])),
368 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
369 "field_id".to_string(),
370 COL_D_F64.to_string(),
371 )])),
372 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
373 "field_id".to_string(),
374 COL_E_F32.to_string(),
375 )])),
376 ]));
377
378 let batch = RecordBatch::try_new(
379 schema.clone(),
380 vec![
381 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
382 Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
383 Arc::new(BinaryArray::from(vec![
384 b"foo" as &[u8],
385 b"bar",
386 b"baz",
387 b"qux",
388 ])),
389 Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
390 Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
391 Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
392 ],
393 )
394 .unwrap();
395
396 table.append(&batch).unwrap();
397 table
398 }
399
400 fn gather_single(
401 store: &ColumnStore<MemPager>,
402 field_id: LogicalFieldId,
403 row_ids: &[u64],
404 ) -> ArrayRef {
405 store
406 .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
407 .unwrap()
408 .column(0)
409 .clone()
410 }
411
412 fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
413 Expr::Pred(filter)
414 }
415
416 fn proj(table: &Table, field_id: FieldId) -> Projection {
417 Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
418 }
419
420 fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
421 Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
422 }
423
424 #[test]
425 fn table_new_rejects_reserved_table_id() {
426 let result = Table::new(CATALOG_TID, Arc::new(MemPager::default()));
427 assert!(matches!(
428 result,
429 Err(Error::ReservedTableId(id)) if id == CATALOG_TID
430 ));
431 }
432
433 #[test]
434 fn test_scan_with_u64_filter() {
435 let table = setup_test_table();
436 const COL_A_U64: FieldId = 10;
437 const COL_C_I32: FieldId = 12;
438
439 let expr = pred_expr(Filter {
440 field_id: COL_A_U64,
441 op: Operator::Equals(200.into()),
442 });
443
444 let mut vals: Vec<Option<i32>> = Vec::new();
445 table
446 .scan_stream(
447 &[proj(&table, COL_C_I32)],
448 &expr,
449 ScanStreamOptions::default(),
450 |b| {
451 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
452 vals.extend(
453 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
454 );
455 },
456 )
457 .unwrap();
458 assert_eq!(vals, vec![Some(20), Some(20)]);
459 }
460
461 #[test]
462 fn test_scan_with_string_filter() {
463 let pager = Arc::new(MemPager::default());
464 let table = Table::new(500, Arc::clone(&pager)).unwrap();
465
466 const COL_STR: FieldId = 42;
467 let schema = Arc::new(Schema::new(vec![
468 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
469 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
470 "field_id".to_string(),
471 COL_STR.to_string(),
472 )])),
473 ]));
474
475 let batch = RecordBatch::try_new(
476 schema,
477 vec![
478 Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
479 Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
480 ],
481 )
482 .unwrap();
483 table.append(&batch).unwrap();
484
485 let expr = pred_expr(Filter {
486 field_id: COL_STR,
487 op: Operator::starts_with("al", true),
488 });
489
490 let mut collected: Vec<Option<String>> = Vec::new();
491 table
492 .scan_stream(
493 &[proj(&table, COL_STR)],
494 &expr,
495 ScanStreamOptions::default(),
496 |b| {
497 let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
498 collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
499 },
500 )
501 .unwrap();
502
503 assert_eq!(
504 collected,
505 vec![Some("alice".to_string()), Some("albert".to_string())]
506 );
507 }
508
509 #[test]
510 fn test_table_reopen_with_shared_pager() {
511 const TABLE_ALPHA: TableId = 42;
512 const TABLE_BETA: TableId = 43;
513 const TABLE_GAMMA: TableId = 44;
514 const COL_ALPHA_U64: FieldId = 100;
515 const COL_ALPHA_I32: FieldId = 101;
516 const COL_ALPHA_U32: FieldId = 102;
517 const COL_ALPHA_I16: FieldId = 103;
518 const COL_BETA_U64: FieldId = 200;
519 const COL_BETA_U8: FieldId = 201;
520 const COL_GAMMA_I16: FieldId = 300;
521
522 let pager = Arc::new(MemPager::default());
523
524 let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
525 let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
526 let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
527 let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
528 let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
529
530 let beta_rows: Vec<u64> = vec![101, 102, 103];
531 let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
532 let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
533
534 let gamma_rows: Vec<u64> = vec![501, 502];
535 let gamma_vals_i16: Vec<i16> = vec![123, -321];
536
537 {
539 let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
540 let schema =
541 Arc::new(Schema::new(vec![
542 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
543 Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from(
544 [("field_id".to_string(), COL_ALPHA_U64.to_string())],
545 )),
546 Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([
547 ("field_id".to_string(), COL_ALPHA_I32.to_string()),
548 ])),
549 Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from(
550 [("field_id".to_string(), COL_ALPHA_U32.to_string())],
551 )),
552 Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([
553 ("field_id".to_string(), COL_ALPHA_I16.to_string()),
554 ])),
555 ]));
556 let batch = RecordBatch::try_new(
557 schema,
558 vec![
559 Arc::new(UInt64Array::from(alpha_rows.clone())),
560 Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
561 Arc::new(Int32Array::from(alpha_vals_i32.clone())),
562 Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
563 Arc::new(Int16Array::from(alpha_vals_i16.clone())),
564 ],
565 )
566 .unwrap();
567 table.append(&batch).unwrap();
568 }
569
570 {
571 let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
572 let schema = Arc::new(Schema::new(vec![
573 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
574 Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
575 "field_id".to_string(),
576 COL_BETA_U64.to_string(),
577 )])),
578 Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
579 "field_id".to_string(),
580 COL_BETA_U8.to_string(),
581 )])),
582 ]));
583 let batch = RecordBatch::try_new(
584 schema,
585 vec![
586 Arc::new(UInt64Array::from(beta_rows.clone())),
587 Arc::new(UInt64Array::from(beta_vals_u64.clone())),
588 Arc::new(UInt8Array::from(beta_vals_u8.clone())),
589 ],
590 )
591 .unwrap();
592 table.append(&batch).unwrap();
593 }
594
595 {
596 let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
597 let schema = Arc::new(Schema::new(vec![
598 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
599 Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
600 "field_id".to_string(),
601 COL_GAMMA_I16.to_string(),
602 )])),
603 ]));
604 let batch = RecordBatch::try_new(
605 schema,
606 vec![
607 Arc::new(UInt64Array::from(gamma_rows.clone())),
608 Arc::new(Int16Array::from(gamma_vals_i16.clone())),
609 ],
610 )
611 .unwrap();
612 table.append(&batch).unwrap();
613 }
614
615 {
617 let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
618 let store = table.store();
619
620 let expectations: &[(FieldId, DataType)] = &[
621 (COL_ALPHA_U64, DataType::UInt64),
622 (COL_ALPHA_I32, DataType::Int32),
623 (COL_ALPHA_U32, DataType::UInt32),
624 (COL_ALPHA_I16, DataType::Int16),
625 ];
626
627 for &(col, ref ty) in expectations {
628 let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
629 assert_eq!(store.data_type(lfid).unwrap(), *ty);
630 let arr = gather_single(store, lfid, &alpha_rows);
631 match ty {
632 DataType::UInt64 => {
633 let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
634 assert_eq!(arr.values(), alpha_vals_u64.as_slice());
635 }
636 DataType::Int32 => {
637 let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
638 assert_eq!(arr.values(), alpha_vals_i32.as_slice());
639 }
640 DataType::UInt32 => {
641 let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
642 assert_eq!(arr.values(), alpha_vals_u32.as_slice());
643 }
644 DataType::Int16 => {
645 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
646 assert_eq!(arr.values(), alpha_vals_i16.as_slice());
647 }
648 other => panic!("unexpected dtype {other:?}"),
649 }
650 }
651 }
652
653 {
654 let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
655 let store = table.store();
656
657 let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
658 assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
659 let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
660 let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
661 assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
662
663 let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
664 assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
665 let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
666 let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
667 assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
668 }
669
670 {
671 let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
672 let store = table.store();
673 let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
674 assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
675 let arr = gather_single(store, lfid, &gamma_rows);
676 let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
677 assert_eq!(arr.values(), gamma_vals_i16.as_slice());
678 }
679 }
680
681 #[test]
682 fn test_scan_with_i32_filter() {
683 let table = setup_test_table();
684 const COL_A_U64: FieldId = 10;
685 const COL_C_I32: FieldId = 12;
686
687 let filter = pred_expr(Filter {
688 field_id: COL_C_I32,
689 op: Operator::Equals(20.into()),
690 });
691
692 let mut vals: Vec<Option<u64>> = Vec::new();
693 table
694 .scan_stream(
695 &[proj(&table, COL_A_U64)],
696 &filter,
697 ScanStreamOptions::default(),
698 |b| {
699 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
700 vals.extend(
701 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
702 );
703 },
704 )
705 .unwrap();
706 assert_eq!(vals, vec![Some(200), Some(200)]);
707 }
708
709 #[test]
710 fn test_scan_with_greater_than_filter() {
711 let table = setup_test_table();
712 const COL_A_U64: FieldId = 10;
713 const COL_C_I32: FieldId = 12;
714
715 let filter = pred_expr(Filter {
716 field_id: COL_C_I32,
717 op: Operator::GreaterThan(15.into()),
718 });
719
720 let mut vals: Vec<Option<u64>> = Vec::new();
721 table
722 .scan_stream(
723 &[proj(&table, COL_A_U64)],
724 &filter,
725 ScanStreamOptions::default(),
726 |b| {
727 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
728 vals.extend(
729 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
730 );
731 },
732 )
733 .unwrap();
734 assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
735 }
736
737 #[test]
738 fn test_scan_with_range_filter() {
739 let table = setup_test_table();
740 const COL_A_U64: FieldId = 10;
741 const COL_C_I32: FieldId = 12;
742
743 let filter = pred_expr(Filter {
744 field_id: COL_A_U64,
745 op: Operator::Range {
746 lower: Bound::Included(150.into()),
747 upper: Bound::Excluded(300.into()),
748 },
749 });
750
751 let mut vals: Vec<Option<i32>> = Vec::new();
752 table
753 .scan_stream(
754 &[proj(&table, COL_C_I32)],
755 &filter,
756 ScanStreamOptions::default(),
757 |b| {
758 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
759 vals.extend(
760 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
761 );
762 },
763 )
764 .unwrap();
765 assert_eq!(vals, vec![Some(20), Some(20)]);
766 }
767
768 #[test]
769 fn test_filtered_scan_sum_kernel() {
770 let table = setup_test_table();
774 const COL_A_U64: FieldId = 10;
775
776 let filter = pred_expr(Filter {
777 field_id: COL_A_U64,
778 op: Operator::Range {
779 lower: Bound::Included(150.into()),
780 upper: Bound::Excluded(300.into()),
781 },
782 });
783
784 let mut total: u128 = 0;
785 table
786 .scan_stream(
787 &[proj(&table, COL_A_U64)],
788 &filter,
789 ScanStreamOptions::default(),
790 |b| {
791 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
792 if let Some(part) = sum(a) {
793 total += part as u128;
794 }
795 },
796 )
797 .unwrap();
798
799 assert_eq!(total, 400);
800 }
801
802 #[test]
803 fn test_filtered_scan_sum_i32_kernel() {
804 let table = setup_test_table();
809 const COL_A_U64: FieldId = 10;
810 const COL_C_I32: FieldId = 12;
811
812 let candidates = [100.into(), 300.into()];
813 let filter = pred_expr(Filter {
814 field_id: COL_A_U64,
815 op: Operator::In(&candidates),
816 });
817
818 let mut total: i64 = 0;
819 table
820 .scan_stream(
821 &[proj(&table, COL_C_I32)],
822 &filter,
823 ScanStreamOptions::default(),
824 |b| {
825 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
826 if let Some(part) = sum(a) {
827 total += part as i64;
828 }
829 },
830 )
831 .unwrap();
832 assert_eq!(total, 40);
833 }
834
835 #[test]
836 fn test_filtered_scan_min_max_kernel() {
837 let table = setup_test_table();
842 const COL_A_U64: FieldId = 10;
843 const COL_C_I32: FieldId = 12;
844
845 let candidates = [100.into(), 300.into()];
846 let filter = pred_expr(Filter {
847 field_id: COL_A_U64,
848 op: Operator::In(&candidates),
849 });
850
851 let mut mn: Option<i32> = None;
852 let mut mx: Option<i32> = None;
853 table
854 .scan_stream(
855 &[proj(&table, COL_C_I32)],
856 &filter,
857 ScanStreamOptions::default(),
858 |b| {
859 let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
860
861 if let Some(part_min) = min(a) {
862 mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
863 }
864 if let Some(part_max) = max(a) {
865 mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
866 }
867 },
868 )
869 .unwrap();
870 assert_eq!(mn, Some(10));
871 assert_eq!(mx, Some(30));
872 }
873
874 #[test]
875 fn test_filtered_scan_float64_column() {
876 let table = setup_test_table();
877 const COL_D_F64: FieldId = 13;
878
879 let filter = pred_expr(Filter {
880 field_id: COL_D_F64,
881 op: Operator::GreaterThan(2.0_f64.into()),
882 });
883
884 let mut got = Vec::new();
885 table
886 .scan_stream(
887 &[proj(&table, COL_D_F64)],
888 &filter,
889 ScanStreamOptions::default(),
890 |b| {
891 let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
892 for i in 0..arr.len() {
893 if arr.is_valid(i) {
894 got.push(arr.value(i));
895 }
896 }
897 },
898 )
899 .unwrap();
900
901 assert_eq!(got, vec![2.5, 3.5, 2.5]);
902 }
903
904 #[test]
905 fn test_filtered_scan_float32_in_operator() {
906 let table = setup_test_table();
907 const COL_E_F32: FieldId = 14;
908
909 let candidates = [2.0_f32.into(), 3.0_f32.into()];
910 let filter = pred_expr(Filter {
911 field_id: COL_E_F32,
912 op: Operator::In(&candidates),
913 });
914
915 let mut vals: Vec<Option<f32>> = Vec::new();
916 table
917 .scan_stream(
918 &[proj(&table, COL_E_F32)],
919 &filter,
920 ScanStreamOptions::default(),
921 |b| {
922 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
923 vals.extend((0..arr.len()).map(|i| {
924 if arr.is_null(i) {
925 None
926 } else {
927 Some(arr.value(i))
928 }
929 }));
930 },
931 )
932 .unwrap();
933
934 let collected: Vec<f32> = vals.into_iter().flatten().collect();
935 assert_eq!(collected, vec![2.0, 3.0, 2.0]);
936 }
937
938 #[test]
939 fn test_scan_stream_and_expression() {
940 let table = setup_test_table();
941 const COL_A_U64: FieldId = 10;
942 const COL_C_I32: FieldId = 12;
943 const COL_E_F32: FieldId = 14;
944
945 let expr = Expr::all_of(vec![
946 Filter {
947 field_id: COL_C_I32,
948 op: Operator::GreaterThan(15.into()),
949 },
950 Filter {
951 field_id: COL_A_U64,
952 op: Operator::LessThan(250.into()),
953 },
954 ]);
955
956 let mut vals: Vec<Option<f32>> = Vec::new();
957 table
958 .scan_stream(
959 &[proj(&table, COL_E_F32)],
960 &expr,
961 ScanStreamOptions::default(),
962 |b| {
963 let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
964 vals.extend((0..arr.len()).map(|i| {
965 if arr.is_null(i) {
966 None
967 } else {
968 Some(arr.value(i))
969 }
970 }));
971 },
972 )
973 .unwrap();
974
975 assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
976 }
977
978 #[test]
979 fn test_scan_stream_or_expression() {
980 let table = setup_test_table();
981 const COL_A_U64: FieldId = 10;
982 const COL_C_I32: FieldId = 12;
983
984 let expr = Expr::any_of(vec![
985 Filter {
986 field_id: COL_C_I32,
987 op: Operator::Equals(10.into()),
988 },
989 Filter {
990 field_id: COL_C_I32,
991 op: Operator::Equals(30.into()),
992 },
993 ]);
994
995 let mut vals: Vec<Option<u64>> = Vec::new();
996 table
997 .scan_stream(
998 &[proj(&table, COL_A_U64)],
999 &expr,
1000 ScanStreamOptions::default(),
1001 |b| {
1002 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1003 vals.extend((0..arr.len()).map(|i| {
1004 if arr.is_null(i) {
1005 None
1006 } else {
1007 Some(arr.value(i))
1008 }
1009 }));
1010 },
1011 )
1012 .unwrap();
1013
1014 assert_eq!(vals, vec![Some(100), Some(300)]);
1015 }
1016
1017 #[test]
1018 fn test_scan_stream_not_predicate() {
1019 let table = setup_test_table();
1020 const COL_A_U64: FieldId = 10;
1021 const COL_C_I32: FieldId = 12;
1022
1023 let expr = Expr::not(pred_expr(Filter {
1024 field_id: COL_C_I32,
1025 op: Operator::Equals(20.into()),
1026 }));
1027
1028 let mut vals: Vec<Option<u64>> = Vec::new();
1029 table
1030 .scan_stream(
1031 &[proj(&table, COL_A_U64)],
1032 &expr,
1033 ScanStreamOptions::default(),
1034 |b| {
1035 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1036 vals.extend((0..arr.len()).map(|i| {
1037 if arr.is_null(i) {
1038 None
1039 } else {
1040 Some(arr.value(i))
1041 }
1042 }));
1043 },
1044 )
1045 .unwrap();
1046
1047 assert_eq!(vals, vec![Some(100), Some(300)]);
1048 }
1049
1050 #[test]
1051 fn test_scan_stream_not_and_expression() {
1052 let table = setup_test_table();
1053 const COL_A_U64: FieldId = 10;
1054 const COL_C_I32: FieldId = 12;
1055
1056 let expr = Expr::not(Expr::all_of(vec![
1057 Filter {
1058 field_id: COL_A_U64,
1059 op: Operator::GreaterThan(150.into()),
1060 },
1061 Filter {
1062 field_id: COL_C_I32,
1063 op: Operator::LessThan(40.into()),
1064 },
1065 ]));
1066
1067 let mut vals: Vec<Option<u64>> = Vec::new();
1068 table
1069 .scan_stream(
1070 &[proj(&table, COL_A_U64)],
1071 &expr,
1072 ScanStreamOptions::default(),
1073 |b| {
1074 let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1075 vals.extend((0..arr.len()).map(|i| {
1076 if arr.is_null(i) {
1077 None
1078 } else {
1079 Some(arr.value(i))
1080 }
1081 }));
1082 },
1083 )
1084 .unwrap();
1085
1086 assert_eq!(vals, vec![Some(100)]);
1087 }
1088
1089 #[test]
1090 fn test_scan_stream_include_nulls_toggle() {
1091 let pager = Arc::new(MemPager::default());
1092 let table = setup_test_table_with_pager(&pager);
1093 const COL_A_U64: FieldId = 10;
1094 const COL_C_I32: FieldId = 12;
1095 const COL_B_BIN: FieldId = 11;
1096 const COL_D_F64: FieldId = 13;
1097 const COL_E_F32: FieldId = 14;
1098
1099 let schema = Arc::new(Schema::new(vec![
1100 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1101 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1102 "field_id".to_string(),
1103 COL_A_U64.to_string(),
1104 )])),
1105 Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1106 "field_id".to_string(),
1107 COL_B_BIN.to_string(),
1108 )])),
1109 Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1110 "field_id".to_string(),
1111 COL_C_I32.to_string(),
1112 )])),
1113 Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1114 "field_id".to_string(),
1115 COL_D_F64.to_string(),
1116 )])),
1117 Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1118 "field_id".to_string(),
1119 COL_E_F32.to_string(),
1120 )])),
1121 ]));
1122
1123 let batch = RecordBatch::try_new(
1124 schema,
1125 vec![
1126 Arc::new(UInt64Array::from(vec![5, 6])),
1127 Arc::new(UInt64Array::from(vec![500, 600])),
1128 Arc::new(BinaryArray::from(vec![
1129 Some(&b"new"[..]),
1130 Some(&b"alt"[..]),
1131 ])),
1132 Arc::new(Int32Array::from(vec![Some(40), None])),
1133 Arc::new(Float64Array::from(vec![5.5, 6.5])),
1134 Arc::new(Float32Array::from(vec![5.0, 6.0])),
1135 ],
1136 )
1137 .unwrap();
1138 table.append(&batch).unwrap();
1139
1140 let filter = pred_expr(Filter {
1141 field_id: COL_A_U64,
1142 op: Operator::GreaterThan(450.into()),
1143 });
1144
1145 let mut default_vals: Vec<Option<i32>> = Vec::new();
1146 table
1147 .scan_stream(
1148 &[proj(&table, COL_C_I32)],
1149 &filter,
1150 ScanStreamOptions::default(),
1151 |b| {
1152 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1153 default_vals.extend((0..arr.len()).map(|i| {
1154 if arr.is_null(i) {
1155 None
1156 } else {
1157 Some(arr.value(i))
1158 }
1159 }));
1160 },
1161 )
1162 .unwrap();
1163 assert_eq!(default_vals, vec![Some(40)]);
1164
1165 let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1166 table
1167 .scan_stream(
1168 &[proj(&table, COL_C_I32)],
1169 &filter,
1170 ScanStreamOptions {
1171 include_nulls: true,
1172 },
1173 |b| {
1174 let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1175
1176 let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1177 table
1178 .scan_stream(
1179 &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1180 &filter,
1181 ScanStreamOptions::default(),
1182 |b| {
1183 assert_eq!(b.num_columns(), 2);
1184 let c_arr =
1185 b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1186 let d_arr =
1187 b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1188 for i in 0..b.num_rows() {
1189 let c_val = if c_arr.is_null(i) {
1190 None
1191 } else {
1192 Some(c_arr.value(i))
1193 };
1194 let d_val = if d_arr.is_null(i) {
1195 None
1196 } else {
1197 Some(d_arr.value(i))
1198 };
1199 paired_vals.push((c_val, d_val));
1200 }
1201 },
1202 )
1203 .unwrap();
1204 assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1205 include_null_vals.extend((0..arr.len()).map(|i| {
1206 if arr.is_null(i) {
1207 None
1208 } else {
1209 Some(arr.value(i))
1210 }
1211 }));
1212 },
1213 )
1214 .unwrap();
1215 assert_eq!(include_null_vals, vec![Some(40), None]);
1216 }
1217
1218 #[test]
1219 fn test_filtered_scan_int_sqrt_float64() {
1220 let table = setup_test_table();
1226 const COL_A_U64: FieldId = 10;
1227 const COL_C_I32: FieldId = 12;
1228
1229 let filter = pred_expr(Filter {
1230 field_id: COL_C_I32,
1231 op: Operator::GreaterThan(15.into()),
1232 });
1233
1234 let mut got: Vec<f64> = Vec::new();
1235 table
1236 .scan_stream(
1237 &[proj(&table, COL_A_U64)],
1238 &filter,
1239 ScanStreamOptions::default(),
1240 |b| {
1241 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1242 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1243
1244 let sqrt_arr = unary::<
1246 arrow::datatypes::Float64Type,
1247 _,
1248 arrow::datatypes::Float64Type,
1249 >(f64_arr, |v: f64| v.sqrt());
1250
1251 for i in 0..sqrt_arr.len() {
1252 if !sqrt_arr.is_null(i) {
1253 got.push(sqrt_arr.value(i));
1254 }
1255 }
1256 },
1257 )
1258 .unwrap();
1259
1260 let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1261 assert_eq!(got, expected);
1262 }
1263
1264 #[test]
1265 fn test_multi_field_kernels_with_filters() {
1266 use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1270
1271 let table = Table::new(2, Arc::new(MemPager::default())).unwrap();
1272
1273 const COL_A_U64: FieldId = 20;
1274 const COL_D_U32: FieldId = 21;
1275 const COL_E_I16: FieldId = 22;
1276 const COL_F_U8: FieldId = 23;
1277 const COL_C_I32: FieldId = 24;
1278
1279 let schema = Arc::new(Schema::new(vec![
1280 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1281 Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1282 "field_id".to_string(),
1283 COL_A_U64.to_string(),
1284 )])),
1285 Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1286 "field_id".to_string(),
1287 COL_D_U32.to_string(),
1288 )])),
1289 Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1290 "field_id".to_string(),
1291 COL_E_I16.to_string(),
1292 )])),
1293 Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1294 "field_id".to_string(),
1295 COL_F_U8.to_string(),
1296 )])),
1297 Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1298 "field_id".to_string(),
1299 COL_C_I32.to_string(),
1300 )])),
1301 ]));
1302
1303 let batch = RecordBatch::try_new(
1305 schema.clone(),
1306 vec![
1307 Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1308 Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1309 Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1310 Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1311 Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1312 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1313 ],
1314 )
1315 .unwrap();
1316
1317 table.append(&batch).unwrap();
1318
1319 let filter = pred_expr(Filter {
1321 field_id: COL_C_I32,
1322 op: Operator::GreaterThanOrEquals(20.into()),
1323 });
1324
1325 let mut d_sum: u128 = 0;
1327 table
1328 .scan_stream(
1329 &[proj(&table, COL_D_U32)],
1330 &filter,
1331 ScanStreamOptions::default(),
1332 |b| {
1333 let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1334 if let Some(part) = sum(a) {
1335 d_sum += part as u128;
1336 }
1337 },
1338 )
1339 .unwrap();
1340 assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1341
1342 let mut e_min: Option<i16> = None;
1344 table
1345 .scan_stream(
1346 &[proj(&table, COL_E_I16)],
1347 &filter,
1348 ScanStreamOptions::default(),
1349 |b| {
1350 let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1351 if let Some(part_min) = min(a) {
1352 e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1353 }
1354 },
1355 )
1356 .unwrap();
1357 assert_eq!(e_min, Some(-6));
1358
1359 let mut f_max: Option<u8> = None;
1361 table
1362 .scan_stream(
1363 &[proj(&table, COL_F_U8)],
1364 &filter,
1365 ScanStreamOptions::default(),
1366 |b| {
1367 let a = b
1368 .column(0)
1369 .as_any()
1370 .downcast_ref::<arrow::array::UInt8Array>()
1371 .unwrap();
1372 if let Some(part_max) = max(a) {
1373 f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1374 }
1375 },
1376 )
1377 .unwrap();
1378 assert_eq!(f_max, Some(10));
1379
1380 let mut got: Vec<f64> = Vec::new();
1382 table
1383 .scan_stream(
1384 &[proj(&table, COL_A_U64)],
1385 &filter,
1386 ScanStreamOptions::default(),
1387 |b| {
1388 let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1389 let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1390 let sqrt_arr = unary::<
1391 arrow::datatypes::Float64Type,
1392 _,
1393 arrow::datatypes::Float64Type,
1394 >(f64_arr, |v: f64| v.sqrt());
1395
1396 for i in 0..sqrt_arr.len() {
1397 if !sqrt_arr.is_null(i) {
1398 got.push(sqrt_arr.value(i));
1399 }
1400 }
1401 },
1402 )
1403 .unwrap();
1404 let expected = [15.0_f64, 20.0, 30.0, 40.0];
1405 assert_eq!(got, expected);
1406 }
1407
1408 #[test]
1409 fn test_scan_with_in_filter() {
1410 let table = setup_test_table();
1411 const COL_A_U64: FieldId = 10;
1412 const COL_C_I32: FieldId = 12;
1413
1414 let candidates = [10.into(), 30.into()];
1416 let filter = pred_expr(Filter {
1417 field_id: COL_C_I32,
1418 op: Operator::In(&candidates),
1419 });
1420
1421 let mut vals: Vec<Option<u64>> = Vec::new();
1422 table
1423 .scan_stream(
1424 &[proj(&table, COL_A_U64)],
1425 &filter,
1426 ScanStreamOptions::default(),
1427 |b| {
1428 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1429 vals.extend(
1430 (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1431 );
1432 },
1433 )
1434 .unwrap();
1435 assert_eq!(vals, vec![Some(100), Some(300)]);
1436 }
1437
1438 #[test]
1439 fn test_scan_stream_single_column_batches() {
1440 let table = setup_test_table();
1441 const COL_A_U64: FieldId = 10;
1442 const COL_C_I32: FieldId = 12;
1443
1444 let filter = pred_expr(Filter {
1446 field_id: COL_C_I32,
1447 op: Operator::Equals(20.into()),
1448 });
1449
1450 let mut seen_cols = Vec::<u64>::new();
1451 table
1452 .scan_stream(
1453 &[proj(&table, COL_A_U64)],
1454 &filter,
1455 ScanStreamOptions::default(),
1456 |b| {
1457 assert_eq!(b.num_columns(), 1);
1458 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1459 for i in 0..a.len() {
1461 if !a.is_null(i) {
1462 seen_cols.push(a.value(i));
1463 }
1464 }
1465 },
1466 )
1467 .unwrap();
1468
1469 assert_eq!(seen_cols, vec![200, 200]);
1471 }
1472
1473 #[test]
1474 fn test_scan_with_multiple_projection_columns() {
1475 let table = setup_test_table();
1476 const COL_A_U64: FieldId = 10;
1477 const COL_C_I32: FieldId = 12;
1478
1479 let filter = pred_expr(Filter {
1480 field_id: COL_C_I32,
1481 op: Operator::Equals(20.into()),
1482 });
1483
1484 let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1485
1486 let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1487 table
1488 .scan_stream(
1489 &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1490 &filter,
1491 ScanStreamOptions::default(),
1492 |b| {
1493 assert_eq!(b.num_columns(), 2);
1494 assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1495 assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1496
1497 let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1498 let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1499 for i in 0..b.num_rows() {
1500 let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1501 let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1502 combined.push((left, right));
1503 }
1504 },
1505 )
1506 .unwrap();
1507
1508 assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
1509 }
1510
1511 #[test]
1512 fn test_scan_stream_projection_validation() {
1513 let table = setup_test_table();
1514 const COL_A_U64: FieldId = 10;
1515 const COL_C_I32: FieldId = 12;
1516
1517 let filter = pred_expr(Filter {
1518 field_id: COL_C_I32,
1519 op: Operator::Equals(20.into()),
1520 });
1521
1522 let empty: [Projection; 0] = [];
1523 let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
1524 assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
1525
1526 let duplicate = [
1531 proj(&table, COL_A_U64),
1532 proj_alias(&table, COL_A_U64, "alias_a"),
1533 ];
1534 let mut collected = Vec::<u64>::new();
1535 table
1536 .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
1537 assert_eq!(b.num_columns(), 2);
1538 assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
1539 assert_eq!(b.schema().field(1).name(), "alias_a");
1540 let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1541 let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1542 for i in 0..b.num_rows() {
1543 if !a0.is_null(i) {
1544 collected.push(a0.value(i));
1545 }
1546 if !a1.is_null(i) {
1547 collected.push(a1.value(i));
1548 }
1549 }
1550 })
1551 .unwrap();
1552 assert_eq!(collected, vec![200, 200, 200, 200]);
1554 }
1555
1556 #[test]
1557 fn test_scan_stream_computed_projection() {
1558 let table = setup_test_table();
1559 const COL_A_U64: FieldId = 10;
1560
1561 let projections = [
1562 ScanProjection::column(proj(&table, COL_A_U64)),
1563 ScanProjection::computed(
1564 ScalarExpr::binary(
1565 ScalarExpr::column(COL_A_U64),
1566 BinaryOp::Multiply,
1567 ScalarExpr::literal(2),
1568 ),
1569 "a_times_two",
1570 ),
1571 ];
1572
1573 let filter = pred_expr(Filter {
1574 field_id: COL_A_U64,
1575 op: Operator::GreaterThanOrEquals(0.into()),
1576 });
1577
1578 let mut computed: Vec<(u64, f64)> = Vec::new();
1579 table
1580 .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
1581 assert_eq!(b.num_columns(), 2);
1582 let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1583 let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1584 for i in 0..b.num_rows() {
1585 if base.is_null(i) || comp.is_null(i) {
1586 continue;
1587 }
1588 computed.push((base.value(i), comp.value(i)));
1589 }
1590 })
1591 .unwrap();
1592
1593 let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
1594 assert_eq!(computed, expected);
1595 }
1596
1597 #[test]
1598 fn test_scan_stream_multi_column_filter_compare() {
1599 let table = setup_test_table();
1600 const COL_A_U64: FieldId = 10;
1601 const COL_C_I32: FieldId = 12;
1602
1603 let expr = Expr::Compare {
1604 left: ScalarExpr::binary(
1605 ScalarExpr::column(COL_A_U64),
1606 BinaryOp::Add,
1607 ScalarExpr::column(COL_C_I32),
1608 ),
1609 op: CompareOp::Gt,
1610 right: ScalarExpr::literal(220_i64),
1611 };
1612
1613 let mut vals: Vec<Option<u64>> = Vec::new();
1614 table
1615 .scan_stream(
1616 &[proj(&table, COL_A_U64)],
1617 &expr,
1618 ScanStreamOptions::default(),
1619 |b| {
1620 let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1621 for i in 0..b.num_rows() {
1622 vals.push(if col.is_null(i) {
1623 None
1624 } else {
1625 Some(col.value(i))
1626 });
1627 }
1628 },
1629 )
1630 .unwrap();
1631
1632 assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
1633 }
1634}