llkv_table/
table.rs

1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::types::TableId;
7
8use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
9use arrow::datatypes::{DataType, Field, Schema};
10use std::collections::HashMap;
11
12use llkv_column_map::store::{Projection, ROW_ID_COLUMN_NAME};
13use llkv_column_map::{ColumnStore, types::LogicalFieldId};
14use llkv_storage::pager::{MemPager, Pager};
15use simd_r_drive_entry_handle::EntryHandle;
16
17use crate::reserved::is_reserved_table_id;
18use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
19use crate::types::FieldId;
20use llkv_expr::{Expr, ScalarExpr};
21use llkv_result::{Error, Result as LlkvResult};
22
23/// Cached information about which system columns exist in the table schema.
24/// This avoids repeated string comparisons in hot paths like append().
25#[derive(Debug, Clone, Copy)]
26struct MvccColumnCache {
27    has_created_by: bool,
28    has_deleted_by: bool,
29}
30
31/// A schema-aware table built on top of columnar storage.
32///
33/// `Table` provides a higher-level interface than [`ColumnStore`], adding:
34///
35/// - **Schema management**: Validates field IDs and data types on append
36/// - **MVCC integration**: Automatically manages `created_by` and `deleted_by` columns
37/// - **Scan operations**: Supports projection, filtering, ordering, and computed columns
38/// - **Row ID filtering**: Allows transaction visibility enforcement via [`RowIdFilter`]
39///
40/// # Table IDs
41///
42/// Each table is identified by a [`TableId`]. Table 0 is reserved for the system catalog.
43/// User tables start at ID 1.
44///
45/// # Field IDs
46///
47/// Each column is assigned a [`FieldId`] stored in the Arrow field metadata. This maps
48/// to a [`LogicalFieldId`] in the underlying storage layer, combining the table ID,
49/// namespace, and field ID.
50///
51/// # Thread Safety
52///
53/// `Table` is `Send + Sync` and can be safely shared via `Arc`.
54pub struct Table<P = MemPager>
55where
56    P: Pager<Blob = EntryHandle> + Send + Sync,
57{
58    store: ColumnStore<P>,
59    table_id: TableId,
60    /// Cache of MVCC column presence. Initialized lazily on first schema() call.
61    /// None means not yet initialized.
62    mvcc_cache: RwLock<Option<MvccColumnCache>>,
63}
64
65/// Filter row IDs before they are materialized into batches.
66///
67/// This trait allows implementations to enforce transaction visibility (MVCC),
68/// access control, or other row-level filtering policies. The filter is applied
69/// after column-level predicates but before data is gathered into Arrow batches.
70///
71/// # Example Use Case
72///
73/// MVCC implementations use this to hide rows that were:
74/// - Created after the transaction's snapshot timestamp
75/// - Deleted before the transaction's snapshot timestamp
76pub trait RowIdFilter<P>: Send + Sync
77where
78    P: Pager<Blob = EntryHandle> + Send + Sync,
79{
80    /// Filter a list of row IDs, returning only those that should be visible.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if visibility metadata cannot be loaded or is corrupted.
85    fn filter(&self, table: &Table<P>, row_ids: Vec<u64>) -> LlkvResult<Vec<u64>>;
86}
87
88/// Options for configuring table scans.
89///
90/// These options control how rows are filtered, ordered, and materialized during
91/// scan operations.
92pub struct ScanStreamOptions<P = MemPager>
93where
94    P: Pager<Blob = EntryHandle> + Send + Sync,
95{
96    /// Whether to include rows where all projected columns are null.
97    ///
98    /// When `false` (default), rows with all-null projections are dropped before
99    /// batches are yielded. This is useful for sparse data where many rows may not
100    /// have values for the selected columns.
101    pub include_nulls: bool,
102    /// Optional ordering to apply to results.
103    ///
104    /// If specified, row IDs are sorted according to this specification before
105    /// data is gathered into batches.
106    pub order: Option<ScanOrderSpec>,
107    /// Optional filter for row-level visibility (e.g., MVCC).
108    ///
109    /// Applied after column-level predicates but before data is materialized.
110    /// Used to enforce transaction isolation.
111    pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
112}
113
114impl<P> fmt::Debug for ScanStreamOptions<P>
115where
116    P: Pager<Blob = EntryHandle> + Send + Sync,
117{
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("ScanStreamOptions")
120            .field("include_nulls", &self.include_nulls)
121            .field("order", &self.order)
122            .field(
123                "row_id_filter",
124                &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
125            )
126            .finish()
127    }
128}
129
130impl<P> Clone for ScanStreamOptions<P>
131where
132    P: Pager<Blob = EntryHandle> + Send + Sync,
133{
134    fn clone(&self) -> Self {
135        Self {
136            include_nulls: self.include_nulls,
137            order: self.order,
138            row_id_filter: self.row_id_filter.clone(),
139        }
140    }
141}
142
143impl<P> Default for ScanStreamOptions<P>
144where
145    P: Pager<Blob = EntryHandle> + Send + Sync,
146{
147    fn default() -> Self {
148        Self {
149            include_nulls: false,
150            order: None,
151            row_id_filter: None,
152        }
153    }
154}
155
156/// Specification for ordering scan results.
157///
158/// Defines how to sort rows based on a single column's values.
159#[derive(Clone, Copy, Debug)]
160pub struct ScanOrderSpec {
161    /// The field to sort by.
162    pub field_id: FieldId,
163    /// Sort direction (ascending or descending).
164    pub direction: ScanOrderDirection,
165    /// Whether null values appear first or last.
166    pub nulls_first: bool,
167    /// Optional transformation to apply before sorting.
168    pub transform: ScanOrderTransform,
169}
170
171/// Sort direction for scan ordering.
172#[derive(Clone, Copy, Debug, PartialEq, Eq)]
173pub enum ScanOrderDirection {
174    /// Sort from smallest to largest.
175    Ascending,
176    /// Sort from largest to smallest.
177    Descending,
178}
179
180/// Value transformation to apply before sorting.
181///
182/// Used to enable sorting on columns that need type coercion or conversion.
183#[derive(Clone, Copy, Debug, PartialEq, Eq)]
184pub enum ScanOrderTransform {
185    /// Sort integers as-is.
186    IdentityInteger,
187    /// Sort strings lexicographically.
188    IdentityUtf8,
189    /// Parse strings as integers, then sort numerically.
190    CastUtf8ToInteger,
191}
192
193/// A column or computed expression to include in scan results.
194///
195/// Scans can project either stored columns or expressions computed from them.
196#[derive(Clone, Debug)]
197pub enum ScanProjection {
198    /// Project a stored column directly.
199    Column(Projection),
200    /// Compute a value from an expression and return it with an alias.
201    Computed {
202        /// The expression to evaluate (can reference column field IDs).
203        expr: ScalarExpr<FieldId>,
204        /// The name to give the computed column in results.
205        alias: String,
206    },
207}
208
209impl ScanProjection {
210    /// Create a projection for a stored column.
211    pub fn column<P: Into<Projection>>(proj: P) -> Self {
212        Self::Column(proj.into())
213    }
214
215    /// Create a projection for a computed expression.
216    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
217        Self::Computed {
218            expr,
219            alias: alias.into(),
220        }
221    }
222}
223
224impl From<Projection> for ScanProjection {
225    fn from(value: Projection) -> Self {
226        ScanProjection::Column(value)
227    }
228}
229
230impl From<&Projection> for ScanProjection {
231    fn from(value: &Projection) -> Self {
232        ScanProjection::Column(value.clone())
233    }
234}
235
236impl From<&ScanProjection> for ScanProjection {
237    fn from(value: &ScanProjection) -> Self {
238        value.clone()
239    }
240}
241
242impl<P> Table<P>
243where
244    P: Pager<Blob = EntryHandle> + Send + Sync,
245{
246    /// Create a new `Table` instance for the given table ID.
247    ///
248    /// This opens the underlying [`ColumnStore`] and initializes table-specific state.
249    /// The table does not need to already exist; this method works for both new and
250    /// existing tables.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if:
255    /// - The table ID is reserved (e.g., table 0)
256    /// - The column store cannot be opened
257    pub fn new(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
258        if is_reserved_table_id(table_id) {
259            return Err(Error::ReservedTableId(table_id));
260        }
261
262        tracing::trace!(
263            "!!! Table::new: Creating table table_id={} with pager at {:p}",
264            table_id,
265            &*pager
266        );
267        let store = ColumnStore::open(pager)?;
268        Ok(Self {
269            store,
270            table_id,
271            mvcc_cache: RwLock::new(None),
272        })
273    }
274
275    /// Get or initialize the MVCC column cache from the provided schema.
276    /// This is an optimization to avoid repeated string comparisons in append().
277    fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
278        // Fast path: check if cache is already initialized
279        {
280            let cache_read = self.mvcc_cache.read().unwrap();
281            if let Some(cache) = *cache_read {
282                return cache;
283            }
284        }
285
286        // Slow path: initialize cache from schema
287        let has_created_by = schema
288            .fields()
289            .iter()
290            .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
291        let has_deleted_by = schema
292            .fields()
293            .iter()
294            .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
295
296        let cache = MvccColumnCache {
297            has_created_by,
298            has_deleted_by,
299        };
300
301        // Store in cache for future calls
302        *self.mvcc_cache.write().unwrap() = Some(cache);
303
304        cache
305    }
306
307    /// Append a [`RecordBatch`] to the table.
308    ///
309    /// The batch must include:
310    /// - A `row_id` column (type `UInt64`) with unique row identifiers
311    /// - `field_id` metadata for each user column, mapping to this table's field IDs
312    ///
313    /// ## MVCC Columns
314    ///
315    /// If the batch includes `created_by` or `deleted_by` columns, they are automatically
316    /// assigned the correct [`LogicalFieldId`] for this table's MVCC metadata.
317    ///
318    /// ## Field ID Mapping
319    ///
320    /// Each column's `field_id` metadata is converted to a [`LogicalFieldId`] by combining
321    /// it with this table's ID. This ensures columns from different tables don't collide
322    /// in the underlying storage.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if:
327    /// - The batch is missing the `row_id` column
328    /// - Any user column is missing `field_id` metadata
329    /// - Field IDs are invalid or malformed
330    /// - The underlying storage operation fails
331    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
332        use arrow::array::UInt64Builder;
333
334        // Check if MVCC columns already exist in the batch using cache
335        // This avoids repeated string comparisons on every append
336        let cache = self.get_mvcc_cache(&batch.schema());
337        let has_created_by = cache.has_created_by;
338        let has_deleted_by = cache.has_deleted_by;
339
340        let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
341        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
342
343        for (idx, field) in batch.schema().fields().iter().enumerate() {
344            let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
345            // System columns (row_id, MVCC columns) don't need field_id metadata
346            if maybe_field_id.is_none()
347                && (field.name() == ROW_ID_COLUMN_NAME
348                    || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
349                    || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
350            {
351                if field.name() == ROW_ID_COLUMN_NAME {
352                    new_fields.push(field.as_ref().clone());
353                    new_columns.push(batch.column(idx).clone());
354                } else {
355                    let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
356                        LogicalFieldId::for_mvcc_created_by(self.table_id)
357                    } else {
358                        LogicalFieldId::for_mvcc_deleted_by(self.table_id)
359                    };
360
361                    let mut metadata = field.metadata().clone();
362                    let lfid_val: u64 = lfid.into();
363                    metadata.insert(
364                        crate::constants::FIELD_ID_META_KEY.to_string(),
365                        lfid_val.to_string(),
366                    );
367
368                    let new_field =
369                        Field::new(field.name(), field.data_type().clone(), field.is_nullable())
370                            .with_metadata(metadata);
371                    new_fields.push(new_field);
372                    new_columns.push(batch.column(idx).clone());
373                }
374                continue;
375            }
376
377            let raw_field_id = maybe_field_id
378                .ok_or_else(|| {
379                    llkv_result::Error::Internal(format!(
380                        "Field '{}' is missing a valid '{}' in its metadata.",
381                        field.name(),
382                        crate::constants::FIELD_ID_META_KEY
383                    ))
384                })?
385                .parse::<u64>()
386                .map_err(|err| {
387                    llkv_result::Error::Internal(format!(
388                        "Field '{}' contains an invalid '{}': {}",
389                        field.name(),
390                        crate::constants::FIELD_ID_META_KEY,
391                        err
392                    ))
393                })?;
394
395            if raw_field_id > FieldId::MAX as u64 {
396                return Err(llkv_result::Error::Internal(format!(
397                    "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
398                    field.name(),
399                    FieldId::MAX,
400                    raw_field_id
401                )));
402            }
403
404            let user_field_id = raw_field_id as FieldId;
405            let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
406
407            // Store the fully-qualified logical field id in the metadata we hand to the
408            // column store so descriptors are registered under the correct table id.
409            let lfid = logical_field_id;
410            let mut new_metadata = field.metadata().clone();
411            let lfid_val: u64 = lfid.into();
412            new_metadata.insert(
413                crate::constants::FIELD_ID_META_KEY.to_string(),
414                lfid_val.to_string(),
415            );
416
417            let new_field =
418                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
419                    .with_metadata(new_metadata);
420            new_fields.push(new_field);
421            new_columns.push(batch.column(idx).clone());
422
423            // Ensure the catalog remembers the human-friendly column name for
424            // this field so callers of `Table::schema()` (and other metadata
425            // consumers) can recover it later. The CSV ingest path (and other
426            // writers) may only supply the `field_id` metadata on the batch,
427            // so defensively persist the column name when absent.
428            let need_meta = match self
429                .catalog()
430                .get_cols_meta(self.table_id, &[user_field_id])
431            {
432                metas if metas.is_empty() => true,
433                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
434            };
435
436            if need_meta {
437                let meta = ColMeta {
438                    col_id: user_field_id,
439                    name: Some(field.name().to_string()),
440                    flags: 0,
441                    default: None,
442                };
443                self.put_col_meta(&meta);
444            }
445        }
446
447        // Inject MVCC columns if they don't exist
448        // For non-transactional appends (e.g., CSV ingest), we use TXN_ID_AUTO_COMMIT (1)
449        // which is treated as "committed by system" and always visible.
450        // Use TXN_ID_NONE (0) for deleted_by to indicate "not deleted".
451        const TXN_ID_AUTO_COMMIT: u64 = 1;
452        const TXN_ID_NONE: u64 = 0;
453        let row_count = batch.num_rows();
454
455        if !has_created_by {
456            let mut created_by_builder = UInt64Builder::with_capacity(row_count);
457            for _ in 0..row_count {
458                created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
459            }
460            let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
461            let mut metadata = HashMap::new();
462            let lfid_val: u64 = created_by_lfid.into();
463            metadata.insert(
464                crate::constants::FIELD_ID_META_KEY.to_string(),
465                lfid_val.to_string(),
466            );
467            new_fields.push(
468                Field::new(
469                    llkv_column_map::store::CREATED_BY_COLUMN_NAME,
470                    DataType::UInt64,
471                    false,
472                )
473                .with_metadata(metadata),
474            );
475            new_columns.push(Arc::new(created_by_builder.finish()));
476        }
477
478        if !has_deleted_by {
479            let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
480            for _ in 0..row_count {
481                deleted_by_builder.append_value(TXN_ID_NONE);
482            }
483            let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
484            let mut metadata = HashMap::new();
485            let lfid_val: u64 = deleted_by_lfid.into();
486            metadata.insert(
487                crate::constants::FIELD_ID_META_KEY.to_string(),
488                lfid_val.to_string(),
489            );
490            new_fields.push(
491                Field::new(
492                    llkv_column_map::store::DELETED_BY_COLUMN_NAME,
493                    DataType::UInt64,
494                    false,
495                )
496                .with_metadata(metadata),
497            );
498            new_columns.push(Arc::new(deleted_by_builder.finish()));
499        }
500
501        let new_schema = Arc::new(Schema::new(new_fields));
502        let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
503        self.store.append(&namespaced_batch)
504    }
505
506    /// Stream one or more projected columns as a sequence of RecordBatches.
507    ///
508    /// - Avoids `concat` and large materializations.
509    /// - Uses the same filter machinery as the old `scan` to produce
510    ///   `row_ids`.
511    /// - Splits `row_ids` into fixed-size windows and gathers rows per
512    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
513    pub fn scan_stream<'a, I, T, F>(
514        &self,
515        projections: I,
516        filter_expr: &Expr<'a, FieldId>,
517        options: ScanStreamOptions<P>,
518        on_batch: F,
519    ) -> LlkvResult<()>
520    where
521        I: IntoIterator<Item = T>,
522        T: Into<ScanProjection>,
523        F: FnMut(RecordBatch),
524    {
525        let stream_projections: Vec<ScanProjection> =
526            projections.into_iter().map(|p| p.into()).collect();
527        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
528    }
529
530    // TODO: Document difference between this and `scan_stream`
531    pub fn scan_stream_with_exprs<'a, F>(
532        &self,
533        projections: &[ScanProjection],
534        filter_expr: &Expr<'a, FieldId>,
535        options: ScanStreamOptions<P>,
536        on_batch: F,
537    ) -> LlkvResult<()>
538    where
539        F: FnMut(RecordBatch),
540    {
541        TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
542    }
543
544    // TODO: Return `LlkvResult<Vec<RowId>>`
545    pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<u64>> {
546        collect_row_ids_for_table(self, filter_expr)
547    }
548
549    #[inline]
550    pub fn catalog(&self) -> SysCatalog<'_, P> {
551        SysCatalog::new(&self.store)
552    }
553
554    #[inline]
555    pub fn put_table_meta(&self, meta: &TableMeta) {
556        debug_assert_eq!(meta.table_id, self.table_id);
557        self.catalog().put_table_meta(meta);
558    }
559
560    #[inline]
561    pub fn get_table_meta(&self) -> Option<TableMeta> {
562        self.catalog().get_table_meta(self.table_id)
563    }
564
565    #[inline]
566    pub fn put_col_meta(&self, meta: &ColMeta) {
567        self.catalog().put_col_meta(self.table_id, meta);
568    }
569
570    #[inline]
571    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
572        self.catalog().get_cols_meta(self.table_id, col_ids)
573    }
574
575    /// Build and return an Arrow `Schema` that describes this table.
576    ///
577    /// The returned schema includes the `row_id` field first, followed by
578    /// user fields. Each user field has its `field_id` stored in the field
579    /// metadata (under the "field_id" key) and the name is taken from the
580    /// catalog when available or falls back to `col_<id>`.
581    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
582        // Collect logical fields for this table and sort by field id.
583        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
584        logical_fields.sort_by_key(|lfid| lfid.field_id());
585
586        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
587        let metas = self.get_cols_meta(&field_ids);
588
589        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
590        // Add row_id first
591        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
592
593        for (idx, lfid) in logical_fields.into_iter().enumerate() {
594            let fid = lfid.field_id();
595            let dtype = self.store.data_type(lfid)?;
596            let name = metas
597                .get(idx)
598                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
599                .unwrap_or_else(|| format!("col_{}", fid));
600
601            let mut metadata: HashMap<String, String> = HashMap::new();
602            metadata.insert(
603                crate::constants::FIELD_ID_META_KEY.to_string(),
604                fid.to_string(),
605            );
606
607            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
608        }
609
610        Ok(Arc::new(Schema::new(fields)))
611    }
612
613    /// Return the table schema formatted as an Arrow RecordBatch suitable
614    /// for pretty printing. The batch has three columns: `name` (Utf8),
615    /// `field_id` (UInt32) and `data_type` (Utf8).
616    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
617        let schema = self.schema()?;
618        let fields = schema.fields();
619
620        let mut names: Vec<String> = Vec::with_capacity(fields.len());
621        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
622        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
623
624        for field in fields.iter() {
625            names.push(field.name().to_string());
626            let fid = field
627                .metadata()
628                .get(crate::constants::FIELD_ID_META_KEY)
629                .and_then(|s| s.parse::<u32>().ok())
630                .unwrap_or(0u32);
631            fids.push(fid);
632            dtypes.push(format!("{:?}", field.data_type()));
633        }
634
635        // Build Arrow arrays
636        let name_array: ArrayRef = Arc::new(StringArray::from(names));
637        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
638        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
639
640        let rb_schema = Arc::new(Schema::new(vec![
641            Field::new("name", DataType::Utf8, false),
642            Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
643            Field::new("data_type", DataType::Utf8, false),
644        ]));
645
646        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
647        Ok(batch)
648    }
649
650    pub fn store(&self) -> &ColumnStore<P> {
651        &self.store
652    }
653
654    #[inline]
655    pub fn table_id(&self) -> TableId {
656        self.table_id
657    }
658
659    /// Return the total number of rows for a given user column id in this table.
660    ///
661    /// This delegates to the ColumnStore descriptor for the logical field that
662    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
663    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
664        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
665        self.store.total_rows_for_field(lfid)
666    }
667
668    /// Return the total number of rows for this table.
669    ///
670    /// Prefer reading the dedicated row-id shadow column if present; otherwise
671    /// fall back to inspecting any persisted user column descriptor.
672    pub fn total_rows(&self) -> llkv_result::Result<u64> {
673        use llkv_column_map::store::rowid_fid;
674        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
675        // Try the row-id shadow column first
676        match self.store.total_rows_for_field(rid_lfid) {
677            Ok(n) => Ok(n),
678            Err(_) => {
679                // Fall back to scanning the catalog for any user-data column
680                self.store.total_rows_for_table(self.table_id)
681            }
682        }
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use super::*;
689    use crate::reserved::CATALOG_TABLE_ID;
690    use crate::types::RowId;
691    use arrow::array::Array;
692    use arrow::array::ArrayRef;
693    use arrow::array::{
694        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
695        UInt32Array, UInt64Array,
696    };
697    use arrow::compute::{cast, max, min, sum, unary};
698    use arrow::datatypes::DataType;
699    use llkv_column_map::ColumnStore;
700    use llkv_column_map::store::GatherNullPolicy;
701    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
702    use std::collections::HashMap;
703    use std::ops::Bound;
704
705    fn setup_test_table() -> Table {
706        let pager = Arc::new(MemPager::default());
707        setup_test_table_with_pager(&pager)
708    }
709
710    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
711        let table = Table::new(1, Arc::clone(pager)).unwrap();
712        const COL_A_U64: FieldId = 10;
713        const COL_B_BIN: FieldId = 11;
714        const COL_C_I32: FieldId = 12;
715        const COL_D_F64: FieldId = 13;
716        const COL_E_F32: FieldId = 14;
717
718        let schema = Arc::new(Schema::new(vec![
719            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
720            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
721                crate::constants::FIELD_ID_META_KEY.to_string(),
722                COL_A_U64.to_string(),
723            )])),
724            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
725                crate::constants::FIELD_ID_META_KEY.to_string(),
726                COL_B_BIN.to_string(),
727            )])),
728            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
729                crate::constants::FIELD_ID_META_KEY.to_string(),
730                COL_C_I32.to_string(),
731            )])),
732            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
733                crate::constants::FIELD_ID_META_KEY.to_string(),
734                COL_D_F64.to_string(),
735            )])),
736            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
737                crate::constants::FIELD_ID_META_KEY.to_string(),
738                COL_E_F32.to_string(),
739            )])),
740        ]));
741
742        let batch = RecordBatch::try_new(
743            schema.clone(),
744            vec![
745                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
746                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
747                Arc::new(BinaryArray::from(vec![
748                    b"foo" as &[u8],
749                    b"bar",
750                    b"baz",
751                    b"qux",
752                ])),
753                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
754                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
755                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
756            ],
757        )
758        .unwrap();
759
760        table.append(&batch).unwrap();
761        table
762    }
763
764    fn gather_single(
765        store: &ColumnStore<MemPager>,
766        field_id: LogicalFieldId,
767        row_ids: &[u64],
768    ) -> ArrayRef {
769        store
770            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
771            .unwrap()
772            .column(0)
773            .clone()
774    }
775
776    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
777        Expr::Pred(filter)
778    }
779
780    fn proj(table: &Table, field_id: FieldId) -> Projection {
781        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
782    }
783
784    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
785        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
786    }
787
788    #[test]
789    fn table_new_rejects_reserved_table_id() {
790        let result = Table::new(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
791        assert!(matches!(
792            result,
793            Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
794        ));
795    }
796
797    #[test]
798    fn test_append_rejects_logical_field_id_in_metadata() {
799        // Create a table and build a schema where the column's metadata
800        // contains a fully-qualified LogicalFieldId (u64). Append should
801        // reject this and require a plain user FieldId instead.
802        let table = Table::new(7, Arc::new(MemPager::default())).unwrap();
803
804        const USER_FID: FieldId = 42;
805        // Build a logical id (namespaced) and put its numeric value into metadata
806        let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
807        let logical_val: u64 = logical.into();
808
809        let schema = Arc::new(Schema::new(vec![
810            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811            Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
812                crate::constants::FIELD_ID_META_KEY.to_string(),
813                logical_val.to_string(),
814            )])),
815        ]));
816
817        let batch = RecordBatch::try_new(
818            schema,
819            vec![
820                Arc::new(UInt64Array::from(vec![1u64, 2u64])),
821                Arc::new(UInt64Array::from(vec![10u64, 20u64])),
822            ],
823        )
824        .unwrap();
825
826        let res = table.append(&batch);
827        assert!(matches!(res, Err(Error::Internal(_))));
828    }
829
830    #[test]
831    fn test_scan_with_u64_filter() {
832        let table = setup_test_table();
833        const COL_A_U64: FieldId = 10;
834        const COL_C_I32: FieldId = 12;
835
836        let expr = pred_expr(Filter {
837            field_id: COL_A_U64,
838            op: Operator::Equals(200.into()),
839        });
840
841        let mut vals: Vec<Option<i32>> = Vec::new();
842        table
843            .scan_stream(
844                &[proj(&table, COL_C_I32)],
845                &expr,
846                ScanStreamOptions::default(),
847                |b| {
848                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
849                    vals.extend(
850                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
851                    );
852                },
853            )
854            .unwrap();
855        assert_eq!(vals, vec![Some(20), Some(20)]);
856    }
857
858    #[test]
859    fn test_scan_with_string_filter() {
860        let pager = Arc::new(MemPager::default());
861        let table = Table::new(500, Arc::clone(&pager)).unwrap();
862
863        const COL_STR: FieldId = 42;
864        let schema = Arc::new(Schema::new(vec![
865            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
866            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
867                crate::constants::FIELD_ID_META_KEY.to_string(),
868                COL_STR.to_string(),
869            )])),
870        ]));
871
872        let batch = RecordBatch::try_new(
873            schema,
874            vec![
875                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
876                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
877            ],
878        )
879        .unwrap();
880        table.append(&batch).unwrap();
881
882        let expr = pred_expr(Filter {
883            field_id: COL_STR,
884            op: Operator::starts_with("al", true),
885        });
886
887        let mut collected: Vec<Option<String>> = Vec::new();
888        table
889            .scan_stream(
890                &[proj(&table, COL_STR)],
891                &expr,
892                ScanStreamOptions::default(),
893                |b| {
894                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
895                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
896                },
897            )
898            .unwrap();
899
900        assert_eq!(
901            collected,
902            vec![Some("alice".to_string()), Some("albert".to_string())]
903        );
904    }
905
906    #[test]
907    fn test_table_reopen_with_shared_pager() {
908        const TABLE_ALPHA: TableId = 42;
909        const TABLE_BETA: TableId = 43;
910        const TABLE_GAMMA: TableId = 44;
911        const COL_ALPHA_U64: FieldId = 100;
912        const COL_ALPHA_I32: FieldId = 101;
913        const COL_ALPHA_U32: FieldId = 102;
914        const COL_ALPHA_I16: FieldId = 103;
915        const COL_BETA_U64: FieldId = 200;
916        const COL_BETA_U8: FieldId = 201;
917        const COL_GAMMA_I16: FieldId = 300;
918
919        let pager = Arc::new(MemPager::default());
920
921        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
922        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
923        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
924        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
925        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
926
927        let beta_rows: Vec<u64> = vec![101, 102, 103];
928        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
929        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
930
931        let gamma_rows: Vec<u64> = vec![501, 502];
932        let gamma_vals_i16: Vec<i16> = vec![123, -321];
933
934        // First session: create tables and write data.
935        {
936            let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
937            let schema = Arc::new(Schema::new(vec![
938                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
939                Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
940                    crate::constants::FIELD_ID_META_KEY.to_string(),
941                    COL_ALPHA_U64.to_string(),
942                )])),
943                Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
944                    crate::constants::FIELD_ID_META_KEY.to_string(),
945                    COL_ALPHA_I32.to_string(),
946                )])),
947                Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
948                    crate::constants::FIELD_ID_META_KEY.to_string(),
949                    COL_ALPHA_U32.to_string(),
950                )])),
951                Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
952                    crate::constants::FIELD_ID_META_KEY.to_string(),
953                    COL_ALPHA_I16.to_string(),
954                )])),
955            ]));
956            let batch = RecordBatch::try_new(
957                schema,
958                vec![
959                    Arc::new(UInt64Array::from(alpha_rows.clone())),
960                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
961                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
962                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
963                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
964                ],
965            )
966            .unwrap();
967            table.append(&batch).unwrap();
968        }
969
970        {
971            let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
972            let schema = Arc::new(Schema::new(vec![
973                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
974                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
975                    crate::constants::FIELD_ID_META_KEY.to_string(),
976                    COL_BETA_U64.to_string(),
977                )])),
978                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
979                    crate::constants::FIELD_ID_META_KEY.to_string(),
980                    COL_BETA_U8.to_string(),
981                )])),
982            ]));
983            let batch = RecordBatch::try_new(
984                schema,
985                vec![
986                    Arc::new(UInt64Array::from(beta_rows.clone())),
987                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
988                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
989                ],
990            )
991            .unwrap();
992            table.append(&batch).unwrap();
993        }
994
995        {
996            let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
997            let schema = Arc::new(Schema::new(vec![
998                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
999                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1000                    crate::constants::FIELD_ID_META_KEY.to_string(),
1001                    COL_GAMMA_I16.to_string(),
1002                )])),
1003            ]));
1004            let batch = RecordBatch::try_new(
1005                schema,
1006                vec![
1007                    Arc::new(UInt64Array::from(gamma_rows.clone())),
1008                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1009                ],
1010            )
1011            .unwrap();
1012            table.append(&batch).unwrap();
1013        }
1014
1015        // Second session: reopen each table and ensure schema and values are intact.
1016        {
1017            let table = Table::new(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1018            let store = table.store();
1019
1020            let expectations: &[(FieldId, DataType)] = &[
1021                (COL_ALPHA_U64, DataType::UInt64),
1022                (COL_ALPHA_I32, DataType::Int32),
1023                (COL_ALPHA_U32, DataType::UInt32),
1024                (COL_ALPHA_I16, DataType::Int16),
1025            ];
1026
1027            for &(col, ref ty) in expectations {
1028                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1029                assert_eq!(store.data_type(lfid).unwrap(), *ty);
1030                let arr = gather_single(store, lfid, &alpha_rows);
1031                match ty {
1032                    DataType::UInt64 => {
1033                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1034                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1035                    }
1036                    DataType::Int32 => {
1037                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1038                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1039                    }
1040                    DataType::UInt32 => {
1041                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1042                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1043                    }
1044                    DataType::Int16 => {
1045                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1046                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1047                    }
1048                    other => panic!("unexpected dtype {other:?}"),
1049                }
1050            }
1051        }
1052
1053        {
1054            let table = Table::new(TABLE_BETA, Arc::clone(&pager)).unwrap();
1055            let store = table.store();
1056
1057            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1058            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1059            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1060            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1061            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1062
1063            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1064            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1065            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1066            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1067            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1068        }
1069
1070        {
1071            let table = Table::new(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1072            let store = table.store();
1073            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1074            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1075            let arr = gather_single(store, lfid, &gamma_rows);
1076            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1077            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1078        }
1079    }
1080
1081    #[test]
1082    fn test_scan_with_i32_filter() {
1083        let table = setup_test_table();
1084        const COL_A_U64: FieldId = 10;
1085        const COL_C_I32: FieldId = 12;
1086
1087        let filter = pred_expr(Filter {
1088            field_id: COL_C_I32,
1089            op: Operator::Equals(20.into()),
1090        });
1091
1092        let mut vals: Vec<Option<u64>> = Vec::new();
1093        table
1094            .scan_stream(
1095                &[proj(&table, COL_A_U64)],
1096                &filter,
1097                ScanStreamOptions::default(),
1098                |b| {
1099                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1100                    vals.extend(
1101                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1102                    );
1103                },
1104            )
1105            .unwrap();
1106        assert_eq!(vals, vec![Some(200), Some(200)]);
1107    }
1108
1109    #[test]
1110    fn test_scan_with_greater_than_filter() {
1111        let table = setup_test_table();
1112        const COL_A_U64: FieldId = 10;
1113        const COL_C_I32: FieldId = 12;
1114
1115        let filter = pred_expr(Filter {
1116            field_id: COL_C_I32,
1117            op: Operator::GreaterThan(15.into()),
1118        });
1119
1120        let mut vals: Vec<Option<u64>> = Vec::new();
1121        table
1122            .scan_stream(
1123                &[proj(&table, COL_A_U64)],
1124                &filter,
1125                ScanStreamOptions::default(),
1126                |b| {
1127                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1128                    vals.extend(
1129                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1130                    );
1131                },
1132            )
1133            .unwrap();
1134        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1135    }
1136
1137    #[test]
1138    fn test_scan_with_range_filter() {
1139        let table = setup_test_table();
1140        const COL_A_U64: FieldId = 10;
1141        const COL_C_I32: FieldId = 12;
1142
1143        let filter = pred_expr(Filter {
1144            field_id: COL_A_U64,
1145            op: Operator::Range {
1146                lower: Bound::Included(150.into()),
1147                upper: Bound::Excluded(300.into()),
1148            },
1149        });
1150
1151        let mut vals: Vec<Option<i32>> = Vec::new();
1152        table
1153            .scan_stream(
1154                &[proj(&table, COL_C_I32)],
1155                &filter,
1156                ScanStreamOptions::default(),
1157                |b| {
1158                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1159                    vals.extend(
1160                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1161                    );
1162                },
1163            )
1164            .unwrap();
1165        assert_eq!(vals, vec![Some(20), Some(20)]);
1166    }
1167
1168    #[test]
1169    fn test_filtered_scan_sum_kernel() {
1170        // Trade-off note:
1171        // - We use Arrow's sum kernel per batch, then add the partial sums.
1172        // - This preserves Arrow null semantics and avoids concat.
1173        let table = setup_test_table();
1174        const COL_A_U64: FieldId = 10;
1175
1176        let filter = pred_expr(Filter {
1177            field_id: COL_A_U64,
1178            op: Operator::Range {
1179                lower: Bound::Included(150.into()),
1180                upper: Bound::Excluded(300.into()),
1181            },
1182        });
1183
1184        let mut total: u128 = 0;
1185        table
1186            .scan_stream(
1187                &[proj(&table, COL_A_U64)],
1188                &filter,
1189                ScanStreamOptions::default(),
1190                |b| {
1191                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1192                    if let Some(part) = sum(a) {
1193                        total += part as u128;
1194                    }
1195                },
1196            )
1197            .unwrap();
1198
1199        assert_eq!(total, 400);
1200    }
1201
1202    #[test]
1203    fn test_filtered_scan_sum_i32_kernel() {
1204        // Trade-off note:
1205        // - Per-batch sum + accumulate avoids building one big Array.
1206        // - For tiny batches overhead may match manual loops, but keeps
1207        //   Arrow semantics exact.
1208        let table = setup_test_table();
1209        const COL_A_U64: FieldId = 10;
1210        const COL_C_I32: FieldId = 12;
1211
1212        let candidates = [100.into(), 300.into()];
1213        let filter = pred_expr(Filter {
1214            field_id: COL_A_U64,
1215            op: Operator::In(&candidates),
1216        });
1217
1218        let mut total: i64 = 0;
1219        table
1220            .scan_stream(
1221                &[proj(&table, COL_C_I32)],
1222                &filter,
1223                ScanStreamOptions::default(),
1224                |b| {
1225                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1226                    if let Some(part) = sum(a) {
1227                        total += part as i64;
1228                    }
1229                },
1230            )
1231            .unwrap();
1232        assert_eq!(total, 40);
1233    }
1234
1235    #[test]
1236    fn test_filtered_scan_min_max_kernel() {
1237        // Trade-off note:
1238        // - min/max are computed per batch and folded. This preserves
1239        //   Arrow's null behavior and avoids concat.
1240        // - Be mindful of NaN semantics if extended to floats later.
1241        let table = setup_test_table();
1242        const COL_A_U64: FieldId = 10;
1243        const COL_C_I32: FieldId = 12;
1244
1245        let candidates = [100.into(), 300.into()];
1246        let filter = pred_expr(Filter {
1247            field_id: COL_A_U64,
1248            op: Operator::In(&candidates),
1249        });
1250
1251        let mut mn: Option<i32> = None;
1252        let mut mx: Option<i32> = None;
1253        table
1254            .scan_stream(
1255                &[proj(&table, COL_C_I32)],
1256                &filter,
1257                ScanStreamOptions::default(),
1258                |b| {
1259                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1260
1261                    if let Some(part_min) = min(a) {
1262                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1263                    }
1264                    if let Some(part_max) = max(a) {
1265                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1266                    }
1267                },
1268            )
1269            .unwrap();
1270        assert_eq!(mn, Some(10));
1271        assert_eq!(mx, Some(30));
1272    }
1273
1274    #[test]
1275    fn test_filtered_scan_float64_column() {
1276        let table = setup_test_table();
1277        const COL_D_F64: FieldId = 13;
1278
1279        let filter = pred_expr(Filter {
1280            field_id: COL_D_F64,
1281            op: Operator::GreaterThan(2.0_f64.into()),
1282        });
1283
1284        let mut got = Vec::new();
1285        table
1286            .scan_stream(
1287                &[proj(&table, COL_D_F64)],
1288                &filter,
1289                ScanStreamOptions::default(),
1290                |b| {
1291                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1292                    for i in 0..arr.len() {
1293                        if arr.is_valid(i) {
1294                            got.push(arr.value(i));
1295                        }
1296                    }
1297                },
1298            )
1299            .unwrap();
1300
1301        assert_eq!(got, vec![2.5, 3.5, 2.5]);
1302    }
1303
1304    #[test]
1305    fn test_filtered_scan_float32_in_operator() {
1306        let table = setup_test_table();
1307        const COL_E_F32: FieldId = 14;
1308
1309        let candidates = [2.0_f32.into(), 3.0_f32.into()];
1310        let filter = pred_expr(Filter {
1311            field_id: COL_E_F32,
1312            op: Operator::In(&candidates),
1313        });
1314
1315        let mut vals: Vec<Option<f32>> = Vec::new();
1316        table
1317            .scan_stream(
1318                &[proj(&table, COL_E_F32)],
1319                &filter,
1320                ScanStreamOptions::default(),
1321                |b| {
1322                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1323                    vals.extend((0..arr.len()).map(|i| {
1324                        if arr.is_null(i) {
1325                            None
1326                        } else {
1327                            Some(arr.value(i))
1328                        }
1329                    }));
1330                },
1331            )
1332            .unwrap();
1333
1334        let collected: Vec<f32> = vals.into_iter().flatten().collect();
1335        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1336    }
1337
1338    #[test]
1339    fn test_scan_stream_and_expression() {
1340        let table = setup_test_table();
1341        const COL_A_U64: FieldId = 10;
1342        const COL_C_I32: FieldId = 12;
1343        const COL_E_F32: FieldId = 14;
1344
1345        let expr = Expr::all_of(vec![
1346            Filter {
1347                field_id: COL_C_I32,
1348                op: Operator::GreaterThan(15.into()),
1349            },
1350            Filter {
1351                field_id: COL_A_U64,
1352                op: Operator::LessThan(250.into()),
1353            },
1354        ]);
1355
1356        let mut vals: Vec<Option<f32>> = Vec::new();
1357        table
1358            .scan_stream(
1359                &[proj(&table, COL_E_F32)],
1360                &expr,
1361                ScanStreamOptions::default(),
1362                |b| {
1363                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1364                    vals.extend((0..arr.len()).map(|i| {
1365                        if arr.is_null(i) {
1366                            None
1367                        } else {
1368                            Some(arr.value(i))
1369                        }
1370                    }));
1371                },
1372            )
1373            .unwrap();
1374
1375        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1376    }
1377
1378    #[test]
1379    fn test_scan_stream_or_expression() {
1380        let table = setup_test_table();
1381        const COL_A_U64: FieldId = 10;
1382        const COL_C_I32: FieldId = 12;
1383
1384        let expr = Expr::any_of(vec![
1385            Filter {
1386                field_id: COL_C_I32,
1387                op: Operator::Equals(10.into()),
1388            },
1389            Filter {
1390                field_id: COL_C_I32,
1391                op: Operator::Equals(30.into()),
1392            },
1393        ]);
1394
1395        let mut vals: Vec<Option<u64>> = Vec::new();
1396        table
1397            .scan_stream(
1398                &[proj(&table, COL_A_U64)],
1399                &expr,
1400                ScanStreamOptions::default(),
1401                |b| {
1402                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1403                    vals.extend((0..arr.len()).map(|i| {
1404                        if arr.is_null(i) {
1405                            None
1406                        } else {
1407                            Some(arr.value(i))
1408                        }
1409                    }));
1410                },
1411            )
1412            .unwrap();
1413
1414        assert_eq!(vals, vec![Some(100), Some(300)]);
1415    }
1416
1417    #[test]
1418    fn test_scan_stream_not_predicate() {
1419        let table = setup_test_table();
1420        const COL_A_U64: FieldId = 10;
1421        const COL_C_I32: FieldId = 12;
1422
1423        let expr = Expr::not(pred_expr(Filter {
1424            field_id: COL_C_I32,
1425            op: Operator::Equals(20.into()),
1426        }));
1427
1428        let mut vals: Vec<Option<u64>> = Vec::new();
1429        table
1430            .scan_stream(
1431                &[proj(&table, COL_A_U64)],
1432                &expr,
1433                ScanStreamOptions::default(),
1434                |b| {
1435                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1436                    vals.extend((0..arr.len()).map(|i| {
1437                        if arr.is_null(i) {
1438                            None
1439                        } else {
1440                            Some(arr.value(i))
1441                        }
1442                    }));
1443                },
1444            )
1445            .unwrap();
1446
1447        assert_eq!(vals, vec![Some(100), Some(300)]);
1448    }
1449
1450    #[test]
1451    fn test_scan_stream_not_and_expression() {
1452        let table = setup_test_table();
1453        const COL_A_U64: FieldId = 10;
1454        const COL_C_I32: FieldId = 12;
1455
1456        let expr = Expr::not(Expr::all_of(vec![
1457            Filter {
1458                field_id: COL_A_U64,
1459                op: Operator::GreaterThan(150.into()),
1460            },
1461            Filter {
1462                field_id: COL_C_I32,
1463                op: Operator::LessThan(40.into()),
1464            },
1465        ]));
1466
1467        let mut vals: Vec<Option<u64>> = Vec::new();
1468        table
1469            .scan_stream(
1470                &[proj(&table, COL_A_U64)],
1471                &expr,
1472                ScanStreamOptions::default(),
1473                |b| {
1474                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1475                    vals.extend((0..arr.len()).map(|i| {
1476                        if arr.is_null(i) {
1477                            None
1478                        } else {
1479                            Some(arr.value(i))
1480                        }
1481                    }));
1482                },
1483            )
1484            .unwrap();
1485
1486        assert_eq!(vals, vec![Some(100)]);
1487    }
1488
1489    #[test]
1490    fn test_scan_stream_include_nulls_toggle() {
1491        let pager = Arc::new(MemPager::default());
1492        let table = setup_test_table_with_pager(&pager);
1493        const COL_A_U64: FieldId = 10;
1494        const COL_C_I32: FieldId = 12;
1495        const COL_B_BIN: FieldId = 11;
1496        const COL_D_F64: FieldId = 13;
1497        const COL_E_F32: FieldId = 14;
1498
1499        let schema = Arc::new(Schema::new(vec![
1500            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1501            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1502                crate::constants::FIELD_ID_META_KEY.to_string(),
1503                COL_A_U64.to_string(),
1504            )])),
1505            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1506                crate::constants::FIELD_ID_META_KEY.to_string(),
1507                COL_B_BIN.to_string(),
1508            )])),
1509            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1510                crate::constants::FIELD_ID_META_KEY.to_string(),
1511                COL_C_I32.to_string(),
1512            )])),
1513            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1514                crate::constants::FIELD_ID_META_KEY.to_string(),
1515                COL_D_F64.to_string(),
1516            )])),
1517            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1518                crate::constants::FIELD_ID_META_KEY.to_string(),
1519                COL_E_F32.to_string(),
1520            )])),
1521        ]));
1522
1523        let batch = RecordBatch::try_new(
1524            schema,
1525            vec![
1526                Arc::new(UInt64Array::from(vec![5, 6])),
1527                Arc::new(UInt64Array::from(vec![500, 600])),
1528                Arc::new(BinaryArray::from(vec![
1529                    Some(&b"new"[..]),
1530                    Some(&b"alt"[..]),
1531                ])),
1532                Arc::new(Int32Array::from(vec![Some(40), None])),
1533                Arc::new(Float64Array::from(vec![5.5, 6.5])),
1534                Arc::new(Float32Array::from(vec![5.0, 6.0])),
1535            ],
1536        )
1537        .unwrap();
1538        table.append(&batch).unwrap();
1539
1540        let filter = pred_expr(Filter {
1541            field_id: COL_A_U64,
1542            op: Operator::GreaterThan(450.into()),
1543        });
1544
1545        let mut default_vals: Vec<Option<i32>> = Vec::new();
1546        table
1547            .scan_stream(
1548                &[proj(&table, COL_C_I32)],
1549                &filter,
1550                ScanStreamOptions::default(),
1551                |b| {
1552                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1553                    default_vals.extend((0..arr.len()).map(|i| {
1554                        if arr.is_null(i) {
1555                            None
1556                        } else {
1557                            Some(arr.value(i))
1558                        }
1559                    }));
1560                },
1561            )
1562            .unwrap();
1563        assert_eq!(default_vals, vec![Some(40)]);
1564
1565        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1566        table
1567            .scan_stream(
1568                &[proj(&table, COL_C_I32)],
1569                &filter,
1570                ScanStreamOptions {
1571                    include_nulls: true,
1572                    order: None,
1573                    row_id_filter: None,
1574                },
1575                |b| {
1576                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1577
1578                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1579                    table
1580                        .scan_stream(
1581                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1582                            &filter,
1583                            ScanStreamOptions::default(),
1584                            |b| {
1585                                assert_eq!(b.num_columns(), 2);
1586                                let c_arr =
1587                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1588                                let d_arr =
1589                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1590                                for i in 0..b.num_rows() {
1591                                    let c_val = if c_arr.is_null(i) {
1592                                        None
1593                                    } else {
1594                                        Some(c_arr.value(i))
1595                                    };
1596                                    let d_val = if d_arr.is_null(i) {
1597                                        None
1598                                    } else {
1599                                        Some(d_arr.value(i))
1600                                    };
1601                                    paired_vals.push((c_val, d_val));
1602                                }
1603                            },
1604                        )
1605                        .unwrap();
1606                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1607                    include_null_vals.extend((0..arr.len()).map(|i| {
1608                        if arr.is_null(i) {
1609                            None
1610                        } else {
1611                            Some(arr.value(i))
1612                        }
1613                    }));
1614                },
1615            )
1616            .unwrap();
1617        assert_eq!(include_null_vals, vec![Some(40), None]);
1618    }
1619
1620    #[test]
1621    fn test_filtered_scan_int_sqrt_float64() {
1622        // Trade-off note:
1623        // - We cast per batch and apply a compute unary kernel for sqrt.
1624        // - This keeps processing streaming and avoids per-value loops.
1625        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
1626        //   `Float64Array` first.
1627        let table = setup_test_table();
1628        const COL_A_U64: FieldId = 10;
1629        const COL_C_I32: FieldId = 12;
1630
1631        let filter = pred_expr(Filter {
1632            field_id: COL_C_I32,
1633            op: Operator::GreaterThan(15.into()),
1634        });
1635
1636        let mut got: Vec<f64> = Vec::new();
1637        table
1638            .scan_stream(
1639                &[proj(&table, COL_A_U64)],
1640                &filter,
1641                ScanStreamOptions::default(),
1642                |b| {
1643                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1644                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1645
1646                    // unary::<Float64Type, _, Float64Type>(...)
1647                    let sqrt_arr = unary::<
1648                        arrow::datatypes::Float64Type,
1649                        _,
1650                        arrow::datatypes::Float64Type,
1651                    >(f64_arr, |v: f64| v.sqrt());
1652
1653                    for i in 0..sqrt_arr.len() {
1654                        if !sqrt_arr.is_null(i) {
1655                            got.push(sqrt_arr.value(i));
1656                        }
1657                    }
1658                },
1659            )
1660            .unwrap();
1661
1662        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1663        assert_eq!(got, expected);
1664    }
1665
1666    #[test]
1667    fn test_multi_field_kernels_with_filters() {
1668        // Trade-off note:
1669        // - All reductions use per-batch kernels + accumulation to stay
1670        //   streaming. No concat or whole-column materialization.
1671        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1672
1673        let table = Table::new(2, Arc::new(MemPager::default())).unwrap();
1674
1675        const COL_A_U64: FieldId = 20;
1676        const COL_D_U32: FieldId = 21;
1677        const COL_E_I16: FieldId = 22;
1678        const COL_F_U8: FieldId = 23;
1679        const COL_C_I32: FieldId = 24;
1680
1681        let schema = Arc::new(Schema::new(vec![
1682            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1683            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1684                crate::constants::FIELD_ID_META_KEY.to_string(),
1685                COL_A_U64.to_string(),
1686            )])),
1687            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1688                crate::constants::FIELD_ID_META_KEY.to_string(),
1689                COL_D_U32.to_string(),
1690            )])),
1691            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1692                crate::constants::FIELD_ID_META_KEY.to_string(),
1693                COL_E_I16.to_string(),
1694            )])),
1695            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1696                crate::constants::FIELD_ID_META_KEY.to_string(),
1697                COL_F_U8.to_string(),
1698            )])),
1699            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1700                crate::constants::FIELD_ID_META_KEY.to_string(),
1701                COL_C_I32.to_string(),
1702            )])),
1703        ]));
1704
1705        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
1706        let batch = RecordBatch::try_new(
1707            schema.clone(),
1708            vec![
1709                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1710                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1711                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1712                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1713                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1714                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1715            ],
1716        )
1717        .unwrap();
1718
1719        table.append(&batch).unwrap();
1720
1721        // Filter: c_i32 >= 20.
1722        let filter = pred_expr(Filter {
1723            field_id: COL_C_I32,
1724            op: Operator::GreaterThanOrEquals(20.into()),
1725        });
1726
1727        // 1) SUM over d_u32 (per-batch sum + accumulate).
1728        let mut d_sum: u128 = 0;
1729        table
1730            .scan_stream(
1731                &[proj(&table, COL_D_U32)],
1732                &filter,
1733                ScanStreamOptions::default(),
1734                |b| {
1735                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1736                    if let Some(part) = sum(a) {
1737                        d_sum += part as u128;
1738                    }
1739                },
1740            )
1741            .unwrap();
1742        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1743
1744        // 2) MIN over e_i16 (per-batch min + fold).
1745        let mut e_min: Option<i16> = None;
1746        table
1747            .scan_stream(
1748                &[proj(&table, COL_E_I16)],
1749                &filter,
1750                ScanStreamOptions::default(),
1751                |b| {
1752                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1753                    if let Some(part_min) = min(a) {
1754                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1755                    }
1756                },
1757            )
1758            .unwrap();
1759        assert_eq!(e_min, Some(-6));
1760
1761        // 3) MAX over f_u8 (per-batch max + fold).
1762        let mut f_max: Option<u8> = None;
1763        table
1764            .scan_stream(
1765                &[proj(&table, COL_F_U8)],
1766                &filter,
1767                ScanStreamOptions::default(),
1768                |b| {
1769                    let a = b
1770                        .column(0)
1771                        .as_any()
1772                        .downcast_ref::<arrow::array::UInt8Array>()
1773                        .unwrap();
1774                    if let Some(part_max) = max(a) {
1775                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1776                    }
1777                },
1778            )
1779            .unwrap();
1780        assert_eq!(f_max, Some(10));
1781
1782        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
1783        let mut got: Vec<f64> = Vec::new();
1784        table
1785            .scan_stream(
1786                &[proj(&table, COL_A_U64)],
1787                &filter,
1788                ScanStreamOptions::default(),
1789                |b| {
1790                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1791                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1792                    let sqrt_arr = unary::<
1793                        arrow::datatypes::Float64Type,
1794                        _,
1795                        arrow::datatypes::Float64Type,
1796                    >(f64_arr, |v: f64| v.sqrt());
1797
1798                    for i in 0..sqrt_arr.len() {
1799                        if !sqrt_arr.is_null(i) {
1800                            got.push(sqrt_arr.value(i));
1801                        }
1802                    }
1803                },
1804            )
1805            .unwrap();
1806        let expected = [15.0_f64, 20.0, 30.0, 40.0];
1807        assert_eq!(got, expected);
1808    }
1809
1810    #[test]
1811    fn test_scan_with_in_filter() {
1812        let table = setup_test_table();
1813        const COL_A_U64: FieldId = 10;
1814        const COL_C_I32: FieldId = 12;
1815
1816        // IN now uses untyped literals, too.
1817        let candidates = [10.into(), 30.into()];
1818        let filter = pred_expr(Filter {
1819            field_id: COL_C_I32,
1820            op: Operator::In(&candidates),
1821        });
1822
1823        let mut vals: Vec<Option<u64>> = Vec::new();
1824        table
1825            .scan_stream(
1826                &[proj(&table, COL_A_U64)],
1827                &filter,
1828                ScanStreamOptions::default(),
1829                |b| {
1830                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1831                    vals.extend(
1832                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1833                    );
1834                },
1835            )
1836            .unwrap();
1837        assert_eq!(vals, vec![Some(100), Some(300)]);
1838    }
1839
1840    #[test]
1841    fn test_scan_stream_single_column_batches() {
1842        let table = setup_test_table();
1843        const COL_A_U64: FieldId = 10;
1844        const COL_C_I32: FieldId = 12;
1845
1846        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
1847        let filter = pred_expr(Filter {
1848            field_id: COL_C_I32,
1849            op: Operator::Equals(20.into()),
1850        });
1851
1852        let mut seen_cols = Vec::<u64>::new();
1853        table
1854            .scan_stream(
1855                &[proj(&table, COL_A_U64)],
1856                &filter,
1857                ScanStreamOptions::default(),
1858                |b| {
1859                    assert_eq!(b.num_columns(), 1);
1860                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1861                    // No kernel needed; just collect values for shape assertions.
1862                    for i in 0..a.len() {
1863                        if !a.is_null(i) {
1864                            seen_cols.push(a.value(i));
1865                        }
1866                    }
1867                },
1868            )
1869            .unwrap();
1870
1871        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
1872        assert_eq!(seen_cols, vec![200, 200]);
1873    }
1874
1875    #[test]
1876    fn test_scan_with_multiple_projection_columns() {
1877        let table = setup_test_table();
1878        const COL_A_U64: FieldId = 10;
1879        const COL_C_I32: FieldId = 12;
1880
1881        let filter = pred_expr(Filter {
1882            field_id: COL_C_I32,
1883            op: Operator::Equals(20.into()),
1884        });
1885
1886        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1887
1888        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1889        table
1890            .scan_stream(
1891                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1892                &filter,
1893                ScanStreamOptions::default(),
1894                |b| {
1895                    assert_eq!(b.num_columns(), 2);
1896                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1897                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1898
1899                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1900                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1901                    for i in 0..b.num_rows() {
1902                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1903                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1904                        combined.push((left, right));
1905                    }
1906                },
1907            )
1908            .unwrap();
1909
1910        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
1911    }
1912
1913    #[test]
1914    fn test_scan_stream_projection_validation() {
1915        let table = setup_test_table();
1916        const COL_A_U64: FieldId = 10;
1917        const COL_C_I32: FieldId = 12;
1918
1919        let filter = pred_expr(Filter {
1920            field_id: COL_C_I32,
1921            op: Operator::Equals(20.into()),
1922        });
1923
1924        let empty: [Projection; 0] = [];
1925        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
1926        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
1927
1928        // Duplicate projections are allowed: the same column will be
1929        // gathered once and duplicated in the output in the requested
1930        // order. Verify the call succeeds and produces two identical
1931        // columns per batch.
1932        let duplicate = [
1933            proj(&table, COL_A_U64),
1934            proj_alias(&table, COL_A_U64, "alias_a"),
1935        ];
1936        let mut collected = Vec::<u64>::new();
1937        table
1938            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
1939                assert_eq!(b.num_columns(), 2);
1940                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
1941                assert_eq!(b.schema().field(1).name(), "alias_a");
1942                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1943                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1944                for i in 0..b.num_rows() {
1945                    if !a0.is_null(i) {
1946                        collected.push(a0.value(i));
1947                    }
1948                    if !a1.is_null(i) {
1949                        collected.push(a1.value(i));
1950                    }
1951                }
1952            })
1953            .unwrap();
1954        // Two matching rows, two columns per row -> four values.
1955        assert_eq!(collected, vec![200, 200, 200, 200]);
1956    }
1957
1958    #[test]
1959    fn test_scan_stream_computed_projection() {
1960        let table = setup_test_table();
1961        const COL_A_U64: FieldId = 10;
1962
1963        let projections = [
1964            ScanProjection::column(proj(&table, COL_A_U64)),
1965            ScanProjection::computed(
1966                ScalarExpr::binary(
1967                    ScalarExpr::column(COL_A_U64),
1968                    BinaryOp::Multiply,
1969                    ScalarExpr::literal(2),
1970                ),
1971                "a_times_two",
1972            ),
1973        ];
1974
1975        let filter = pred_expr(Filter {
1976            field_id: COL_A_U64,
1977            op: Operator::GreaterThanOrEquals(0.into()),
1978        });
1979
1980        let mut computed: Vec<(u64, f64)> = Vec::new();
1981        table
1982            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
1983                assert_eq!(b.num_columns(), 2);
1984                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1985                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1986                for i in 0..b.num_rows() {
1987                    if base.is_null(i) || comp.is_null(i) {
1988                        continue;
1989                    }
1990                    computed.push((base.value(i), comp.value(i)));
1991                }
1992            })
1993            .unwrap();
1994
1995        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
1996        assert_eq!(computed, expected);
1997    }
1998
1999    #[test]
2000    fn test_scan_stream_multi_column_filter_compare() {
2001        let table = setup_test_table();
2002        const COL_A_U64: FieldId = 10;
2003        const COL_C_I32: FieldId = 12;
2004
2005        let expr = Expr::Compare {
2006            left: ScalarExpr::binary(
2007                ScalarExpr::column(COL_A_U64),
2008                BinaryOp::Add,
2009                ScalarExpr::column(COL_C_I32),
2010            ),
2011            op: CompareOp::Gt,
2012            right: ScalarExpr::literal(220_i64),
2013        };
2014
2015        let mut vals: Vec<Option<u64>> = Vec::new();
2016        table
2017            .scan_stream(
2018                &[proj(&table, COL_A_U64)],
2019                &expr,
2020                ScanStreamOptions::default(),
2021                |b| {
2022                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2023                    for i in 0..b.num_rows() {
2024                        vals.push(if col.is_null(i) {
2025                            None
2026                        } else {
2027                            Some(col.value(i))
2028                        });
2029                    }
2030                },
2031            )
2032            .unwrap();
2033
2034        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2035    }
2036}