llkv_table/
table.rs

1use std::sync::Arc;
2
3use crate::planner::TablePlanner;
4use crate::types::TableId;
5
6use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt32Array};
7use arrow::datatypes::{DataType, Field, Schema};
8use std::collections::HashMap;
9
10use llkv_column_map::store::{Projection, ROW_ID_COLUMN_NAME};
11use llkv_column_map::{ColumnStore, types::LogicalFieldId};
12use llkv_storage::pager::{MemPager, Pager};
13use simd_r_drive_entry_handle::EntryHandle;
14
15use crate::sys_catalog::{CATALOG_TID, ColMeta, SysCatalog, TableMeta};
16use crate::types::FieldId;
17use llkv_expr::{Expr, ScalarExpr};
18use llkv_result::{Error, Result as LlkvResult};
19
20pub struct Table<P = MemPager>
21where
22    P: Pager<Blob = EntryHandle> + Send + Sync,
23{
24    store: ColumnStore<P>,
25    table_id: TableId,
26}
27
28#[derive(Clone, Copy, Debug, Default)]
29pub struct ScanStreamOptions {
30    /// Preserve null rows emitted by the projected columns when `true`.
31    /// When `false`, the scan gatherer drops rows where all projected
32    /// columns are null or missing before yielding batches. This keeps
33    /// the table scan column-oriented while delegating row-level
34    /// filtering to the column-map layer.
35    pub include_nulls: bool,
36}
37
38#[derive(Clone, Debug)]
39pub enum ScanProjection {
40    Column(Projection),
41    Computed {
42        expr: ScalarExpr<FieldId>,
43        alias: String,
44    },
45}
46
47impl ScanProjection {
48    pub fn column<P: Into<Projection>>(proj: P) -> Self {
49        Self::Column(proj.into())
50    }
51
52    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
53        Self::Computed {
54            expr,
55            alias: alias.into(),
56        }
57    }
58}
59
60impl From<Projection> for ScanProjection {
61    fn from(value: Projection) -> Self {
62        ScanProjection::Column(value)
63    }
64}
65
66impl From<&Projection> for ScanProjection {
67    fn from(value: &Projection) -> Self {
68        ScanProjection::Column(value.clone())
69    }
70}
71
72impl From<&ScanProjection> for ScanProjection {
73    fn from(value: &ScanProjection) -> Self {
74        value.clone()
75    }
76}
77
78impl<P> Table<P>
79where
80    P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82    pub fn new(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
83        if table_id == CATALOG_TID {
84            return Err(Error::reserved_table_id(table_id));
85        }
86
87        let store = ColumnStore::open(pager)?;
88        Ok(Self { store, table_id })
89    }
90
91    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
92        let mut new_fields = Vec::with_capacity(batch.schema().fields().len());
93        for field in batch.schema().fields() {
94            if field.name() == ROW_ID_COLUMN_NAME {
95                new_fields.push(field.as_ref().clone());
96                continue;
97            }
98
99            let user_field_id: FieldId = field
100                .metadata()
101                .get("field_id")
102                .and_then(|s| s.parse().ok())
103                .ok_or_else(|| {
104                    llkv_result::Error::Internal(format!(
105                        "Field '{}' is missing a valid 'field_id' in its \
106                         metadata.",
107                        field.name()
108                    ))
109                })?;
110
111            let lfid = LogicalFieldId::for_user(self.table_id, user_field_id);
112            let mut new_metadata = field.metadata().clone();
113            let lfid_val: u64 = lfid.into();
114            new_metadata.insert("field_id".to_string(), lfid_val.to_string());
115
116            let new_field =
117                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
118                    .with_metadata(new_metadata);
119            new_fields.push(new_field);
120
121            // Ensure the catalog remembers the human-friendly column name for
122            // this field so callers of `Table::schema()` (and other metadata
123            // consumers) can recover it later. The CSV ingest path (and other
124            // writers) may only supply the `field_id` metadata on the batch,
125            // so defensively persist the column name when absent.
126            let need_meta = match self
127                .catalog()
128                .get_cols_meta(self.table_id, &[user_field_id])
129            {
130                metas if metas.is_empty() => true,
131                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
132            };
133
134            if need_meta {
135                let meta = ColMeta {
136                    col_id: user_field_id,
137                    name: Some(field.name().to_string()),
138                    flags: 0,
139                    default: None,
140                };
141                self.put_col_meta(&meta);
142            }
143        }
144
145        let new_schema = Arc::new(Schema::new(new_fields));
146        let namespaced_batch = RecordBatch::try_new(new_schema, batch.columns().to_vec())?;
147        self.store.append(&namespaced_batch)
148    }
149
150    /// Stream one or more projected columns as a sequence of RecordBatches.
151    ///
152    /// - Avoids `concat` and large materializations.
153    /// - Uses the same filter machinery as the old `scan` to produce
154    ///   `row_ids`.
155    /// - Splits `row_ids` into fixed-size windows and gathers rows per
156    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
157    pub fn scan_stream<'a, I, T, F>(
158        &self,
159        projections: I,
160        filter_expr: &Expr<'a, FieldId>,
161        options: ScanStreamOptions,
162        on_batch: F,
163    ) -> LlkvResult<()>
164    where
165        I: IntoIterator<Item = T>,
166        T: Into<ScanProjection>,
167        F: FnMut(RecordBatch),
168    {
169        let stream_projections: Vec<ScanProjection> =
170            projections.into_iter().map(|p| p.into()).collect();
171        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
172    }
173
174    // TODO: Document difference between this and `scan_stream`
175    pub fn scan_stream_with_exprs<'a, F>(
176        &self,
177        projections: &[ScanProjection],
178        filter_expr: &Expr<'a, FieldId>,
179        options: ScanStreamOptions,
180        on_batch: F,
181    ) -> LlkvResult<()>
182    where
183        F: FnMut(RecordBatch),
184    {
185        TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
186    }
187
188    #[inline]
189    pub fn catalog(&self) -> SysCatalog<'_, P> {
190        SysCatalog::new(&self.store)
191    }
192
193    #[inline]
194    pub fn put_table_meta(&self, meta: &TableMeta) {
195        debug_assert_eq!(meta.table_id, self.table_id);
196        self.catalog().put_table_meta(meta);
197    }
198
199    #[inline]
200    pub fn get_table_meta(&self) -> Option<TableMeta> {
201        self.catalog().get_table_meta(self.table_id)
202    }
203
204    #[inline]
205    pub fn put_col_meta(&self, meta: &ColMeta) {
206        self.catalog().put_col_meta(self.table_id, meta);
207    }
208
209    #[inline]
210    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
211        self.catalog().get_cols_meta(self.table_id, col_ids)
212    }
213
214    /// Build and return an Arrow `Schema` that describes this table.
215    ///
216    /// The returned schema includes the `row_id` field first, followed by
217    /// user fields. Each user field has its `field_id` stored in the field
218    /// metadata (under the "field_id" key) and the name is taken from the
219    /// catalog when available or falls back to `col_<id>`.
220    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
221        // Collect logical fields for this table and sort by field id.
222        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
223        logical_fields.sort_by_key(|lfid| lfid.field_id());
224
225        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
226        let metas = self.get_cols_meta(&field_ids);
227
228        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
229        // Add row_id first
230        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
231
232        for (idx, lfid) in logical_fields.into_iter().enumerate() {
233            let fid = lfid.field_id();
234            let dtype = self.store.data_type(lfid)?;
235            let name = metas
236                .get(idx)
237                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
238                .unwrap_or_else(|| format!("col_{}", fid));
239
240            let mut metadata: HashMap<String, String> = HashMap::new();
241            metadata.insert("field_id".to_string(), fid.to_string());
242
243            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
244        }
245
246        Ok(Arc::new(Schema::new(fields)))
247    }
248
249    /// Return the table schema formatted as an Arrow RecordBatch suitable
250    /// for pretty printing. The batch has three columns: `name` (Utf8),
251    /// `field_id` (UInt32) and `data_type` (Utf8).
252    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
253        let schema = self.schema()?;
254        let fields = schema.fields();
255
256        let mut names: Vec<String> = Vec::with_capacity(fields.len());
257        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
258        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
259
260        for field in fields.iter() {
261            names.push(field.name().to_string());
262            let fid = field
263                .metadata()
264                .get("field_id")
265                .and_then(|s| s.parse::<u32>().ok())
266                .unwrap_or(0u32);
267            fids.push(fid);
268            dtypes.push(format!("{:?}", field.data_type()));
269        }
270
271        // Build Arrow arrays
272        let name_array: ArrayRef = Arc::new(StringArray::from(names));
273        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
274        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
275
276        let rb_schema = Arc::new(Schema::new(vec![
277            Field::new("name", DataType::Utf8, false),
278            Field::new("field_id", DataType::UInt32, false),
279            Field::new("data_type", DataType::Utf8, false),
280        ]));
281
282        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
283        Ok(batch)
284    }
285
286    pub fn store(&self) -> &ColumnStore<P> {
287        &self.store
288    }
289
290    #[inline]
291    pub fn table_id(&self) -> TableId {
292        self.table_id
293    }
294
295    /// Return the total number of rows for a given user column id in this table.
296    ///
297    /// This delegates to the ColumnStore descriptor for the logical field that
298    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
299    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
300        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
301        self.store.total_rows_for_field(lfid)
302    }
303
304    /// Return the total number of rows for this table.
305    ///
306    /// Prefer reading the dedicated row-id shadow column if present; otherwise
307    /// fall back to inspecting any persisted user column descriptor.
308    pub fn total_rows(&self) -> llkv_result::Result<u64> {
309        use llkv_column_map::store::rowid_fid;
310        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
311        // Try the row-id shadow column first
312        match self.store.total_rows_for_field(rid_lfid) {
313            Ok(n) => Ok(n),
314            Err(_) => {
315                // Fall back to scanning the catalog for any user-data column
316                self.store.total_rows_for_table(self.table_id)
317            }
318        }
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::sys_catalog::CATALOG_TID;
326    use crate::types::RowId;
327    use arrow::array::Array;
328    use arrow::array::ArrayRef;
329    use arrow::array::{
330        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
331        UInt32Array, UInt64Array,
332    };
333    use arrow::compute::{cast, max, min, sum, unary};
334    use arrow::datatypes::DataType;
335    use llkv_column_map::ColumnStore;
336    use llkv_column_map::store::GatherNullPolicy;
337    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
338    use std::collections::HashMap;
339    use std::ops::Bound;
340
341    fn setup_test_table() -> Table {
342        let pager = Arc::new(MemPager::default());
343        setup_test_table_with_pager(&pager)
344    }
345
346    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
347        let table = Table::new(1, Arc::clone(pager)).unwrap();
348        const COL_A_U64: FieldId = 10;
349        const COL_B_BIN: FieldId = 11;
350        const COL_C_I32: FieldId = 12;
351        const COL_D_F64: FieldId = 13;
352        const COL_E_F32: FieldId = 14;
353
354        let schema = Arc::new(Schema::new(vec![
355            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
356            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
357                "field_id".to_string(),
358                COL_A_U64.to_string(),
359            )])),
360            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
361                "field_id".to_string(),
362                COL_B_BIN.to_string(),
363            )])),
364            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
365                "field_id".to_string(),
366                COL_C_I32.to_string(),
367            )])),
368            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
369                "field_id".to_string(),
370                COL_D_F64.to_string(),
371            )])),
372            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
373                "field_id".to_string(),
374                COL_E_F32.to_string(),
375            )])),
376        ]));
377
378        let batch = RecordBatch::try_new(
379            schema.clone(),
380            vec![
381                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
382                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
383                Arc::new(BinaryArray::from(vec![
384                    b"foo" as &[u8],
385                    b"bar",
386                    b"baz",
387                    b"qux",
388                ])),
389                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
390                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
391                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
392            ],
393        )
394        .unwrap();
395
396        table.append(&batch).unwrap();
397        table
398    }
399
400    fn gather_single(
401        store: &ColumnStore<MemPager>,
402        field_id: LogicalFieldId,
403        row_ids: &[u64],
404    ) -> ArrayRef {
405        store
406            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
407            .unwrap()
408            .column(0)
409            .clone()
410    }
411
412    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
413        Expr::Pred(filter)
414    }
415
416    fn proj(table: &Table, field_id: FieldId) -> Projection {
417        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
418    }
419
420    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
421        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
422    }
423
424    #[test]
425    fn table_new_rejects_reserved_table_id() {
426        let result = Table::new(CATALOG_TID, Arc::new(MemPager::default()));
427        assert!(matches!(
428            result,
429            Err(Error::ReservedTableId(id)) if id == CATALOG_TID
430        ));
431    }
432
433    #[test]
434    fn test_scan_with_u64_filter() {
435        let table = setup_test_table();
436        const COL_A_U64: FieldId = 10;
437        const COL_C_I32: FieldId = 12;
438
439        let expr = pred_expr(Filter {
440            field_id: COL_A_U64,
441            op: Operator::Equals(200.into()),
442        });
443
444        let mut vals: Vec<Option<i32>> = Vec::new();
445        table
446            .scan_stream(
447                &[proj(&table, COL_C_I32)],
448                &expr,
449                ScanStreamOptions::default(),
450                |b| {
451                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
452                    vals.extend(
453                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
454                    );
455                },
456            )
457            .unwrap();
458        assert_eq!(vals, vec![Some(20), Some(20)]);
459    }
460
461    #[test]
462    fn test_scan_with_string_filter() {
463        let pager = Arc::new(MemPager::default());
464        let table = Table::new(500, Arc::clone(&pager)).unwrap();
465
466        const COL_STR: FieldId = 42;
467        let schema = Arc::new(Schema::new(vec![
468            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
469            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
470                "field_id".to_string(),
471                COL_STR.to_string(),
472            )])),
473        ]));
474
475        let batch = RecordBatch::try_new(
476            schema,
477            vec![
478                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
479                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
480            ],
481        )
482        .unwrap();
483        table.append(&batch).unwrap();
484
485        let expr = pred_expr(Filter {
486            field_id: COL_STR,
487            op: Operator::starts_with("al", true),
488        });
489
490        let mut collected: Vec<Option<String>> = Vec::new();
491        table
492            .scan_stream(
493                &[proj(&table, COL_STR)],
494                &expr,
495                ScanStreamOptions::default(),
496                |b| {
497                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
498                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
499                },
500            )
501            .unwrap();
502
503        assert_eq!(
504            collected,
505            vec![Some("alice".to_string()), Some("albert".to_string())]
506        );
507    }
508
509    #[test]
510    fn test_table_reopen_with_shared_pager() {
511        const TABLE_ALPHA: TableId = 42;
512        const TABLE_BETA: TableId = 43;
513        const TABLE_GAMMA: TableId = 44;
514        const COL_ALPHA_U64: FieldId = 100;
515        const COL_ALPHA_I32: FieldId = 101;
516        const COL_ALPHA_U32: FieldId = 102;
517        const COL_ALPHA_I16: FieldId = 103;
518        const COL_BETA_U64: FieldId = 200;
519        const COL_BETA_U8: FieldId = 201;
520        const COL_GAMMA_I16: FieldId = 300;
521
522        let pager = Arc::new(MemPager::default());
523
524        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
525        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
526        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
527        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
528        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
529
530        let beta_rows: Vec<u64> = vec![101, 102, 103];
531        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
532        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
533
534        let gamma_rows: Vec<u64> = vec![501, 502];
535        let gamma_vals_i16: Vec<i16> = vec![123, -321];
536
537        // First session: create tables and write data.
538        {
539            let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
540            let schema =
541                Arc::new(Schema::new(vec![
542                    Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
543                    Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from(
544                        [("field_id".to_string(), COL_ALPHA_U64.to_string())],
545                    )),
546                    Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([
547                        ("field_id".to_string(), COL_ALPHA_I32.to_string()),
548                    ])),
549                    Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from(
550                        [("field_id".to_string(), COL_ALPHA_U32.to_string())],
551                    )),
552                    Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([
553                        ("field_id".to_string(), COL_ALPHA_I16.to_string()),
554                    ])),
555                ]));
556            let batch = RecordBatch::try_new(
557                schema,
558                vec![
559                    Arc::new(UInt64Array::from(alpha_rows.clone())),
560                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
561                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
562                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
563                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
564                ],
565            )
566            .unwrap();
567            table.append(&batch).unwrap();
568        }
569
570        {
571            let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
572            let schema = Arc::new(Schema::new(vec![
573                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
574                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
575                    "field_id".to_string(),
576                    COL_BETA_U64.to_string(),
577                )])),
578                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
579                    "field_id".to_string(),
580                    COL_BETA_U8.to_string(),
581                )])),
582            ]));
583            let batch = RecordBatch::try_new(
584                schema,
585                vec![
586                    Arc::new(UInt64Array::from(beta_rows.clone())),
587                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
588                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
589                ],
590            )
591            .unwrap();
592            table.append(&batch).unwrap();
593        }
594
595        {
596            let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
597            let schema = Arc::new(Schema::new(vec![
598                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
599                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
600                    "field_id".to_string(),
601                    COL_GAMMA_I16.to_string(),
602                )])),
603            ]));
604            let batch = RecordBatch::try_new(
605                schema,
606                vec![
607                    Arc::new(UInt64Array::from(gamma_rows.clone())),
608                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
609                ],
610            )
611            .unwrap();
612            table.append(&batch).unwrap();
613        }
614
615        // Second session: reopen each table and ensure schema and values are intact.
616        {
617            let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
618            let store = table.store();
619
620            let expectations: &[(FieldId, DataType)] = &[
621                (COL_ALPHA_U64, DataType::UInt64),
622                (COL_ALPHA_I32, DataType::Int32),
623                (COL_ALPHA_U32, DataType::UInt32),
624                (COL_ALPHA_I16, DataType::Int16),
625            ];
626
627            for &(col, ref ty) in expectations {
628                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
629                assert_eq!(store.data_type(lfid).unwrap(), *ty);
630                let arr = gather_single(store, lfid, &alpha_rows);
631                match ty {
632                    DataType::UInt64 => {
633                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
634                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
635                    }
636                    DataType::Int32 => {
637                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
638                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
639                    }
640                    DataType::UInt32 => {
641                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
642                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
643                    }
644                    DataType::Int16 => {
645                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
646                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
647                    }
648                    other => panic!("unexpected dtype {other:?}"),
649                }
650            }
651        }
652
653        {
654            let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
655            let store = table.store();
656
657            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
658            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
659            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
660            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
661            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
662
663            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
664            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
665            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
666            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
667            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
668        }
669
670        {
671            let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
672            let store = table.store();
673            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
674            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
675            let arr = gather_single(store, lfid, &gamma_rows);
676            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
677            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
678        }
679    }
680
681    #[test]
682    fn test_scan_with_i32_filter() {
683        let table = setup_test_table();
684        const COL_A_U64: FieldId = 10;
685        const COL_C_I32: FieldId = 12;
686
687        let filter = pred_expr(Filter {
688            field_id: COL_C_I32,
689            op: Operator::Equals(20.into()),
690        });
691
692        let mut vals: Vec<Option<u64>> = Vec::new();
693        table
694            .scan_stream(
695                &[proj(&table, COL_A_U64)],
696                &filter,
697                ScanStreamOptions::default(),
698                |b| {
699                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
700                    vals.extend(
701                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
702                    );
703                },
704            )
705            .unwrap();
706        assert_eq!(vals, vec![Some(200), Some(200)]);
707    }
708
709    #[test]
710    fn test_scan_with_greater_than_filter() {
711        let table = setup_test_table();
712        const COL_A_U64: FieldId = 10;
713        const COL_C_I32: FieldId = 12;
714
715        let filter = pred_expr(Filter {
716            field_id: COL_C_I32,
717            op: Operator::GreaterThan(15.into()),
718        });
719
720        let mut vals: Vec<Option<u64>> = Vec::new();
721        table
722            .scan_stream(
723                &[proj(&table, COL_A_U64)],
724                &filter,
725                ScanStreamOptions::default(),
726                |b| {
727                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
728                    vals.extend(
729                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
730                    );
731                },
732            )
733            .unwrap();
734        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
735    }
736
737    #[test]
738    fn test_scan_with_range_filter() {
739        let table = setup_test_table();
740        const COL_A_U64: FieldId = 10;
741        const COL_C_I32: FieldId = 12;
742
743        let filter = pred_expr(Filter {
744            field_id: COL_A_U64,
745            op: Operator::Range {
746                lower: Bound::Included(150.into()),
747                upper: Bound::Excluded(300.into()),
748            },
749        });
750
751        let mut vals: Vec<Option<i32>> = Vec::new();
752        table
753            .scan_stream(
754                &[proj(&table, COL_C_I32)],
755                &filter,
756                ScanStreamOptions::default(),
757                |b| {
758                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
759                    vals.extend(
760                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
761                    );
762                },
763            )
764            .unwrap();
765        assert_eq!(vals, vec![Some(20), Some(20)]);
766    }
767
768    #[test]
769    fn test_filtered_scan_sum_kernel() {
770        // Trade-off note:
771        // - We use Arrow's sum kernel per batch, then add the partial sums.
772        // - This preserves Arrow null semantics and avoids concat.
773        let table = setup_test_table();
774        const COL_A_U64: FieldId = 10;
775
776        let filter = pred_expr(Filter {
777            field_id: COL_A_U64,
778            op: Operator::Range {
779                lower: Bound::Included(150.into()),
780                upper: Bound::Excluded(300.into()),
781            },
782        });
783
784        let mut total: u128 = 0;
785        table
786            .scan_stream(
787                &[proj(&table, COL_A_U64)],
788                &filter,
789                ScanStreamOptions::default(),
790                |b| {
791                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
792                    if let Some(part) = sum(a) {
793                        total += part as u128;
794                    }
795                },
796            )
797            .unwrap();
798
799        assert_eq!(total, 400);
800    }
801
802    #[test]
803    fn test_filtered_scan_sum_i32_kernel() {
804        // Trade-off note:
805        // - Per-batch sum + accumulate avoids building one big Array.
806        // - For tiny batches overhead may match manual loops, but keeps
807        //   Arrow semantics exact.
808        let table = setup_test_table();
809        const COL_A_U64: FieldId = 10;
810        const COL_C_I32: FieldId = 12;
811
812        let candidates = [100.into(), 300.into()];
813        let filter = pred_expr(Filter {
814            field_id: COL_A_U64,
815            op: Operator::In(&candidates),
816        });
817
818        let mut total: i64 = 0;
819        table
820            .scan_stream(
821                &[proj(&table, COL_C_I32)],
822                &filter,
823                ScanStreamOptions::default(),
824                |b| {
825                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
826                    if let Some(part) = sum(a) {
827                        total += part as i64;
828                    }
829                },
830            )
831            .unwrap();
832        assert_eq!(total, 40);
833    }
834
835    #[test]
836    fn test_filtered_scan_min_max_kernel() {
837        // Trade-off note:
838        // - min/max are computed per batch and folded. This preserves
839        //   Arrow's null behavior and avoids concat.
840        // - Be mindful of NaN semantics if extended to floats later.
841        let table = setup_test_table();
842        const COL_A_U64: FieldId = 10;
843        const COL_C_I32: FieldId = 12;
844
845        let candidates = [100.into(), 300.into()];
846        let filter = pred_expr(Filter {
847            field_id: COL_A_U64,
848            op: Operator::In(&candidates),
849        });
850
851        let mut mn: Option<i32> = None;
852        let mut mx: Option<i32> = None;
853        table
854            .scan_stream(
855                &[proj(&table, COL_C_I32)],
856                &filter,
857                ScanStreamOptions::default(),
858                |b| {
859                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
860
861                    if let Some(part_min) = min(a) {
862                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
863                    }
864                    if let Some(part_max) = max(a) {
865                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
866                    }
867                },
868            )
869            .unwrap();
870        assert_eq!(mn, Some(10));
871        assert_eq!(mx, Some(30));
872    }
873
874    #[test]
875    fn test_filtered_scan_float64_column() {
876        let table = setup_test_table();
877        const COL_D_F64: FieldId = 13;
878
879        let filter = pred_expr(Filter {
880            field_id: COL_D_F64,
881            op: Operator::GreaterThan(2.0_f64.into()),
882        });
883
884        let mut got = Vec::new();
885        table
886            .scan_stream(
887                &[proj(&table, COL_D_F64)],
888                &filter,
889                ScanStreamOptions::default(),
890                |b| {
891                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
892                    for i in 0..arr.len() {
893                        if arr.is_valid(i) {
894                            got.push(arr.value(i));
895                        }
896                    }
897                },
898            )
899            .unwrap();
900
901        assert_eq!(got, vec![2.5, 3.5, 2.5]);
902    }
903
904    #[test]
905    fn test_filtered_scan_float32_in_operator() {
906        let table = setup_test_table();
907        const COL_E_F32: FieldId = 14;
908
909        let candidates = [2.0_f32.into(), 3.0_f32.into()];
910        let filter = pred_expr(Filter {
911            field_id: COL_E_F32,
912            op: Operator::In(&candidates),
913        });
914
915        let mut vals: Vec<Option<f32>> = Vec::new();
916        table
917            .scan_stream(
918                &[proj(&table, COL_E_F32)],
919                &filter,
920                ScanStreamOptions::default(),
921                |b| {
922                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
923                    vals.extend((0..arr.len()).map(|i| {
924                        if arr.is_null(i) {
925                            None
926                        } else {
927                            Some(arr.value(i))
928                        }
929                    }));
930                },
931            )
932            .unwrap();
933
934        let collected: Vec<f32> = vals.into_iter().flatten().collect();
935        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
936    }
937
938    #[test]
939    fn test_scan_stream_and_expression() {
940        let table = setup_test_table();
941        const COL_A_U64: FieldId = 10;
942        const COL_C_I32: FieldId = 12;
943        const COL_E_F32: FieldId = 14;
944
945        let expr = Expr::all_of(vec![
946            Filter {
947                field_id: COL_C_I32,
948                op: Operator::GreaterThan(15.into()),
949            },
950            Filter {
951                field_id: COL_A_U64,
952                op: Operator::LessThan(250.into()),
953            },
954        ]);
955
956        let mut vals: Vec<Option<f32>> = Vec::new();
957        table
958            .scan_stream(
959                &[proj(&table, COL_E_F32)],
960                &expr,
961                ScanStreamOptions::default(),
962                |b| {
963                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
964                    vals.extend((0..arr.len()).map(|i| {
965                        if arr.is_null(i) {
966                            None
967                        } else {
968                            Some(arr.value(i))
969                        }
970                    }));
971                },
972            )
973            .unwrap();
974
975        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
976    }
977
978    #[test]
979    fn test_scan_stream_or_expression() {
980        let table = setup_test_table();
981        const COL_A_U64: FieldId = 10;
982        const COL_C_I32: FieldId = 12;
983
984        let expr = Expr::any_of(vec![
985            Filter {
986                field_id: COL_C_I32,
987                op: Operator::Equals(10.into()),
988            },
989            Filter {
990                field_id: COL_C_I32,
991                op: Operator::Equals(30.into()),
992            },
993        ]);
994
995        let mut vals: Vec<Option<u64>> = Vec::new();
996        table
997            .scan_stream(
998                &[proj(&table, COL_A_U64)],
999                &expr,
1000                ScanStreamOptions::default(),
1001                |b| {
1002                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1003                    vals.extend((0..arr.len()).map(|i| {
1004                        if arr.is_null(i) {
1005                            None
1006                        } else {
1007                            Some(arr.value(i))
1008                        }
1009                    }));
1010                },
1011            )
1012            .unwrap();
1013
1014        assert_eq!(vals, vec![Some(100), Some(300)]);
1015    }
1016
1017    #[test]
1018    fn test_scan_stream_not_predicate() {
1019        let table = setup_test_table();
1020        const COL_A_U64: FieldId = 10;
1021        const COL_C_I32: FieldId = 12;
1022
1023        let expr = Expr::not(pred_expr(Filter {
1024            field_id: COL_C_I32,
1025            op: Operator::Equals(20.into()),
1026        }));
1027
1028        let mut vals: Vec<Option<u64>> = Vec::new();
1029        table
1030            .scan_stream(
1031                &[proj(&table, COL_A_U64)],
1032                &expr,
1033                ScanStreamOptions::default(),
1034                |b| {
1035                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1036                    vals.extend((0..arr.len()).map(|i| {
1037                        if arr.is_null(i) {
1038                            None
1039                        } else {
1040                            Some(arr.value(i))
1041                        }
1042                    }));
1043                },
1044            )
1045            .unwrap();
1046
1047        assert_eq!(vals, vec![Some(100), Some(300)]);
1048    }
1049
1050    #[test]
1051    fn test_scan_stream_not_and_expression() {
1052        let table = setup_test_table();
1053        const COL_A_U64: FieldId = 10;
1054        const COL_C_I32: FieldId = 12;
1055
1056        let expr = Expr::not(Expr::all_of(vec![
1057            Filter {
1058                field_id: COL_A_U64,
1059                op: Operator::GreaterThan(150.into()),
1060            },
1061            Filter {
1062                field_id: COL_C_I32,
1063                op: Operator::LessThan(40.into()),
1064            },
1065        ]));
1066
1067        let mut vals: Vec<Option<u64>> = Vec::new();
1068        table
1069            .scan_stream(
1070                &[proj(&table, COL_A_U64)],
1071                &expr,
1072                ScanStreamOptions::default(),
1073                |b| {
1074                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1075                    vals.extend((0..arr.len()).map(|i| {
1076                        if arr.is_null(i) {
1077                            None
1078                        } else {
1079                            Some(arr.value(i))
1080                        }
1081                    }));
1082                },
1083            )
1084            .unwrap();
1085
1086        assert_eq!(vals, vec![Some(100)]);
1087    }
1088
1089    #[test]
1090    fn test_scan_stream_include_nulls_toggle() {
1091        let pager = Arc::new(MemPager::default());
1092        let table = setup_test_table_with_pager(&pager);
1093        const COL_A_U64: FieldId = 10;
1094        const COL_C_I32: FieldId = 12;
1095        const COL_B_BIN: FieldId = 11;
1096        const COL_D_F64: FieldId = 13;
1097        const COL_E_F32: FieldId = 14;
1098
1099        let schema = Arc::new(Schema::new(vec![
1100            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1101            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1102                "field_id".to_string(),
1103                COL_A_U64.to_string(),
1104            )])),
1105            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1106                "field_id".to_string(),
1107                COL_B_BIN.to_string(),
1108            )])),
1109            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1110                "field_id".to_string(),
1111                COL_C_I32.to_string(),
1112            )])),
1113            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1114                "field_id".to_string(),
1115                COL_D_F64.to_string(),
1116            )])),
1117            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1118                "field_id".to_string(),
1119                COL_E_F32.to_string(),
1120            )])),
1121        ]));
1122
1123        let batch = RecordBatch::try_new(
1124            schema,
1125            vec![
1126                Arc::new(UInt64Array::from(vec![5, 6])),
1127                Arc::new(UInt64Array::from(vec![500, 600])),
1128                Arc::new(BinaryArray::from(vec![
1129                    Some(&b"new"[..]),
1130                    Some(&b"alt"[..]),
1131                ])),
1132                Arc::new(Int32Array::from(vec![Some(40), None])),
1133                Arc::new(Float64Array::from(vec![5.5, 6.5])),
1134                Arc::new(Float32Array::from(vec![5.0, 6.0])),
1135            ],
1136        )
1137        .unwrap();
1138        table.append(&batch).unwrap();
1139
1140        let filter = pred_expr(Filter {
1141            field_id: COL_A_U64,
1142            op: Operator::GreaterThan(450.into()),
1143        });
1144
1145        let mut default_vals: Vec<Option<i32>> = Vec::new();
1146        table
1147            .scan_stream(
1148                &[proj(&table, COL_C_I32)],
1149                &filter,
1150                ScanStreamOptions::default(),
1151                |b| {
1152                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1153                    default_vals.extend((0..arr.len()).map(|i| {
1154                        if arr.is_null(i) {
1155                            None
1156                        } else {
1157                            Some(arr.value(i))
1158                        }
1159                    }));
1160                },
1161            )
1162            .unwrap();
1163        assert_eq!(default_vals, vec![Some(40)]);
1164
1165        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1166        table
1167            .scan_stream(
1168                &[proj(&table, COL_C_I32)],
1169                &filter,
1170                ScanStreamOptions {
1171                    include_nulls: true,
1172                },
1173                |b| {
1174                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1175
1176                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1177                    table
1178                        .scan_stream(
1179                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1180                            &filter,
1181                            ScanStreamOptions::default(),
1182                            |b| {
1183                                assert_eq!(b.num_columns(), 2);
1184                                let c_arr =
1185                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1186                                let d_arr =
1187                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1188                                for i in 0..b.num_rows() {
1189                                    let c_val = if c_arr.is_null(i) {
1190                                        None
1191                                    } else {
1192                                        Some(c_arr.value(i))
1193                                    };
1194                                    let d_val = if d_arr.is_null(i) {
1195                                        None
1196                                    } else {
1197                                        Some(d_arr.value(i))
1198                                    };
1199                                    paired_vals.push((c_val, d_val));
1200                                }
1201                            },
1202                        )
1203                        .unwrap();
1204                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1205                    include_null_vals.extend((0..arr.len()).map(|i| {
1206                        if arr.is_null(i) {
1207                            None
1208                        } else {
1209                            Some(arr.value(i))
1210                        }
1211                    }));
1212                },
1213            )
1214            .unwrap();
1215        assert_eq!(include_null_vals, vec![Some(40), None]);
1216    }
1217
1218    #[test]
1219    fn test_filtered_scan_int_sqrt_float64() {
1220        // Trade-off note:
1221        // - We cast per batch and apply a compute unary kernel for sqrt.
1222        // - This keeps processing streaming and avoids per-value loops.
1223        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
1224        //   `Float64Array` first.
1225        let table = setup_test_table();
1226        const COL_A_U64: FieldId = 10;
1227        const COL_C_I32: FieldId = 12;
1228
1229        let filter = pred_expr(Filter {
1230            field_id: COL_C_I32,
1231            op: Operator::GreaterThan(15.into()),
1232        });
1233
1234        let mut got: Vec<f64> = Vec::new();
1235        table
1236            .scan_stream(
1237                &[proj(&table, COL_A_U64)],
1238                &filter,
1239                ScanStreamOptions::default(),
1240                |b| {
1241                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1242                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1243
1244                    // unary::<Float64Type, _, Float64Type>(...)
1245                    let sqrt_arr = unary::<
1246                        arrow::datatypes::Float64Type,
1247                        _,
1248                        arrow::datatypes::Float64Type,
1249                    >(f64_arr, |v: f64| v.sqrt());
1250
1251                    for i in 0..sqrt_arr.len() {
1252                        if !sqrt_arr.is_null(i) {
1253                            got.push(sqrt_arr.value(i));
1254                        }
1255                    }
1256                },
1257            )
1258            .unwrap();
1259
1260        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1261        assert_eq!(got, expected);
1262    }
1263
1264    #[test]
1265    fn test_multi_field_kernels_with_filters() {
1266        // Trade-off note:
1267        // - All reductions use per-batch kernels + accumulation to stay
1268        //   streaming. No concat or whole-column materialization.
1269        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1270
1271        let table = Table::new(2, Arc::new(MemPager::default())).unwrap();
1272
1273        const COL_A_U64: FieldId = 20;
1274        const COL_D_U32: FieldId = 21;
1275        const COL_E_I16: FieldId = 22;
1276        const COL_F_U8: FieldId = 23;
1277        const COL_C_I32: FieldId = 24;
1278
1279        let schema = Arc::new(Schema::new(vec![
1280            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1281            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1282                "field_id".to_string(),
1283                COL_A_U64.to_string(),
1284            )])),
1285            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1286                "field_id".to_string(),
1287                COL_D_U32.to_string(),
1288            )])),
1289            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1290                "field_id".to_string(),
1291                COL_E_I16.to_string(),
1292            )])),
1293            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1294                "field_id".to_string(),
1295                COL_F_U8.to_string(),
1296            )])),
1297            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1298                "field_id".to_string(),
1299                COL_C_I32.to_string(),
1300            )])),
1301        ]));
1302
1303        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
1304        let batch = RecordBatch::try_new(
1305            schema.clone(),
1306            vec![
1307                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1308                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1309                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1310                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1311                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1312                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1313            ],
1314        )
1315        .unwrap();
1316
1317        table.append(&batch).unwrap();
1318
1319        // Filter: c_i32 >= 20.
1320        let filter = pred_expr(Filter {
1321            field_id: COL_C_I32,
1322            op: Operator::GreaterThanOrEquals(20.into()),
1323        });
1324
1325        // 1) SUM over d_u32 (per-batch sum + accumulate).
1326        let mut d_sum: u128 = 0;
1327        table
1328            .scan_stream(
1329                &[proj(&table, COL_D_U32)],
1330                &filter,
1331                ScanStreamOptions::default(),
1332                |b| {
1333                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1334                    if let Some(part) = sum(a) {
1335                        d_sum += part as u128;
1336                    }
1337                },
1338            )
1339            .unwrap();
1340        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1341
1342        // 2) MIN over e_i16 (per-batch min + fold).
1343        let mut e_min: Option<i16> = None;
1344        table
1345            .scan_stream(
1346                &[proj(&table, COL_E_I16)],
1347                &filter,
1348                ScanStreamOptions::default(),
1349                |b| {
1350                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1351                    if let Some(part_min) = min(a) {
1352                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1353                    }
1354                },
1355            )
1356            .unwrap();
1357        assert_eq!(e_min, Some(-6));
1358
1359        // 3) MAX over f_u8 (per-batch max + fold).
1360        let mut f_max: Option<u8> = None;
1361        table
1362            .scan_stream(
1363                &[proj(&table, COL_F_U8)],
1364                &filter,
1365                ScanStreamOptions::default(),
1366                |b| {
1367                    let a = b
1368                        .column(0)
1369                        .as_any()
1370                        .downcast_ref::<arrow::array::UInt8Array>()
1371                        .unwrap();
1372                    if let Some(part_max) = max(a) {
1373                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1374                    }
1375                },
1376            )
1377            .unwrap();
1378        assert_eq!(f_max, Some(10));
1379
1380        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
1381        let mut got: Vec<f64> = Vec::new();
1382        table
1383            .scan_stream(
1384                &[proj(&table, COL_A_U64)],
1385                &filter,
1386                ScanStreamOptions::default(),
1387                |b| {
1388                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1389                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1390                    let sqrt_arr = unary::<
1391                        arrow::datatypes::Float64Type,
1392                        _,
1393                        arrow::datatypes::Float64Type,
1394                    >(f64_arr, |v: f64| v.sqrt());
1395
1396                    for i in 0..sqrt_arr.len() {
1397                        if !sqrt_arr.is_null(i) {
1398                            got.push(sqrt_arr.value(i));
1399                        }
1400                    }
1401                },
1402            )
1403            .unwrap();
1404        let expected = [15.0_f64, 20.0, 30.0, 40.0];
1405        assert_eq!(got, expected);
1406    }
1407
1408    #[test]
1409    fn test_scan_with_in_filter() {
1410        let table = setup_test_table();
1411        const COL_A_U64: FieldId = 10;
1412        const COL_C_I32: FieldId = 12;
1413
1414        // IN now uses untyped literals, too.
1415        let candidates = [10.into(), 30.into()];
1416        let filter = pred_expr(Filter {
1417            field_id: COL_C_I32,
1418            op: Operator::In(&candidates),
1419        });
1420
1421        let mut vals: Vec<Option<u64>> = Vec::new();
1422        table
1423            .scan_stream(
1424                &[proj(&table, COL_A_U64)],
1425                &filter,
1426                ScanStreamOptions::default(),
1427                |b| {
1428                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1429                    vals.extend(
1430                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1431                    );
1432                },
1433            )
1434            .unwrap();
1435        assert_eq!(vals, vec![Some(100), Some(300)]);
1436    }
1437
1438    #[test]
1439    fn test_scan_stream_single_column_batches() {
1440        let table = setup_test_table();
1441        const COL_A_U64: FieldId = 10;
1442        const COL_C_I32: FieldId = 12;
1443
1444        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
1445        let filter = pred_expr(Filter {
1446            field_id: COL_C_I32,
1447            op: Operator::Equals(20.into()),
1448        });
1449
1450        let mut seen_cols = Vec::<u64>::new();
1451        table
1452            .scan_stream(
1453                &[proj(&table, COL_A_U64)],
1454                &filter,
1455                ScanStreamOptions::default(),
1456                |b| {
1457                    assert_eq!(b.num_columns(), 1);
1458                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1459                    // No kernel needed; just collect values for shape assertions.
1460                    for i in 0..a.len() {
1461                        if !a.is_null(i) {
1462                            seen_cols.push(a.value(i));
1463                        }
1464                    }
1465                },
1466            )
1467            .unwrap();
1468
1469        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
1470        assert_eq!(seen_cols, vec![200, 200]);
1471    }
1472
1473    #[test]
1474    fn test_scan_with_multiple_projection_columns() {
1475        let table = setup_test_table();
1476        const COL_A_U64: FieldId = 10;
1477        const COL_C_I32: FieldId = 12;
1478
1479        let filter = pred_expr(Filter {
1480            field_id: COL_C_I32,
1481            op: Operator::Equals(20.into()),
1482        });
1483
1484        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1485
1486        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1487        table
1488            .scan_stream(
1489                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1490                &filter,
1491                ScanStreamOptions::default(),
1492                |b| {
1493                    assert_eq!(b.num_columns(), 2);
1494                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1495                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1496
1497                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1498                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1499                    for i in 0..b.num_rows() {
1500                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1501                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1502                        combined.push((left, right));
1503                    }
1504                },
1505            )
1506            .unwrap();
1507
1508        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
1509    }
1510
1511    #[test]
1512    fn test_scan_stream_projection_validation() {
1513        let table = setup_test_table();
1514        const COL_A_U64: FieldId = 10;
1515        const COL_C_I32: FieldId = 12;
1516
1517        let filter = pred_expr(Filter {
1518            field_id: COL_C_I32,
1519            op: Operator::Equals(20.into()),
1520        });
1521
1522        let empty: [Projection; 0] = [];
1523        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
1524        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
1525
1526        // Duplicate projections are allowed: the same column will be
1527        // gathered once and duplicated in the output in the requested
1528        // order. Verify the call succeeds and produces two identical
1529        // columns per batch.
1530        let duplicate = [
1531            proj(&table, COL_A_U64),
1532            proj_alias(&table, COL_A_U64, "alias_a"),
1533        ];
1534        let mut collected = Vec::<u64>::new();
1535        table
1536            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
1537                assert_eq!(b.num_columns(), 2);
1538                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
1539                assert_eq!(b.schema().field(1).name(), "alias_a");
1540                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1541                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1542                for i in 0..b.num_rows() {
1543                    if !a0.is_null(i) {
1544                        collected.push(a0.value(i));
1545                    }
1546                    if !a1.is_null(i) {
1547                        collected.push(a1.value(i));
1548                    }
1549                }
1550            })
1551            .unwrap();
1552        // Two matching rows, two columns per row -> four values.
1553        assert_eq!(collected, vec![200, 200, 200, 200]);
1554    }
1555
1556    #[test]
1557    fn test_scan_stream_computed_projection() {
1558        let table = setup_test_table();
1559        const COL_A_U64: FieldId = 10;
1560
1561        let projections = [
1562            ScanProjection::column(proj(&table, COL_A_U64)),
1563            ScanProjection::computed(
1564                ScalarExpr::binary(
1565                    ScalarExpr::column(COL_A_U64),
1566                    BinaryOp::Multiply,
1567                    ScalarExpr::literal(2),
1568                ),
1569                "a_times_two",
1570            ),
1571        ];
1572
1573        let filter = pred_expr(Filter {
1574            field_id: COL_A_U64,
1575            op: Operator::GreaterThanOrEquals(0.into()),
1576        });
1577
1578        let mut computed: Vec<(u64, f64)> = Vec::new();
1579        table
1580            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
1581                assert_eq!(b.num_columns(), 2);
1582                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1583                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1584                for i in 0..b.num_rows() {
1585                    if base.is_null(i) || comp.is_null(i) {
1586                        continue;
1587                    }
1588                    computed.push((base.value(i), comp.value(i)));
1589                }
1590            })
1591            .unwrap();
1592
1593        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
1594        assert_eq!(computed, expected);
1595    }
1596
1597    #[test]
1598    fn test_scan_stream_multi_column_filter_compare() {
1599        let table = setup_test_table();
1600        const COL_A_U64: FieldId = 10;
1601        const COL_C_I32: FieldId = 12;
1602
1603        let expr = Expr::Compare {
1604            left: ScalarExpr::binary(
1605                ScalarExpr::column(COL_A_U64),
1606                BinaryOp::Add,
1607                ScalarExpr::column(COL_C_I32),
1608            ),
1609            op: CompareOp::Gt,
1610            right: ScalarExpr::literal(220_i64),
1611        };
1612
1613        let mut vals: Vec<Option<u64>> = Vec::new();
1614        table
1615            .scan_stream(
1616                &[proj(&table, COL_A_U64)],
1617                &expr,
1618                ScanStreamOptions::default(),
1619                |b| {
1620                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1621                    for i in 0..b.num_rows() {
1622                        vals.push(if col.is_null(i) {
1623                            None
1624                        } else {
1625                            Some(col.value(i))
1626                        });
1627                    }
1628                },
1629            )
1630            .unwrap();
1631
1632        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
1633    }
1634}