llkv_table/
table.rs

1use croaring::Treemap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use crate::planner::{TablePlanner, collect_row_ids_for_table};
7use crate::stream::ColumnStream;
8use crate::stream::RowIdSource;
9
10use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use std::collections::HashMap;
13
14use crate::constants::STREAM_BATCH_ROWS;
15use llkv_column_map::ColumnStore;
16use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
17use llkv_storage::pager::{MemPager, Pager};
18use llkv_types::ids::{LogicalFieldId, TableId};
19use simd_r_drive_entry_handle::EntryHandle;
20
21use crate::reserved::is_reserved_table_id;
22use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
23use crate::types::FieldId;
24use llkv_expr::{Expr, ScalarExpr};
25use llkv_result::{Error, Result as LlkvResult};
26
27/// Cached information about which system columns exist in the table schema.
28/// This avoids repeated string comparisons in hot paths like append().
29#[derive(Debug, Clone, Copy)]
30struct MvccColumnCache {
31    has_created_by: bool,
32    has_deleted_by: bool,
33}
34
35/// Handle for data operations on a table.
36///
37pub struct Table<P = MemPager>
38where
39    P: Pager<Blob = EntryHandle> + Send + Sync,
40{
41    store: Arc<ColumnStore<P>>,
42    table_id: TableId,
43    /// Cache of MVCC column presence. Initialized lazily on first schema() call.
44    /// None means not yet initialized.
45    mvcc_cache: RwLock<Option<MvccColumnCache>>,
46}
47
48/// Filter row IDs before they are materialized into batches.
49///
50/// This trait allows implementations to enforce transaction visibility (MVCC),
51/// access control, or other row-level filtering policies. The filter is applied
52/// after column-level predicates but before data is gathered into Arrow batches.
53///
54/// # Example Use Case
55///
56/// MVCC implementations use this to hide rows that were:
57/// - Created after the transaction's snapshot timestamp
58/// - Deleted before the transaction's snapshot timestamp
59pub trait RowIdFilter<P>: Send + Sync
60where
61    P: Pager<Blob = EntryHandle> + Send + Sync,
62{
63    /// Filter a list of row IDs, returning only those that should be visible.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if visibility metadata cannot be loaded or is corrupted.
68    fn filter(&self, table: &Table<P>, row_ids: Treemap) -> LlkvResult<Treemap>;
69}
70
71/// Options for configuring table scans.
72///
73/// These options control how rows are filtered, ordered, and materialized during
74/// scan operations.
75pub struct ScanStreamOptions<P = MemPager>
76where
77    P: Pager<Blob = EntryHandle> + Send + Sync,
78{
79    /// Whether to include rows where all projected columns are null.
80    ///
81    /// When `false` (default), rows with all-null projections are dropped before
82    /// batches are yielded. This is useful for sparse data where many rows may not
83    /// have values for the selected columns.
84    pub include_nulls: bool,
85    /// Optional ordering to apply to results.
86    ///
87    /// If specified, row IDs are sorted according to this specification before
88    /// data is gathered into batches.
89    pub order: Option<ScanOrderSpec>,
90    /// Optional filter for row-level visibility (e.g., MVCC).
91    ///
92    /// Applied after column-level predicates but before data is materialized.
93    /// Used to enforce transaction isolation.
94    pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
95}
96
97impl<P> fmt::Debug for ScanStreamOptions<P>
98where
99    P: Pager<Blob = EntryHandle> + Send + Sync,
100{
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        f.debug_struct("ScanStreamOptions")
103            .field("include_nulls", &self.include_nulls)
104            .field("order", &self.order)
105            .field(
106                "row_id_filter",
107                &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
108            )
109            .finish()
110    }
111}
112
113impl<P> Clone for ScanStreamOptions<P>
114where
115    P: Pager<Blob = EntryHandle> + Send + Sync,
116{
117    fn clone(&self) -> Self {
118        Self {
119            include_nulls: self.include_nulls,
120            order: self.order,
121            row_id_filter: self.row_id_filter.clone(),
122        }
123    }
124}
125
126impl<P> Default for ScanStreamOptions<P>
127where
128    P: Pager<Blob = EntryHandle> + Send + Sync,
129{
130    fn default() -> Self {
131        Self {
132            include_nulls: false,
133            order: None,
134            row_id_filter: None,
135        }
136    }
137}
138
139/// Specification for ordering scan results.
140///
141/// Defines how to sort rows based on a single column's values.
142#[derive(Clone, Copy, Debug)]
143pub struct ScanOrderSpec {
144    /// The field to sort by.
145    pub field_id: FieldId,
146    /// Sort direction (ascending or descending).
147    pub direction: ScanOrderDirection,
148    /// Whether null values appear first or last.
149    pub nulls_first: bool,
150    /// Optional transformation to apply before sorting.
151    pub transform: ScanOrderTransform,
152}
153
154/// Sort direction for scan ordering.
155#[derive(Clone, Copy, Debug, PartialEq, Eq)]
156pub enum ScanOrderDirection {
157    /// Sort from smallest to largest.
158    Ascending,
159    /// Sort from largest to smallest.
160    Descending,
161}
162
163/// Value transformation to apply before sorting.
164///
165/// Used to enable sorting on columns that need type coercion or conversion.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub enum ScanOrderTransform {
168    /// Sort 64-bit integers as-is.
169    IdentityInt64,
170    /// Sort 32-bit integers as-is.
171    IdentityInt32,
172    /// Sort strings lexicographically.
173    IdentityUtf8,
174    /// Parse strings as integers, then sort numerically.
175    CastUtf8ToInteger,
176}
177
178/// A column or computed expression to include in scan results.
179///
180/// Scans can project either stored columns or expressions computed from them.
181#[derive(Clone, Debug)]
182pub enum ScanProjection {
183    /// Project a stored column directly.
184    Column(Projection),
185    /// Compute a value from an expression and return it with an alias.
186    Computed {
187        /// The expression to evaluate (can reference column field IDs).
188        expr: ScalarExpr<FieldId>,
189        /// The name to give the computed column in results.
190        alias: String,
191    },
192}
193
194impl ScanProjection {
195    /// Create a projection for a stored column.
196    pub fn column<P: Into<Projection>>(proj: P) -> Self {
197        Self::Column(proj.into())
198    }
199
200    /// Create a projection for a computed expression.
201    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
202        Self::Computed {
203            expr,
204            alias: alias.into(),
205        }
206    }
207}
208
209impl From<Projection> for ScanProjection {
210    fn from(value: Projection) -> Self {
211        ScanProjection::Column(value)
212    }
213}
214
215impl From<&Projection> for ScanProjection {
216    fn from(value: &Projection) -> Self {
217        ScanProjection::Column(value.clone())
218    }
219}
220
221impl From<&ScanProjection> for ScanProjection {
222    fn from(value: &ScanProjection) -> Self {
223        value.clone()
224    }
225}
226
227impl<P> Table<P>
228where
229    P: Pager<Blob = EntryHandle> + Send + Sync,
230{
231    /// Create a new table from column specifications.
232    ///
233    /// Coordinates metadata persistence, catalog registration, and storage initialization.
234    pub fn create_from_columns(
235        display_name: &str,
236        canonical_name: &str,
237        columns: &[llkv_plan::PlanColumnSpec],
238        metadata: Arc<crate::metadata::MetadataManager<P>>,
239        catalog: Arc<crate::catalog::TableCatalog>,
240        store: Arc<ColumnStore<P>>,
241    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
242        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
243        service.create_table_from_columns(display_name, canonical_name, columns)
244    }
245
246    /// Create a new table from an Arrow schema (for CREATE TABLE AS SELECT).
247    pub fn create_from_schema(
248        display_name: &str,
249        canonical_name: &str,
250        schema: &arrow::datatypes::Schema,
251        metadata: Arc<crate::metadata::MetadataManager<P>>,
252        catalog: Arc<crate::catalog::TableCatalog>,
253        store: Arc<ColumnStore<P>>,
254    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
255        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
256        service.create_table_from_schema(display_name, canonical_name, schema)
257    }
258
259    /// Internal constructor: wrap a table ID with column store access.
260    ///
261    /// **This is for internal crate use only.** User code should create tables via
262    /// `CatalogService::create_table_*()`. For tests, use `Table::from_id()`.
263    #[doc(hidden)]
264    pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
265        if is_reserved_table_id(table_id) {
266            return Err(Error::ReservedTableId(table_id));
267        }
268
269        tracing::trace!(
270            "Table::from_id: Opening table_id={} with pager at {:p}",
271            table_id,
272            &*pager
273        );
274        let store = ColumnStore::open(pager)?;
275        Ok(Self {
276            store: Arc::new(store),
277            table_id,
278            mvcc_cache: RwLock::new(None),
279        })
280    }
281
282    /// Internal constructor: wrap a table ID with a shared column store.
283    ///
284    /// **This is for internal crate use only.** Preferred over `from_id()` when
285    /// multiple tables share the same store. For tests, use `Table::from_id_with_store()`.
286    #[doc(hidden)]
287    pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
288        if is_reserved_table_id(table_id) {
289            return Err(Error::ReservedTableId(table_id));
290        }
291
292        Ok(Self {
293            store,
294            table_id,
295            mvcc_cache: RwLock::new(None),
296        })
297    }
298
299    /// Register a persisted sort index for the specified user column.
300    pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
301        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
302        self.store
303            .register_index(logical_field_id, IndexKind::Sort)?;
304        Ok(())
305    }
306
307    /// Remove a persisted sort index for the specified user column if it exists.
308    pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
309        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
310        match self
311            .store
312            .unregister_index(logical_field_id, IndexKind::Sort)
313        {
314            Ok(()) | Err(Error::NotFound) => Ok(()),
315            Err(err) => Err(err),
316        }
317    }
318
319    /// List the persisted index kinds registered for the given user column.
320    pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
321        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
322        match self.store.list_persisted_indexes(logical_field_id) {
323            Ok(kinds) => Ok(kinds),
324            Err(Error::NotFound) => Ok(Vec::new()),
325            Err(err) => Err(err),
326        }
327    }
328
329    /// Get or initialize the MVCC column cache from the provided schema.
330    /// This is an optimization to avoid repeated string comparisons in append().
331    fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
332        // Fast path: check if cache is already initialized
333        {
334            let cache_read = self.mvcc_cache.read().unwrap();
335            if let Some(cache) = *cache_read {
336                return cache;
337            }
338        }
339
340        // Slow path: initialize cache from schema
341        let has_created_by = schema
342            .fields()
343            .iter()
344            .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
345        let has_deleted_by = schema
346            .fields()
347            .iter()
348            .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
349
350        let cache = MvccColumnCache {
351            has_created_by,
352            has_deleted_by,
353        };
354
355        // Store in cache for future calls
356        *self.mvcc_cache.write().unwrap() = Some(cache);
357
358        cache
359    }
360
361    /// Append a [`RecordBatch`] to the table.
362    ///
363    /// The batch must include:
364    /// - A `row_id` column (type `UInt64`) with unique row identifiers
365    /// - `field_id` metadata for each user column, mapping to this table's field IDs
366    ///
367    /// # MVCC Columns
368    ///
369    /// If the batch includes `created_by` or `deleted_by` columns, they are automatically
370    /// assigned the correct [`LogicalFieldId`] for this table's MVCC metadata.
371    ///
372    /// # Field ID Mapping
373    ///
374    /// Each column's `field_id` metadata is converted to a [`LogicalFieldId`] by combining
375    /// it with this table's ID. This ensures columns from different tables don't collide
376    /// in the underlying storage.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if:
381    /// - The batch is missing the `row_id` column
382    /// - Any user column is missing `field_id` metadata
383    /// - Field IDs are invalid or malformed
384    /// - The underlying storage operation fails
385    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
386        use arrow::array::UInt64Builder;
387
388        // Check if MVCC columns already exist in the batch using cache
389        // This avoids repeated string comparisons on every append
390        let cache = self.get_mvcc_cache(&batch.schema());
391        let has_created_by = cache.has_created_by;
392        let has_deleted_by = cache.has_deleted_by;
393
394        let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
395        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
396
397        for (idx, field) in batch.schema().fields().iter().enumerate() {
398            let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
399            // System columns (row_id, MVCC columns) don't need field_id metadata
400            if maybe_field_id.is_none()
401                && (field.name() == ROW_ID_COLUMN_NAME
402                    || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
403                    || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
404            {
405                if field.name() == ROW_ID_COLUMN_NAME {
406                    new_fields.push(field.as_ref().clone());
407                    new_columns.push(batch.column(idx).clone());
408                } else {
409                    let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
410                        LogicalFieldId::for_mvcc_created_by(self.table_id)
411                    } else {
412                        LogicalFieldId::for_mvcc_deleted_by(self.table_id)
413                    };
414
415                    let mut metadata = field.metadata().clone();
416                    let lfid_val: u64 = lfid.into();
417                    metadata.insert(
418                        crate::constants::FIELD_ID_META_KEY.to_string(),
419                        lfid_val.to_string(),
420                    );
421
422                    let new_field =
423                        Field::new(field.name(), field.data_type().clone(), field.is_nullable())
424                            .with_metadata(metadata);
425                    new_fields.push(new_field);
426                    new_columns.push(batch.column(idx).clone());
427                }
428                continue;
429            }
430
431            let raw_field_id = maybe_field_id
432                .ok_or_else(|| {
433                    llkv_result::Error::Internal(format!(
434                        "Field '{}' is missing a valid '{}' in its metadata.",
435                        field.name(),
436                        crate::constants::FIELD_ID_META_KEY
437                    ))
438                })?
439                .parse::<u64>()
440                .map_err(|err| {
441                    llkv_result::Error::Internal(format!(
442                        "Field '{}' contains an invalid '{}': {}",
443                        field.name(),
444                        crate::constants::FIELD_ID_META_KEY,
445                        err
446                    ))
447                })?;
448
449            if raw_field_id > FieldId::MAX as u64 {
450                return Err(llkv_result::Error::Internal(format!(
451                    "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
452                    field.name(),
453                    FieldId::MAX,
454                    raw_field_id
455                )));
456            }
457
458            let user_field_id = raw_field_id as FieldId;
459            let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
460
461            // Store the fully-qualified logical field id in the metadata we hand to the
462            // column store so descriptors are registered under the correct table id.
463            let lfid = logical_field_id;
464            let mut new_metadata = field.metadata().clone();
465            let lfid_val: u64 = lfid.into();
466            new_metadata.insert(
467                crate::constants::FIELD_ID_META_KEY.to_string(),
468                lfid_val.to_string(),
469            );
470
471            let new_field =
472                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
473                    .with_metadata(new_metadata);
474            new_fields.push(new_field);
475            new_columns.push(batch.column(idx).clone());
476
477            // Ensure the catalog remembers the human-friendly column name for
478            // this field so callers of `Table::schema()` (and other metadata
479            // consumers) can recover it later. The CSV ingest path (and other
480            // writers) may only supply the `field_id` metadata on the batch,
481            // so defensively persist the column name when absent.
482            let need_meta = match self
483                .catalog()
484                .get_cols_meta(self.table_id, &[user_field_id])
485            {
486                metas if metas.is_empty() => true,
487                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
488            };
489
490            if need_meta {
491                let meta = ColMeta {
492                    col_id: user_field_id,
493                    name: Some(field.name().to_string()),
494                    flags: 0,
495                    default: None,
496                };
497                self.catalog().put_col_meta(self.table_id, &meta);
498            }
499        }
500
501        // Inject MVCC columns if they don't exist
502        // For non-transactional appends (e.g., CSV ingest), we use TXN_ID_AUTO_COMMIT (1)
503        // which is treated as "committed by system" and always visible.
504        // Use TXN_ID_NONE (0) for deleted_by to indicate "not deleted".
505        const TXN_ID_AUTO_COMMIT: u64 = 1;
506        const TXN_ID_NONE: u64 = 0;
507        let row_count = batch.num_rows();
508
509        if !has_created_by {
510            let mut created_by_builder = UInt64Builder::with_capacity(row_count);
511            for _ in 0..row_count {
512                created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
513            }
514            let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
515            let mut metadata = HashMap::new();
516            let lfid_val: u64 = created_by_lfid.into();
517            metadata.insert(
518                crate::constants::FIELD_ID_META_KEY.to_string(),
519                lfid_val.to_string(),
520            );
521            new_fields.push(
522                Field::new(
523                    llkv_column_map::store::CREATED_BY_COLUMN_NAME,
524                    DataType::UInt64,
525                    false,
526                )
527                .with_metadata(metadata),
528            );
529            new_columns.push(Arc::new(created_by_builder.finish()));
530        }
531
532        if !has_deleted_by {
533            let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
534            for _ in 0..row_count {
535                deleted_by_builder.append_value(TXN_ID_NONE);
536            }
537            let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
538            let mut metadata = HashMap::new();
539            let lfid_val: u64 = deleted_by_lfid.into();
540            metadata.insert(
541                crate::constants::FIELD_ID_META_KEY.to_string(),
542                lfid_val.to_string(),
543            );
544            new_fields.push(
545                Field::new(
546                    llkv_column_map::store::DELETED_BY_COLUMN_NAME,
547                    DataType::UInt64,
548                    false,
549                )
550                .with_metadata(metadata),
551            );
552            new_columns.push(Arc::new(deleted_by_builder.finish()));
553        }
554
555        let new_schema = Arc::new(Schema::new(new_fields));
556        let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
557
558        tracing::trace!(
559            table_id = self.table_id,
560            num_columns = namespaced_batch.num_columns(),
561            num_rows = namespaced_batch.num_rows(),
562            "Attempting append to table"
563        );
564
565        if let Err(err) = self.store.append(&namespaced_batch) {
566            let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
567                .schema()
568                .fields()
569                .iter()
570                .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
571                .filter_map(|s| s.parse::<u64>().ok())
572                .map(LogicalFieldId::from)
573                .collect();
574
575            // Check which fields are missing from the catalog
576            let missing_fields: Vec<LogicalFieldId> = batch_field_ids
577                .iter()
578                .filter(|&&field_id| !self.store.has_field(field_id))
579                .copied()
580                .collect();
581
582            tracing::error!(
583                table_id = self.table_id,
584                error = ?err,
585                batch_field_ids = ?batch_field_ids,
586                missing_from_catalog = ?missing_fields,
587                "Append failed - some fields missing from catalog"
588            );
589            return Err(err);
590        }
591        Ok(())
592    }
593
594    /// Stream one or more projected columns as a sequence of RecordBatches.
595    ///
596    /// - Avoids `concat` and large materializations.
597    /// - Uses the same filter machinery as the old `scan` to produce
598    ///   `row_ids`.
599    /// - Splits `row_ids` into fixed-size windows and gathers rows per
600    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
601    pub fn scan_stream<'a, I, T, F>(
602        &self,
603        projections: I,
604        filter_expr: &Expr<'a, FieldId>,
605        options: ScanStreamOptions<P>,
606        on_batch: F,
607    ) -> LlkvResult<()>
608    where
609        I: IntoIterator<Item = T>,
610        T: Into<ScanProjection>,
611        F: FnMut(RecordBatch),
612    {
613        let stream_projections: Vec<ScanProjection> =
614            projections.into_iter().map(|p| p.into()).collect();
615        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
616    }
617
618    /// Stream projections using fully resolved expression inputs.
619    ///
620    /// Callers that already parsed expressions into [`ScanProjection`] values can
621    /// use this entry point to skip the iterator conversion performed by
622    /// [`Self::scan_stream`]. The execution semantics and callbacks are identical.
623    pub fn scan_stream_with_exprs<'a, F>(
624        &self,
625        projections: &[ScanProjection],
626        filter_expr: &Expr<'a, FieldId>,
627        options: ScanStreamOptions<P>,
628        on_batch: F,
629    ) -> LlkvResult<()>
630    where
631        F: FnMut(RecordBatch),
632    {
633        TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
634    }
635
636    pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Treemap> {
637        let source = collect_row_ids_for_table(self, filter_expr)?;
638        Ok(match source {
639            RowIdSource::Bitmap(b) => b,
640            RowIdSource::Vector(v) => Treemap::from_iter(v),
641        })
642    }
643
644    #[inline]
645    pub fn catalog(&self) -> SysCatalog<'_, P> {
646        SysCatalog::new(&self.store)
647    }
648
649    #[inline]
650    pub fn get_table_meta(&self) -> Option<TableMeta> {
651        self.catalog().get_table_meta(self.table_id)
652    }
653
654    #[inline]
655    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
656        self.catalog().get_cols_meta(self.table_id, col_ids)
657    }
658
659    /// Build and return an Arrow `Schema` that describes this table.
660    ///
661    /// The returned schema includes the `row_id` field first, followed by
662    /// user fields. Each user field has its `field_id` stored in the field
663    /// metadata (under the "field_id" key) and the name is taken from the
664    /// catalog when available or falls back to `col_<id>`.
665    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
666        // Collect logical fields for this table and sort by field id.
667        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
668        logical_fields.sort_by_key(|lfid| lfid.field_id());
669
670        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
671        let metas = self.get_cols_meta(&field_ids);
672
673        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
674        // Add row_id first
675        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
676
677        for (idx, lfid) in logical_fields.into_iter().enumerate() {
678            let fid = lfid.field_id();
679            let dtype = self.store.data_type(lfid)?;
680            let name = metas
681                .get(idx)
682                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
683                .unwrap_or_else(|| format!("col_{}", fid));
684
685            let mut metadata: HashMap<String, String> = HashMap::new();
686            metadata.insert(
687                crate::constants::FIELD_ID_META_KEY.to_string(),
688                fid.to_string(),
689            );
690
691            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
692        }
693
694        Ok(Arc::new(Schema::new(fields)))
695    }
696
697    /// Return the table schema formatted as an Arrow RecordBatch suitable
698    /// for pretty printing. The batch has three columns: `name` (Utf8),
699    /// `field_id` (UInt32) and `data_type` (Utf8).
700    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
701        let schema = self.schema()?;
702        let fields = schema.fields();
703
704        let mut names: Vec<String> = Vec::with_capacity(fields.len());
705        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
706        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
707
708        for field in fields.iter() {
709            names.push(field.name().to_string());
710            let fid = field
711                .metadata()
712                .get(crate::constants::FIELD_ID_META_KEY)
713                .and_then(|s| s.parse::<u32>().ok())
714                .unwrap_or(0u32);
715            fids.push(fid);
716            dtypes.push(format!("{:?}", field.data_type()));
717        }
718
719        // Build Arrow arrays
720        let name_array: ArrayRef = Arc::new(StringArray::from(names));
721        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
722        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
723
724        let rb_schema = Arc::new(Schema::new(vec![
725            Field::new("name", DataType::Utf8, false),
726            Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
727            Field::new("data_type", DataType::Utf8, false),
728        ]));
729
730        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
731        Ok(batch)
732    }
733
734    /// Create a streaming view over the provided row IDs for the specified logical fields.
735    pub fn stream_columns<'table, 'a>(
736        &'table self,
737        logical_fields: impl Into<Arc<[LogicalFieldId]>>,
738        row_ids: impl crate::stream::RowIdStreamSource<'a>,
739        policy: GatherNullPolicy,
740    ) -> LlkvResult<ColumnStream<'table, 'a, P>> {
741        let total_rows = row_ids.count() as usize;
742        let iter = row_ids.into_iter_source();
743        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
744        let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
745        Ok(ColumnStream::new(
746            &self.store,
747            ctx,
748            iter,
749            total_rows,
750            STREAM_BATCH_ROWS,
751            policy,
752            logical_fields,
753        ))
754    }
755
756    pub fn store(&self) -> &ColumnStore<P> {
757        &self.store
758    }
759
760    #[inline]
761    pub fn table_id(&self) -> TableId {
762        self.table_id
763    }
764
765    /// Return the total number of rows for a given user column id in this table.
766    ///
767    /// This delegates to the ColumnStore descriptor for the logical field that
768    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
769    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
770        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
771        self.store.total_rows_for_field(lfid)
772    }
773
774    /// Return the total number of rows for this table.
775    ///
776    /// Prefer reading the dedicated row-id shadow column if present; otherwise
777    /// fall back to inspecting any persisted user column descriptor.
778    pub fn total_rows(&self) -> llkv_result::Result<u64> {
779        use llkv_column_map::store::rowid_fid;
780        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
781        // Try the row-id shadow column first
782        match self.store.total_rows_for_field(rid_lfid) {
783            Ok(n) => Ok(n),
784            Err(_) => {
785                // Fall back to scanning the catalog for any user-data column
786                self.store.total_rows_for_table(self.table_id)
787            }
788        }
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::reserved::CATALOG_TABLE_ID;
796    use crate::types::RowId;
797    use arrow::array::Array;
798    use arrow::array::ArrayRef;
799    use arrow::array::{
800        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
801        UInt32Array, UInt64Array,
802    };
803    use arrow::compute::{cast, max, min, sum, unary};
804    use arrow::datatypes::DataType;
805    use llkv_column_map::ColumnStore;
806    use llkv_column_map::store::GatherNullPolicy;
807    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
808    use std::collections::HashMap;
809    use std::ops::Bound;
810
811    fn setup_test_table() -> Table {
812        let pager = Arc::new(MemPager::default());
813        setup_test_table_with_pager(&pager)
814    }
815
816    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
817        let table = Table::from_id(1, Arc::clone(pager)).unwrap();
818        const COL_A_U64: FieldId = 10;
819        const COL_B_BIN: FieldId = 11;
820        const COL_C_I32: FieldId = 12;
821        const COL_D_F64: FieldId = 13;
822        const COL_E_F32: FieldId = 14;
823
824        let schema = Arc::new(Schema::new(vec![
825            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
826            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
827                crate::constants::FIELD_ID_META_KEY.to_string(),
828                COL_A_U64.to_string(),
829            )])),
830            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
831                crate::constants::FIELD_ID_META_KEY.to_string(),
832                COL_B_BIN.to_string(),
833            )])),
834            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
835                crate::constants::FIELD_ID_META_KEY.to_string(),
836                COL_C_I32.to_string(),
837            )])),
838            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
839                crate::constants::FIELD_ID_META_KEY.to_string(),
840                COL_D_F64.to_string(),
841            )])),
842            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
843                crate::constants::FIELD_ID_META_KEY.to_string(),
844                COL_E_F32.to_string(),
845            )])),
846        ]));
847
848        let batch = RecordBatch::try_new(
849            schema.clone(),
850            vec![
851                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
852                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
853                Arc::new(BinaryArray::from(vec![
854                    b"foo" as &[u8],
855                    b"bar",
856                    b"baz",
857                    b"qux",
858                ])),
859                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
860                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
861                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
862            ],
863        )
864        .unwrap();
865
866        table.append(&batch).unwrap();
867        table
868    }
869
870    fn gather_single(
871        store: &ColumnStore<MemPager>,
872        field_id: LogicalFieldId,
873        row_ids: &[u64],
874    ) -> ArrayRef {
875        store
876            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
877            .unwrap()
878            .column(0)
879            .clone()
880    }
881
882    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
883        Expr::Pred(filter)
884    }
885
886    fn proj(table: &Table, field_id: FieldId) -> Projection {
887        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
888    }
889
890    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
891        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
892    }
893
894    #[test]
895    fn table_new_rejects_reserved_table_id() {
896        let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
897        assert!(matches!(
898            result,
899            Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
900        ));
901    }
902
903    #[test]
904    fn test_append_rejects_logical_field_id_in_metadata() {
905        // Create a table and build a schema where the column's metadata
906        // contains a fully-qualified LogicalFieldId (u64). Append should
907        // reject this and require a plain user FieldId instead.
908        let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
909
910        const USER_FID: FieldId = 42;
911        // Build a logical id (namespaced) and put its numeric value into metadata
912        let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
913        let logical_val: u64 = logical.into();
914
915        let schema = Arc::new(Schema::new(vec![
916            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
917            Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
918                crate::constants::FIELD_ID_META_KEY.to_string(),
919                logical_val.to_string(),
920            )])),
921        ]));
922
923        let batch = RecordBatch::try_new(
924            schema,
925            vec![
926                Arc::new(UInt64Array::from(vec![1u64, 2u64])),
927                Arc::new(UInt64Array::from(vec![10u64, 20u64])),
928            ],
929        )
930        .unwrap();
931
932        let res = table.append(&batch);
933        assert!(matches!(res, Err(Error::Internal(_))));
934    }
935
936    #[test]
937    fn test_scan_with_u64_filter() {
938        let table = setup_test_table();
939        const COL_A_U64: FieldId = 10;
940        const COL_C_I32: FieldId = 12;
941
942        let expr = pred_expr(Filter {
943            field_id: COL_A_U64,
944            op: Operator::Equals(200.into()),
945        });
946
947        let mut vals: Vec<Option<i32>> = Vec::new();
948        table
949            .scan_stream(
950                &[proj(&table, COL_C_I32)],
951                &expr,
952                ScanStreamOptions::default(),
953                |b| {
954                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
955                    vals.extend(
956                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
957                    );
958                },
959            )
960            .unwrap();
961        assert_eq!(vals, vec![Some(20), Some(20)]);
962    }
963
964    #[test]
965    fn test_scan_with_string_filter() {
966        let pager = Arc::new(MemPager::default());
967        let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
968
969        const COL_STR: FieldId = 42;
970        let schema = Arc::new(Schema::new(vec![
971            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
972            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
973                crate::constants::FIELD_ID_META_KEY.to_string(),
974                COL_STR.to_string(),
975            )])),
976        ]));
977
978        let batch = RecordBatch::try_new(
979            schema,
980            vec![
981                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
982                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
983            ],
984        )
985        .unwrap();
986        table.append(&batch).unwrap();
987
988        let expr = pred_expr(Filter {
989            field_id: COL_STR,
990            op: Operator::starts_with("al".to_string(), true),
991        });
992
993        let mut collected: Vec<Option<String>> = Vec::new();
994        table
995            .scan_stream(
996                &[proj(&table, COL_STR)],
997                &expr,
998                ScanStreamOptions::default(),
999                |b| {
1000                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1001                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
1002                },
1003            )
1004            .unwrap();
1005
1006        assert_eq!(
1007            collected,
1008            vec![Some("alice".to_string()), Some("albert".to_string())]
1009        );
1010    }
1011
1012    #[test]
1013    fn test_table_reopen_with_shared_pager() {
1014        const TABLE_ALPHA: TableId = 42;
1015        const TABLE_BETA: TableId = 43;
1016        const TABLE_GAMMA: TableId = 44;
1017        const COL_ALPHA_U64: FieldId = 100;
1018        const COL_ALPHA_I32: FieldId = 101;
1019        const COL_ALPHA_U32: FieldId = 102;
1020        const COL_ALPHA_I16: FieldId = 103;
1021        const COL_BETA_U64: FieldId = 200;
1022        const COL_BETA_U8: FieldId = 201;
1023        const COL_GAMMA_I16: FieldId = 300;
1024
1025        let pager = Arc::new(MemPager::default());
1026
1027        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1028        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1029        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1030        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1031        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1032
1033        let beta_rows: Vec<RowId> = vec![101, 102, 103];
1034        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1035        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1036
1037        let gamma_rows: Vec<RowId> = vec![501, 502];
1038        let gamma_vals_i16: Vec<i16> = vec![123, -321];
1039
1040        // First session: create tables and write data.
1041        {
1042            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1043            let schema = Arc::new(Schema::new(vec![
1044                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1045                Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1046                    crate::constants::FIELD_ID_META_KEY.to_string(),
1047                    COL_ALPHA_U64.to_string(),
1048                )])),
1049                Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1050                    crate::constants::FIELD_ID_META_KEY.to_string(),
1051                    COL_ALPHA_I32.to_string(),
1052                )])),
1053                Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1054                    crate::constants::FIELD_ID_META_KEY.to_string(),
1055                    COL_ALPHA_U32.to_string(),
1056                )])),
1057                Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1058                    crate::constants::FIELD_ID_META_KEY.to_string(),
1059                    COL_ALPHA_I16.to_string(),
1060                )])),
1061            ]));
1062            let batch = RecordBatch::try_new(
1063                schema,
1064                vec![
1065                    Arc::new(UInt64Array::from(alpha_rows.clone())),
1066                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1067                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1068                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1069                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1070                ],
1071            )
1072            .unwrap();
1073            table.append(&batch).unwrap();
1074        }
1075
1076        {
1077            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1078            let schema = Arc::new(Schema::new(vec![
1079                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1080                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1081                    crate::constants::FIELD_ID_META_KEY.to_string(),
1082                    COL_BETA_U64.to_string(),
1083                )])),
1084                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1085                    crate::constants::FIELD_ID_META_KEY.to_string(),
1086                    COL_BETA_U8.to_string(),
1087                )])),
1088            ]));
1089            let batch = RecordBatch::try_new(
1090                schema,
1091                vec![
1092                    Arc::new(UInt64Array::from(beta_rows.clone())),
1093                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1094                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1095                ],
1096            )
1097            .unwrap();
1098            table.append(&batch).unwrap();
1099        }
1100
1101        {
1102            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1103            let schema = Arc::new(Schema::new(vec![
1104                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1105                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1106                    crate::constants::FIELD_ID_META_KEY.to_string(),
1107                    COL_GAMMA_I16.to_string(),
1108                )])),
1109            ]));
1110            let batch = RecordBatch::try_new(
1111                schema,
1112                vec![
1113                    Arc::new(UInt64Array::from(gamma_rows.clone())),
1114                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1115                ],
1116            )
1117            .unwrap();
1118            table.append(&batch).unwrap();
1119        }
1120
1121        // Second session: reopen each table and ensure schema and values are intact.
1122        {
1123            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1124            let store = table.store();
1125
1126            let expectations: &[(FieldId, DataType)] = &[
1127                (COL_ALPHA_U64, DataType::UInt64),
1128                (COL_ALPHA_I32, DataType::Int32),
1129                (COL_ALPHA_U32, DataType::UInt32),
1130                (COL_ALPHA_I16, DataType::Int16),
1131            ];
1132
1133            for &(col, ref ty) in expectations {
1134                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1135                assert_eq!(store.data_type(lfid).unwrap(), *ty);
1136                let arr = gather_single(store, lfid, &alpha_rows);
1137                match ty {
1138                    DataType::UInt64 => {
1139                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1140                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1141                    }
1142                    DataType::Int32 => {
1143                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1144                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1145                    }
1146                    DataType::UInt32 => {
1147                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1148                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1149                    }
1150                    DataType::Int16 => {
1151                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1152                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1153                    }
1154                    other => panic!("unexpected dtype {other:?}"),
1155                }
1156            }
1157        }
1158
1159        {
1160            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1161            let store = table.store();
1162
1163            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1164            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1165            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1166            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1167            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1168
1169            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1170            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1171            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1172            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1173            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1174        }
1175
1176        {
1177            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1178            let store = table.store();
1179            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1180            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1181            let arr = gather_single(store, lfid, &gamma_rows);
1182            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1183            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1184        }
1185    }
1186
1187    #[test]
1188    fn test_scan_with_i32_filter() {
1189        let table = setup_test_table();
1190        const COL_A_U64: FieldId = 10;
1191        const COL_C_I32: FieldId = 12;
1192
1193        let filter = pred_expr(Filter {
1194            field_id: COL_C_I32,
1195            op: Operator::Equals(20.into()),
1196        });
1197
1198        let mut vals: Vec<Option<u64>> = Vec::new();
1199        table
1200            .scan_stream(
1201                &[proj(&table, COL_A_U64)],
1202                &filter,
1203                ScanStreamOptions::default(),
1204                |b| {
1205                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1206                    vals.extend(
1207                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1208                    );
1209                },
1210            )
1211            .unwrap();
1212        assert_eq!(vals, vec![Some(200), Some(200)]);
1213    }
1214
1215    #[test]
1216    fn test_scan_with_greater_than_filter() {
1217        let table = setup_test_table();
1218        const COL_A_U64: FieldId = 10;
1219        const COL_C_I32: FieldId = 12;
1220
1221        let filter = pred_expr(Filter {
1222            field_id: COL_C_I32,
1223            op: Operator::GreaterThan(15.into()),
1224        });
1225
1226        let mut vals: Vec<Option<u64>> = Vec::new();
1227        table
1228            .scan_stream(
1229                &[proj(&table, COL_A_U64)],
1230                &filter,
1231                ScanStreamOptions::default(),
1232                |b| {
1233                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1234                    vals.extend(
1235                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1236                    );
1237                },
1238            )
1239            .unwrap();
1240        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1241    }
1242
1243    #[test]
1244    fn test_scan_with_range_filter() {
1245        let table = setup_test_table();
1246        const COL_A_U64: FieldId = 10;
1247        const COL_C_I32: FieldId = 12;
1248
1249        let filter = pred_expr(Filter {
1250            field_id: COL_A_U64,
1251            op: Operator::Range {
1252                lower: Bound::Included(150.into()),
1253                upper: Bound::Excluded(300.into()),
1254            },
1255        });
1256
1257        let mut vals: Vec<Option<i32>> = Vec::new();
1258        table
1259            .scan_stream(
1260                &[proj(&table, COL_C_I32)],
1261                &filter,
1262                ScanStreamOptions::default(),
1263                |b| {
1264                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1265                    vals.extend(
1266                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1267                    );
1268                },
1269            )
1270            .unwrap();
1271        assert_eq!(vals, vec![Some(20), Some(20)]);
1272    }
1273
1274    #[test]
1275    fn test_filtered_scan_sum_kernel() {
1276        // Trade-off note:
1277        // - We use Arrow's sum kernel per batch, then add the partial sums.
1278        // - This preserves Arrow null semantics and avoids concat.
1279        let table = setup_test_table();
1280        const COL_A_U64: FieldId = 10;
1281
1282        let filter = pred_expr(Filter {
1283            field_id: COL_A_U64,
1284            op: Operator::Range {
1285                lower: Bound::Included(150.into()),
1286                upper: Bound::Excluded(300.into()),
1287            },
1288        });
1289
1290        let mut total: u128 = 0;
1291        table
1292            .scan_stream(
1293                &[proj(&table, COL_A_U64)],
1294                &filter,
1295                ScanStreamOptions::default(),
1296                |b| {
1297                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1298                    if let Some(part) = sum(a) {
1299                        total += part as u128;
1300                    }
1301                },
1302            )
1303            .unwrap();
1304
1305        assert_eq!(total, 400);
1306    }
1307
1308    #[test]
1309    fn test_filtered_scan_sum_i32_kernel() {
1310        // Trade-off note:
1311        // - Per-batch sum + accumulate avoids building one big Array.
1312        // - For tiny batches overhead may match manual loops, but keeps
1313        //   Arrow semantics exact.
1314        let table = setup_test_table();
1315        const COL_A_U64: FieldId = 10;
1316        const COL_C_I32: FieldId = 12;
1317
1318        let candidates = [100.into(), 300.into()];
1319        let filter = pred_expr(Filter {
1320            field_id: COL_A_U64,
1321            op: Operator::In(&candidates),
1322        });
1323
1324        let mut total: i64 = 0;
1325        table
1326            .scan_stream(
1327                &[proj(&table, COL_C_I32)],
1328                &filter,
1329                ScanStreamOptions::default(),
1330                |b| {
1331                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1332                    if let Some(part) = sum(a) {
1333                        total += part as i64;
1334                    }
1335                },
1336            )
1337            .unwrap();
1338        assert_eq!(total, 40);
1339    }
1340
1341    #[test]
1342    fn test_filtered_scan_min_max_kernel() {
1343        // Trade-off note:
1344        // - min/max are computed per batch and folded. This preserves
1345        //   Arrow's null behavior and avoids concat.
1346        // - Be mindful of NaN semantics if extended to floats later.
1347        let table = setup_test_table();
1348        const COL_A_U64: FieldId = 10;
1349        const COL_C_I32: FieldId = 12;
1350
1351        let candidates = [100.into(), 300.into()];
1352        let filter = pred_expr(Filter {
1353            field_id: COL_A_U64,
1354            op: Operator::In(&candidates),
1355        });
1356
1357        let mut mn: Option<i32> = None;
1358        let mut mx: Option<i32> = None;
1359        table
1360            .scan_stream(
1361                &[proj(&table, COL_C_I32)],
1362                &filter,
1363                ScanStreamOptions::default(),
1364                |b| {
1365                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1366
1367                    if let Some(part_min) = min(a) {
1368                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1369                    }
1370                    if let Some(part_max) = max(a) {
1371                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1372                    }
1373                },
1374            )
1375            .unwrap();
1376        assert_eq!(mn, Some(10));
1377        assert_eq!(mx, Some(30));
1378    }
1379
1380    #[test]
1381    fn test_filtered_scan_float64_column() {
1382        let table = setup_test_table();
1383        const COL_D_F64: FieldId = 13;
1384
1385        let filter = pred_expr(Filter {
1386            field_id: COL_D_F64,
1387            op: Operator::GreaterThan(2.0_f64.into()),
1388        });
1389
1390        let mut got = Vec::new();
1391        table
1392            .scan_stream(
1393                &[proj(&table, COL_D_F64)],
1394                &filter,
1395                ScanStreamOptions::default(),
1396                |b| {
1397                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1398                    for i in 0..arr.len() {
1399                        if arr.is_valid(i) {
1400                            got.push(arr.value(i));
1401                        }
1402                    }
1403                },
1404            )
1405            .unwrap();
1406
1407        assert_eq!(got, vec![2.5, 3.5, 2.5]);
1408    }
1409
1410    #[test]
1411    fn test_filtered_scan_float32_in_operator() {
1412        let table = setup_test_table();
1413        const COL_E_F32: FieldId = 14;
1414
1415        let candidates = [2.0_f32.into(), 3.0_f32.into()];
1416        let filter = pred_expr(Filter {
1417            field_id: COL_E_F32,
1418            op: Operator::In(&candidates),
1419        });
1420
1421        let mut vals: Vec<Option<f32>> = Vec::new();
1422        table
1423            .scan_stream(
1424                &[proj(&table, COL_E_F32)],
1425                &filter,
1426                ScanStreamOptions::default(),
1427                |b| {
1428                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1429                    vals.extend((0..arr.len()).map(|i| {
1430                        if arr.is_null(i) {
1431                            None
1432                        } else {
1433                            Some(arr.value(i))
1434                        }
1435                    }));
1436                },
1437            )
1438            .unwrap();
1439
1440        let collected: Vec<f32> = vals.into_iter().flatten().collect();
1441        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1442    }
1443
1444    #[test]
1445    fn test_scan_stream_and_expression() {
1446        let table = setup_test_table();
1447        const COL_A_U64: FieldId = 10;
1448        const COL_C_I32: FieldId = 12;
1449        const COL_E_F32: FieldId = 14;
1450
1451        let expr = Expr::all_of(vec![
1452            Filter {
1453                field_id: COL_C_I32,
1454                op: Operator::GreaterThan(15.into()),
1455            },
1456            Filter {
1457                field_id: COL_A_U64,
1458                op: Operator::LessThan(250.into()),
1459            },
1460        ]);
1461
1462        let mut vals: Vec<Option<f32>> = Vec::new();
1463        table
1464            .scan_stream(
1465                &[proj(&table, COL_E_F32)],
1466                &expr,
1467                ScanStreamOptions::default(),
1468                |b| {
1469                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1470                    vals.extend((0..arr.len()).map(|i| {
1471                        if arr.is_null(i) {
1472                            None
1473                        } else {
1474                            Some(arr.value(i))
1475                        }
1476                    }));
1477                },
1478            )
1479            .unwrap();
1480
1481        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1482    }
1483
1484    #[test]
1485    fn test_scan_stream_or_expression() {
1486        let table = setup_test_table();
1487        const COL_A_U64: FieldId = 10;
1488        const COL_C_I32: FieldId = 12;
1489
1490        let expr = Expr::any_of(vec![
1491            Filter {
1492                field_id: COL_C_I32,
1493                op: Operator::Equals(10.into()),
1494            },
1495            Filter {
1496                field_id: COL_C_I32,
1497                op: Operator::Equals(30.into()),
1498            },
1499        ]);
1500
1501        let mut vals: Vec<Option<u64>> = Vec::new();
1502        table
1503            .scan_stream(
1504                &[proj(&table, COL_A_U64)],
1505                &expr,
1506                ScanStreamOptions::default(),
1507                |b| {
1508                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1509                    vals.extend((0..arr.len()).map(|i| {
1510                        if arr.is_null(i) {
1511                            None
1512                        } else {
1513                            Some(arr.value(i))
1514                        }
1515                    }));
1516                },
1517            )
1518            .unwrap();
1519
1520        assert_eq!(vals, vec![Some(100), Some(300)]);
1521    }
1522
1523    #[test]
1524    fn test_scan_stream_not_predicate() {
1525        let table = setup_test_table();
1526        const COL_A_U64: FieldId = 10;
1527        const COL_C_I32: FieldId = 12;
1528
1529        let expr = Expr::not(pred_expr(Filter {
1530            field_id: COL_C_I32,
1531            op: Operator::Equals(20.into()),
1532        }));
1533
1534        let mut vals: Vec<Option<u64>> = Vec::new();
1535        table
1536            .scan_stream(
1537                &[proj(&table, COL_A_U64)],
1538                &expr,
1539                ScanStreamOptions::default(),
1540                |b| {
1541                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1542                    vals.extend((0..arr.len()).map(|i| {
1543                        if arr.is_null(i) {
1544                            None
1545                        } else {
1546                            Some(arr.value(i))
1547                        }
1548                    }));
1549                },
1550            )
1551            .unwrap();
1552
1553        assert_eq!(vals, vec![Some(100), Some(300)]);
1554    }
1555
1556    #[test]
1557    fn test_scan_stream_not_and_expression() {
1558        let table = setup_test_table();
1559        const COL_A_U64: FieldId = 10;
1560        const COL_C_I32: FieldId = 12;
1561
1562        let expr = Expr::not(Expr::all_of(vec![
1563            Filter {
1564                field_id: COL_A_U64,
1565                op: Operator::GreaterThan(150.into()),
1566            },
1567            Filter {
1568                field_id: COL_C_I32,
1569                op: Operator::LessThan(40.into()),
1570            },
1571        ]));
1572
1573        let mut vals: Vec<Option<u64>> = Vec::new();
1574        table
1575            .scan_stream(
1576                &[proj(&table, COL_A_U64)],
1577                &expr,
1578                ScanStreamOptions::default(),
1579                |b| {
1580                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1581                    vals.extend((0..arr.len()).map(|i| {
1582                        if arr.is_null(i) {
1583                            None
1584                        } else {
1585                            Some(arr.value(i))
1586                        }
1587                    }));
1588                },
1589            )
1590            .unwrap();
1591
1592        assert_eq!(vals, vec![Some(100)]);
1593    }
1594
1595    #[test]
1596    fn test_scan_stream_include_nulls_toggle() {
1597        let pager = Arc::new(MemPager::default());
1598        let table = setup_test_table_with_pager(&pager);
1599        const COL_A_U64: FieldId = 10;
1600        const COL_C_I32: FieldId = 12;
1601        const COL_B_BIN: FieldId = 11;
1602        const COL_D_F64: FieldId = 13;
1603        const COL_E_F32: FieldId = 14;
1604
1605        let schema = Arc::new(Schema::new(vec![
1606            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1607            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1608                crate::constants::FIELD_ID_META_KEY.to_string(),
1609                COL_A_U64.to_string(),
1610            )])),
1611            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1612                crate::constants::FIELD_ID_META_KEY.to_string(),
1613                COL_B_BIN.to_string(),
1614            )])),
1615            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1616                crate::constants::FIELD_ID_META_KEY.to_string(),
1617                COL_C_I32.to_string(),
1618            )])),
1619            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1620                crate::constants::FIELD_ID_META_KEY.to_string(),
1621                COL_D_F64.to_string(),
1622            )])),
1623            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1624                crate::constants::FIELD_ID_META_KEY.to_string(),
1625                COL_E_F32.to_string(),
1626            )])),
1627        ]));
1628
1629        let batch = RecordBatch::try_new(
1630            schema.clone(),
1631            vec![
1632                Arc::new(UInt64Array::from(vec![5, 6])),
1633                Arc::new(UInt64Array::from(vec![500, 600])),
1634                Arc::new(BinaryArray::from(vec![
1635                    Some(&b"new"[..]),
1636                    Some(&b"alt"[..]),
1637                ])),
1638                Arc::new(Int32Array::from(vec![Some(40), None])),
1639                Arc::new(Float64Array::from(vec![5.5, 6.5])),
1640                Arc::new(Float32Array::from(vec![5.0, 6.0])),
1641            ],
1642        )
1643        .unwrap();
1644        table.append(&batch).unwrap();
1645
1646        let filter = pred_expr(Filter {
1647            field_id: COL_A_U64,
1648            op: Operator::GreaterThan(450.into()),
1649        });
1650
1651        let mut default_vals: Vec<Option<i32>> = Vec::new();
1652        table
1653            .scan_stream(
1654                &[proj(&table, COL_C_I32)],
1655                &filter,
1656                ScanStreamOptions::default(),
1657                |b| {
1658                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1659                    default_vals.extend((0..arr.len()).map(|i| {
1660                        if arr.is_null(i) {
1661                            None
1662                        } else {
1663                            Some(arr.value(i))
1664                        }
1665                    }));
1666                },
1667            )
1668            .unwrap();
1669        assert_eq!(default_vals, vec![Some(40)]);
1670
1671        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1672        table
1673            .scan_stream(
1674                &[proj(&table, COL_C_I32)],
1675                &filter,
1676                ScanStreamOptions {
1677                    include_nulls: true,
1678                    order: None,
1679                    row_id_filter: None,
1680                },
1681                |b| {
1682                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1683
1684                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1685                    table
1686                        .scan_stream(
1687                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1688                            &filter,
1689                            ScanStreamOptions::default(),
1690                            |b| {
1691                                assert_eq!(b.num_columns(), 2);
1692                                let c_arr =
1693                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1694                                let d_arr =
1695                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1696                                for i in 0..b.num_rows() {
1697                                    let c_val = if c_arr.is_null(i) {
1698                                        None
1699                                    } else {
1700                                        Some(c_arr.value(i))
1701                                    };
1702                                    let d_val = if d_arr.is_null(i) {
1703                                        None
1704                                    } else {
1705                                        Some(d_arr.value(i))
1706                                    };
1707                                    paired_vals.push((c_val, d_val));
1708                                }
1709                            },
1710                        )
1711                        .unwrap();
1712                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1713                    include_null_vals.extend((0..arr.len()).map(|i| {
1714                        if arr.is_null(i) {
1715                            None
1716                        } else {
1717                            Some(arr.value(i))
1718                        }
1719                    }));
1720                },
1721            )
1722            .unwrap();
1723        assert_eq!(include_null_vals, vec![Some(40), None]);
1724    }
1725
1726    #[test]
1727    fn test_filtered_scan_int_sqrt_float64() {
1728        // Trade-off note:
1729        // - We cast per batch and apply a compute unary kernel for sqrt.
1730        // - This keeps processing streaming and avoids per-value loops.
1731        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
1732        //   `Float64Array` first.
1733        let table = setup_test_table();
1734        const COL_A_U64: FieldId = 10;
1735        const COL_C_I32: FieldId = 12;
1736
1737        let filter = pred_expr(Filter {
1738            field_id: COL_C_I32,
1739            op: Operator::GreaterThan(15.into()),
1740        });
1741
1742        let mut got: Vec<f64> = Vec::new();
1743        table
1744            .scan_stream(
1745                &[proj(&table, COL_A_U64)],
1746                &filter,
1747                ScanStreamOptions::default(),
1748                |b| {
1749                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1750                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1751
1752                    // unary::<Float64Type, _, Float64Type>(...)
1753                    let sqrt_arr = unary::<
1754                        arrow::datatypes::Float64Type,
1755                        _,
1756                        arrow::datatypes::Float64Type,
1757                    >(f64_arr, |v: f64| v.sqrt());
1758
1759                    for i in 0..sqrt_arr.len() {
1760                        if !sqrt_arr.is_null(i) {
1761                            got.push(sqrt_arr.value(i));
1762                        }
1763                    }
1764                },
1765            )
1766            .unwrap();
1767
1768        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1769        assert_eq!(got, expected);
1770    }
1771
1772    #[test]
1773    fn test_multi_field_kernels_with_filters() {
1774        // Trade-off note:
1775        // - All reductions use per-batch kernels + accumulation to stay
1776        //   streaming. No concat or whole-column materialization.
1777        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1778
1779        let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1780
1781        const COL_A_U64: FieldId = 20;
1782        const COL_D_U32: FieldId = 21;
1783        const COL_E_I16: FieldId = 22;
1784        const COL_F_U8: FieldId = 23;
1785        const COL_C_I32: FieldId = 24;
1786
1787        let schema = Arc::new(Schema::new(vec![
1788            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1789            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1790                crate::constants::FIELD_ID_META_KEY.to_string(),
1791                COL_A_U64.to_string(),
1792            )])),
1793            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1794                crate::constants::FIELD_ID_META_KEY.to_string(),
1795                COL_D_U32.to_string(),
1796            )])),
1797            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1798                crate::constants::FIELD_ID_META_KEY.to_string(),
1799                COL_E_I16.to_string(),
1800            )])),
1801            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1802                crate::constants::FIELD_ID_META_KEY.to_string(),
1803                COL_F_U8.to_string(),
1804            )])),
1805            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1806                crate::constants::FIELD_ID_META_KEY.to_string(),
1807                COL_C_I32.to_string(),
1808            )])),
1809        ]));
1810
1811        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
1812        let batch = RecordBatch::try_new(
1813            schema.clone(),
1814            vec![
1815                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1816                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1817                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1818                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1819                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1820                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1821            ],
1822        )
1823        .unwrap();
1824
1825        table.append(&batch).unwrap();
1826
1827        // Filter: c_i32 >= 20.
1828        let filter = pred_expr(Filter {
1829            field_id: COL_C_I32,
1830            op: Operator::GreaterThanOrEquals(20.into()),
1831        });
1832
1833        // 1) SUM over d_u32 (per-batch sum + accumulate).
1834        let mut d_sum: u128 = 0;
1835        table
1836            .scan_stream(
1837                &[proj(&table, COL_D_U32)],
1838                &filter,
1839                ScanStreamOptions::default(),
1840                |b| {
1841                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1842                    if let Some(part) = sum(a) {
1843                        d_sum += part as u128;
1844                    }
1845                },
1846            )
1847            .unwrap();
1848        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1849
1850        // 2) MIN over e_i16 (per-batch min + fold).
1851        let mut e_min: Option<i16> = None;
1852        table
1853            .scan_stream(
1854                &[proj(&table, COL_E_I16)],
1855                &filter,
1856                ScanStreamOptions::default(),
1857                |b| {
1858                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1859                    if let Some(part_min) = min(a) {
1860                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1861                    }
1862                },
1863            )
1864            .unwrap();
1865        assert_eq!(e_min, Some(-6));
1866
1867        // 3) MAX over f_u8 (per-batch max + fold).
1868        let mut f_max: Option<u8> = None;
1869        table
1870            .scan_stream(
1871                &[proj(&table, COL_F_U8)],
1872                &filter,
1873                ScanStreamOptions::default(),
1874                |b| {
1875                    let a = b
1876                        .column(0)
1877                        .as_any()
1878                        .downcast_ref::<arrow::array::UInt8Array>()
1879                        .unwrap();
1880                    if let Some(part_max) = max(a) {
1881                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1882                    }
1883                },
1884            )
1885            .unwrap();
1886        assert_eq!(f_max, Some(10));
1887
1888        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
1889        let mut got: Vec<f64> = Vec::new();
1890        table
1891            .scan_stream(
1892                &[proj(&table, COL_A_U64)],
1893                &filter,
1894                ScanStreamOptions::default(),
1895                |b| {
1896                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1897                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1898                    let sqrt_arr = unary::<
1899                        arrow::datatypes::Float64Type,
1900                        _,
1901                        arrow::datatypes::Float64Type,
1902                    >(f64_arr, |v: f64| v.sqrt());
1903
1904                    for i in 0..sqrt_arr.len() {
1905                        if !sqrt_arr.is_null(i) {
1906                            got.push(sqrt_arr.value(i));
1907                        }
1908                    }
1909                },
1910            )
1911            .unwrap();
1912        let expected = [15.0_f64, 20.0, 30.0, 40.0];
1913        assert_eq!(got, expected);
1914    }
1915
1916    #[test]
1917    fn test_scan_with_in_filter() {
1918        let table = setup_test_table();
1919        const COL_A_U64: FieldId = 10;
1920        const COL_C_I32: FieldId = 12;
1921
1922        // IN now uses untyped literals, too.
1923        let candidates = [10.into(), 30.into()];
1924        let filter = pred_expr(Filter {
1925            field_id: COL_C_I32,
1926            op: Operator::In(&candidates),
1927        });
1928
1929        let mut vals: Vec<Option<u64>> = Vec::new();
1930        table
1931            .scan_stream(
1932                &[proj(&table, COL_A_U64)],
1933                &filter,
1934                ScanStreamOptions::default(),
1935                |b| {
1936                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1937                    vals.extend(
1938                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1939                    );
1940                },
1941            )
1942            .unwrap();
1943        assert_eq!(vals, vec![Some(100), Some(300)]);
1944    }
1945
1946    #[test]
1947    fn test_scan_stream_single_column_batches() {
1948        let table = setup_test_table();
1949        const COL_A_U64: FieldId = 10;
1950        const COL_C_I32: FieldId = 12;
1951
1952        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
1953        let filter = pred_expr(Filter {
1954            field_id: COL_C_I32,
1955            op: Operator::Equals(20.into()),
1956        });
1957
1958        let mut seen_cols = Vec::<u64>::new();
1959        table
1960            .scan_stream(
1961                &[proj(&table, COL_A_U64)],
1962                &filter,
1963                ScanStreamOptions::default(),
1964                |b| {
1965                    assert_eq!(b.num_columns(), 1);
1966                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1967                    // No kernel needed; just collect values for shape assertions.
1968                    for i in 0..a.len() {
1969                        if !a.is_null(i) {
1970                            seen_cols.push(a.value(i));
1971                        }
1972                    }
1973                },
1974            )
1975            .unwrap();
1976
1977        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
1978        assert_eq!(seen_cols, vec![200, 200]);
1979    }
1980
1981    #[test]
1982    fn test_scan_with_multiple_projection_columns() {
1983        let table = setup_test_table();
1984        const COL_A_U64: FieldId = 10;
1985        const COL_C_I32: FieldId = 12;
1986
1987        let filter = pred_expr(Filter {
1988            field_id: COL_C_I32,
1989            op: Operator::Equals(20.into()),
1990        });
1991
1992        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1993
1994        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1995        table
1996            .scan_stream(
1997                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1998                &filter,
1999                ScanStreamOptions::default(),
2000                |b| {
2001                    assert_eq!(b.num_columns(), 2);
2002                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
2003                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
2004
2005                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2006                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
2007                    for i in 0..b.num_rows() {
2008                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
2009                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
2010                        combined.push((left, right));
2011                    }
2012                },
2013            )
2014            .unwrap();
2015
2016        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2017    }
2018
2019    #[test]
2020    fn test_scan_stream_projection_validation() {
2021        let table = setup_test_table();
2022        const COL_A_U64: FieldId = 10;
2023        const COL_C_I32: FieldId = 12;
2024
2025        let filter = pred_expr(Filter {
2026            field_id: COL_C_I32,
2027            op: Operator::Equals(20.into()),
2028        });
2029
2030        let empty: [Projection; 0] = [];
2031        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2032        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2033
2034        // Duplicate projections are allowed: the same column will be
2035        // gathered once and duplicated in the output in the requested
2036        // order. Verify the call succeeds and produces two identical
2037        // columns per batch.
2038        let duplicate = [
2039            proj(&table, COL_A_U64),
2040            proj_alias(&table, COL_A_U64, "alias_a"),
2041        ];
2042        let mut collected = Vec::<u64>::new();
2043        table
2044            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2045                assert_eq!(b.num_columns(), 2);
2046                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2047                assert_eq!(b.schema().field(1).name(), "alias_a");
2048                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2049                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2050                for i in 0..b.num_rows() {
2051                    if !a0.is_null(i) {
2052                        collected.push(a0.value(i));
2053                    }
2054                    if !a1.is_null(i) {
2055                        collected.push(a1.value(i));
2056                    }
2057                }
2058            })
2059            .unwrap();
2060        // Two matching rows, two columns per row -> four values.
2061        assert_eq!(collected, vec![200, 200, 200, 200]);
2062    }
2063
2064    #[test]
2065    fn test_scan_stream_computed_projection() {
2066        let table = setup_test_table();
2067        const COL_A_U64: FieldId = 10;
2068
2069        let projections = [
2070            ScanProjection::column(proj(&table, COL_A_U64)),
2071            ScanProjection::computed(
2072                ScalarExpr::binary(
2073                    ScalarExpr::column(COL_A_U64),
2074                    BinaryOp::Multiply,
2075                    ScalarExpr::literal(2),
2076                ),
2077                "a_times_two",
2078            ),
2079        ];
2080
2081        let filter = pred_expr(Filter {
2082            field_id: COL_A_U64,
2083            op: Operator::GreaterThanOrEquals(0.into()),
2084        });
2085
2086        let mut computed: Vec<(u64, f64)> = Vec::new();
2087        table
2088            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2089                assert_eq!(b.num_columns(), 2);
2090                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2091                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2092                for i in 0..b.num_rows() {
2093                    if base.is_null(i) || comp.is_null(i) {
2094                        continue;
2095                    }
2096                    computed.push((base.value(i), comp.value(i)));
2097                }
2098            })
2099            .unwrap();
2100
2101        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2102        assert_eq!(computed, expected);
2103    }
2104
2105    #[test]
2106    fn test_scan_stream_multi_column_filter_compare() {
2107        let table = setup_test_table();
2108        const COL_A_U64: FieldId = 10;
2109        const COL_C_I32: FieldId = 12;
2110
2111        let expr = Expr::Compare {
2112            left: ScalarExpr::binary(
2113                ScalarExpr::column(COL_A_U64),
2114                BinaryOp::Add,
2115                ScalarExpr::column(COL_C_I32),
2116            ),
2117            op: CompareOp::Gt,
2118            right: ScalarExpr::literal(220_i64),
2119        };
2120
2121        let mut vals: Vec<Option<u64>> = Vec::new();
2122        table
2123            .scan_stream(
2124                &[proj(&table, COL_A_U64)],
2125                &expr,
2126                ScanStreamOptions::default(),
2127                |b| {
2128                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2129                    for i in 0..b.num_rows() {
2130                        vals.push(if col.is_null(i) {
2131                            None
2132                        } else {
2133                            Some(col.value(i))
2134                        });
2135                    }
2136                },
2137            )
2138            .unwrap();
2139
2140        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2141    }
2142}