llkv_table/
table.rs

1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::stream::ColumnStream;
7use crate::types::TableId;
8
9use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
10use arrow::datatypes::{DataType, Field, Schema};
11use std::collections::HashMap;
12
13use crate::constants::STREAM_BATCH_ROWS;
14use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
15use llkv_column_map::{ColumnStore, types::LogicalFieldId};
16use llkv_storage::pager::{MemPager, Pager};
17use simd_r_drive_entry_handle::EntryHandle;
18
19use crate::reserved::is_reserved_table_id;
20use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
21use crate::types::{FieldId, RowId};
22use llkv_expr::{Expr, ScalarExpr};
23use llkv_result::{Error, Result as LlkvResult};
24
25/// Cached information about which system columns exist in the table schema.
26/// This avoids repeated string comparisons in hot paths like append().
27#[derive(Debug, Clone, Copy)]
28struct MvccColumnCache {
29    has_created_by: bool,
30    has_deleted_by: bool,
31}
32
33/// Handle for data operations on a table.
34///
35pub struct Table<P = MemPager>
36where
37    P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39    store: Arc<ColumnStore<P>>,
40    table_id: TableId,
41    /// Cache of MVCC column presence. Initialized lazily on first schema() call.
42    /// None means not yet initialized.
43    mvcc_cache: RwLock<Option<MvccColumnCache>>,
44}
45
46/// Filter row IDs before they are materialized into batches.
47///
48/// This trait allows implementations to enforce transaction visibility (MVCC),
49/// access control, or other row-level filtering policies. The filter is applied
50/// after column-level predicates but before data is gathered into Arrow batches.
51///
52/// # Example Use Case
53///
54/// MVCC implementations use this to hide rows that were:
55/// - Created after the transaction's snapshot timestamp
56/// - Deleted before the transaction's snapshot timestamp
57pub trait RowIdFilter<P>: Send + Sync
58where
59    P: Pager<Blob = EntryHandle> + Send + Sync,
60{
61    /// Filter a list of row IDs, returning only those that should be visible.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if visibility metadata cannot be loaded or is corrupted.
66    fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> LlkvResult<Vec<RowId>>;
67}
68
69/// Options for configuring table scans.
70///
71/// These options control how rows are filtered, ordered, and materialized during
72/// scan operations.
73pub struct ScanStreamOptions<P = MemPager>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Whether to include rows where all projected columns are null.
78    ///
79    /// When `false` (default), rows with all-null projections are dropped before
80    /// batches are yielded. This is useful for sparse data where many rows may not
81    /// have values for the selected columns.
82    pub include_nulls: bool,
83    /// Optional ordering to apply to results.
84    ///
85    /// If specified, row IDs are sorted according to this specification before
86    /// data is gathered into batches.
87    pub order: Option<ScanOrderSpec>,
88    /// Optional filter for row-level visibility (e.g., MVCC).
89    ///
90    /// Applied after column-level predicates but before data is materialized.
91    /// Used to enforce transaction isolation.
92    pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
93}
94
95impl<P> fmt::Debug for ScanStreamOptions<P>
96where
97    P: Pager<Blob = EntryHandle> + Send + Sync,
98{
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("ScanStreamOptions")
101            .field("include_nulls", &self.include_nulls)
102            .field("order", &self.order)
103            .field(
104                "row_id_filter",
105                &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
106            )
107            .finish()
108    }
109}
110
111impl<P> Clone for ScanStreamOptions<P>
112where
113    P: Pager<Blob = EntryHandle> + Send + Sync,
114{
115    fn clone(&self) -> Self {
116        Self {
117            include_nulls: self.include_nulls,
118            order: self.order,
119            row_id_filter: self.row_id_filter.clone(),
120        }
121    }
122}
123
124impl<P> Default for ScanStreamOptions<P>
125where
126    P: Pager<Blob = EntryHandle> + Send + Sync,
127{
128    fn default() -> Self {
129        Self {
130            include_nulls: false,
131            order: None,
132            row_id_filter: None,
133        }
134    }
135}
136
137/// Specification for ordering scan results.
138///
139/// Defines how to sort rows based on a single column's values.
140#[derive(Clone, Copy, Debug)]
141pub struct ScanOrderSpec {
142    /// The field to sort by.
143    pub field_id: FieldId,
144    /// Sort direction (ascending or descending).
145    pub direction: ScanOrderDirection,
146    /// Whether null values appear first or last.
147    pub nulls_first: bool,
148    /// Optional transformation to apply before sorting.
149    pub transform: ScanOrderTransform,
150}
151
152/// Sort direction for scan ordering.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum ScanOrderDirection {
155    /// Sort from smallest to largest.
156    Ascending,
157    /// Sort from largest to smallest.
158    Descending,
159}
160
161/// Value transformation to apply before sorting.
162///
163/// Used to enable sorting on columns that need type coercion or conversion.
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum ScanOrderTransform {
166    /// Sort integers as-is.
167    IdentityInteger,
168    /// Sort strings lexicographically.
169    IdentityUtf8,
170    /// Parse strings as integers, then sort numerically.
171    CastUtf8ToInteger,
172}
173
174/// A column or computed expression to include in scan results.
175///
176/// Scans can project either stored columns or expressions computed from them.
177#[derive(Clone, Debug)]
178pub enum ScanProjection {
179    /// Project a stored column directly.
180    Column(Projection),
181    /// Compute a value from an expression and return it with an alias.
182    Computed {
183        /// The expression to evaluate (can reference column field IDs).
184        expr: ScalarExpr<FieldId>,
185        /// The name to give the computed column in results.
186        alias: String,
187    },
188}
189
190impl ScanProjection {
191    /// Create a projection for a stored column.
192    pub fn column<P: Into<Projection>>(proj: P) -> Self {
193        Self::Column(proj.into())
194    }
195
196    /// Create a projection for a computed expression.
197    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
198        Self::Computed {
199            expr,
200            alias: alias.into(),
201        }
202    }
203}
204
205impl From<Projection> for ScanProjection {
206    fn from(value: Projection) -> Self {
207        ScanProjection::Column(value)
208    }
209}
210
211impl From<&Projection> for ScanProjection {
212    fn from(value: &Projection) -> Self {
213        ScanProjection::Column(value.clone())
214    }
215}
216
217impl From<&ScanProjection> for ScanProjection {
218    fn from(value: &ScanProjection) -> Self {
219        value.clone()
220    }
221}
222
223impl<P> Table<P>
224where
225    P: Pager<Blob = EntryHandle> + Send + Sync,
226{
227    /// Create a new table from column specifications.
228    ///
229    /// Coordinates metadata persistence, catalog registration, and storage initialization.
230    pub fn create_from_columns(
231        display_name: &str,
232        canonical_name: &str,
233        columns: &[llkv_plan::PlanColumnSpec],
234        metadata: Arc<crate::metadata::MetadataManager<P>>,
235        catalog: Arc<crate::catalog::TableCatalog>,
236        store: Arc<ColumnStore<P>>,
237    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
238        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
239        service.create_table_from_columns(display_name, canonical_name, columns)
240    }
241
242    /// Create a new table from an Arrow schema (for CREATE TABLE AS SELECT).
243    pub fn create_from_schema(
244        display_name: &str,
245        canonical_name: &str,
246        schema: &arrow::datatypes::Schema,
247        metadata: Arc<crate::metadata::MetadataManager<P>>,
248        catalog: Arc<crate::catalog::TableCatalog>,
249        store: Arc<ColumnStore<P>>,
250    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
251        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
252        service.create_table_from_schema(display_name, canonical_name, schema)
253    }
254
255    /// Internal constructor: wrap a table ID with column store access.
256    ///
257    /// **This is for internal crate use only.** User code should create tables via
258    /// `CatalogService::create_table_*()`. For tests, use `Table::from_id()`.
259    #[doc(hidden)]
260    pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
261        if is_reserved_table_id(table_id) {
262            return Err(Error::ReservedTableId(table_id));
263        }
264
265        tracing::trace!(
266            "Table::from_id: Opening table_id={} with pager at {:p}",
267            table_id,
268            &*pager
269        );
270        let store = ColumnStore::open(pager)?;
271        Ok(Self {
272            store: Arc::new(store),
273            table_id,
274            mvcc_cache: RwLock::new(None),
275        })
276    }
277
278    /// Internal constructor: wrap a table ID with a shared column store.
279    ///
280    /// **This is for internal crate use only.** Preferred over `from_id()` when
281    /// multiple tables share the same store. For tests, use `Table::from_id_with_store()`.
282    #[doc(hidden)]
283    pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
284        if is_reserved_table_id(table_id) {
285            return Err(Error::ReservedTableId(table_id));
286        }
287
288        Ok(Self {
289            store,
290            table_id,
291            mvcc_cache: RwLock::new(None),
292        })
293    }
294
295    /// Register a persisted sort index for the specified user column.
296    pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
297        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
298        self.store
299            .register_index(logical_field_id, IndexKind::Sort)?;
300        Ok(())
301    }
302
303    /// Remove a persisted sort index for the specified user column if it exists.
304    pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
305        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
306        match self
307            .store
308            .unregister_index(logical_field_id, IndexKind::Sort)
309        {
310            Ok(()) | Err(Error::NotFound) => Ok(()),
311            Err(err) => Err(err),
312        }
313    }
314
315    /// List the persisted index kinds registered for the given user column.
316    pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
317        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
318        match self.store.list_persisted_indexes(logical_field_id) {
319            Ok(kinds) => Ok(kinds),
320            Err(Error::NotFound) => Ok(Vec::new()),
321            Err(err) => Err(err),
322        }
323    }
324
325    /// Get or initialize the MVCC column cache from the provided schema.
326    /// This is an optimization to avoid repeated string comparisons in append().
327    fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
328        // Fast path: check if cache is already initialized
329        {
330            let cache_read = self.mvcc_cache.read().unwrap();
331            if let Some(cache) = *cache_read {
332                return cache;
333            }
334        }
335
336        // Slow path: initialize cache from schema
337        let has_created_by = schema
338            .fields()
339            .iter()
340            .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
341        let has_deleted_by = schema
342            .fields()
343            .iter()
344            .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
345
346        let cache = MvccColumnCache {
347            has_created_by,
348            has_deleted_by,
349        };
350
351        // Store in cache for future calls
352        *self.mvcc_cache.write().unwrap() = Some(cache);
353
354        cache
355    }
356
357    /// Append a [`RecordBatch`] to the table.
358    ///
359    /// The batch must include:
360    /// - A `row_id` column (type `UInt64`) with unique row identifiers
361    /// - `field_id` metadata for each user column, mapping to this table's field IDs
362    ///
363    /// # MVCC Columns
364    ///
365    /// If the batch includes `created_by` or `deleted_by` columns, they are automatically
366    /// assigned the correct [`LogicalFieldId`] for this table's MVCC metadata.
367    ///
368    /// # Field ID Mapping
369    ///
370    /// Each column's `field_id` metadata is converted to a [`LogicalFieldId`] by combining
371    /// it with this table's ID. This ensures columns from different tables don't collide
372    /// in the underlying storage.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if:
377    /// - The batch is missing the `row_id` column
378    /// - Any user column is missing `field_id` metadata
379    /// - Field IDs are invalid or malformed
380    /// - The underlying storage operation fails
381    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
382        use arrow::array::UInt64Builder;
383
384        // Check if MVCC columns already exist in the batch using cache
385        // This avoids repeated string comparisons on every append
386        let cache = self.get_mvcc_cache(&batch.schema());
387        let has_created_by = cache.has_created_by;
388        let has_deleted_by = cache.has_deleted_by;
389
390        let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
391        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
392
393        for (idx, field) in batch.schema().fields().iter().enumerate() {
394            let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
395            // System columns (row_id, MVCC columns) don't need field_id metadata
396            if maybe_field_id.is_none()
397                && (field.name() == ROW_ID_COLUMN_NAME
398                    || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
399                    || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
400            {
401                if field.name() == ROW_ID_COLUMN_NAME {
402                    new_fields.push(field.as_ref().clone());
403                    new_columns.push(batch.column(idx).clone());
404                } else {
405                    let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
406                        LogicalFieldId::for_mvcc_created_by(self.table_id)
407                    } else {
408                        LogicalFieldId::for_mvcc_deleted_by(self.table_id)
409                    };
410
411                    let mut metadata = field.metadata().clone();
412                    let lfid_val: u64 = lfid.into();
413                    metadata.insert(
414                        crate::constants::FIELD_ID_META_KEY.to_string(),
415                        lfid_val.to_string(),
416                    );
417
418                    let new_field =
419                        Field::new(field.name(), field.data_type().clone(), field.is_nullable())
420                            .with_metadata(metadata);
421                    new_fields.push(new_field);
422                    new_columns.push(batch.column(idx).clone());
423                }
424                continue;
425            }
426
427            let raw_field_id = maybe_field_id
428                .ok_or_else(|| {
429                    llkv_result::Error::Internal(format!(
430                        "Field '{}' is missing a valid '{}' in its metadata.",
431                        field.name(),
432                        crate::constants::FIELD_ID_META_KEY
433                    ))
434                })?
435                .parse::<u64>()
436                .map_err(|err| {
437                    llkv_result::Error::Internal(format!(
438                        "Field '{}' contains an invalid '{}': {}",
439                        field.name(),
440                        crate::constants::FIELD_ID_META_KEY,
441                        err
442                    ))
443                })?;
444
445            if raw_field_id > FieldId::MAX as u64 {
446                return Err(llkv_result::Error::Internal(format!(
447                    "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
448                    field.name(),
449                    FieldId::MAX,
450                    raw_field_id
451                )));
452            }
453
454            let user_field_id = raw_field_id as FieldId;
455            let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
456
457            // Store the fully-qualified logical field id in the metadata we hand to the
458            // column store so descriptors are registered under the correct table id.
459            let lfid = logical_field_id;
460            let mut new_metadata = field.metadata().clone();
461            let lfid_val: u64 = lfid.into();
462            new_metadata.insert(
463                crate::constants::FIELD_ID_META_KEY.to_string(),
464                lfid_val.to_string(),
465            );
466
467            let new_field =
468                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
469                    .with_metadata(new_metadata);
470            new_fields.push(new_field);
471            new_columns.push(batch.column(idx).clone());
472
473            // Ensure the catalog remembers the human-friendly column name for
474            // this field so callers of `Table::schema()` (and other metadata
475            // consumers) can recover it later. The CSV ingest path (and other
476            // writers) may only supply the `field_id` metadata on the batch,
477            // so defensively persist the column name when absent.
478            let need_meta = match self
479                .catalog()
480                .get_cols_meta(self.table_id, &[user_field_id])
481            {
482                metas if metas.is_empty() => true,
483                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
484            };
485
486            if need_meta {
487                let meta = ColMeta {
488                    col_id: user_field_id,
489                    name: Some(field.name().to_string()),
490                    flags: 0,
491                    default: None,
492                };
493                self.catalog().put_col_meta(self.table_id, &meta);
494            }
495        }
496
497        // Inject MVCC columns if they don't exist
498        // For non-transactional appends (e.g., CSV ingest), we use TXN_ID_AUTO_COMMIT (1)
499        // which is treated as "committed by system" and always visible.
500        // Use TXN_ID_NONE (0) for deleted_by to indicate "not deleted".
501        const TXN_ID_AUTO_COMMIT: u64 = 1;
502        const TXN_ID_NONE: u64 = 0;
503        let row_count = batch.num_rows();
504
505        if !has_created_by {
506            let mut created_by_builder = UInt64Builder::with_capacity(row_count);
507            for _ in 0..row_count {
508                created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
509            }
510            let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
511            let mut metadata = HashMap::new();
512            let lfid_val: u64 = created_by_lfid.into();
513            metadata.insert(
514                crate::constants::FIELD_ID_META_KEY.to_string(),
515                lfid_val.to_string(),
516            );
517            new_fields.push(
518                Field::new(
519                    llkv_column_map::store::CREATED_BY_COLUMN_NAME,
520                    DataType::UInt64,
521                    false,
522                )
523                .with_metadata(metadata),
524            );
525            new_columns.push(Arc::new(created_by_builder.finish()));
526        }
527
528        if !has_deleted_by {
529            let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
530            for _ in 0..row_count {
531                deleted_by_builder.append_value(TXN_ID_NONE);
532            }
533            let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
534            let mut metadata = HashMap::new();
535            let lfid_val: u64 = deleted_by_lfid.into();
536            metadata.insert(
537                crate::constants::FIELD_ID_META_KEY.to_string(),
538                lfid_val.to_string(),
539            );
540            new_fields.push(
541                Field::new(
542                    llkv_column_map::store::DELETED_BY_COLUMN_NAME,
543                    DataType::UInt64,
544                    false,
545                )
546                .with_metadata(metadata),
547            );
548            new_columns.push(Arc::new(deleted_by_builder.finish()));
549        }
550
551        let new_schema = Arc::new(Schema::new(new_fields));
552        let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
553
554        tracing::trace!(
555            table_id = self.table_id,
556            num_columns = namespaced_batch.num_columns(),
557            num_rows = namespaced_batch.num_rows(),
558            "Attempting append to table"
559        );
560
561        if let Err(err) = self.store.append(&namespaced_batch) {
562            let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
563                .schema()
564                .fields()
565                .iter()
566                .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
567                .filter_map(|s| s.parse::<u64>().ok())
568                .map(LogicalFieldId::from)
569                .collect();
570
571            // Check which fields are missing from the catalog
572            let missing_fields: Vec<LogicalFieldId> = batch_field_ids
573                .iter()
574                .filter(|&&field_id| !self.store.has_field(field_id))
575                .copied()
576                .collect();
577
578            tracing::error!(
579                table_id = self.table_id,
580                error = ?err,
581                batch_field_ids = ?batch_field_ids,
582                missing_from_catalog = ?missing_fields,
583                "Append failed - some fields missing from catalog"
584            );
585            return Err(err);
586        }
587        Ok(())
588    }
589
590    /// Stream one or more projected columns as a sequence of RecordBatches.
591    ///
592    /// - Avoids `concat` and large materializations.
593    /// - Uses the same filter machinery as the old `scan` to produce
594    ///   `row_ids`.
595    /// - Splits `row_ids` into fixed-size windows and gathers rows per
596    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
597    pub fn scan_stream<'a, I, T, F>(
598        &self,
599        projections: I,
600        filter_expr: &Expr<'a, FieldId>,
601        options: ScanStreamOptions<P>,
602        on_batch: F,
603    ) -> LlkvResult<()>
604    where
605        I: IntoIterator<Item = T>,
606        T: Into<ScanProjection>,
607        F: FnMut(RecordBatch),
608    {
609        let stream_projections: Vec<ScanProjection> =
610            projections.into_iter().map(|p| p.into()).collect();
611        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
612    }
613
614    /// Stream projections using fully resolved expression inputs.
615    ///
616    /// Callers that already parsed expressions into [`ScanProjection`] values can
617    /// use this entry point to skip the iterator conversion performed by
618    /// [`Self::scan_stream`]. The execution semantics and callbacks are identical.
619    pub fn scan_stream_with_exprs<'a, F>(
620        &self,
621        projections: &[ScanProjection],
622        filter_expr: &Expr<'a, FieldId>,
623        options: ScanStreamOptions<P>,
624        on_batch: F,
625    ) -> LlkvResult<()>
626    where
627        F: FnMut(RecordBatch),
628    {
629        TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
630    }
631
632    pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<RowId>> {
633        collect_row_ids_for_table(self, filter_expr)
634    }
635
636    #[inline]
637    pub fn catalog(&self) -> SysCatalog<'_, P> {
638        SysCatalog::new(&self.store)
639    }
640
641    #[inline]
642    pub fn get_table_meta(&self) -> Option<TableMeta> {
643        self.catalog().get_table_meta(self.table_id)
644    }
645
646    #[inline]
647    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
648        self.catalog().get_cols_meta(self.table_id, col_ids)
649    }
650
651    /// Build and return an Arrow `Schema` that describes this table.
652    ///
653    /// The returned schema includes the `row_id` field first, followed by
654    /// user fields. Each user field has its `field_id` stored in the field
655    /// metadata (under the "field_id" key) and the name is taken from the
656    /// catalog when available or falls back to `col_<id>`.
657    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
658        // Collect logical fields for this table and sort by field id.
659        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
660        logical_fields.sort_by_key(|lfid| lfid.field_id());
661
662        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
663        let metas = self.get_cols_meta(&field_ids);
664
665        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
666        // Add row_id first
667        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
668
669        for (idx, lfid) in logical_fields.into_iter().enumerate() {
670            let fid = lfid.field_id();
671            let dtype = self.store.data_type(lfid)?;
672            let name = metas
673                .get(idx)
674                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
675                .unwrap_or_else(|| format!("col_{}", fid));
676
677            let mut metadata: HashMap<String, String> = HashMap::new();
678            metadata.insert(
679                crate::constants::FIELD_ID_META_KEY.to_string(),
680                fid.to_string(),
681            );
682
683            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
684        }
685
686        Ok(Arc::new(Schema::new(fields)))
687    }
688
689    /// Return the table schema formatted as an Arrow RecordBatch suitable
690    /// for pretty printing. The batch has three columns: `name` (Utf8),
691    /// `field_id` (UInt32) and `data_type` (Utf8).
692    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
693        let schema = self.schema()?;
694        let fields = schema.fields();
695
696        let mut names: Vec<String> = Vec::with_capacity(fields.len());
697        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
698        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
699
700        for field in fields.iter() {
701            names.push(field.name().to_string());
702            let fid = field
703                .metadata()
704                .get(crate::constants::FIELD_ID_META_KEY)
705                .and_then(|s| s.parse::<u32>().ok())
706                .unwrap_or(0u32);
707            fids.push(fid);
708            dtypes.push(format!("{:?}", field.data_type()));
709        }
710
711        // Build Arrow arrays
712        let name_array: ArrayRef = Arc::new(StringArray::from(names));
713        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
714        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
715
716        let rb_schema = Arc::new(Schema::new(vec![
717            Field::new("name", DataType::Utf8, false),
718            Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
719            Field::new("data_type", DataType::Utf8, false),
720        ]));
721
722        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
723        Ok(batch)
724    }
725
726    /// Create a streaming view over the provided row IDs for the specified logical fields.
727    pub fn stream_columns(
728        &self,
729        logical_fields: impl Into<Arc<[LogicalFieldId]>>,
730        row_ids: Vec<RowId>,
731        policy: GatherNullPolicy,
732    ) -> LlkvResult<ColumnStream<'_, P>> {
733        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
734        let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
735        Ok(ColumnStream::new(
736            &self.store,
737            ctx,
738            row_ids,
739            STREAM_BATCH_ROWS,
740            policy,
741            logical_fields,
742        ))
743    }
744
745    pub fn store(&self) -> &ColumnStore<P> {
746        &self.store
747    }
748
749    #[inline]
750    pub fn table_id(&self) -> TableId {
751        self.table_id
752    }
753
754    /// Return the total number of rows for a given user column id in this table.
755    ///
756    /// This delegates to the ColumnStore descriptor for the logical field that
757    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
758    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
759        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
760        self.store.total_rows_for_field(lfid)
761    }
762
763    /// Return the total number of rows for this table.
764    ///
765    /// Prefer reading the dedicated row-id shadow column if present; otherwise
766    /// fall back to inspecting any persisted user column descriptor.
767    pub fn total_rows(&self) -> llkv_result::Result<u64> {
768        use llkv_column_map::store::rowid_fid;
769        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
770        // Try the row-id shadow column first
771        match self.store.total_rows_for_field(rid_lfid) {
772            Ok(n) => Ok(n),
773            Err(_) => {
774                // Fall back to scanning the catalog for any user-data column
775                self.store.total_rows_for_table(self.table_id)
776            }
777        }
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784    use crate::reserved::CATALOG_TABLE_ID;
785    use crate::types::RowId;
786    use arrow::array::Array;
787    use arrow::array::ArrayRef;
788    use arrow::array::{
789        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
790        UInt32Array, UInt64Array,
791    };
792    use arrow::compute::{cast, max, min, sum, unary};
793    use arrow::datatypes::DataType;
794    use llkv_column_map::ColumnStore;
795    use llkv_column_map::store::GatherNullPolicy;
796    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
797    use std::collections::HashMap;
798    use std::ops::Bound;
799
800    fn setup_test_table() -> Table {
801        let pager = Arc::new(MemPager::default());
802        setup_test_table_with_pager(&pager)
803    }
804
805    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
806        let table = Table::from_id(1, Arc::clone(pager)).unwrap();
807        const COL_A_U64: FieldId = 10;
808        const COL_B_BIN: FieldId = 11;
809        const COL_C_I32: FieldId = 12;
810        const COL_D_F64: FieldId = 13;
811        const COL_E_F32: FieldId = 14;
812
813        let schema = Arc::new(Schema::new(vec![
814            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
815            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
816                crate::constants::FIELD_ID_META_KEY.to_string(),
817                COL_A_U64.to_string(),
818            )])),
819            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
820                crate::constants::FIELD_ID_META_KEY.to_string(),
821                COL_B_BIN.to_string(),
822            )])),
823            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
824                crate::constants::FIELD_ID_META_KEY.to_string(),
825                COL_C_I32.to_string(),
826            )])),
827            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
828                crate::constants::FIELD_ID_META_KEY.to_string(),
829                COL_D_F64.to_string(),
830            )])),
831            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
832                crate::constants::FIELD_ID_META_KEY.to_string(),
833                COL_E_F32.to_string(),
834            )])),
835        ]));
836
837        let batch = RecordBatch::try_new(
838            schema.clone(),
839            vec![
840                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
841                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
842                Arc::new(BinaryArray::from(vec![
843                    b"foo" as &[u8],
844                    b"bar",
845                    b"baz",
846                    b"qux",
847                ])),
848                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
849                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
850                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
851            ],
852        )
853        .unwrap();
854
855        table.append(&batch).unwrap();
856        table
857    }
858
859    fn gather_single(
860        store: &ColumnStore<MemPager>,
861        field_id: LogicalFieldId,
862        row_ids: &[u64],
863    ) -> ArrayRef {
864        store
865            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
866            .unwrap()
867            .column(0)
868            .clone()
869    }
870
871    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
872        Expr::Pred(filter)
873    }
874
875    fn proj(table: &Table, field_id: FieldId) -> Projection {
876        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
877    }
878
879    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
880        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
881    }
882
883    #[test]
884    fn table_new_rejects_reserved_table_id() {
885        let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
886        assert!(matches!(
887            result,
888            Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
889        ));
890    }
891
892    #[test]
893    fn test_append_rejects_logical_field_id_in_metadata() {
894        // Create a table and build a schema where the column's metadata
895        // contains a fully-qualified LogicalFieldId (u64). Append should
896        // reject this and require a plain user FieldId instead.
897        let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
898
899        const USER_FID: FieldId = 42;
900        // Build a logical id (namespaced) and put its numeric value into metadata
901        let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
902        let logical_val: u64 = logical.into();
903
904        let schema = Arc::new(Schema::new(vec![
905            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
906            Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
907                crate::constants::FIELD_ID_META_KEY.to_string(),
908                logical_val.to_string(),
909            )])),
910        ]));
911
912        let batch = RecordBatch::try_new(
913            schema,
914            vec![
915                Arc::new(UInt64Array::from(vec![1u64, 2u64])),
916                Arc::new(UInt64Array::from(vec![10u64, 20u64])),
917            ],
918        )
919        .unwrap();
920
921        let res = table.append(&batch);
922        assert!(matches!(res, Err(Error::Internal(_))));
923    }
924
925    #[test]
926    fn test_scan_with_u64_filter() {
927        let table = setup_test_table();
928        const COL_A_U64: FieldId = 10;
929        const COL_C_I32: FieldId = 12;
930
931        let expr = pred_expr(Filter {
932            field_id: COL_A_U64,
933            op: Operator::Equals(200.into()),
934        });
935
936        let mut vals: Vec<Option<i32>> = Vec::new();
937        table
938            .scan_stream(
939                &[proj(&table, COL_C_I32)],
940                &expr,
941                ScanStreamOptions::default(),
942                |b| {
943                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
944                    vals.extend(
945                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
946                    );
947                },
948            )
949            .unwrap();
950        assert_eq!(vals, vec![Some(20), Some(20)]);
951    }
952
953    #[test]
954    fn test_scan_with_string_filter() {
955        let pager = Arc::new(MemPager::default());
956        let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
957
958        const COL_STR: FieldId = 42;
959        let schema = Arc::new(Schema::new(vec![
960            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
961            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
962                crate::constants::FIELD_ID_META_KEY.to_string(),
963                COL_STR.to_string(),
964            )])),
965        ]));
966
967        let batch = RecordBatch::try_new(
968            schema,
969            vec![
970                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
971                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
972            ],
973        )
974        .unwrap();
975        table.append(&batch).unwrap();
976
977        let expr = pred_expr(Filter {
978            field_id: COL_STR,
979            op: Operator::starts_with("al", true),
980        });
981
982        let mut collected: Vec<Option<String>> = Vec::new();
983        table
984            .scan_stream(
985                &[proj(&table, COL_STR)],
986                &expr,
987                ScanStreamOptions::default(),
988                |b| {
989                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
990                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
991                },
992            )
993            .unwrap();
994
995        assert_eq!(
996            collected,
997            vec![Some("alice".to_string()), Some("albert".to_string())]
998        );
999    }
1000
1001    #[test]
1002    fn test_table_reopen_with_shared_pager() {
1003        const TABLE_ALPHA: TableId = 42;
1004        const TABLE_BETA: TableId = 43;
1005        const TABLE_GAMMA: TableId = 44;
1006        const COL_ALPHA_U64: FieldId = 100;
1007        const COL_ALPHA_I32: FieldId = 101;
1008        const COL_ALPHA_U32: FieldId = 102;
1009        const COL_ALPHA_I16: FieldId = 103;
1010        const COL_BETA_U64: FieldId = 200;
1011        const COL_BETA_U8: FieldId = 201;
1012        const COL_GAMMA_I16: FieldId = 300;
1013
1014        let pager = Arc::new(MemPager::default());
1015
1016        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1017        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1018        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1019        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1020        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1021
1022        let beta_rows: Vec<RowId> = vec![101, 102, 103];
1023        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1024        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1025
1026        let gamma_rows: Vec<RowId> = vec![501, 502];
1027        let gamma_vals_i16: Vec<i16> = vec![123, -321];
1028
1029        // First session: create tables and write data.
1030        {
1031            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1032            let schema = Arc::new(Schema::new(vec![
1033                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1034                Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1035                    crate::constants::FIELD_ID_META_KEY.to_string(),
1036                    COL_ALPHA_U64.to_string(),
1037                )])),
1038                Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1039                    crate::constants::FIELD_ID_META_KEY.to_string(),
1040                    COL_ALPHA_I32.to_string(),
1041                )])),
1042                Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1043                    crate::constants::FIELD_ID_META_KEY.to_string(),
1044                    COL_ALPHA_U32.to_string(),
1045                )])),
1046                Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1047                    crate::constants::FIELD_ID_META_KEY.to_string(),
1048                    COL_ALPHA_I16.to_string(),
1049                )])),
1050            ]));
1051            let batch = RecordBatch::try_new(
1052                schema,
1053                vec![
1054                    Arc::new(UInt64Array::from(alpha_rows.clone())),
1055                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1056                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1057                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1058                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1059                ],
1060            )
1061            .unwrap();
1062            table.append(&batch).unwrap();
1063        }
1064
1065        {
1066            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1067            let schema = Arc::new(Schema::new(vec![
1068                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1069                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1070                    crate::constants::FIELD_ID_META_KEY.to_string(),
1071                    COL_BETA_U64.to_string(),
1072                )])),
1073                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1074                    crate::constants::FIELD_ID_META_KEY.to_string(),
1075                    COL_BETA_U8.to_string(),
1076                )])),
1077            ]));
1078            let batch = RecordBatch::try_new(
1079                schema,
1080                vec![
1081                    Arc::new(UInt64Array::from(beta_rows.clone())),
1082                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1083                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1084                ],
1085            )
1086            .unwrap();
1087            table.append(&batch).unwrap();
1088        }
1089
1090        {
1091            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1092            let schema = Arc::new(Schema::new(vec![
1093                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1094                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1095                    crate::constants::FIELD_ID_META_KEY.to_string(),
1096                    COL_GAMMA_I16.to_string(),
1097                )])),
1098            ]));
1099            let batch = RecordBatch::try_new(
1100                schema,
1101                vec![
1102                    Arc::new(UInt64Array::from(gamma_rows.clone())),
1103                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1104                ],
1105            )
1106            .unwrap();
1107            table.append(&batch).unwrap();
1108        }
1109
1110        // Second session: reopen each table and ensure schema and values are intact.
1111        {
1112            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1113            let store = table.store();
1114
1115            let expectations: &[(FieldId, DataType)] = &[
1116                (COL_ALPHA_U64, DataType::UInt64),
1117                (COL_ALPHA_I32, DataType::Int32),
1118                (COL_ALPHA_U32, DataType::UInt32),
1119                (COL_ALPHA_I16, DataType::Int16),
1120            ];
1121
1122            for &(col, ref ty) in expectations {
1123                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1124                assert_eq!(store.data_type(lfid).unwrap(), *ty);
1125                let arr = gather_single(store, lfid, &alpha_rows);
1126                match ty {
1127                    DataType::UInt64 => {
1128                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1129                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1130                    }
1131                    DataType::Int32 => {
1132                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1133                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1134                    }
1135                    DataType::UInt32 => {
1136                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1137                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1138                    }
1139                    DataType::Int16 => {
1140                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1141                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1142                    }
1143                    other => panic!("unexpected dtype {other:?}"),
1144                }
1145            }
1146        }
1147
1148        {
1149            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1150            let store = table.store();
1151
1152            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1153            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1154            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1155            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1156            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1157
1158            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1159            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1160            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1161            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1162            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1163        }
1164
1165        {
1166            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1167            let store = table.store();
1168            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1169            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1170            let arr = gather_single(store, lfid, &gamma_rows);
1171            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1172            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1173        }
1174    }
1175
1176    #[test]
1177    fn test_scan_with_i32_filter() {
1178        let table = setup_test_table();
1179        const COL_A_U64: FieldId = 10;
1180        const COL_C_I32: FieldId = 12;
1181
1182        let filter = pred_expr(Filter {
1183            field_id: COL_C_I32,
1184            op: Operator::Equals(20.into()),
1185        });
1186
1187        let mut vals: Vec<Option<u64>> = Vec::new();
1188        table
1189            .scan_stream(
1190                &[proj(&table, COL_A_U64)],
1191                &filter,
1192                ScanStreamOptions::default(),
1193                |b| {
1194                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1195                    vals.extend(
1196                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1197                    );
1198                },
1199            )
1200            .unwrap();
1201        assert_eq!(vals, vec![Some(200), Some(200)]);
1202    }
1203
1204    #[test]
1205    fn test_scan_with_greater_than_filter() {
1206        let table = setup_test_table();
1207        const COL_A_U64: FieldId = 10;
1208        const COL_C_I32: FieldId = 12;
1209
1210        let filter = pred_expr(Filter {
1211            field_id: COL_C_I32,
1212            op: Operator::GreaterThan(15.into()),
1213        });
1214
1215        let mut vals: Vec<Option<u64>> = Vec::new();
1216        table
1217            .scan_stream(
1218                &[proj(&table, COL_A_U64)],
1219                &filter,
1220                ScanStreamOptions::default(),
1221                |b| {
1222                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1223                    vals.extend(
1224                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1225                    );
1226                },
1227            )
1228            .unwrap();
1229        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1230    }
1231
1232    #[test]
1233    fn test_scan_with_range_filter() {
1234        let table = setup_test_table();
1235        const COL_A_U64: FieldId = 10;
1236        const COL_C_I32: FieldId = 12;
1237
1238        let filter = pred_expr(Filter {
1239            field_id: COL_A_U64,
1240            op: Operator::Range {
1241                lower: Bound::Included(150.into()),
1242                upper: Bound::Excluded(300.into()),
1243            },
1244        });
1245
1246        let mut vals: Vec<Option<i32>> = Vec::new();
1247        table
1248            .scan_stream(
1249                &[proj(&table, COL_C_I32)],
1250                &filter,
1251                ScanStreamOptions::default(),
1252                |b| {
1253                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1254                    vals.extend(
1255                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1256                    );
1257                },
1258            )
1259            .unwrap();
1260        assert_eq!(vals, vec![Some(20), Some(20)]);
1261    }
1262
1263    #[test]
1264    fn test_filtered_scan_sum_kernel() {
1265        // Trade-off note:
1266        // - We use Arrow's sum kernel per batch, then add the partial sums.
1267        // - This preserves Arrow null semantics and avoids concat.
1268        let table = setup_test_table();
1269        const COL_A_U64: FieldId = 10;
1270
1271        let filter = pred_expr(Filter {
1272            field_id: COL_A_U64,
1273            op: Operator::Range {
1274                lower: Bound::Included(150.into()),
1275                upper: Bound::Excluded(300.into()),
1276            },
1277        });
1278
1279        let mut total: u128 = 0;
1280        table
1281            .scan_stream(
1282                &[proj(&table, COL_A_U64)],
1283                &filter,
1284                ScanStreamOptions::default(),
1285                |b| {
1286                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1287                    if let Some(part) = sum(a) {
1288                        total += part as u128;
1289                    }
1290                },
1291            )
1292            .unwrap();
1293
1294        assert_eq!(total, 400);
1295    }
1296
1297    #[test]
1298    fn test_filtered_scan_sum_i32_kernel() {
1299        // Trade-off note:
1300        // - Per-batch sum + accumulate avoids building one big Array.
1301        // - For tiny batches overhead may match manual loops, but keeps
1302        //   Arrow semantics exact.
1303        let table = setup_test_table();
1304        const COL_A_U64: FieldId = 10;
1305        const COL_C_I32: FieldId = 12;
1306
1307        let candidates = [100.into(), 300.into()];
1308        let filter = pred_expr(Filter {
1309            field_id: COL_A_U64,
1310            op: Operator::In(&candidates),
1311        });
1312
1313        let mut total: i64 = 0;
1314        table
1315            .scan_stream(
1316                &[proj(&table, COL_C_I32)],
1317                &filter,
1318                ScanStreamOptions::default(),
1319                |b| {
1320                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1321                    if let Some(part) = sum(a) {
1322                        total += part as i64;
1323                    }
1324                },
1325            )
1326            .unwrap();
1327        assert_eq!(total, 40);
1328    }
1329
1330    #[test]
1331    fn test_filtered_scan_min_max_kernel() {
1332        // Trade-off note:
1333        // - min/max are computed per batch and folded. This preserves
1334        //   Arrow's null behavior and avoids concat.
1335        // - Be mindful of NaN semantics if extended to floats later.
1336        let table = setup_test_table();
1337        const COL_A_U64: FieldId = 10;
1338        const COL_C_I32: FieldId = 12;
1339
1340        let candidates = [100.into(), 300.into()];
1341        let filter = pred_expr(Filter {
1342            field_id: COL_A_U64,
1343            op: Operator::In(&candidates),
1344        });
1345
1346        let mut mn: Option<i32> = None;
1347        let mut mx: Option<i32> = None;
1348        table
1349            .scan_stream(
1350                &[proj(&table, COL_C_I32)],
1351                &filter,
1352                ScanStreamOptions::default(),
1353                |b| {
1354                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1355
1356                    if let Some(part_min) = min(a) {
1357                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1358                    }
1359                    if let Some(part_max) = max(a) {
1360                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1361                    }
1362                },
1363            )
1364            .unwrap();
1365        assert_eq!(mn, Some(10));
1366        assert_eq!(mx, Some(30));
1367    }
1368
1369    #[test]
1370    fn test_filtered_scan_float64_column() {
1371        let table = setup_test_table();
1372        const COL_D_F64: FieldId = 13;
1373
1374        let filter = pred_expr(Filter {
1375            field_id: COL_D_F64,
1376            op: Operator::GreaterThan(2.0_f64.into()),
1377        });
1378
1379        let mut got = Vec::new();
1380        table
1381            .scan_stream(
1382                &[proj(&table, COL_D_F64)],
1383                &filter,
1384                ScanStreamOptions::default(),
1385                |b| {
1386                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1387                    for i in 0..arr.len() {
1388                        if arr.is_valid(i) {
1389                            got.push(arr.value(i));
1390                        }
1391                    }
1392                },
1393            )
1394            .unwrap();
1395
1396        assert_eq!(got, vec![2.5, 3.5, 2.5]);
1397    }
1398
1399    #[test]
1400    fn test_filtered_scan_float32_in_operator() {
1401        let table = setup_test_table();
1402        const COL_E_F32: FieldId = 14;
1403
1404        let candidates = [2.0_f32.into(), 3.0_f32.into()];
1405        let filter = pred_expr(Filter {
1406            field_id: COL_E_F32,
1407            op: Operator::In(&candidates),
1408        });
1409
1410        let mut vals: Vec<Option<f32>> = Vec::new();
1411        table
1412            .scan_stream(
1413                &[proj(&table, COL_E_F32)],
1414                &filter,
1415                ScanStreamOptions::default(),
1416                |b| {
1417                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1418                    vals.extend((0..arr.len()).map(|i| {
1419                        if arr.is_null(i) {
1420                            None
1421                        } else {
1422                            Some(arr.value(i))
1423                        }
1424                    }));
1425                },
1426            )
1427            .unwrap();
1428
1429        let collected: Vec<f32> = vals.into_iter().flatten().collect();
1430        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1431    }
1432
1433    #[test]
1434    fn test_scan_stream_and_expression() {
1435        let table = setup_test_table();
1436        const COL_A_U64: FieldId = 10;
1437        const COL_C_I32: FieldId = 12;
1438        const COL_E_F32: FieldId = 14;
1439
1440        let expr = Expr::all_of(vec![
1441            Filter {
1442                field_id: COL_C_I32,
1443                op: Operator::GreaterThan(15.into()),
1444            },
1445            Filter {
1446                field_id: COL_A_U64,
1447                op: Operator::LessThan(250.into()),
1448            },
1449        ]);
1450
1451        let mut vals: Vec<Option<f32>> = Vec::new();
1452        table
1453            .scan_stream(
1454                &[proj(&table, COL_E_F32)],
1455                &expr,
1456                ScanStreamOptions::default(),
1457                |b| {
1458                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1459                    vals.extend((0..arr.len()).map(|i| {
1460                        if arr.is_null(i) {
1461                            None
1462                        } else {
1463                            Some(arr.value(i))
1464                        }
1465                    }));
1466                },
1467            )
1468            .unwrap();
1469
1470        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1471    }
1472
1473    #[test]
1474    fn test_scan_stream_or_expression() {
1475        let table = setup_test_table();
1476        const COL_A_U64: FieldId = 10;
1477        const COL_C_I32: FieldId = 12;
1478
1479        let expr = Expr::any_of(vec![
1480            Filter {
1481                field_id: COL_C_I32,
1482                op: Operator::Equals(10.into()),
1483            },
1484            Filter {
1485                field_id: COL_C_I32,
1486                op: Operator::Equals(30.into()),
1487            },
1488        ]);
1489
1490        let mut vals: Vec<Option<u64>> = Vec::new();
1491        table
1492            .scan_stream(
1493                &[proj(&table, COL_A_U64)],
1494                &expr,
1495                ScanStreamOptions::default(),
1496                |b| {
1497                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1498                    vals.extend((0..arr.len()).map(|i| {
1499                        if arr.is_null(i) {
1500                            None
1501                        } else {
1502                            Some(arr.value(i))
1503                        }
1504                    }));
1505                },
1506            )
1507            .unwrap();
1508
1509        assert_eq!(vals, vec![Some(100), Some(300)]);
1510    }
1511
1512    #[test]
1513    fn test_scan_stream_not_predicate() {
1514        let table = setup_test_table();
1515        const COL_A_U64: FieldId = 10;
1516        const COL_C_I32: FieldId = 12;
1517
1518        let expr = Expr::not(pred_expr(Filter {
1519            field_id: COL_C_I32,
1520            op: Operator::Equals(20.into()),
1521        }));
1522
1523        let mut vals: Vec<Option<u64>> = Vec::new();
1524        table
1525            .scan_stream(
1526                &[proj(&table, COL_A_U64)],
1527                &expr,
1528                ScanStreamOptions::default(),
1529                |b| {
1530                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1531                    vals.extend((0..arr.len()).map(|i| {
1532                        if arr.is_null(i) {
1533                            None
1534                        } else {
1535                            Some(arr.value(i))
1536                        }
1537                    }));
1538                },
1539            )
1540            .unwrap();
1541
1542        assert_eq!(vals, vec![Some(100), Some(300)]);
1543    }
1544
1545    #[test]
1546    fn test_scan_stream_not_and_expression() {
1547        let table = setup_test_table();
1548        const COL_A_U64: FieldId = 10;
1549        const COL_C_I32: FieldId = 12;
1550
1551        let expr = Expr::not(Expr::all_of(vec![
1552            Filter {
1553                field_id: COL_A_U64,
1554                op: Operator::GreaterThan(150.into()),
1555            },
1556            Filter {
1557                field_id: COL_C_I32,
1558                op: Operator::LessThan(40.into()),
1559            },
1560        ]));
1561
1562        let mut vals: Vec<Option<u64>> = Vec::new();
1563        table
1564            .scan_stream(
1565                &[proj(&table, COL_A_U64)],
1566                &expr,
1567                ScanStreamOptions::default(),
1568                |b| {
1569                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1570                    vals.extend((0..arr.len()).map(|i| {
1571                        if arr.is_null(i) {
1572                            None
1573                        } else {
1574                            Some(arr.value(i))
1575                        }
1576                    }));
1577                },
1578            )
1579            .unwrap();
1580
1581        assert_eq!(vals, vec![Some(100)]);
1582    }
1583
1584    #[test]
1585    fn test_scan_stream_include_nulls_toggle() {
1586        let pager = Arc::new(MemPager::default());
1587        let table = setup_test_table_with_pager(&pager);
1588        const COL_A_U64: FieldId = 10;
1589        const COL_C_I32: FieldId = 12;
1590        const COL_B_BIN: FieldId = 11;
1591        const COL_D_F64: FieldId = 13;
1592        const COL_E_F32: FieldId = 14;
1593
1594        let schema = Arc::new(Schema::new(vec![
1595            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1596            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1597                crate::constants::FIELD_ID_META_KEY.to_string(),
1598                COL_A_U64.to_string(),
1599            )])),
1600            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1601                crate::constants::FIELD_ID_META_KEY.to_string(),
1602                COL_B_BIN.to_string(),
1603            )])),
1604            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1605                crate::constants::FIELD_ID_META_KEY.to_string(),
1606                COL_C_I32.to_string(),
1607            )])),
1608            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1609                crate::constants::FIELD_ID_META_KEY.to_string(),
1610                COL_D_F64.to_string(),
1611            )])),
1612            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1613                crate::constants::FIELD_ID_META_KEY.to_string(),
1614                COL_E_F32.to_string(),
1615            )])),
1616        ]));
1617
1618        let batch = RecordBatch::try_new(
1619            schema,
1620            vec![
1621                Arc::new(UInt64Array::from(vec![5, 6])),
1622                Arc::new(UInt64Array::from(vec![500, 600])),
1623                Arc::new(BinaryArray::from(vec![
1624                    Some(&b"new"[..]),
1625                    Some(&b"alt"[..]),
1626                ])),
1627                Arc::new(Int32Array::from(vec![Some(40), None])),
1628                Arc::new(Float64Array::from(vec![5.5, 6.5])),
1629                Arc::new(Float32Array::from(vec![5.0, 6.0])),
1630            ],
1631        )
1632        .unwrap();
1633        table.append(&batch).unwrap();
1634
1635        let filter = pred_expr(Filter {
1636            field_id: COL_A_U64,
1637            op: Operator::GreaterThan(450.into()),
1638        });
1639
1640        let mut default_vals: Vec<Option<i32>> = Vec::new();
1641        table
1642            .scan_stream(
1643                &[proj(&table, COL_C_I32)],
1644                &filter,
1645                ScanStreamOptions::default(),
1646                |b| {
1647                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1648                    default_vals.extend((0..arr.len()).map(|i| {
1649                        if arr.is_null(i) {
1650                            None
1651                        } else {
1652                            Some(arr.value(i))
1653                        }
1654                    }));
1655                },
1656            )
1657            .unwrap();
1658        assert_eq!(default_vals, vec![Some(40)]);
1659
1660        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1661        table
1662            .scan_stream(
1663                &[proj(&table, COL_C_I32)],
1664                &filter,
1665                ScanStreamOptions {
1666                    include_nulls: true,
1667                    order: None,
1668                    row_id_filter: None,
1669                },
1670                |b| {
1671                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1672
1673                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1674                    table
1675                        .scan_stream(
1676                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1677                            &filter,
1678                            ScanStreamOptions::default(),
1679                            |b| {
1680                                assert_eq!(b.num_columns(), 2);
1681                                let c_arr =
1682                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1683                                let d_arr =
1684                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1685                                for i in 0..b.num_rows() {
1686                                    let c_val = if c_arr.is_null(i) {
1687                                        None
1688                                    } else {
1689                                        Some(c_arr.value(i))
1690                                    };
1691                                    let d_val = if d_arr.is_null(i) {
1692                                        None
1693                                    } else {
1694                                        Some(d_arr.value(i))
1695                                    };
1696                                    paired_vals.push((c_val, d_val));
1697                                }
1698                            },
1699                        )
1700                        .unwrap();
1701                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1702                    include_null_vals.extend((0..arr.len()).map(|i| {
1703                        if arr.is_null(i) {
1704                            None
1705                        } else {
1706                            Some(arr.value(i))
1707                        }
1708                    }));
1709                },
1710            )
1711            .unwrap();
1712        assert_eq!(include_null_vals, vec![Some(40), None]);
1713    }
1714
1715    #[test]
1716    fn test_filtered_scan_int_sqrt_float64() {
1717        // Trade-off note:
1718        // - We cast per batch and apply a compute unary kernel for sqrt.
1719        // - This keeps processing streaming and avoids per-value loops.
1720        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
1721        //   `Float64Array` first.
1722        let table = setup_test_table();
1723        const COL_A_U64: FieldId = 10;
1724        const COL_C_I32: FieldId = 12;
1725
1726        let filter = pred_expr(Filter {
1727            field_id: COL_C_I32,
1728            op: Operator::GreaterThan(15.into()),
1729        });
1730
1731        let mut got: Vec<f64> = Vec::new();
1732        table
1733            .scan_stream(
1734                &[proj(&table, COL_A_U64)],
1735                &filter,
1736                ScanStreamOptions::default(),
1737                |b| {
1738                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1739                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1740
1741                    // unary::<Float64Type, _, Float64Type>(...)
1742                    let sqrt_arr = unary::<
1743                        arrow::datatypes::Float64Type,
1744                        _,
1745                        arrow::datatypes::Float64Type,
1746                    >(f64_arr, |v: f64| v.sqrt());
1747
1748                    for i in 0..sqrt_arr.len() {
1749                        if !sqrt_arr.is_null(i) {
1750                            got.push(sqrt_arr.value(i));
1751                        }
1752                    }
1753                },
1754            )
1755            .unwrap();
1756
1757        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1758        assert_eq!(got, expected);
1759    }
1760
1761    #[test]
1762    fn test_multi_field_kernels_with_filters() {
1763        // Trade-off note:
1764        // - All reductions use per-batch kernels + accumulation to stay
1765        //   streaming. No concat or whole-column materialization.
1766        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1767
1768        let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1769
1770        const COL_A_U64: FieldId = 20;
1771        const COL_D_U32: FieldId = 21;
1772        const COL_E_I16: FieldId = 22;
1773        const COL_F_U8: FieldId = 23;
1774        const COL_C_I32: FieldId = 24;
1775
1776        let schema = Arc::new(Schema::new(vec![
1777            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1778            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1779                crate::constants::FIELD_ID_META_KEY.to_string(),
1780                COL_A_U64.to_string(),
1781            )])),
1782            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1783                crate::constants::FIELD_ID_META_KEY.to_string(),
1784                COL_D_U32.to_string(),
1785            )])),
1786            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1787                crate::constants::FIELD_ID_META_KEY.to_string(),
1788                COL_E_I16.to_string(),
1789            )])),
1790            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1791                crate::constants::FIELD_ID_META_KEY.to_string(),
1792                COL_F_U8.to_string(),
1793            )])),
1794            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1795                crate::constants::FIELD_ID_META_KEY.to_string(),
1796                COL_C_I32.to_string(),
1797            )])),
1798        ]));
1799
1800        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
1801        let batch = RecordBatch::try_new(
1802            schema.clone(),
1803            vec![
1804                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1805                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1806                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1807                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1808                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1809                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1810            ],
1811        )
1812        .unwrap();
1813
1814        table.append(&batch).unwrap();
1815
1816        // Filter: c_i32 >= 20.
1817        let filter = pred_expr(Filter {
1818            field_id: COL_C_I32,
1819            op: Operator::GreaterThanOrEquals(20.into()),
1820        });
1821
1822        // 1) SUM over d_u32 (per-batch sum + accumulate).
1823        let mut d_sum: u128 = 0;
1824        table
1825            .scan_stream(
1826                &[proj(&table, COL_D_U32)],
1827                &filter,
1828                ScanStreamOptions::default(),
1829                |b| {
1830                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1831                    if let Some(part) = sum(a) {
1832                        d_sum += part as u128;
1833                    }
1834                },
1835            )
1836            .unwrap();
1837        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1838
1839        // 2) MIN over e_i16 (per-batch min + fold).
1840        let mut e_min: Option<i16> = None;
1841        table
1842            .scan_stream(
1843                &[proj(&table, COL_E_I16)],
1844                &filter,
1845                ScanStreamOptions::default(),
1846                |b| {
1847                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1848                    if let Some(part_min) = min(a) {
1849                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1850                    }
1851                },
1852            )
1853            .unwrap();
1854        assert_eq!(e_min, Some(-6));
1855
1856        // 3) MAX over f_u8 (per-batch max + fold).
1857        let mut f_max: Option<u8> = None;
1858        table
1859            .scan_stream(
1860                &[proj(&table, COL_F_U8)],
1861                &filter,
1862                ScanStreamOptions::default(),
1863                |b| {
1864                    let a = b
1865                        .column(0)
1866                        .as_any()
1867                        .downcast_ref::<arrow::array::UInt8Array>()
1868                        .unwrap();
1869                    if let Some(part_max) = max(a) {
1870                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1871                    }
1872                },
1873            )
1874            .unwrap();
1875        assert_eq!(f_max, Some(10));
1876
1877        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
1878        let mut got: Vec<f64> = Vec::new();
1879        table
1880            .scan_stream(
1881                &[proj(&table, COL_A_U64)],
1882                &filter,
1883                ScanStreamOptions::default(),
1884                |b| {
1885                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1886                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1887                    let sqrt_arr = unary::<
1888                        arrow::datatypes::Float64Type,
1889                        _,
1890                        arrow::datatypes::Float64Type,
1891                    >(f64_arr, |v: f64| v.sqrt());
1892
1893                    for i in 0..sqrt_arr.len() {
1894                        if !sqrt_arr.is_null(i) {
1895                            got.push(sqrt_arr.value(i));
1896                        }
1897                    }
1898                },
1899            )
1900            .unwrap();
1901        let expected = [15.0_f64, 20.0, 30.0, 40.0];
1902        assert_eq!(got, expected);
1903    }
1904
1905    #[test]
1906    fn test_scan_with_in_filter() {
1907        let table = setup_test_table();
1908        const COL_A_U64: FieldId = 10;
1909        const COL_C_I32: FieldId = 12;
1910
1911        // IN now uses untyped literals, too.
1912        let candidates = [10.into(), 30.into()];
1913        let filter = pred_expr(Filter {
1914            field_id: COL_C_I32,
1915            op: Operator::In(&candidates),
1916        });
1917
1918        let mut vals: Vec<Option<u64>> = Vec::new();
1919        table
1920            .scan_stream(
1921                &[proj(&table, COL_A_U64)],
1922                &filter,
1923                ScanStreamOptions::default(),
1924                |b| {
1925                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1926                    vals.extend(
1927                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1928                    );
1929                },
1930            )
1931            .unwrap();
1932        assert_eq!(vals, vec![Some(100), Some(300)]);
1933    }
1934
1935    #[test]
1936    fn test_scan_stream_single_column_batches() {
1937        let table = setup_test_table();
1938        const COL_A_U64: FieldId = 10;
1939        const COL_C_I32: FieldId = 12;
1940
1941        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
1942        let filter = pred_expr(Filter {
1943            field_id: COL_C_I32,
1944            op: Operator::Equals(20.into()),
1945        });
1946
1947        let mut seen_cols = Vec::<u64>::new();
1948        table
1949            .scan_stream(
1950                &[proj(&table, COL_A_U64)],
1951                &filter,
1952                ScanStreamOptions::default(),
1953                |b| {
1954                    assert_eq!(b.num_columns(), 1);
1955                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1956                    // No kernel needed; just collect values for shape assertions.
1957                    for i in 0..a.len() {
1958                        if !a.is_null(i) {
1959                            seen_cols.push(a.value(i));
1960                        }
1961                    }
1962                },
1963            )
1964            .unwrap();
1965
1966        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
1967        assert_eq!(seen_cols, vec![200, 200]);
1968    }
1969
1970    #[test]
1971    fn test_scan_with_multiple_projection_columns() {
1972        let table = setup_test_table();
1973        const COL_A_U64: FieldId = 10;
1974        const COL_C_I32: FieldId = 12;
1975
1976        let filter = pred_expr(Filter {
1977            field_id: COL_C_I32,
1978            op: Operator::Equals(20.into()),
1979        });
1980
1981        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1982
1983        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1984        table
1985            .scan_stream(
1986                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1987                &filter,
1988                ScanStreamOptions::default(),
1989                |b| {
1990                    assert_eq!(b.num_columns(), 2);
1991                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1992                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1993
1994                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1995                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1996                    for i in 0..b.num_rows() {
1997                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1998                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1999                        combined.push((left, right));
2000                    }
2001                },
2002            )
2003            .unwrap();
2004
2005        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2006    }
2007
2008    #[test]
2009    fn test_scan_stream_projection_validation() {
2010        let table = setup_test_table();
2011        const COL_A_U64: FieldId = 10;
2012        const COL_C_I32: FieldId = 12;
2013
2014        let filter = pred_expr(Filter {
2015            field_id: COL_C_I32,
2016            op: Operator::Equals(20.into()),
2017        });
2018
2019        let empty: [Projection; 0] = [];
2020        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2021        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2022
2023        // Duplicate projections are allowed: the same column will be
2024        // gathered once and duplicated in the output in the requested
2025        // order. Verify the call succeeds and produces two identical
2026        // columns per batch.
2027        let duplicate = [
2028            proj(&table, COL_A_U64),
2029            proj_alias(&table, COL_A_U64, "alias_a"),
2030        ];
2031        let mut collected = Vec::<u64>::new();
2032        table
2033            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2034                assert_eq!(b.num_columns(), 2);
2035                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2036                assert_eq!(b.schema().field(1).name(), "alias_a");
2037                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2038                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2039                for i in 0..b.num_rows() {
2040                    if !a0.is_null(i) {
2041                        collected.push(a0.value(i));
2042                    }
2043                    if !a1.is_null(i) {
2044                        collected.push(a1.value(i));
2045                    }
2046                }
2047            })
2048            .unwrap();
2049        // Two matching rows, two columns per row -> four values.
2050        assert_eq!(collected, vec![200, 200, 200, 200]);
2051    }
2052
2053    #[test]
2054    fn test_scan_stream_computed_projection() {
2055        let table = setup_test_table();
2056        const COL_A_U64: FieldId = 10;
2057
2058        let projections = [
2059            ScanProjection::column(proj(&table, COL_A_U64)),
2060            ScanProjection::computed(
2061                ScalarExpr::binary(
2062                    ScalarExpr::column(COL_A_U64),
2063                    BinaryOp::Multiply,
2064                    ScalarExpr::literal(2),
2065                ),
2066                "a_times_two",
2067            ),
2068        ];
2069
2070        let filter = pred_expr(Filter {
2071            field_id: COL_A_U64,
2072            op: Operator::GreaterThanOrEquals(0.into()),
2073        });
2074
2075        let mut computed: Vec<(u64, f64)> = Vec::new();
2076        table
2077            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2078                assert_eq!(b.num_columns(), 2);
2079                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2080                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2081                for i in 0..b.num_rows() {
2082                    if base.is_null(i) || comp.is_null(i) {
2083                        continue;
2084                    }
2085                    computed.push((base.value(i), comp.value(i)));
2086                }
2087            })
2088            .unwrap();
2089
2090        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2091        assert_eq!(computed, expected);
2092    }
2093
2094    #[test]
2095    fn test_scan_stream_multi_column_filter_compare() {
2096        let table = setup_test_table();
2097        const COL_A_U64: FieldId = 10;
2098        const COL_C_I32: FieldId = 12;
2099
2100        let expr = Expr::Compare {
2101            left: ScalarExpr::binary(
2102                ScalarExpr::column(COL_A_U64),
2103                BinaryOp::Add,
2104                ScalarExpr::column(COL_C_I32),
2105            ),
2106            op: CompareOp::Gt,
2107            right: ScalarExpr::literal(220_i64),
2108        };
2109
2110        let mut vals: Vec<Option<u64>> = Vec::new();
2111        table
2112            .scan_stream(
2113                &[proj(&table, COL_A_U64)],
2114                &expr,
2115                ScanStreamOptions::default(),
2116                |b| {
2117                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2118                    for i in 0..b.num_rows() {
2119                        vals.push(if col.is_null(i) {
2120                            None
2121                        } else {
2122                            Some(col.value(i))
2123                        });
2124                    }
2125                },
2126            )
2127            .unwrap();
2128
2129        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2130    }
2131}