llkv_table/
table.rs

1use std::fmt;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use crate::planner::{TablePlanner, collect_row_ids_for_table};
6use crate::stream::ColumnStream;
7use crate::types::TableId;
8
9use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array};
10use arrow::datatypes::{DataType, Field, Schema};
11use std::collections::HashMap;
12
13use crate::constants::STREAM_BATCH_ROWS;
14use llkv_column_map::store::{GatherNullPolicy, IndexKind, Projection, ROW_ID_COLUMN_NAME};
15use llkv_column_map::{ColumnStore, types::LogicalFieldId};
16use llkv_storage::pager::{MemPager, Pager};
17use simd_r_drive_entry_handle::EntryHandle;
18
19use crate::reserved::is_reserved_table_id;
20use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
21use crate::types::{FieldId, RowId};
22use llkv_expr::{Expr, ScalarExpr};
23use llkv_result::{Error, Result as LlkvResult};
24
25/// Cached information about which system columns exist in the table schema.
26/// This avoids repeated string comparisons in hot paths like append().
27#[derive(Debug, Clone, Copy)]
28struct MvccColumnCache {
29    has_created_by: bool,
30    has_deleted_by: bool,
31}
32
33/// Handle for data operations on a table.
34///
35pub struct Table<P = MemPager>
36where
37    P: Pager<Blob = EntryHandle> + Send + Sync,
38{
39    store: Arc<ColumnStore<P>>,
40    table_id: TableId,
41    /// Cache of MVCC column presence. Initialized lazily on first schema() call.
42    /// None means not yet initialized.
43    mvcc_cache: RwLock<Option<MvccColumnCache>>,
44}
45
46/// Filter row IDs before they are materialized into batches.
47///
48/// This trait allows implementations to enforce transaction visibility (MVCC),
49/// access control, or other row-level filtering policies. The filter is applied
50/// after column-level predicates but before data is gathered into Arrow batches.
51///
52/// # Example Use Case
53///
54/// MVCC implementations use this to hide rows that were:
55/// - Created after the transaction's snapshot timestamp
56/// - Deleted before the transaction's snapshot timestamp
57pub trait RowIdFilter<P>: Send + Sync
58where
59    P: Pager<Blob = EntryHandle> + Send + Sync,
60{
61    /// Filter a list of row IDs, returning only those that should be visible.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if visibility metadata cannot be loaded or is corrupted.
66    fn filter(&self, table: &Table<P>, row_ids: Vec<RowId>) -> LlkvResult<Vec<RowId>>;
67}
68
69/// Options for configuring table scans.
70///
71/// These options control how rows are filtered, ordered, and materialized during
72/// scan operations.
73pub struct ScanStreamOptions<P = MemPager>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Whether to include rows where all projected columns are null.
78    ///
79    /// When `false` (default), rows with all-null projections are dropped before
80    /// batches are yielded. This is useful for sparse data where many rows may not
81    /// have values for the selected columns.
82    pub include_nulls: bool,
83    /// Optional ordering to apply to results.
84    ///
85    /// If specified, row IDs are sorted according to this specification before
86    /// data is gathered into batches.
87    pub order: Option<ScanOrderSpec>,
88    /// Optional filter for row-level visibility (e.g., MVCC).
89    ///
90    /// Applied after column-level predicates but before data is materialized.
91    /// Used to enforce transaction isolation.
92    pub row_id_filter: Option<std::sync::Arc<dyn RowIdFilter<P>>>,
93}
94
95impl<P> fmt::Debug for ScanStreamOptions<P>
96where
97    P: Pager<Blob = EntryHandle> + Send + Sync,
98{
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("ScanStreamOptions")
101            .field("include_nulls", &self.include_nulls)
102            .field("order", &self.order)
103            .field(
104                "row_id_filter",
105                &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
106            )
107            .finish()
108    }
109}
110
111impl<P> Clone for ScanStreamOptions<P>
112where
113    P: Pager<Blob = EntryHandle> + Send + Sync,
114{
115    fn clone(&self) -> Self {
116        Self {
117            include_nulls: self.include_nulls,
118            order: self.order,
119            row_id_filter: self.row_id_filter.clone(),
120        }
121    }
122}
123
124impl<P> Default for ScanStreamOptions<P>
125where
126    P: Pager<Blob = EntryHandle> + Send + Sync,
127{
128    fn default() -> Self {
129        Self {
130            include_nulls: false,
131            order: None,
132            row_id_filter: None,
133        }
134    }
135}
136
137/// Specification for ordering scan results.
138///
139/// Defines how to sort rows based on a single column's values.
140#[derive(Clone, Copy, Debug)]
141pub struct ScanOrderSpec {
142    /// The field to sort by.
143    pub field_id: FieldId,
144    /// Sort direction (ascending or descending).
145    pub direction: ScanOrderDirection,
146    /// Whether null values appear first or last.
147    pub nulls_first: bool,
148    /// Optional transformation to apply before sorting.
149    pub transform: ScanOrderTransform,
150}
151
152/// Sort direction for scan ordering.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum ScanOrderDirection {
155    /// Sort from smallest to largest.
156    Ascending,
157    /// Sort from largest to smallest.
158    Descending,
159}
160
161/// Value transformation to apply before sorting.
162///
163/// Used to enable sorting on columns that need type coercion or conversion.
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum ScanOrderTransform {
166    /// Sort integers as-is.
167    IdentityInteger,
168    /// Sort strings lexicographically.
169    IdentityUtf8,
170    /// Parse strings as integers, then sort numerically.
171    CastUtf8ToInteger,
172}
173
174/// A column or computed expression to include in scan results.
175///
176/// Scans can project either stored columns or expressions computed from them.
177#[derive(Clone, Debug)]
178pub enum ScanProjection {
179    /// Project a stored column directly.
180    Column(Projection),
181    /// Compute a value from an expression and return it with an alias.
182    Computed {
183        /// The expression to evaluate (can reference column field IDs).
184        expr: ScalarExpr<FieldId>,
185        /// The name to give the computed column in results.
186        alias: String,
187    },
188}
189
190impl ScanProjection {
191    /// Create a projection for a stored column.
192    pub fn column<P: Into<Projection>>(proj: P) -> Self {
193        Self::Column(proj.into())
194    }
195
196    /// Create a projection for a computed expression.
197    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
198        Self::Computed {
199            expr,
200            alias: alias.into(),
201        }
202    }
203}
204
205impl From<Projection> for ScanProjection {
206    fn from(value: Projection) -> Self {
207        ScanProjection::Column(value)
208    }
209}
210
211impl From<&Projection> for ScanProjection {
212    fn from(value: &Projection) -> Self {
213        ScanProjection::Column(value.clone())
214    }
215}
216
217impl From<&ScanProjection> for ScanProjection {
218    fn from(value: &ScanProjection) -> Self {
219        value.clone()
220    }
221}
222
223impl<P> Table<P>
224where
225    P: Pager<Blob = EntryHandle> + Send + Sync,
226{
227    /// Create a new table from column specifications.
228    ///
229    /// Coordinates metadata persistence, catalog registration, and storage initialization.
230    pub fn create_from_columns(
231        display_name: &str,
232        canonical_name: &str,
233        columns: &[llkv_plan::ColumnSpec],
234        metadata: Arc<crate::metadata::MetadataManager<P>>,
235        catalog: Arc<crate::catalog::TableCatalog>,
236        store: Arc<ColumnStore<P>>,
237    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
238        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
239        service.create_table_from_columns(display_name, canonical_name, columns)
240    }
241
242    /// Create a new table from an Arrow schema (for CREATE TABLE AS SELECT).
243    pub fn create_from_schema(
244        display_name: &str,
245        canonical_name: &str,
246        schema: &arrow::datatypes::Schema,
247        metadata: Arc<crate::metadata::MetadataManager<P>>,
248        catalog: Arc<crate::catalog::TableCatalog>,
249        store: Arc<ColumnStore<P>>,
250    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
251        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
252        service.create_table_from_schema(display_name, canonical_name, schema)
253    }
254
255    /// Internal constructor: wrap a table ID with column store access.
256    ///
257    /// **This is for internal crate use only.** User code should create tables via
258    /// `CatalogService::create_table_*()`. For tests, use `Table::from_id()`.
259    #[doc(hidden)]
260    pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
261        if is_reserved_table_id(table_id) {
262            return Err(Error::ReservedTableId(table_id));
263        }
264
265        tracing::trace!(
266            "Table::from_id: Opening table_id={} with pager at {:p}",
267            table_id,
268            &*pager
269        );
270        let store = ColumnStore::open(pager)?;
271        Ok(Self {
272            store: Arc::new(store),
273            table_id,
274            mvcc_cache: RwLock::new(None),
275        })
276    }
277
278    /// Internal constructor: wrap a table ID with a shared column store.
279    ///
280    /// **This is for internal crate use only.** Preferred over `from_id()` when
281    /// multiple tables share the same store. For tests, use `Table::from_id_with_store()`.
282    #[doc(hidden)]
283    pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
284        if is_reserved_table_id(table_id) {
285            return Err(Error::ReservedTableId(table_id));
286        }
287
288        Ok(Self {
289            store,
290            table_id,
291            mvcc_cache: RwLock::new(None),
292        })
293    }
294
295    /// Register a persisted sort index for the specified user column.
296    pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
297        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
298        self.store
299            .register_index(logical_field_id, IndexKind::Sort)?;
300        Ok(())
301    }
302
303    /// Remove a persisted sort index for the specified user column if it exists.
304    pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
305        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
306        match self
307            .store
308            .unregister_index(logical_field_id, IndexKind::Sort)
309        {
310            Ok(()) | Err(Error::NotFound) => Ok(()),
311            Err(err) => Err(err),
312        }
313    }
314
315    /// List the persisted index kinds registered for the given user column.
316    pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
317        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
318        match self.store.list_persisted_indexes(logical_field_id) {
319            Ok(kinds) => Ok(kinds),
320            Err(Error::NotFound) => Ok(Vec::new()),
321            Err(err) => Err(err),
322        }
323    }
324
325    /// Get or initialize the MVCC column cache from the provided schema.
326    /// This is an optimization to avoid repeated string comparisons in append().
327    fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
328        // Fast path: check if cache is already initialized
329        {
330            let cache_read = self.mvcc_cache.read().unwrap();
331            if let Some(cache) = *cache_read {
332                return cache;
333            }
334        }
335
336        // Slow path: initialize cache from schema
337        let has_created_by = schema
338            .fields()
339            .iter()
340            .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
341        let has_deleted_by = schema
342            .fields()
343            .iter()
344            .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
345
346        let cache = MvccColumnCache {
347            has_created_by,
348            has_deleted_by,
349        };
350
351        // Store in cache for future calls
352        *self.mvcc_cache.write().unwrap() = Some(cache);
353
354        cache
355    }
356
357    /// Append a [`RecordBatch`] to the table.
358    ///
359    /// The batch must include:
360    /// - A `row_id` column (type `UInt64`) with unique row identifiers
361    /// - `field_id` metadata for each user column, mapping to this table's field IDs
362    ///
363    /// ## MVCC Columns
364    ///
365    /// If the batch includes `created_by` or `deleted_by` columns, they are automatically
366    /// assigned the correct [`LogicalFieldId`] for this table's MVCC metadata.
367    ///
368    /// ## Field ID Mapping
369    ///
370    /// Each column's `field_id` metadata is converted to a [`LogicalFieldId`] by combining
371    /// it with this table's ID. This ensures columns from different tables don't collide
372    /// in the underlying storage.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if:
377    /// - The batch is missing the `row_id` column
378    /// - Any user column is missing `field_id` metadata
379    /// - Field IDs are invalid or malformed
380    /// - The underlying storage operation fails
381    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
382        use arrow::array::UInt64Builder;
383
384        // Check if MVCC columns already exist in the batch using cache
385        // This avoids repeated string comparisons on every append
386        let cache = self.get_mvcc_cache(&batch.schema());
387        let has_created_by = cache.has_created_by;
388        let has_deleted_by = cache.has_deleted_by;
389
390        let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
391        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
392
393        for (idx, field) in batch.schema().fields().iter().enumerate() {
394            let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
395            // System columns (row_id, MVCC columns) don't need field_id metadata
396            if maybe_field_id.is_none()
397                && (field.name() == ROW_ID_COLUMN_NAME
398                    || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
399                    || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
400            {
401                if field.name() == ROW_ID_COLUMN_NAME {
402                    new_fields.push(field.as_ref().clone());
403                    new_columns.push(batch.column(idx).clone());
404                } else {
405                    let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
406                        LogicalFieldId::for_mvcc_created_by(self.table_id)
407                    } else {
408                        LogicalFieldId::for_mvcc_deleted_by(self.table_id)
409                    };
410
411                    let mut metadata = field.metadata().clone();
412                    let lfid_val: u64 = lfid.into();
413                    metadata.insert(
414                        crate::constants::FIELD_ID_META_KEY.to_string(),
415                        lfid_val.to_string(),
416                    );
417
418                    let new_field =
419                        Field::new(field.name(), field.data_type().clone(), field.is_nullable())
420                            .with_metadata(metadata);
421                    new_fields.push(new_field);
422                    new_columns.push(batch.column(idx).clone());
423                }
424                continue;
425            }
426
427            let raw_field_id = maybe_field_id
428                .ok_or_else(|| {
429                    llkv_result::Error::Internal(format!(
430                        "Field '{}' is missing a valid '{}' in its metadata.",
431                        field.name(),
432                        crate::constants::FIELD_ID_META_KEY
433                    ))
434                })?
435                .parse::<u64>()
436                .map_err(|err| {
437                    llkv_result::Error::Internal(format!(
438                        "Field '{}' contains an invalid '{}': {}",
439                        field.name(),
440                        crate::constants::FIELD_ID_META_KEY,
441                        err
442                    ))
443                })?;
444
445            if raw_field_id > FieldId::MAX as u64 {
446                return Err(llkv_result::Error::Internal(format!(
447                    "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
448                    field.name(),
449                    FieldId::MAX,
450                    raw_field_id
451                )));
452            }
453
454            let user_field_id = raw_field_id as FieldId;
455            let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
456
457            // Store the fully-qualified logical field id in the metadata we hand to the
458            // column store so descriptors are registered under the correct table id.
459            let lfid = logical_field_id;
460            let mut new_metadata = field.metadata().clone();
461            let lfid_val: u64 = lfid.into();
462            new_metadata.insert(
463                crate::constants::FIELD_ID_META_KEY.to_string(),
464                lfid_val.to_string(),
465            );
466
467            let new_field =
468                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
469                    .with_metadata(new_metadata);
470            new_fields.push(new_field);
471            new_columns.push(batch.column(idx).clone());
472
473            // Ensure the catalog remembers the human-friendly column name for
474            // this field so callers of `Table::schema()` (and other metadata
475            // consumers) can recover it later. The CSV ingest path (and other
476            // writers) may only supply the `field_id` metadata on the batch,
477            // so defensively persist the column name when absent.
478            let need_meta = match self
479                .catalog()
480                .get_cols_meta(self.table_id, &[user_field_id])
481            {
482                metas if metas.is_empty() => true,
483                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
484            };
485
486            if need_meta {
487                let meta = ColMeta {
488                    col_id: user_field_id,
489                    name: Some(field.name().to_string()),
490                    flags: 0,
491                    default: None,
492                };
493                self.catalog().put_col_meta(self.table_id, &meta);
494            }
495        }
496
497        // Inject MVCC columns if they don't exist
498        // For non-transactional appends (e.g., CSV ingest), we use TXN_ID_AUTO_COMMIT (1)
499        // which is treated as "committed by system" and always visible.
500        // Use TXN_ID_NONE (0) for deleted_by to indicate "not deleted".
501        const TXN_ID_AUTO_COMMIT: u64 = 1;
502        const TXN_ID_NONE: u64 = 0;
503        let row_count = batch.num_rows();
504
505        if !has_created_by {
506            let mut created_by_builder = UInt64Builder::with_capacity(row_count);
507            for _ in 0..row_count {
508                created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
509            }
510            let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
511            let mut metadata = HashMap::new();
512            let lfid_val: u64 = created_by_lfid.into();
513            metadata.insert(
514                crate::constants::FIELD_ID_META_KEY.to_string(),
515                lfid_val.to_string(),
516            );
517            new_fields.push(
518                Field::new(
519                    llkv_column_map::store::CREATED_BY_COLUMN_NAME,
520                    DataType::UInt64,
521                    false,
522                )
523                .with_metadata(metadata),
524            );
525            new_columns.push(Arc::new(created_by_builder.finish()));
526        }
527
528        if !has_deleted_by {
529            let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
530            for _ in 0..row_count {
531                deleted_by_builder.append_value(TXN_ID_NONE);
532            }
533            let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
534            let mut metadata = HashMap::new();
535            let lfid_val: u64 = deleted_by_lfid.into();
536            metadata.insert(
537                crate::constants::FIELD_ID_META_KEY.to_string(),
538                lfid_val.to_string(),
539            );
540            new_fields.push(
541                Field::new(
542                    llkv_column_map::store::DELETED_BY_COLUMN_NAME,
543                    DataType::UInt64,
544                    false,
545                )
546                .with_metadata(metadata),
547            );
548            new_columns.push(Arc::new(deleted_by_builder.finish()));
549        }
550
551        let new_schema = Arc::new(Schema::new(new_fields));
552        let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
553
554        tracing::trace!(
555            table_id = self.table_id,
556            num_columns = namespaced_batch.num_columns(),
557            num_rows = namespaced_batch.num_rows(),
558            "Attempting append to table"
559        );
560
561        if let Err(err) = self.store.append(&namespaced_batch) {
562            let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
563                .schema()
564                .fields()
565                .iter()
566                .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
567                .filter_map(|s| s.parse::<u64>().ok())
568                .map(LogicalFieldId::from)
569                .collect();
570
571            // Check which fields are missing from the catalog
572            let missing_fields: Vec<LogicalFieldId> = batch_field_ids
573                .iter()
574                .filter(|&&field_id| !self.store.has_field(field_id))
575                .copied()
576                .collect();
577
578            tracing::error!(
579                table_id = self.table_id,
580                error = ?err,
581                batch_field_ids = ?batch_field_ids,
582                missing_from_catalog = ?missing_fields,
583                "Append failed - some fields missing from catalog"
584            );
585            return Err(err);
586        }
587        Ok(())
588    }
589
590    /// Stream one or more projected columns as a sequence of RecordBatches.
591    ///
592    /// - Avoids `concat` and large materializations.
593    /// - Uses the same filter machinery as the old `scan` to produce
594    ///   `row_ids`.
595    /// - Splits `row_ids` into fixed-size windows and gathers rows per
596    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
597    pub fn scan_stream<'a, I, T, F>(
598        &self,
599        projections: I,
600        filter_expr: &Expr<'a, FieldId>,
601        options: ScanStreamOptions<P>,
602        on_batch: F,
603    ) -> LlkvResult<()>
604    where
605        I: IntoIterator<Item = T>,
606        T: Into<ScanProjection>,
607        F: FnMut(RecordBatch),
608    {
609        let stream_projections: Vec<ScanProjection> =
610            projections.into_iter().map(|p| p.into()).collect();
611        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
612    }
613
614    // TODO: Document difference between this and `scan_stream`
615    pub fn scan_stream_with_exprs<'a, F>(
616        &self,
617        projections: &[ScanProjection],
618        filter_expr: &Expr<'a, FieldId>,
619        options: ScanStreamOptions<P>,
620        on_batch: F,
621    ) -> LlkvResult<()>
622    where
623        F: FnMut(RecordBatch),
624    {
625        TablePlanner::new(self).scan_stream_with_exprs(projections, filter_expr, options, on_batch)
626    }
627
628    pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Vec<RowId>> {
629        collect_row_ids_for_table(self, filter_expr)
630    }
631
632    #[inline]
633    pub fn catalog(&self) -> SysCatalog<'_, P> {
634        SysCatalog::new(&self.store)
635    }
636
637    #[inline]
638    pub fn get_table_meta(&self) -> Option<TableMeta> {
639        self.catalog().get_table_meta(self.table_id)
640    }
641
642    #[inline]
643    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
644        self.catalog().get_cols_meta(self.table_id, col_ids)
645    }
646
647    /// Build and return an Arrow `Schema` that describes this table.
648    ///
649    /// The returned schema includes the `row_id` field first, followed by
650    /// user fields. Each user field has its `field_id` stored in the field
651    /// metadata (under the "field_id" key) and the name is taken from the
652    /// catalog when available or falls back to `col_<id>`.
653    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
654        // Collect logical fields for this table and sort by field id.
655        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
656        logical_fields.sort_by_key(|lfid| lfid.field_id());
657
658        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
659        let metas = self.get_cols_meta(&field_ids);
660
661        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
662        // Add row_id first
663        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
664
665        for (idx, lfid) in logical_fields.into_iter().enumerate() {
666            let fid = lfid.field_id();
667            let dtype = self.store.data_type(lfid)?;
668            let name = metas
669                .get(idx)
670                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
671                .unwrap_or_else(|| format!("col_{}", fid));
672
673            let mut metadata: HashMap<String, String> = HashMap::new();
674            metadata.insert(
675                crate::constants::FIELD_ID_META_KEY.to_string(),
676                fid.to_string(),
677            );
678
679            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
680        }
681
682        Ok(Arc::new(Schema::new(fields)))
683    }
684
685    /// Return the table schema formatted as an Arrow RecordBatch suitable
686    /// for pretty printing. The batch has three columns: `name` (Utf8),
687    /// `field_id` (UInt32) and `data_type` (Utf8).
688    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
689        let schema = self.schema()?;
690        let fields = schema.fields();
691
692        let mut names: Vec<String> = Vec::with_capacity(fields.len());
693        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
694        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
695
696        for field in fields.iter() {
697            names.push(field.name().to_string());
698            let fid = field
699                .metadata()
700                .get(crate::constants::FIELD_ID_META_KEY)
701                .and_then(|s| s.parse::<u32>().ok())
702                .unwrap_or(0u32);
703            fids.push(fid);
704            dtypes.push(format!("{:?}", field.data_type()));
705        }
706
707        // Build Arrow arrays
708        let name_array: ArrayRef = Arc::new(StringArray::from(names));
709        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
710        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
711
712        let rb_schema = Arc::new(Schema::new(vec![
713            Field::new("name", DataType::Utf8, false),
714            Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
715            Field::new("data_type", DataType::Utf8, false),
716        ]));
717
718        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
719        Ok(batch)
720    }
721
722    /// Create a streaming view over the provided row IDs for the specified logical fields.
723    pub fn stream_columns(
724        &self,
725        logical_fields: impl Into<Arc<[LogicalFieldId]>>,
726        row_ids: Vec<RowId>,
727        policy: GatherNullPolicy,
728    ) -> LlkvResult<ColumnStream<'_, P>> {
729        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
730        let ctx = self.store.prepare_gather_context(logical_fields.as_ref())?;
731        Ok(ColumnStream::new(
732            &self.store,
733            ctx,
734            row_ids,
735            STREAM_BATCH_ROWS,
736            policy,
737            logical_fields,
738        ))
739    }
740
741    pub fn store(&self) -> &ColumnStore<P> {
742        &self.store
743    }
744
745    #[inline]
746    pub fn table_id(&self) -> TableId {
747        self.table_id
748    }
749
750    /// Return the total number of rows for a given user column id in this table.
751    ///
752    /// This delegates to the ColumnStore descriptor for the logical field that
753    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
754    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
755        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
756        self.store.total_rows_for_field(lfid)
757    }
758
759    /// Return the total number of rows for this table.
760    ///
761    /// Prefer reading the dedicated row-id shadow column if present; otherwise
762    /// fall back to inspecting any persisted user column descriptor.
763    pub fn total_rows(&self) -> llkv_result::Result<u64> {
764        use llkv_column_map::store::rowid_fid;
765        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
766        // Try the row-id shadow column first
767        match self.store.total_rows_for_field(rid_lfid) {
768            Ok(n) => Ok(n),
769            Err(_) => {
770                // Fall back to scanning the catalog for any user-data column
771                self.store.total_rows_for_table(self.table_id)
772            }
773        }
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use crate::reserved::CATALOG_TABLE_ID;
781    use crate::types::RowId;
782    use arrow::array::Array;
783    use arrow::array::ArrayRef;
784    use arrow::array::{
785        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
786        UInt32Array, UInt64Array,
787    };
788    use arrow::compute::{cast, max, min, sum, unary};
789    use arrow::datatypes::DataType;
790    use llkv_column_map::ColumnStore;
791    use llkv_column_map::store::GatherNullPolicy;
792    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
793    use std::collections::HashMap;
794    use std::ops::Bound;
795
796    fn setup_test_table() -> Table {
797        let pager = Arc::new(MemPager::default());
798        setup_test_table_with_pager(&pager)
799    }
800
801    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
802        let table = Table::from_id(1, Arc::clone(pager)).unwrap();
803        const COL_A_U64: FieldId = 10;
804        const COL_B_BIN: FieldId = 11;
805        const COL_C_I32: FieldId = 12;
806        const COL_D_F64: FieldId = 13;
807        const COL_E_F32: FieldId = 14;
808
809        let schema = Arc::new(Schema::new(vec![
810            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
811            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
812                crate::constants::FIELD_ID_META_KEY.to_string(),
813                COL_A_U64.to_string(),
814            )])),
815            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
816                crate::constants::FIELD_ID_META_KEY.to_string(),
817                COL_B_BIN.to_string(),
818            )])),
819            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
820                crate::constants::FIELD_ID_META_KEY.to_string(),
821                COL_C_I32.to_string(),
822            )])),
823            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
824                crate::constants::FIELD_ID_META_KEY.to_string(),
825                COL_D_F64.to_string(),
826            )])),
827            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
828                crate::constants::FIELD_ID_META_KEY.to_string(),
829                COL_E_F32.to_string(),
830            )])),
831        ]));
832
833        let batch = RecordBatch::try_new(
834            schema.clone(),
835            vec![
836                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
837                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
838                Arc::new(BinaryArray::from(vec![
839                    b"foo" as &[u8],
840                    b"bar",
841                    b"baz",
842                    b"qux",
843                ])),
844                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
845                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
846                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
847            ],
848        )
849        .unwrap();
850
851        table.append(&batch).unwrap();
852        table
853    }
854
855    fn gather_single(
856        store: &ColumnStore<MemPager>,
857        field_id: LogicalFieldId,
858        row_ids: &[u64],
859    ) -> ArrayRef {
860        store
861            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
862            .unwrap()
863            .column(0)
864            .clone()
865    }
866
867    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
868        Expr::Pred(filter)
869    }
870
871    fn proj(table: &Table, field_id: FieldId) -> Projection {
872        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
873    }
874
875    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
876        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
877    }
878
879    #[test]
880    fn table_new_rejects_reserved_table_id() {
881        let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
882        assert!(matches!(
883            result,
884            Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
885        ));
886    }
887
888    #[test]
889    fn test_append_rejects_logical_field_id_in_metadata() {
890        // Create a table and build a schema where the column's metadata
891        // contains a fully-qualified LogicalFieldId (u64). Append should
892        // reject this and require a plain user FieldId instead.
893        let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
894
895        const USER_FID: FieldId = 42;
896        // Build a logical id (namespaced) and put its numeric value into metadata
897        let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
898        let logical_val: u64 = logical.into();
899
900        let schema = Arc::new(Schema::new(vec![
901            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
902            Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
903                crate::constants::FIELD_ID_META_KEY.to_string(),
904                logical_val.to_string(),
905            )])),
906        ]));
907
908        let batch = RecordBatch::try_new(
909            schema,
910            vec![
911                Arc::new(UInt64Array::from(vec![1u64, 2u64])),
912                Arc::new(UInt64Array::from(vec![10u64, 20u64])),
913            ],
914        )
915        .unwrap();
916
917        let res = table.append(&batch);
918        assert!(matches!(res, Err(Error::Internal(_))));
919    }
920
921    #[test]
922    fn test_scan_with_u64_filter() {
923        let table = setup_test_table();
924        const COL_A_U64: FieldId = 10;
925        const COL_C_I32: FieldId = 12;
926
927        let expr = pred_expr(Filter {
928            field_id: COL_A_U64,
929            op: Operator::Equals(200.into()),
930        });
931
932        let mut vals: Vec<Option<i32>> = Vec::new();
933        table
934            .scan_stream(
935                &[proj(&table, COL_C_I32)],
936                &expr,
937                ScanStreamOptions::default(),
938                |b| {
939                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
940                    vals.extend(
941                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
942                    );
943                },
944            )
945            .unwrap();
946        assert_eq!(vals, vec![Some(20), Some(20)]);
947    }
948
949    #[test]
950    fn test_scan_with_string_filter() {
951        let pager = Arc::new(MemPager::default());
952        let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
953
954        const COL_STR: FieldId = 42;
955        let schema = Arc::new(Schema::new(vec![
956            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
957            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
958                crate::constants::FIELD_ID_META_KEY.to_string(),
959                COL_STR.to_string(),
960            )])),
961        ]));
962
963        let batch = RecordBatch::try_new(
964            schema,
965            vec![
966                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
967                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
968            ],
969        )
970        .unwrap();
971        table.append(&batch).unwrap();
972
973        let expr = pred_expr(Filter {
974            field_id: COL_STR,
975            op: Operator::starts_with("al", true),
976        });
977
978        let mut collected: Vec<Option<String>> = Vec::new();
979        table
980            .scan_stream(
981                &[proj(&table, COL_STR)],
982                &expr,
983                ScanStreamOptions::default(),
984                |b| {
985                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
986                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
987                },
988            )
989            .unwrap();
990
991        assert_eq!(
992            collected,
993            vec![Some("alice".to_string()), Some("albert".to_string())]
994        );
995    }
996
997    #[test]
998    fn test_table_reopen_with_shared_pager() {
999        const TABLE_ALPHA: TableId = 42;
1000        const TABLE_BETA: TableId = 43;
1001        const TABLE_GAMMA: TableId = 44;
1002        const COL_ALPHA_U64: FieldId = 100;
1003        const COL_ALPHA_I32: FieldId = 101;
1004        const COL_ALPHA_U32: FieldId = 102;
1005        const COL_ALPHA_I16: FieldId = 103;
1006        const COL_BETA_U64: FieldId = 200;
1007        const COL_BETA_U8: FieldId = 201;
1008        const COL_GAMMA_I16: FieldId = 300;
1009
1010        let pager = Arc::new(MemPager::default());
1011
1012        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1013        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1014        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1015        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1016        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1017
1018        let beta_rows: Vec<RowId> = vec![101, 102, 103];
1019        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1020        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1021
1022        let gamma_rows: Vec<RowId> = vec![501, 502];
1023        let gamma_vals_i16: Vec<i16> = vec![123, -321];
1024
1025        // First session: create tables and write data.
1026        {
1027            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1028            let schema = Arc::new(Schema::new(vec![
1029                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1030                Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1031                    crate::constants::FIELD_ID_META_KEY.to_string(),
1032                    COL_ALPHA_U64.to_string(),
1033                )])),
1034                Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1035                    crate::constants::FIELD_ID_META_KEY.to_string(),
1036                    COL_ALPHA_I32.to_string(),
1037                )])),
1038                Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1039                    crate::constants::FIELD_ID_META_KEY.to_string(),
1040                    COL_ALPHA_U32.to_string(),
1041                )])),
1042                Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1043                    crate::constants::FIELD_ID_META_KEY.to_string(),
1044                    COL_ALPHA_I16.to_string(),
1045                )])),
1046            ]));
1047            let batch = RecordBatch::try_new(
1048                schema,
1049                vec![
1050                    Arc::new(UInt64Array::from(alpha_rows.clone())),
1051                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1052                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1053                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1054                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1055                ],
1056            )
1057            .unwrap();
1058            table.append(&batch).unwrap();
1059        }
1060
1061        {
1062            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1063            let schema = Arc::new(Schema::new(vec![
1064                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1065                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1066                    crate::constants::FIELD_ID_META_KEY.to_string(),
1067                    COL_BETA_U64.to_string(),
1068                )])),
1069                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1070                    crate::constants::FIELD_ID_META_KEY.to_string(),
1071                    COL_BETA_U8.to_string(),
1072                )])),
1073            ]));
1074            let batch = RecordBatch::try_new(
1075                schema,
1076                vec![
1077                    Arc::new(UInt64Array::from(beta_rows.clone())),
1078                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1079                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1080                ],
1081            )
1082            .unwrap();
1083            table.append(&batch).unwrap();
1084        }
1085
1086        {
1087            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1088            let schema = Arc::new(Schema::new(vec![
1089                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1090                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1091                    crate::constants::FIELD_ID_META_KEY.to_string(),
1092                    COL_GAMMA_I16.to_string(),
1093                )])),
1094            ]));
1095            let batch = RecordBatch::try_new(
1096                schema,
1097                vec![
1098                    Arc::new(UInt64Array::from(gamma_rows.clone())),
1099                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1100                ],
1101            )
1102            .unwrap();
1103            table.append(&batch).unwrap();
1104        }
1105
1106        // Second session: reopen each table and ensure schema and values are intact.
1107        {
1108            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1109            let store = table.store();
1110
1111            let expectations: &[(FieldId, DataType)] = &[
1112                (COL_ALPHA_U64, DataType::UInt64),
1113                (COL_ALPHA_I32, DataType::Int32),
1114                (COL_ALPHA_U32, DataType::UInt32),
1115                (COL_ALPHA_I16, DataType::Int16),
1116            ];
1117
1118            for &(col, ref ty) in expectations {
1119                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1120                assert_eq!(store.data_type(lfid).unwrap(), *ty);
1121                let arr = gather_single(store, lfid, &alpha_rows);
1122                match ty {
1123                    DataType::UInt64 => {
1124                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1125                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1126                    }
1127                    DataType::Int32 => {
1128                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1129                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1130                    }
1131                    DataType::UInt32 => {
1132                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1133                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1134                    }
1135                    DataType::Int16 => {
1136                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1137                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1138                    }
1139                    other => panic!("unexpected dtype {other:?}"),
1140                }
1141            }
1142        }
1143
1144        {
1145            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1146            let store = table.store();
1147
1148            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1149            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1150            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1151            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1152            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1153
1154            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1155            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1156            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1157            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1158            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1159        }
1160
1161        {
1162            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1163            let store = table.store();
1164            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1165            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1166            let arr = gather_single(store, lfid, &gamma_rows);
1167            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1168            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1169        }
1170    }
1171
1172    #[test]
1173    fn test_scan_with_i32_filter() {
1174        let table = setup_test_table();
1175        const COL_A_U64: FieldId = 10;
1176        const COL_C_I32: FieldId = 12;
1177
1178        let filter = pred_expr(Filter {
1179            field_id: COL_C_I32,
1180            op: Operator::Equals(20.into()),
1181        });
1182
1183        let mut vals: Vec<Option<u64>> = Vec::new();
1184        table
1185            .scan_stream(
1186                &[proj(&table, COL_A_U64)],
1187                &filter,
1188                ScanStreamOptions::default(),
1189                |b| {
1190                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1191                    vals.extend(
1192                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1193                    );
1194                },
1195            )
1196            .unwrap();
1197        assert_eq!(vals, vec![Some(200), Some(200)]);
1198    }
1199
1200    #[test]
1201    fn test_scan_with_greater_than_filter() {
1202        let table = setup_test_table();
1203        const COL_A_U64: FieldId = 10;
1204        const COL_C_I32: FieldId = 12;
1205
1206        let filter = pred_expr(Filter {
1207            field_id: COL_C_I32,
1208            op: Operator::GreaterThan(15.into()),
1209        });
1210
1211        let mut vals: Vec<Option<u64>> = Vec::new();
1212        table
1213            .scan_stream(
1214                &[proj(&table, COL_A_U64)],
1215                &filter,
1216                ScanStreamOptions::default(),
1217                |b| {
1218                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1219                    vals.extend(
1220                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1221                    );
1222                },
1223            )
1224            .unwrap();
1225        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1226    }
1227
1228    #[test]
1229    fn test_scan_with_range_filter() {
1230        let table = setup_test_table();
1231        const COL_A_U64: FieldId = 10;
1232        const COL_C_I32: FieldId = 12;
1233
1234        let filter = pred_expr(Filter {
1235            field_id: COL_A_U64,
1236            op: Operator::Range {
1237                lower: Bound::Included(150.into()),
1238                upper: Bound::Excluded(300.into()),
1239            },
1240        });
1241
1242        let mut vals: Vec<Option<i32>> = Vec::new();
1243        table
1244            .scan_stream(
1245                &[proj(&table, COL_C_I32)],
1246                &filter,
1247                ScanStreamOptions::default(),
1248                |b| {
1249                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1250                    vals.extend(
1251                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1252                    );
1253                },
1254            )
1255            .unwrap();
1256        assert_eq!(vals, vec![Some(20), Some(20)]);
1257    }
1258
1259    #[test]
1260    fn test_filtered_scan_sum_kernel() {
1261        // Trade-off note:
1262        // - We use Arrow's sum kernel per batch, then add the partial sums.
1263        // - This preserves Arrow null semantics and avoids concat.
1264        let table = setup_test_table();
1265        const COL_A_U64: FieldId = 10;
1266
1267        let filter = pred_expr(Filter {
1268            field_id: COL_A_U64,
1269            op: Operator::Range {
1270                lower: Bound::Included(150.into()),
1271                upper: Bound::Excluded(300.into()),
1272            },
1273        });
1274
1275        let mut total: u128 = 0;
1276        table
1277            .scan_stream(
1278                &[proj(&table, COL_A_U64)],
1279                &filter,
1280                ScanStreamOptions::default(),
1281                |b| {
1282                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1283                    if let Some(part) = sum(a) {
1284                        total += part as u128;
1285                    }
1286                },
1287            )
1288            .unwrap();
1289
1290        assert_eq!(total, 400);
1291    }
1292
1293    #[test]
1294    fn test_filtered_scan_sum_i32_kernel() {
1295        // Trade-off note:
1296        // - Per-batch sum + accumulate avoids building one big Array.
1297        // - For tiny batches overhead may match manual loops, but keeps
1298        //   Arrow semantics exact.
1299        let table = setup_test_table();
1300        const COL_A_U64: FieldId = 10;
1301        const COL_C_I32: FieldId = 12;
1302
1303        let candidates = [100.into(), 300.into()];
1304        let filter = pred_expr(Filter {
1305            field_id: COL_A_U64,
1306            op: Operator::In(&candidates),
1307        });
1308
1309        let mut total: i64 = 0;
1310        table
1311            .scan_stream(
1312                &[proj(&table, COL_C_I32)],
1313                &filter,
1314                ScanStreamOptions::default(),
1315                |b| {
1316                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1317                    if let Some(part) = sum(a) {
1318                        total += part as i64;
1319                    }
1320                },
1321            )
1322            .unwrap();
1323        assert_eq!(total, 40);
1324    }
1325
1326    #[test]
1327    fn test_filtered_scan_min_max_kernel() {
1328        // Trade-off note:
1329        // - min/max are computed per batch and folded. This preserves
1330        //   Arrow's null behavior and avoids concat.
1331        // - Be mindful of NaN semantics if extended to floats later.
1332        let table = setup_test_table();
1333        const COL_A_U64: FieldId = 10;
1334        const COL_C_I32: FieldId = 12;
1335
1336        let candidates = [100.into(), 300.into()];
1337        let filter = pred_expr(Filter {
1338            field_id: COL_A_U64,
1339            op: Operator::In(&candidates),
1340        });
1341
1342        let mut mn: Option<i32> = None;
1343        let mut mx: Option<i32> = None;
1344        table
1345            .scan_stream(
1346                &[proj(&table, COL_C_I32)],
1347                &filter,
1348                ScanStreamOptions::default(),
1349                |b| {
1350                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1351
1352                    if let Some(part_min) = min(a) {
1353                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
1354                    }
1355                    if let Some(part_max) = max(a) {
1356                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
1357                    }
1358                },
1359            )
1360            .unwrap();
1361        assert_eq!(mn, Some(10));
1362        assert_eq!(mx, Some(30));
1363    }
1364
1365    #[test]
1366    fn test_filtered_scan_float64_column() {
1367        let table = setup_test_table();
1368        const COL_D_F64: FieldId = 13;
1369
1370        let filter = pred_expr(Filter {
1371            field_id: COL_D_F64,
1372            op: Operator::GreaterThan(2.0_f64.into()),
1373        });
1374
1375        let mut got = Vec::new();
1376        table
1377            .scan_stream(
1378                &[proj(&table, COL_D_F64)],
1379                &filter,
1380                ScanStreamOptions::default(),
1381                |b| {
1382                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
1383                    for i in 0..arr.len() {
1384                        if arr.is_valid(i) {
1385                            got.push(arr.value(i));
1386                        }
1387                    }
1388                },
1389            )
1390            .unwrap();
1391
1392        assert_eq!(got, vec![2.5, 3.5, 2.5]);
1393    }
1394
1395    #[test]
1396    fn test_filtered_scan_float32_in_operator() {
1397        let table = setup_test_table();
1398        const COL_E_F32: FieldId = 14;
1399
1400        let candidates = [2.0_f32.into(), 3.0_f32.into()];
1401        let filter = pred_expr(Filter {
1402            field_id: COL_E_F32,
1403            op: Operator::In(&candidates),
1404        });
1405
1406        let mut vals: Vec<Option<f32>> = Vec::new();
1407        table
1408            .scan_stream(
1409                &[proj(&table, COL_E_F32)],
1410                &filter,
1411                ScanStreamOptions::default(),
1412                |b| {
1413                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1414                    vals.extend((0..arr.len()).map(|i| {
1415                        if arr.is_null(i) {
1416                            None
1417                        } else {
1418                            Some(arr.value(i))
1419                        }
1420                    }));
1421                },
1422            )
1423            .unwrap();
1424
1425        let collected: Vec<f32> = vals.into_iter().flatten().collect();
1426        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
1427    }
1428
1429    #[test]
1430    fn test_scan_stream_and_expression() {
1431        let table = setup_test_table();
1432        const COL_A_U64: FieldId = 10;
1433        const COL_C_I32: FieldId = 12;
1434        const COL_E_F32: FieldId = 14;
1435
1436        let expr = Expr::all_of(vec![
1437            Filter {
1438                field_id: COL_C_I32,
1439                op: Operator::GreaterThan(15.into()),
1440            },
1441            Filter {
1442                field_id: COL_A_U64,
1443                op: Operator::LessThan(250.into()),
1444            },
1445        ]);
1446
1447        let mut vals: Vec<Option<f32>> = Vec::new();
1448        table
1449            .scan_stream(
1450                &[proj(&table, COL_E_F32)],
1451                &expr,
1452                ScanStreamOptions::default(),
1453                |b| {
1454                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
1455                    vals.extend((0..arr.len()).map(|i| {
1456                        if arr.is_null(i) {
1457                            None
1458                        } else {
1459                            Some(arr.value(i))
1460                        }
1461                    }));
1462                },
1463            )
1464            .unwrap();
1465
1466        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
1467    }
1468
1469    #[test]
1470    fn test_scan_stream_or_expression() {
1471        let table = setup_test_table();
1472        const COL_A_U64: FieldId = 10;
1473        const COL_C_I32: FieldId = 12;
1474
1475        let expr = Expr::any_of(vec![
1476            Filter {
1477                field_id: COL_C_I32,
1478                op: Operator::Equals(10.into()),
1479            },
1480            Filter {
1481                field_id: COL_C_I32,
1482                op: Operator::Equals(30.into()),
1483            },
1484        ]);
1485
1486        let mut vals: Vec<Option<u64>> = Vec::new();
1487        table
1488            .scan_stream(
1489                &[proj(&table, COL_A_U64)],
1490                &expr,
1491                ScanStreamOptions::default(),
1492                |b| {
1493                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1494                    vals.extend((0..arr.len()).map(|i| {
1495                        if arr.is_null(i) {
1496                            None
1497                        } else {
1498                            Some(arr.value(i))
1499                        }
1500                    }));
1501                },
1502            )
1503            .unwrap();
1504
1505        assert_eq!(vals, vec![Some(100), Some(300)]);
1506    }
1507
1508    #[test]
1509    fn test_scan_stream_not_predicate() {
1510        let table = setup_test_table();
1511        const COL_A_U64: FieldId = 10;
1512        const COL_C_I32: FieldId = 12;
1513
1514        let expr = Expr::not(pred_expr(Filter {
1515            field_id: COL_C_I32,
1516            op: Operator::Equals(20.into()),
1517        }));
1518
1519        let mut vals: Vec<Option<u64>> = Vec::new();
1520        table
1521            .scan_stream(
1522                &[proj(&table, COL_A_U64)],
1523                &expr,
1524                ScanStreamOptions::default(),
1525                |b| {
1526                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1527                    vals.extend((0..arr.len()).map(|i| {
1528                        if arr.is_null(i) {
1529                            None
1530                        } else {
1531                            Some(arr.value(i))
1532                        }
1533                    }));
1534                },
1535            )
1536            .unwrap();
1537
1538        assert_eq!(vals, vec![Some(100), Some(300)]);
1539    }
1540
1541    #[test]
1542    fn test_scan_stream_not_and_expression() {
1543        let table = setup_test_table();
1544        const COL_A_U64: FieldId = 10;
1545        const COL_C_I32: FieldId = 12;
1546
1547        let expr = Expr::not(Expr::all_of(vec![
1548            Filter {
1549                field_id: COL_A_U64,
1550                op: Operator::GreaterThan(150.into()),
1551            },
1552            Filter {
1553                field_id: COL_C_I32,
1554                op: Operator::LessThan(40.into()),
1555            },
1556        ]));
1557
1558        let mut vals: Vec<Option<u64>> = Vec::new();
1559        table
1560            .scan_stream(
1561                &[proj(&table, COL_A_U64)],
1562                &expr,
1563                ScanStreamOptions::default(),
1564                |b| {
1565                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1566                    vals.extend((0..arr.len()).map(|i| {
1567                        if arr.is_null(i) {
1568                            None
1569                        } else {
1570                            Some(arr.value(i))
1571                        }
1572                    }));
1573                },
1574            )
1575            .unwrap();
1576
1577        assert_eq!(vals, vec![Some(100)]);
1578    }
1579
1580    #[test]
1581    fn test_scan_stream_include_nulls_toggle() {
1582        let pager = Arc::new(MemPager::default());
1583        let table = setup_test_table_with_pager(&pager);
1584        const COL_A_U64: FieldId = 10;
1585        const COL_C_I32: FieldId = 12;
1586        const COL_B_BIN: FieldId = 11;
1587        const COL_D_F64: FieldId = 13;
1588        const COL_E_F32: FieldId = 14;
1589
1590        let schema = Arc::new(Schema::new(vec![
1591            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1592            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1593                crate::constants::FIELD_ID_META_KEY.to_string(),
1594                COL_A_U64.to_string(),
1595            )])),
1596            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1597                crate::constants::FIELD_ID_META_KEY.to_string(),
1598                COL_B_BIN.to_string(),
1599            )])),
1600            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
1601                crate::constants::FIELD_ID_META_KEY.to_string(),
1602                COL_C_I32.to_string(),
1603            )])),
1604            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1605                crate::constants::FIELD_ID_META_KEY.to_string(),
1606                COL_D_F64.to_string(),
1607            )])),
1608            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1609                crate::constants::FIELD_ID_META_KEY.to_string(),
1610                COL_E_F32.to_string(),
1611            )])),
1612        ]));
1613
1614        let batch = RecordBatch::try_new(
1615            schema,
1616            vec![
1617                Arc::new(UInt64Array::from(vec![5, 6])),
1618                Arc::new(UInt64Array::from(vec![500, 600])),
1619                Arc::new(BinaryArray::from(vec![
1620                    Some(&b"new"[..]),
1621                    Some(&b"alt"[..]),
1622                ])),
1623                Arc::new(Int32Array::from(vec![Some(40), None])),
1624                Arc::new(Float64Array::from(vec![5.5, 6.5])),
1625                Arc::new(Float32Array::from(vec![5.0, 6.0])),
1626            ],
1627        )
1628        .unwrap();
1629        table.append(&batch).unwrap();
1630
1631        let filter = pred_expr(Filter {
1632            field_id: COL_A_U64,
1633            op: Operator::GreaterThan(450.into()),
1634        });
1635
1636        let mut default_vals: Vec<Option<i32>> = Vec::new();
1637        table
1638            .scan_stream(
1639                &[proj(&table, COL_C_I32)],
1640                &filter,
1641                ScanStreamOptions::default(),
1642                |b| {
1643                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1644                    default_vals.extend((0..arr.len()).map(|i| {
1645                        if arr.is_null(i) {
1646                            None
1647                        } else {
1648                            Some(arr.value(i))
1649                        }
1650                    }));
1651                },
1652            )
1653            .unwrap();
1654        assert_eq!(default_vals, vec![Some(40)]);
1655
1656        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
1657        table
1658            .scan_stream(
1659                &[proj(&table, COL_C_I32)],
1660                &filter,
1661                ScanStreamOptions {
1662                    include_nulls: true,
1663                    order: None,
1664                    row_id_filter: None,
1665                },
1666                |b| {
1667                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1668
1669                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
1670                    table
1671                        .scan_stream(
1672                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
1673                            &filter,
1674                            ScanStreamOptions::default(),
1675                            |b| {
1676                                assert_eq!(b.num_columns(), 2);
1677                                let c_arr =
1678                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1679                                let d_arr =
1680                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
1681                                for i in 0..b.num_rows() {
1682                                    let c_val = if c_arr.is_null(i) {
1683                                        None
1684                                    } else {
1685                                        Some(c_arr.value(i))
1686                                    };
1687                                    let d_val = if d_arr.is_null(i) {
1688                                        None
1689                                    } else {
1690                                        Some(d_arr.value(i))
1691                                    };
1692                                    paired_vals.push((c_val, d_val));
1693                                }
1694                            },
1695                        )
1696                        .unwrap();
1697                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
1698                    include_null_vals.extend((0..arr.len()).map(|i| {
1699                        if arr.is_null(i) {
1700                            None
1701                        } else {
1702                            Some(arr.value(i))
1703                        }
1704                    }));
1705                },
1706            )
1707            .unwrap();
1708        assert_eq!(include_null_vals, vec![Some(40), None]);
1709    }
1710
1711    #[test]
1712    fn test_filtered_scan_int_sqrt_float64() {
1713        // Trade-off note:
1714        // - We cast per batch and apply a compute unary kernel for sqrt.
1715        // - This keeps processing streaming and avoids per-value loops.
1716        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
1717        //   `Float64Array` first.
1718        let table = setup_test_table();
1719        const COL_A_U64: FieldId = 10;
1720        const COL_C_I32: FieldId = 12;
1721
1722        let filter = pred_expr(Filter {
1723            field_id: COL_C_I32,
1724            op: Operator::GreaterThan(15.into()),
1725        });
1726
1727        let mut got: Vec<f64> = Vec::new();
1728        table
1729            .scan_stream(
1730                &[proj(&table, COL_A_U64)],
1731                &filter,
1732                ScanStreamOptions::default(),
1733                |b| {
1734                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1735                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1736
1737                    // unary::<Float64Type, _, Float64Type>(...)
1738                    let sqrt_arr = unary::<
1739                        arrow::datatypes::Float64Type,
1740                        _,
1741                        arrow::datatypes::Float64Type,
1742                    >(f64_arr, |v: f64| v.sqrt());
1743
1744                    for i in 0..sqrt_arr.len() {
1745                        if !sqrt_arr.is_null(i) {
1746                            got.push(sqrt_arr.value(i));
1747                        }
1748                    }
1749                },
1750            )
1751            .unwrap();
1752
1753        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
1754        assert_eq!(got, expected);
1755    }
1756
1757    #[test]
1758    fn test_multi_field_kernels_with_filters() {
1759        // Trade-off note:
1760        // - All reductions use per-batch kernels + accumulation to stay
1761        //   streaming. No concat or whole-column materialization.
1762        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
1763
1764        let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
1765
1766        const COL_A_U64: FieldId = 20;
1767        const COL_D_U32: FieldId = 21;
1768        const COL_E_I16: FieldId = 22;
1769        const COL_F_U8: FieldId = 23;
1770        const COL_C_I32: FieldId = 24;
1771
1772        let schema = Arc::new(Schema::new(vec![
1773            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1774            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1775                crate::constants::FIELD_ID_META_KEY.to_string(),
1776                COL_A_U64.to_string(),
1777            )])),
1778            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1779                crate::constants::FIELD_ID_META_KEY.to_string(),
1780                COL_D_U32.to_string(),
1781            )])),
1782            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1783                crate::constants::FIELD_ID_META_KEY.to_string(),
1784                COL_E_I16.to_string(),
1785            )])),
1786            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1787                crate::constants::FIELD_ID_META_KEY.to_string(),
1788                COL_F_U8.to_string(),
1789            )])),
1790            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1791                crate::constants::FIELD_ID_META_KEY.to_string(),
1792                COL_C_I32.to_string(),
1793            )])),
1794        ]));
1795
1796        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
1797        let batch = RecordBatch::try_new(
1798            schema.clone(),
1799            vec![
1800                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
1801                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
1802                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
1803                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
1804                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
1805                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
1806            ],
1807        )
1808        .unwrap();
1809
1810        table.append(&batch).unwrap();
1811
1812        // Filter: c_i32 >= 20.
1813        let filter = pred_expr(Filter {
1814            field_id: COL_C_I32,
1815            op: Operator::GreaterThanOrEquals(20.into()),
1816        });
1817
1818        // 1) SUM over d_u32 (per-batch sum + accumulate).
1819        let mut d_sum: u128 = 0;
1820        table
1821            .scan_stream(
1822                &[proj(&table, COL_D_U32)],
1823                &filter,
1824                ScanStreamOptions::default(),
1825                |b| {
1826                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1827                    if let Some(part) = sum(a) {
1828                        d_sum += part as u128;
1829                    }
1830                },
1831            )
1832            .unwrap();
1833        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
1834
1835        // 2) MIN over e_i16 (per-batch min + fold).
1836        let mut e_min: Option<i16> = None;
1837        table
1838            .scan_stream(
1839                &[proj(&table, COL_E_I16)],
1840                &filter,
1841                ScanStreamOptions::default(),
1842                |b| {
1843                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
1844                    if let Some(part_min) = min(a) {
1845                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
1846                    }
1847                },
1848            )
1849            .unwrap();
1850        assert_eq!(e_min, Some(-6));
1851
1852        // 3) MAX over f_u8 (per-batch max + fold).
1853        let mut f_max: Option<u8> = None;
1854        table
1855            .scan_stream(
1856                &[proj(&table, COL_F_U8)],
1857                &filter,
1858                ScanStreamOptions::default(),
1859                |b| {
1860                    let a = b
1861                        .column(0)
1862                        .as_any()
1863                        .downcast_ref::<arrow::array::UInt8Array>()
1864                        .unwrap();
1865                    if let Some(part_max) = max(a) {
1866                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
1867                    }
1868                },
1869            )
1870            .unwrap();
1871        assert_eq!(f_max, Some(10));
1872
1873        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
1874        let mut got: Vec<f64> = Vec::new();
1875        table
1876            .scan_stream(
1877                &[proj(&table, COL_A_U64)],
1878                &filter,
1879                ScanStreamOptions::default(),
1880                |b| {
1881                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
1882                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
1883                    let sqrt_arr = unary::<
1884                        arrow::datatypes::Float64Type,
1885                        _,
1886                        arrow::datatypes::Float64Type,
1887                    >(f64_arr, |v: f64| v.sqrt());
1888
1889                    for i in 0..sqrt_arr.len() {
1890                        if !sqrt_arr.is_null(i) {
1891                            got.push(sqrt_arr.value(i));
1892                        }
1893                    }
1894                },
1895            )
1896            .unwrap();
1897        let expected = [15.0_f64, 20.0, 30.0, 40.0];
1898        assert_eq!(got, expected);
1899    }
1900
1901    #[test]
1902    fn test_scan_with_in_filter() {
1903        let table = setup_test_table();
1904        const COL_A_U64: FieldId = 10;
1905        const COL_C_I32: FieldId = 12;
1906
1907        // IN now uses untyped literals, too.
1908        let candidates = [10.into(), 30.into()];
1909        let filter = pred_expr(Filter {
1910            field_id: COL_C_I32,
1911            op: Operator::In(&candidates),
1912        });
1913
1914        let mut vals: Vec<Option<u64>> = Vec::new();
1915        table
1916            .scan_stream(
1917                &[proj(&table, COL_A_U64)],
1918                &filter,
1919                ScanStreamOptions::default(),
1920                |b| {
1921                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1922                    vals.extend(
1923                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1924                    );
1925                },
1926            )
1927            .unwrap();
1928        assert_eq!(vals, vec![Some(100), Some(300)]);
1929    }
1930
1931    #[test]
1932    fn test_scan_stream_single_column_batches() {
1933        let table = setup_test_table();
1934        const COL_A_U64: FieldId = 10;
1935        const COL_C_I32: FieldId = 12;
1936
1937        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
1938        let filter = pred_expr(Filter {
1939            field_id: COL_C_I32,
1940            op: Operator::Equals(20.into()),
1941        });
1942
1943        let mut seen_cols = Vec::<u64>::new();
1944        table
1945            .scan_stream(
1946                &[proj(&table, COL_A_U64)],
1947                &filter,
1948                ScanStreamOptions::default(),
1949                |b| {
1950                    assert_eq!(b.num_columns(), 1);
1951                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1952                    // No kernel needed; just collect values for shape assertions.
1953                    for i in 0..a.len() {
1954                        if !a.is_null(i) {
1955                            seen_cols.push(a.value(i));
1956                        }
1957                    }
1958                },
1959            )
1960            .unwrap();
1961
1962        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
1963        assert_eq!(seen_cols, vec![200, 200]);
1964    }
1965
1966    #[test]
1967    fn test_scan_with_multiple_projection_columns() {
1968        let table = setup_test_table();
1969        const COL_A_U64: FieldId = 10;
1970        const COL_C_I32: FieldId = 12;
1971
1972        let filter = pred_expr(Filter {
1973            field_id: COL_C_I32,
1974            op: Operator::Equals(20.into()),
1975        });
1976
1977        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
1978
1979        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
1980        table
1981            .scan_stream(
1982                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
1983                &filter,
1984                ScanStreamOptions::default(),
1985                |b| {
1986                    assert_eq!(b.num_columns(), 2);
1987                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
1988                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
1989
1990                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1991                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
1992                    for i in 0..b.num_rows() {
1993                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
1994                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
1995                        combined.push((left, right));
1996                    }
1997                },
1998            )
1999            .unwrap();
2000
2001        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2002    }
2003
2004    #[test]
2005    fn test_scan_stream_projection_validation() {
2006        let table = setup_test_table();
2007        const COL_A_U64: FieldId = 10;
2008        const COL_C_I32: FieldId = 12;
2009
2010        let filter = pred_expr(Filter {
2011            field_id: COL_C_I32,
2012            op: Operator::Equals(20.into()),
2013        });
2014
2015        let empty: [Projection; 0] = [];
2016        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2017        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2018
2019        // Duplicate projections are allowed: the same column will be
2020        // gathered once and duplicated in the output in the requested
2021        // order. Verify the call succeeds and produces two identical
2022        // columns per batch.
2023        let duplicate = [
2024            proj(&table, COL_A_U64),
2025            proj_alias(&table, COL_A_U64, "alias_a"),
2026        ];
2027        let mut collected = Vec::<u64>::new();
2028        table
2029            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2030                assert_eq!(b.num_columns(), 2);
2031                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2032                assert_eq!(b.schema().field(1).name(), "alias_a");
2033                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2034                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2035                for i in 0..b.num_rows() {
2036                    if !a0.is_null(i) {
2037                        collected.push(a0.value(i));
2038                    }
2039                    if !a1.is_null(i) {
2040                        collected.push(a1.value(i));
2041                    }
2042                }
2043            })
2044            .unwrap();
2045        // Two matching rows, two columns per row -> four values.
2046        assert_eq!(collected, vec![200, 200, 200, 200]);
2047    }
2048
2049    #[test]
2050    fn test_scan_stream_computed_projection() {
2051        let table = setup_test_table();
2052        const COL_A_U64: FieldId = 10;
2053
2054        let projections = [
2055            ScanProjection::column(proj(&table, COL_A_U64)),
2056            ScanProjection::computed(
2057                ScalarExpr::binary(
2058                    ScalarExpr::column(COL_A_U64),
2059                    BinaryOp::Multiply,
2060                    ScalarExpr::literal(2),
2061                ),
2062                "a_times_two",
2063            ),
2064        ];
2065
2066        let filter = pred_expr(Filter {
2067            field_id: COL_A_U64,
2068            op: Operator::GreaterThanOrEquals(0.into()),
2069        });
2070
2071        let mut computed: Vec<(u64, f64)> = Vec::new();
2072        table
2073            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2074                assert_eq!(b.num_columns(), 2);
2075                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2076                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2077                for i in 0..b.num_rows() {
2078                    if base.is_null(i) || comp.is_null(i) {
2079                        continue;
2080                    }
2081                    computed.push((base.value(i), comp.value(i)));
2082                }
2083            })
2084            .unwrap();
2085
2086        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2087        assert_eq!(computed, expected);
2088    }
2089
2090    #[test]
2091    fn test_scan_stream_multi_column_filter_compare() {
2092        let table = setup_test_table();
2093        const COL_A_U64: FieldId = 10;
2094        const COL_C_I32: FieldId = 12;
2095
2096        let expr = Expr::Compare {
2097            left: ScalarExpr::binary(
2098                ScalarExpr::column(COL_A_U64),
2099                BinaryOp::Add,
2100                ScalarExpr::column(COL_C_I32),
2101            ),
2102            op: CompareOp::Gt,
2103            right: ScalarExpr::literal(220_i64),
2104        };
2105
2106        let mut vals: Vec<Option<u64>> = Vec::new();
2107        table
2108            .scan_stream(
2109                &[proj(&table, COL_A_U64)],
2110                &expr,
2111                ScanStreamOptions::default(),
2112                |b| {
2113                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2114                    for i in 0..b.num_rows() {
2115                        vals.push(if col.is_null(i) {
2116                            None
2117                        } else {
2118                            Some(col.value(i))
2119                        });
2120                    }
2121                },
2122            )
2123            .unwrap();
2124
2125        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2126    }
2127}