llkv_table/
table.rs

1use croaring::Treemap;
2use std::cmp;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use arrow::array::{
7    Array, ArrayRef, OffsetSizeTrait, RecordBatch, StringArray, UInt32Array, UInt64Array,
8};
9use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema, UInt64Type};
10use std::collections::HashMap;
11
12use crate::constants::STREAM_BATCH_ROWS;
13use llkv_column_map::ColumnStore;
14use llkv_column_map::ScanBuilder;
15use llkv_column_map::store::scan::ScanOptions;
16use llkv_column_map::store::scan::filter::{FilterDispatch, FilterPrimitive, Utf8Filter};
17use llkv_column_map::store::{GatherNullPolicy, IndexKind, MultiGatherContext, ROW_ID_COLUMN_NAME};
18use llkv_column_map::{
19    llkv_for_each_arrow_boolean, llkv_for_each_arrow_numeric, llkv_for_each_arrow_string,
20};
21use llkv_storage::pager::{MemPager, Pager};
22use llkv_types::ids::{LogicalFieldId, TableId};
23use simd_r_drive_entry_handle::EntryHandle;
24
25use crate::ROW_ID_FIELD_ID;
26use crate::reserved::is_reserved_table_id;
27use crate::sys_catalog::{ColMeta, SysCatalog, TableMeta};
28use crate::types::{FieldId, RowId};
29use llkv_compute::analysis::PredicateFusionCache;
30use llkv_compute::program::{OwnedFilter, OwnedOperator, ProgramCompiler};
31use llkv_compute::rowid::RowIdFilter as RowIdBitmapFilter;
32use llkv_expr::literal::FromLiteral;
33use llkv_expr::typed_predicate::{
34    Predicate, PredicateValue, build_bool_predicate, build_fixed_width_predicate,
35    build_var_width_predicate,
36};
37use llkv_expr::{Expr, Operator};
38use llkv_result::{Error, Result as LlkvResult};
39use llkv_scan::execute::execute_scan;
40use llkv_scan::row_stream::{
41    ColumnProjectionInfo, ProjectionEval, RowIdSource, RowStreamBuilder, ScanRowStream,
42};
43pub use llkv_scan::{
44    RowIdFilter, ScanOrderDirection, ScanOrderSpec, ScanOrderTransform, ScanProjection,
45    ScanStorage, ScanStreamOptions,
46};
47use rustc_hash::{FxHashMap, FxHashSet};
48use std::ops::Bound;
49
50/// Cached information about which system columns exist in the table schema.
51/// This avoids repeated string comparisons in hot paths like append().
52#[derive(Debug, Clone, Copy)]
53struct MvccColumnCache {
54    has_created_by: bool,
55    has_deleted_by: bool,
56}
57
58/// Handle for data operations on a table.
59///
60pub struct Table<P = MemPager>
61where
62    P: Pager<Blob = EntryHandle> + Send + Sync,
63{
64    store: Arc<ColumnStore<P>>,
65    table_id: TableId,
66    /// Cache of MVCC column presence. Initialized lazily on first schema() call.
67    /// None means not yet initialized.
68    mvcc_cache: RwLock<Option<MvccColumnCache>>,
69}
70
71pub type TableScanStream<'table, P> = ScanRowStream<'table, P, Table<P>>;
72
73impl<P> Table<P>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Create a new table from column specifications.
78    ///
79    /// Coordinates metadata persistence, catalog registration, and storage initialization.
80    pub fn create_from_columns(
81        display_name: &str,
82        canonical_name: &str,
83        columns: &[llkv_plan::PlanColumnSpec],
84        metadata: Arc<crate::metadata::MetadataManager<P>>,
85        catalog: Arc<crate::catalog::TableCatalog>,
86        store: Arc<ColumnStore<P>>,
87    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
88        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
89        service.create_table_from_columns(display_name, canonical_name, columns)
90    }
91
92    /// Create a new table from an Arrow schema (for CREATE TABLE AS SELECT).
93    pub fn create_from_schema(
94        display_name: &str,
95        canonical_name: &str,
96        schema: &arrow::datatypes::Schema,
97        metadata: Arc<crate::metadata::MetadataManager<P>>,
98        catalog: Arc<crate::catalog::TableCatalog>,
99        store: Arc<ColumnStore<P>>,
100    ) -> LlkvResult<crate::catalog::CreateTableResult<P>> {
101        let service = crate::catalog::CatalogManager::new(metadata, catalog, store);
102        service.create_table_from_schema(display_name, canonical_name, schema)
103    }
104
105    /// Internal constructor: wrap a table ID with column store access.
106    ///
107    /// **This is for internal crate use only.** User code should create tables via
108    /// `CatalogService::create_table_*()`. For tests, use `Table::from_id()`.
109    #[doc(hidden)]
110    pub fn from_id(table_id: TableId, pager: Arc<P>) -> LlkvResult<Self> {
111        if is_reserved_table_id(table_id) {
112            return Err(Error::ReservedTableId(table_id));
113        }
114
115        tracing::trace!(
116            "Table::from_id: Opening table_id={} with pager at {:p}",
117            table_id,
118            &*pager
119        );
120        let store = ColumnStore::open(pager)?;
121        Ok(Self {
122            store: Arc::new(store),
123            table_id,
124            mvcc_cache: RwLock::new(None),
125        })
126    }
127
128    /// Internal constructor: wrap a table ID with a shared column store.
129    ///
130    /// **This is for internal crate use only.** Preferred over `from_id()` when
131    /// multiple tables share the same store. For tests, use `Table::from_id_with_store()`.
132    #[doc(hidden)]
133    pub fn from_id_and_store(table_id: TableId, store: Arc<ColumnStore<P>>) -> LlkvResult<Self> {
134        if is_reserved_table_id(table_id) {
135            return Err(Error::ReservedTableId(table_id));
136        }
137
138        Ok(Self {
139            store,
140            table_id,
141            mvcc_cache: RwLock::new(None),
142        })
143    }
144
145    /// Register a persisted sort index for the specified user column.
146    pub fn register_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
147        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
148        self.store
149            .register_index(logical_field_id, IndexKind::Sort)?;
150        Ok(())
151    }
152
153    /// Remove a persisted sort index for the specified user column if it exists.
154    pub fn unregister_sort_index(&self, field_id: FieldId) -> LlkvResult<()> {
155        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
156        match self
157            .store
158            .unregister_index(logical_field_id, IndexKind::Sort)
159        {
160            Ok(()) | Err(Error::NotFound) => Ok(()),
161            Err(err) => Err(err),
162        }
163    }
164
165    /// List the persisted index kinds registered for the given user column.
166    pub fn list_registered_indexes(&self, field_id: FieldId) -> LlkvResult<Vec<IndexKind>> {
167        let logical_field_id = LogicalFieldId::for_user(self.table_id, field_id);
168        match self.store.list_persisted_indexes(logical_field_id) {
169            Ok(kinds) => Ok(kinds),
170            Err(Error::NotFound) => Ok(Vec::new()),
171            Err(err) => Err(err),
172        }
173    }
174
175    /// Get or initialize the MVCC column cache from the provided schema.
176    /// This is an optimization to avoid repeated string comparisons in append().
177    fn get_mvcc_cache(&self, schema: &Arc<Schema>) -> MvccColumnCache {
178        // Fast path: check if cache is already initialized
179        {
180            let cache_read = self.mvcc_cache.read().unwrap();
181            if let Some(cache) = *cache_read {
182                return cache;
183            }
184        }
185
186        // Slow path: initialize cache from schema
187        let has_created_by = schema
188            .fields()
189            .iter()
190            .any(|f| f.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME);
191        let has_deleted_by = schema
192            .fields()
193            .iter()
194            .any(|f| f.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME);
195
196        let cache = MvccColumnCache {
197            has_created_by,
198            has_deleted_by,
199        };
200
201        // Store in cache for future calls
202        *self.mvcc_cache.write().unwrap() = Some(cache);
203
204        cache
205    }
206
207    /// Append a [`RecordBatch`] to the table.
208    ///
209    /// The batch must include:
210    /// - A `row_id` column (type `UInt64`) with unique row identifiers
211    /// - `field_id` metadata for each user column, mapping to this table's field IDs
212    ///
213    /// # MVCC Columns
214    ///
215    /// If the batch includes `created_by` or `deleted_by` columns, they are automatically
216    /// assigned the correct [`LogicalFieldId`] for this table's MVCC metadata.
217    ///
218    /// # Field ID Mapping
219    ///
220    /// Each column's `field_id` metadata is converted to a [`LogicalFieldId`] by combining
221    /// it with this table's ID. This ensures columns from different tables don't collide
222    /// in the underlying storage.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if:
227    /// - The batch is missing the `row_id` column
228    /// - Any user column is missing `field_id` metadata
229    /// - Field IDs are invalid or malformed
230    /// - The underlying storage operation fails
231    pub fn append(&self, batch: &RecordBatch) -> LlkvResult<()> {
232        use arrow::array::UInt64Builder;
233
234        // Check if MVCC columns already exist in the batch using cache
235        // This avoids repeated string comparisons on every append
236        let cache = self.get_mvcc_cache(&batch.schema());
237        let has_created_by = cache.has_created_by;
238        let has_deleted_by = cache.has_deleted_by;
239
240        let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + 2);
241        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.columns().len() + 2);
242
243        for (idx, field) in batch.schema().fields().iter().enumerate() {
244            let maybe_field_id = field.metadata().get(crate::constants::FIELD_ID_META_KEY);
245            // System columns (row_id, MVCC columns) don't need field_id metadata
246            if maybe_field_id.is_none()
247                && (field.name() == ROW_ID_COLUMN_NAME
248                    || field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME
249                    || field.name() == llkv_column_map::store::DELETED_BY_COLUMN_NAME)
250            {
251                if field.name() == ROW_ID_COLUMN_NAME {
252                    new_fields.push(field.as_ref().clone());
253                    new_columns.push(batch.column(idx).clone());
254                } else {
255                    let lfid = if field.name() == llkv_column_map::store::CREATED_BY_COLUMN_NAME {
256                        LogicalFieldId::for_mvcc_created_by(self.table_id)
257                    } else {
258                        LogicalFieldId::for_mvcc_deleted_by(self.table_id)
259                    };
260
261                    let mut metadata = field.metadata().clone();
262                    let lfid_val: u64 = lfid.into();
263                    metadata.insert(
264                        crate::constants::FIELD_ID_META_KEY.to_string(),
265                        lfid_val.to_string(),
266                    );
267
268                    let new_field =
269                        Field::new(field.name(), field.data_type().clone(), field.is_nullable())
270                            .with_metadata(metadata);
271                    new_fields.push(new_field);
272                    new_columns.push(batch.column(idx).clone());
273                }
274                continue;
275            }
276
277            let raw_field_id = maybe_field_id
278                .ok_or_else(|| {
279                    llkv_result::Error::Internal(format!(
280                        "Field '{}' is missing a valid '{}' in its metadata.",
281                        field.name(),
282                        crate::constants::FIELD_ID_META_KEY
283                    ))
284                })?
285                .parse::<u64>()
286                .map_err(|err| {
287                    llkv_result::Error::Internal(format!(
288                        "Field '{}' contains an invalid '{}': {}",
289                        field.name(),
290                        crate::constants::FIELD_ID_META_KEY,
291                        err
292                    ))
293                })?;
294
295            if raw_field_id > FieldId::MAX as u64 {
296                return Err(llkv_result::Error::Internal(format!(
297                    "Field '{}' expected user FieldId (<= {}) but got logical id '{}'",
298                    field.name(),
299                    FieldId::MAX,
300                    raw_field_id
301                )));
302            }
303
304            let user_field_id = raw_field_id as FieldId;
305            let logical_field_id = LogicalFieldId::for_user(self.table_id, user_field_id);
306
307            // Store the fully-qualified logical field id in the metadata we hand to the
308            // column store so descriptors are registered under the correct table id.
309            let lfid = logical_field_id;
310            let mut new_metadata = field.metadata().clone();
311            let lfid_val: u64 = lfid.into();
312            new_metadata.insert(
313                crate::constants::FIELD_ID_META_KEY.to_string(),
314                lfid_val.to_string(),
315            );
316
317            let new_field =
318                Field::new(field.name(), field.data_type().clone(), field.is_nullable())
319                    .with_metadata(new_metadata);
320            new_fields.push(new_field);
321            new_columns.push(batch.column(idx).clone());
322
323            // Ensure the catalog remembers the human-friendly column name for
324            // this field so callers of `Table::schema()` (and other metadata
325            // consumers) can recover it later. The CSV ingest path (and other
326            // writers) may only supply the `field_id` metadata on the batch,
327            // so defensively persist the column name when absent.
328            let need_meta = match self
329                .catalog()
330                .get_cols_meta(self.table_id, &[user_field_id])
331            {
332                metas if metas.is_empty() => true,
333                metas => metas[0].as_ref().and_then(|m| m.name.as_ref()).is_none(),
334            };
335
336            if need_meta {
337                let meta = ColMeta {
338                    col_id: user_field_id,
339                    name: Some(field.name().to_string()),
340                    flags: 0,
341                    default: None,
342                };
343                self.catalog().put_col_meta(self.table_id, &meta);
344            }
345        }
346
347        // Inject MVCC columns if they don't exist
348        // For non-transactional appends (e.g., CSV ingest), we use TXN_ID_AUTO_COMMIT (1)
349        // which is treated as "committed by system" and always visible.
350        // Use TXN_ID_NONE (0) for deleted_by to indicate "not deleted".
351        const TXN_ID_AUTO_COMMIT: u64 = 1;
352        const TXN_ID_NONE: u64 = 0;
353        let row_count = batch.num_rows();
354
355        if !has_created_by {
356            let mut created_by_builder = UInt64Builder::with_capacity(row_count);
357            for _ in 0..row_count {
358                created_by_builder.append_value(TXN_ID_AUTO_COMMIT);
359            }
360            let created_by_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
361            let mut metadata = HashMap::new();
362            let lfid_val: u64 = created_by_lfid.into();
363            metadata.insert(
364                crate::constants::FIELD_ID_META_KEY.to_string(),
365                lfid_val.to_string(),
366            );
367            new_fields.push(
368                Field::new(
369                    llkv_column_map::store::CREATED_BY_COLUMN_NAME,
370                    DataType::UInt64,
371                    false,
372                )
373                .with_metadata(metadata),
374            );
375            new_columns.push(Arc::new(created_by_builder.finish()));
376        }
377
378        if !has_deleted_by {
379            let mut deleted_by_builder = UInt64Builder::with_capacity(row_count);
380            for _ in 0..row_count {
381                deleted_by_builder.append_value(TXN_ID_NONE);
382            }
383            let deleted_by_lfid = LogicalFieldId::for_mvcc_deleted_by(self.table_id);
384            let mut metadata = HashMap::new();
385            let lfid_val: u64 = deleted_by_lfid.into();
386            metadata.insert(
387                crate::constants::FIELD_ID_META_KEY.to_string(),
388                lfid_val.to_string(),
389            );
390            new_fields.push(
391                Field::new(
392                    llkv_column_map::store::DELETED_BY_COLUMN_NAME,
393                    DataType::UInt64,
394                    false,
395                )
396                .with_metadata(metadata),
397            );
398            new_columns.push(Arc::new(deleted_by_builder.finish()));
399        }
400
401        let new_schema = Arc::new(Schema::new(new_fields));
402        let namespaced_batch = RecordBatch::try_new(new_schema, new_columns)?;
403
404        tracing::trace!(
405            table_id = self.table_id,
406            num_columns = namespaced_batch.num_columns(),
407            num_rows = namespaced_batch.num_rows(),
408            "Attempting append to table"
409        );
410
411        if let Err(err) = self.store.append(&namespaced_batch) {
412            let batch_field_ids: Vec<LogicalFieldId> = namespaced_batch
413                .schema()
414                .fields()
415                .iter()
416                .filter_map(|f| f.metadata().get(crate::constants::FIELD_ID_META_KEY))
417                .filter_map(|s| s.parse::<u64>().ok())
418                .map(LogicalFieldId::from)
419                .collect();
420
421            // Check which fields are missing from the catalog
422            let missing_fields: Vec<LogicalFieldId> = batch_field_ids
423                .iter()
424                .filter(|&&field_id| !self.store.has_field(field_id))
425                .copied()
426                .collect();
427
428            tracing::error!(
429                table_id = self.table_id,
430                error = ?err,
431                batch_field_ids = ?batch_field_ids,
432                missing_from_catalog = ?missing_fields,
433                "Append failed - some fields missing from catalog"
434            );
435            return Err(err);
436        }
437        Ok(())
438    }
439
440    /// Stream one or more projected columns as a sequence of RecordBatches.
441    ///
442    /// - Avoids `concat` and large materializations.
443    /// - Uses the same filter machinery as the old `scan` to produce
444    ///   `row_ids`.
445    /// - Splits `row_ids` into fixed-size windows and gathers rows per
446    ///   window to form a small `RecordBatch` that is sent to `on_batch`.
447    pub fn scan_stream<'a, I, T, F>(
448        &self,
449        projections: I,
450        filter_expr: &Expr<'a, FieldId>,
451        options: ScanStreamOptions<P>,
452        on_batch: F,
453    ) -> LlkvResult<()>
454    where
455        I: IntoIterator<Item = T>,
456        T: Into<ScanProjection>,
457        F: FnMut(RecordBatch),
458    {
459        let stream_projections: Vec<ScanProjection> =
460            projections.into_iter().map(|p| p.into()).collect();
461        self.scan_stream_with_exprs(&stream_projections, filter_expr, options, on_batch)
462    }
463
464    /// Stream projections using fully resolved expression inputs.
465    ///
466    /// Callers that already parsed expressions into [`ScanProjection`] values can
467    /// use this entry point to skip the iterator conversion performed by
468    /// [`Self::scan_stream`]. The execution semantics and callbacks are identical.
469    pub fn scan_stream_with_exprs<'a, F>(
470        &self,
471        projections: &[ScanProjection],
472        filter_expr: &Expr<'a, FieldId>,
473        options: ScanStreamOptions<P>,
474        on_batch: F,
475    ) -> LlkvResult<()>
476    where
477        F: FnMut(RecordBatch),
478    {
479        let mut cb = on_batch;
480        execute_scan(
481            self,
482            self.table_id,
483            projections,
484            filter_expr,
485            options,
486            &mut cb,
487        )
488    }
489
490    pub fn filter_row_ids<'a>(&self, filter_expr: &Expr<'a, FieldId>) -> LlkvResult<Treemap> {
491        let source = self.collect_row_ids_for_table(filter_expr)?;
492        Ok(match source {
493            RowIdSource::Bitmap(b) => b,
494            RowIdSource::Vector(v) => Treemap::from_iter(v),
495        })
496    }
497
498    #[inline]
499    pub fn catalog(&self) -> SysCatalog<'_, P> {
500        SysCatalog::new(&self.store)
501    }
502
503    #[inline]
504    pub fn get_table_meta(&self) -> Option<TableMeta> {
505        self.catalog().get_table_meta(self.table_id)
506    }
507
508    #[inline]
509    pub fn get_cols_meta(&self, col_ids: &[FieldId]) -> Vec<Option<ColMeta>> {
510        self.catalog().get_cols_meta(self.table_id, col_ids)
511    }
512
513    /// Build and return an Arrow `Schema` that describes this table.
514    ///
515    /// The returned schema includes the `row_id` field first, followed by
516    /// user fields. Each user field has its `field_id` stored in the field
517    /// metadata (under the "field_id" key) and the name is taken from the
518    /// catalog when available or falls back to `col_<id>`.
519    pub fn schema(&self) -> LlkvResult<Arc<Schema>> {
520        // Collect logical fields for this table and sort by field id.
521        let mut logical_fields = self.store.user_field_ids_for_table(self.table_id);
522        logical_fields.sort_by_key(|lfid| lfid.field_id());
523
524        let field_ids: Vec<FieldId> = logical_fields.iter().map(|lfid| lfid.field_id()).collect();
525        let metas = self.get_cols_meta(&field_ids);
526
527        let mut fields: Vec<Field> = Vec::with_capacity(1 + field_ids.len());
528        // Add row_id first
529        fields.push(Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false));
530
531        for (idx, lfid) in logical_fields.into_iter().enumerate() {
532            let fid = lfid.field_id();
533            let dtype = self.store.data_type(lfid)?;
534            let name = metas
535                .get(idx)
536                .and_then(|m| m.as_ref().and_then(|meta| meta.name.clone()))
537                .unwrap_or_else(|| format!("col_{}", fid));
538
539            let mut metadata: HashMap<String, String> = HashMap::new();
540            metadata.insert(
541                crate::constants::FIELD_ID_META_KEY.to_string(),
542                fid.to_string(),
543            );
544
545            fields.push(Field::new(&name, dtype.clone(), true).with_metadata(metadata));
546        }
547
548        Ok(Arc::new(Schema::new(fields)))
549    }
550
551    /// Return the table schema formatted as an Arrow RecordBatch suitable
552    /// for pretty printing. The batch has three columns: `name` (Utf8),
553    /// `field_id` (UInt32) and `data_type` (Utf8).
554    pub fn schema_recordbatch(&self) -> LlkvResult<RecordBatch> {
555        let schema = self.schema()?;
556        let fields = schema.fields();
557
558        let mut names: Vec<String> = Vec::with_capacity(fields.len());
559        let mut fids: Vec<u32> = Vec::with_capacity(fields.len());
560        let mut dtypes: Vec<String> = Vec::with_capacity(fields.len());
561
562        for field in fields.iter() {
563            names.push(field.name().to_string());
564            let fid = field
565                .metadata()
566                .get(crate::constants::FIELD_ID_META_KEY)
567                .and_then(|s| s.parse::<u32>().ok())
568                .unwrap_or(0u32);
569            fids.push(fid);
570            dtypes.push(format!("{:?}", field.data_type()));
571        }
572
573        // Build Arrow arrays
574        let name_array: ArrayRef = Arc::new(StringArray::from(names));
575        let fid_array: ArrayRef = Arc::new(UInt32Array::from(fids));
576        let dtype_array: ArrayRef = Arc::new(StringArray::from(dtypes));
577
578        let rb_schema = Arc::new(Schema::new(vec![
579            Field::new("name", DataType::Utf8, false),
580            Field::new(crate::constants::FIELD_ID_META_KEY, DataType::UInt32, false),
581            Field::new("data_type", DataType::Utf8, false),
582        ]));
583
584        let batch = RecordBatch::try_new(rb_schema, vec![name_array, fid_array, dtype_array])?;
585        Ok(batch)
586    }
587
588    /// Create a streaming view over the provided row IDs for the specified logical fields.
589    pub fn stream_columns<'table>(
590        &'table self,
591        logical_fields: impl Into<Arc<[LogicalFieldId]>>,
592        row_ids: impl Into<RowIdSource>,
593        policy: GatherNullPolicy,
594    ) -> LlkvResult<TableScanStream<'table, P>> {
595        let logical_fields: Arc<[LogicalFieldId]> = logical_fields.into();
596        let mut projection_evals = Vec::with_capacity(logical_fields.len());
597        let mut schema_fields = Vec::with_capacity(logical_fields.len());
598        let mut unique_index: FxHashMap<LogicalFieldId, usize> = FxHashMap::default();
599        let mut unique_lfids: Vec<LogicalFieldId> = Vec::new();
600
601        for &lfid in logical_fields.iter() {
602            let dtype = self.store.data_type(lfid)?;
603            if let std::collections::hash_map::Entry::Vacant(entry) = unique_index.entry(lfid) {
604                entry.insert(unique_lfids.len());
605                unique_lfids.push(lfid);
606            }
607
608            let field_name = self
609                .catalog()
610                .get_cols_meta(self.table_id, &[lfid.field_id()])
611                .into_iter()
612                .flatten()
613                .next()
614                .and_then(|meta| meta.name)
615                .unwrap_or_else(|| format!("col_{}", lfid.field_id()));
616
617            projection_evals.push(ProjectionEval::Column(ColumnProjectionInfo {
618                logical_field_id: lfid,
619                data_type: dtype.clone(),
620                output_name: field_name.clone(),
621            }));
622            schema_fields.push(Field::new(field_name, dtype, true));
623        }
624
625        let schema = Arc::new(Schema::new(schema_fields));
626        let passthrough_fields = vec![None; projection_evals.len()];
627        let row_source = row_ids.into();
628
629        RowStreamBuilder::new(
630            self,
631            self.table_id,
632            Arc::clone(&schema),
633            Arc::new(unique_lfids),
634            Arc::new(projection_evals),
635            Arc::new(passthrough_fields),
636            Arc::new(unique_index),
637            Arc::new(FxHashSet::default()),
638            false,
639            policy,
640            row_source,
641            STREAM_BATCH_ROWS,
642            true,
643        )
644        .build()
645    }
646
647    pub fn store(&self) -> &ColumnStore<P> {
648        &self.store
649    }
650
651    #[inline]
652    pub fn table_id(&self) -> TableId {
653        self.table_id
654    }
655
656    /// Return the total number of rows for a given user column id in this table.
657    ///
658    /// This delegates to the ColumnStore descriptor for the logical field that
659    /// corresponds to (table_id, col_id) and returns the persisted total_row_count.
660    pub fn total_rows_for_col(&self, col_id: FieldId) -> llkv_result::Result<u64> {
661        let lfid = LogicalFieldId::for_user(self.table_id, col_id);
662        self.store.total_rows_for_field(lfid)
663    }
664
665    /// Return the total number of rows for this table.
666    ///
667    /// Prefer reading the dedicated row-id shadow column if present; otherwise
668    /// fall back to inspecting any persisted user column descriptor.
669    pub fn total_rows(&self) -> llkv_result::Result<u64> {
670        use llkv_column_map::store::rowid_fid;
671        let rid_lfid = rowid_fid(LogicalFieldId::for_user(self.table_id, 0));
672        // Try the row-id shadow column first
673        match self.store.total_rows_for_field(rid_lfid) {
674            Ok(n) => Ok(n),
675            Err(_) => {
676                // Fall back to scanning the catalog for any user-data column
677                self.store.total_rows_for_table(self.table_id)
678            }
679        }
680    }
681}
682
683macro_rules! impl_row_id_ignore_chunk {
684    (
685        $_base:ident,
686        $chunk:ident,
687        $_chunk_with_rids:ident,
688        $_run:ident,
689        $_run_with_rids:ident,
690        $array_ty:ty,
691        $_arrow_ty:ty,
692        $_dtype:expr,
693        $_native_ty:ty,
694        $_cast:expr
695    ) => {
696        fn $chunk(&mut self, _values: &$array_ty) {}
697    };
698}
699
700macro_rules! impl_row_id_ignore_sorted_run {
701    (
702        $_base:ident,
703        $_chunk:ident,
704        $_chunk_with_rids:ident,
705        $run:ident,
706        $_run_with_rids:ident,
707        $array_ty:ty,
708        $_arrow_ty:ty,
709        $_dtype:expr,
710        $_native_ty:ty,
711        $_cast:expr
712    ) => {
713        fn $run(&mut self, _values: &$array_ty, _start: usize, _len: usize) {}
714    };
715}
716
717macro_rules! impl_row_id_collect_chunk_with_rids {
718    (
719        $_base:ident,
720        $_chunk:ident,
721        $chunk_with_rids:ident,
722        $_run:ident,
723        $_run_with_rids:ident,
724        $array_ty:ty,
725        $_arrow_ty:ty,
726        $_dtype:expr,
727        $_native_ty:ty,
728        $_cast:expr
729    ) => {
730        fn $chunk_with_rids(&mut self, _: &$array_ty, row_ids: &UInt64Array) {
731            self.extend_from_array(row_ids);
732        }
733    };
734}
735
736macro_rules! impl_row_id_collect_sorted_run_with_rids {
737    (
738        $_base:ident,
739        $_chunk:ident,
740        $_chunk_with_rids:ident,
741        $_run:ident,
742        $run_with_rids:ident,
743        $array_ty:ty,
744        $_arrow_ty:ty,
745        $_dtype:expr,
746        $_native_ty:ty,
747        $_cast:expr
748    ) => {
749        fn $run_with_rids(
750            &mut self,
751            _: &$array_ty,
752            row_ids: &UInt64Array,
753            start: usize,
754            len: usize,
755        ) {
756            self.extend_from_slice(row_ids, start, len);
757        }
758    };
759}
760
761macro_rules! impl_row_id_stream_chunk_with_rids {
762    (
763        $_base:ident,
764        $_chunk:ident,
765        $chunk_with_rids:ident,
766        $_run:ident,
767        $_run_with_rids:ident,
768        $array_ty:ty,
769        $_arrow_ty:ty,
770        $_dtype:expr,
771        $_native_ty:ty,
772        $_cast:expr
773    ) => {
774        fn $chunk_with_rids(&mut self, _: &$array_ty, row_ids: &UInt64Array) {
775            self.extend_from_array(row_ids);
776        }
777    };
778}
779
780macro_rules! impl_row_id_stream_sorted_run_with_rids {
781    (
782        $_base:ident,
783        $_chunk:ident,
784        $_chunk_with_rids:ident,
785        $_run:ident,
786        $run_with_rids:ident,
787        $array_ty:ty,
788        $_arrow_ty:ty,
789        $_dtype:expr,
790        $_native_ty:ty,
791        $_cast:expr
792    ) => {
793        fn $run_with_rids(
794            &mut self,
795            _: &$array_ty,
796            row_ids: &UInt64Array,
797            start: usize,
798            len: usize,
799        ) {
800            self.extend_sorted_run(row_ids, start, len);
801        }
802    };
803}
804
805#[derive(Default)]
806struct RowIdScanCollector {
807    row_ids: Treemap,
808}
809
810impl RowIdScanCollector {
811    fn extend_from_array(&mut self, row_ids: &UInt64Array) {
812        for idx in 0..row_ids.len() {
813            self.row_ids.add(row_ids.value(idx));
814        }
815    }
816
817    fn extend_from_slice(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
818        if len == 0 {
819            return;
820        }
821        let end = (start + len).min(row_ids.len());
822        for idx in start..end {
823            self.row_ids.add(row_ids.value(idx));
824        }
825    }
826
827    fn into_inner(self) -> Treemap {
828        self.row_ids
829    }
830}
831
832impl llkv_column_map::scan::PrimitiveVisitor for RowIdScanCollector {
833    llkv_for_each_arrow_numeric!(impl_row_id_ignore_chunk);
834    llkv_for_each_arrow_boolean!(impl_row_id_ignore_chunk);
835    llkv_for_each_arrow_string!(impl_row_id_ignore_chunk);
836}
837
838impl llkv_column_map::scan::PrimitiveSortedVisitor for RowIdScanCollector {
839    llkv_for_each_arrow_numeric!(impl_row_id_ignore_sorted_run);
840    llkv_for_each_arrow_boolean!(impl_row_id_ignore_sorted_run);
841    llkv_for_each_arrow_string!(impl_row_id_ignore_sorted_run);
842}
843
844impl llkv_column_map::scan::PrimitiveWithRowIdsVisitor for RowIdScanCollector {
845    llkv_for_each_arrow_numeric!(impl_row_id_collect_chunk_with_rids);
846    llkv_for_each_arrow_boolean!(impl_row_id_collect_chunk_with_rids);
847    llkv_for_each_arrow_string!(impl_row_id_collect_chunk_with_rids);
848}
849
850impl llkv_column_map::scan::PrimitiveSortedWithRowIdsVisitor for RowIdScanCollector {
851    llkv_for_each_arrow_numeric!(impl_row_id_collect_sorted_run_with_rids);
852    llkv_for_each_arrow_boolean!(impl_row_id_collect_sorted_run_with_rids);
853    llkv_for_each_arrow_string!(impl_row_id_collect_sorted_run_with_rids);
854
855    fn null_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
856        self.extend_from_slice(row_ids, start, len);
857    }
858}
859
860struct RowIdChunkEmitter<'a> {
861    chunk_size: usize,
862    buffer: Vec<RowId>,
863    reverse_sorted_runs: bool,
864    on_chunk: &'a mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
865    error: Option<Error>,
866}
867
868impl<'a> RowIdChunkEmitter<'a> {
869    fn new(
870        chunk_size: usize,
871        reverse_sorted_runs: bool,
872        on_chunk: &'a mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
873    ) -> Self {
874        let chunk_size = cmp::max(1, chunk_size);
875        Self {
876            chunk_size,
877            buffer: Vec::with_capacity(chunk_size),
878            reverse_sorted_runs,
879            on_chunk,
880            error: None,
881        }
882    }
883
884    fn extend_from_array(&mut self, row_ids: &UInt64Array) {
885        if self.error.is_some() {
886            return;
887        }
888
889        // Optimization: If buffer is empty, pass slices directly to avoid copy
890        if self.buffer.is_empty() {
891            let values = row_ids.values();
892            let mut offset = 0;
893            while offset < values.len() {
894                let remaining = values.len() - offset;
895                if remaining >= self.chunk_size {
896                    if let Err(err) = (self.on_chunk)(&values[offset..offset + self.chunk_size]) {
897                        self.error = Some(err);
898                        return;
899                    }
900                    offset += self.chunk_size;
901                } else {
902                    // Buffer the remainder
903                    self.buffer.extend_from_slice(&values[offset..]);
904                    break;
905                }
906            }
907            return;
908        }
909
910        let values = row_ids.values();
911        let mut offset = 0;
912        while offset < values.len() {
913            let remaining = self.chunk_size - self.buffer.len();
914            let available = values.len() - offset;
915            let count = remaining.min(available);
916
917            self.buffer
918                .extend_from_slice(&values[offset..offset + count]);
919            offset += count;
920
921            if self.buffer.len() >= self.chunk_size {
922                self.flush();
923            }
924        }
925    }
926
927    fn extend_from_slice(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
928        if self.error.is_some() || len == 0 {
929            return;
930        }
931        let end = (start + len).min(row_ids.len());
932        let values = &row_ids.values()[start..end];
933
934        let mut offset = 0;
935        while offset < values.len() {
936            let remaining = self.chunk_size - self.buffer.len();
937            let available = values.len() - offset;
938            let count = remaining.min(available);
939
940            self.buffer
941                .extend_from_slice(&values[offset..offset + count]);
942            offset += count;
943
944            if self.buffer.len() >= self.chunk_size {
945                self.flush();
946            }
947        }
948    }
949
950    fn extend_sorted_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
951        if self.reverse_sorted_runs {
952            if self.error.is_some() || len == 0 {
953                return;
954            }
955            let mut idx = (start + len).min(row_ids.len());
956            while idx > start {
957                idx -= 1;
958                self.push(row_ids.value(idx));
959            }
960        } else {
961            self.extend_from_slice(row_ids, start, len);
962        }
963    }
964
965    fn push(&mut self, value: RowId) {
966        if self.error.is_some() {
967            return;
968        }
969        self.buffer.push(value);
970        if self.buffer.len() >= self.chunk_size {
971            self.flush();
972        }
973    }
974
975    fn flush(&mut self) {
976        if self.error.is_some() || self.buffer.is_empty() {
977            return;
978        }
979        if let Err(err) = (self.on_chunk)(self.buffer.as_slice()) {
980            self.error = Some(err);
981        } else {
982            self.buffer.clear();
983        }
984    }
985
986    fn finish(mut self) -> LlkvResult<()> {
987        self.flush();
988        match self.error {
989            Some(err) => Err(err),
990            None => Ok(()),
991        }
992    }
993}
994
995impl<'a> llkv_column_map::scan::PrimitiveVisitor for RowIdChunkEmitter<'a> {
996    llkv_for_each_arrow_numeric!(impl_row_id_ignore_chunk);
997    llkv_for_each_arrow_boolean!(impl_row_id_ignore_chunk);
998}
999
1000impl<'a> llkv_column_map::scan::PrimitiveSortedVisitor for RowIdChunkEmitter<'a> {
1001    llkv_for_each_arrow_numeric!(impl_row_id_ignore_sorted_run);
1002    llkv_for_each_arrow_boolean!(impl_row_id_ignore_sorted_run);
1003}
1004
1005impl<'a> llkv_column_map::scan::PrimitiveWithRowIdsVisitor for RowIdChunkEmitter<'a> {
1006    llkv_for_each_arrow_numeric!(impl_row_id_stream_chunk_with_rids);
1007    llkv_for_each_arrow_boolean!(impl_row_id_stream_chunk_with_rids);
1008}
1009
1010impl<'a> llkv_column_map::scan::PrimitiveSortedWithRowIdsVisitor for RowIdChunkEmitter<'a> {
1011    llkv_for_each_arrow_numeric!(impl_row_id_stream_sorted_run_with_rids);
1012    llkv_for_each_arrow_boolean!(impl_row_id_stream_sorted_run_with_rids);
1013
1014    fn null_run(&mut self, row_ids: &UInt64Array, start: usize, len: usize) {
1015        self.extend_sorted_run(row_ids, start, len);
1016    }
1017}
1018
1019impl<P> ScanStorage<P> for Table<P>
1020where
1021    P: Pager<Blob = EntryHandle> + Send + Sync,
1022{
1023    fn table_id(&self) -> TableId {
1024        self.table_id
1025    }
1026
1027    fn field_data_type(&self, fid: LogicalFieldId) -> LlkvResult<DataType> {
1028        self.store.data_type(fid)
1029    }
1030
1031    fn total_rows(&self) -> LlkvResult<u64> {
1032        Table::total_rows(self)
1033    }
1034
1035    fn prepare_gather_context(
1036        &self,
1037        logical_fields: &[LogicalFieldId],
1038    ) -> LlkvResult<MultiGatherContext> {
1039        self.store.prepare_gather_context(logical_fields)
1040    }
1041
1042    fn gather_row_window_with_context(
1043        &self,
1044        logical_fields: &[LogicalFieldId],
1045        row_ids: &[u64],
1046        null_policy: GatherNullPolicy,
1047        ctx: Option<&mut MultiGatherContext>,
1048    ) -> LlkvResult<RecordBatch> {
1049        self.store
1050            .gather_row_window_with_context(logical_fields, row_ids, null_policy, ctx)
1051    }
1052
1053    fn filter_row_ids<'expr>(&self, filter_expr: &Expr<'expr, FieldId>) -> LlkvResult<Treemap> {
1054        Table::filter_row_ids(self, filter_expr)
1055    }
1056
1057    fn filter_leaf(&self, filter: &OwnedFilter) -> LlkvResult<Treemap> {
1058        let source = self.collect_row_ids_for_filter(filter)?;
1059        Ok(match source {
1060            RowIdSource::Bitmap(b) => b,
1061            RowIdSource::Vector(v) => Treemap::from_iter(v),
1062        })
1063    }
1064
1065    fn filter_fused(
1066        &self,
1067        field_id: FieldId,
1068        filters: &[OwnedFilter],
1069        cache: &PredicateFusionCache,
1070    ) -> LlkvResult<RowIdSource> {
1071        self.collect_fused_predicates(field_id, filters, cache)
1072    }
1073
1074    fn all_row_ids(&self) -> LlkvResult<Treemap> {
1075        self.compute_table_row_ids()
1076    }
1077
1078    fn sorted_row_ids_full_table(&self, order_spec: ScanOrderSpec) -> LlkvResult<Option<Vec<u64>>> {
1079        self.collect_full_table_sorted_row_ids(order_spec)
1080    }
1081
1082    fn stream_row_ids(
1083        &self,
1084        chunk_size: usize,
1085        on_chunk: &mut dyn FnMut(&[u64]) -> LlkvResult<()>,
1086    ) -> LlkvResult<()> {
1087        self.stream_table_row_ids(chunk_size, on_chunk)
1088    }
1089
1090    fn as_any(&self) -> &dyn std::any::Any {
1091        self
1092    }
1093}
1094
1095impl<P> Table<P>
1096where
1097    P: Pager<Blob = EntryHandle> + Send + Sync,
1098{
1099    fn collect_row_ids_for_table<'expr>(
1100        &self,
1101        filter_expr: &Expr<'expr, FieldId>,
1102    ) -> LlkvResult<RowIdSource> {
1103        let fusion_cache = PredicateFusionCache::from_expr(filter_expr);
1104        let mut all_rows_cache: FxHashMap<FieldId, Treemap> = FxHashMap::default();
1105        let filter_arc = Arc::new(filter_expr.clone());
1106        let programs = ProgramCompiler::new(filter_arc).compile()?;
1107        llkv_scan::predicate::collect_row_ids_for_program(
1108            self,
1109            &programs,
1110            &fusion_cache,
1111            &mut all_rows_cache,
1112        )
1113    }
1114
1115    fn collect_row_ids_for_filter(&self, filter: &OwnedFilter) -> LlkvResult<RowIdSource> {
1116        if filter.field_id == ROW_ID_FIELD_ID {
1117            let op = filter.op.to_operator();
1118            let row_ids = self.collect_row_ids_for_rowid_filter(&op)?;
1119            return Ok(RowIdSource::Bitmap(row_ids));
1120        }
1121
1122        let filter_lfid = LogicalFieldId::for_user(self.table_id, filter.field_id);
1123        let dtype = self.store.data_type(filter_lfid)?;
1124
1125        match &filter.op {
1126            OwnedOperator::IsNotNull => {
1127                let mut cache = FxHashMap::default();
1128                let non_null = self.collect_all_row_ids_for_field(filter.field_id, &mut cache)?;
1129                return Ok(RowIdSource::Bitmap(non_null));
1130            }
1131            OwnedOperator::IsNull => {
1132                let all_row_ids = self.compute_table_row_ids()?;
1133                if all_row_ids.is_empty() {
1134                    return Ok(RowIdSource::Bitmap(Treemap::new()));
1135                }
1136                let mut cache = FxHashMap::default();
1137                let non_null = self.collect_all_row_ids_for_field(filter.field_id, &mut cache)?;
1138                let null_ids = all_row_ids - non_null;
1139                return Ok(RowIdSource::Bitmap(null_ids));
1140            }
1141            _ => {}
1142        }
1143
1144        if let OwnedOperator::Range {
1145            lower: Bound::Unbounded,
1146            upper: Bound::Unbounded,
1147        } = &filter.op
1148        {
1149            let all_rows = self.compute_table_row_ids()?;
1150            return Ok(RowIdSource::Bitmap(all_rows));
1151        }
1152
1153        let op = filter.op.to_operator();
1154        let row_ids = match &dtype {
1155            DataType::Utf8 => self.collect_matching_row_ids_string::<i32>(filter_lfid, &op),
1156            DataType::LargeUtf8 => self.collect_matching_row_ids_string::<i64>(filter_lfid, &op),
1157            DataType::Boolean => self.collect_matching_row_ids_bool(filter_lfid, &op),
1158            other => llkv_column_map::with_integer_arrow_type!(
1159                other.clone(),
1160                |ArrowTy| self.collect_matching_row_ids::<ArrowTy>(filter_lfid, &op),
1161                Err(Error::Internal(format!(
1162                    "Filtering on type {:?} is not supported",
1163                    other
1164                )))
1165            ),
1166        }?;
1167
1168        Ok(RowIdSource::Bitmap(row_ids))
1169    }
1170
1171    fn collect_fused_predicates(
1172        &self,
1173        _field_id: FieldId,
1174        filters: &[OwnedFilter],
1175        _cache: &PredicateFusionCache,
1176    ) -> LlkvResult<RowIdSource> {
1177        let mut result: Option<Treemap> = None;
1178
1179        for filter in filters {
1180            let rows = match self.collect_row_ids_for_filter(filter)? {
1181                RowIdSource::Bitmap(b) => b,
1182                RowIdSource::Vector(v) => Treemap::from_iter(v),
1183            };
1184
1185            result = Some(match result {
1186                Some(acc) => acc & rows,
1187                None => rows,
1188            });
1189
1190            if let Some(ref r) = result
1191                && r.is_empty()
1192            {
1193                return Ok(RowIdSource::Bitmap(Treemap::new()));
1194            }
1195        }
1196
1197        Ok(RowIdSource::Bitmap(result.unwrap_or_default()))
1198    }
1199
1200    fn collect_all_row_ids_for_field(
1201        &self,
1202        field_id: FieldId,
1203        cache: &mut FxHashMap<FieldId, Treemap>,
1204    ) -> LlkvResult<Treemap> {
1205        if let Some(rows) = cache.get(&field_id) {
1206            return Ok(rows.clone());
1207        }
1208
1209        let lfid = LogicalFieldId::for_user(self.table_id, field_id);
1210        let mut collector = RowIdScanCollector::default();
1211        ScanBuilder::new(self.store(), lfid)
1212            .options(ScanOptions {
1213                with_row_ids: true,
1214                ..Default::default()
1215            })
1216            .run(&mut collector)?;
1217
1218        let rows = collector.into_inner();
1219        cache.insert(field_id, rows.clone());
1220        Ok(rows)
1221    }
1222
1223    fn collect_matching_row_ids<T>(
1224        &self,
1225        lfid: LogicalFieldId,
1226        op: &Operator,
1227    ) -> LlkvResult<Treemap>
1228    where
1229        T: ArrowPrimitiveType
1230            + FilterPrimitive<Native = <T as ArrowPrimitiveType>::Native>
1231            + FilterDispatch<Value = <T as ArrowPrimitiveType>::Native>,
1232        <T as ArrowPrimitiveType>::Native: PartialOrd + Copy + FromLiteral + PredicateValue,
1233    {
1234        let predicate = build_fixed_width_predicate::<T>(op).map_err(Error::predicate_build)?;
1235        let row_ids =
1236            <T as FilterPrimitive>::run_nullable_filter(self.store(), lfid, |v| match v {
1237                Some(val) => predicate.matches(PredicateValue::borrowed(&val)),
1238                None => false,
1239            })?;
1240        Ok(Treemap::from_iter(row_ids))
1241    }
1242
1243    fn collect_matching_row_ids_string<O>(
1244        &self,
1245        lfid: LogicalFieldId,
1246        op: &Operator,
1247    ) -> LlkvResult<Treemap>
1248    where
1249        O: OffsetSizeTrait + llkv_column_map::store::scan::StringContainsKernel,
1250    {
1251        let predicate = build_var_width_predicate(op).map_err(Error::predicate_build)?;
1252        let row_ids = Utf8Filter::<O>::run_filter(self.store(), lfid, &predicate)?;
1253        Ok(Treemap::from_iter(row_ids))
1254    }
1255
1256    fn collect_matching_row_ids_bool(
1257        &self,
1258        lfid: LogicalFieldId,
1259        op: &Operator,
1260    ) -> LlkvResult<Treemap> {
1261        let predicate = build_bool_predicate(op).map_err(Error::predicate_build)?;
1262
1263        let row_ids = arrow::datatypes::BooleanType::run_nullable_filter(
1264            self.store(),
1265            lfid,
1266            |val: Option<bool>| match val {
1267                Some(v) => predicate.matches(&v),
1268                None => false,
1269            },
1270        )?;
1271        Ok(Treemap::from_iter(row_ids))
1272    }
1273
1274    fn collect_row_ids_for_rowid_filter(&self, op: &Operator<'_>) -> LlkvResult<Treemap> {
1275        let all_row_ids = self.compute_table_row_ids()?;
1276        if all_row_ids.is_empty() {
1277            return Ok(Treemap::new());
1278        }
1279        RowIdBitmapFilter::filter_by_operator(&all_row_ids, op)
1280    }
1281
1282    fn collect_full_table_sorted_row_ids(
1283        &self,
1284        order_spec: ScanOrderSpec,
1285    ) -> LlkvResult<Option<Vec<u64>>> {
1286        use llkv_column_map::store::rowid_fid;
1287
1288        if !matches!(
1289            order_spec.transform,
1290            ScanOrderTransform::IdentityInt64
1291                | ScanOrderTransform::IdentityInt32
1292                | ScanOrderTransform::IdentityUtf8
1293        ) {
1294            return Ok(None);
1295        }
1296
1297        let lfid = LogicalFieldId::for_user(self.table_id, order_spec.field_id);
1298        let dtype = match self.store.data_type(lfid) {
1299            Ok(dt) => dt,
1300            Err(Error::NotFound) => return Ok(None),
1301            Err(err) => return Err(err),
1302        };
1303
1304        if !Self::order_transform_matches_dtype(order_spec.transform, &dtype) {
1305            return Ok(None);
1306        }
1307
1308        let mut ordered: Vec<u64> = Vec::new();
1309
1310        if let Ok(total_rows) = self.total_rows()
1311            && let Ok(cap) = usize::try_from(total_rows)
1312        {
1313            ordered.reserve(cap);
1314        }
1315
1316        let mut on_chunk = |chunk: &[RowId]| -> LlkvResult<()> {
1317            ordered.extend_from_slice(chunk);
1318            Ok(())
1319        };
1320        let reverse_sorted_runs = matches!(order_spec.direction, ScanOrderDirection::Descending);
1321        let mut emitter =
1322            RowIdChunkEmitter::new(STREAM_BATCH_ROWS, reverse_sorted_runs, &mut on_chunk);
1323        let options = ScanOptions {
1324            sorted: true,
1325            reverse: matches!(order_spec.direction, ScanOrderDirection::Descending),
1326            with_row_ids: true,
1327            include_nulls: true,
1328            nulls_first: order_spec.nulls_first,
1329            anchor_row_id_field: Some(rowid_fid(lfid)),
1330            ..Default::default()
1331        };
1332
1333        match ScanBuilder::new(self.store(), lfid)
1334            .options(options)
1335            .run(&mut emitter)
1336        {
1337            Ok(()) => emitter.finish()?,
1338            Err(Error::NotFound) => return Ok(None),
1339            Err(err) => return Err(err),
1340        }
1341
1342        Ok(Some(ordered))
1343    }
1344
1345    fn order_transform_matches_dtype(transform: ScanOrderTransform, dtype: &DataType) -> bool {
1346        match transform {
1347            ScanOrderTransform::IdentityInt64 => matches!(dtype, DataType::Int64),
1348            ScanOrderTransform::IdentityInt32 => matches!(dtype, DataType::Int32),
1349            ScanOrderTransform::IdentityUtf8 => matches!(dtype, DataType::Utf8),
1350            ScanOrderTransform::CastUtf8ToInteger => false,
1351        }
1352    }
1353
1354    fn compute_table_row_ids(&self) -> LlkvResult<Treemap> {
1355        use llkv_column_map::store::rowid_fid;
1356
1357        if let Some(rows) = self.collect_row_ids_from_mvcc()? {
1358            return Ok(rows);
1359        }
1360
1361        let fields = self.store.user_field_ids_for_table(self.table_id);
1362        if fields.is_empty() {
1363            return Ok(Treemap::new());
1364        }
1365
1366        let expected = self
1367            .store
1368            .total_rows_for_table(self.table_id)
1369            .unwrap_or_default();
1370
1371        if expected > 0
1372            && let Some(&first_field) = fields.first()
1373        {
1374            let rid_shadow = rowid_fid(first_field);
1375            let mut collector = RowIdScanCollector::default();
1376
1377            match ScanBuilder::new(self.store(), rid_shadow)
1378                .options(ScanOptions {
1379                    with_row_ids: true,
1380                    ..Default::default()
1381                })
1382                .run(&mut collector)
1383            {
1384                Ok(_) => {
1385                    let row_ids = collector.into_inner();
1386                    if row_ids.cardinality() == expected {
1387                        return Ok(row_ids);
1388                    }
1389                }
1390                Err(llkv_result::Error::NotFound) => {}
1391                Err(_) => {}
1392            }
1393        }
1394
1395        let mut collected = Treemap::new();
1396
1397        for lfid in fields.clone() {
1398            let mut collector = RowIdScanCollector::default();
1399            ScanBuilder::new(self.store(), lfid)
1400                .options(ScanOptions {
1401                    with_row_ids: true,
1402                    ..Default::default()
1403                })
1404                .run(&mut collector)?;
1405            let rows = collector.into_inner();
1406            collected.or_inplace(&rows);
1407
1408            if expected > 0 && collected.cardinality() >= expected {
1409                break;
1410            }
1411        }
1412
1413        Ok(collected)
1414    }
1415
1416    fn collect_row_ids_from_mvcc(&self) -> LlkvResult<Option<Treemap>> {
1417        let Some(rows) = self.fetch_mvcc_row_ids()? else {
1418            return Ok(None);
1419        };
1420        Ok(Some(Treemap::from_iter(rows)))
1421    }
1422
1423    fn stream_table_row_ids(
1424        &self,
1425        chunk_size: usize,
1426        on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
1427    ) -> LlkvResult<()> {
1428        use llkv_column_map::store::rowid_fid;
1429
1430        if self.try_stream_row_ids_from_mvcc(chunk_size, on_chunk)? {
1431            return Ok(());
1432        }
1433
1434        let fields = self.store.user_field_ids_for_table(self.table_id);
1435        if fields.is_empty() {
1436            return Ok(());
1437        }
1438
1439        let Some(&first_field) = fields.first() else {
1440            return Ok(());
1441        };
1442
1443        let rid_shadow = rowid_fid(first_field);
1444        let mut emitter = RowIdChunkEmitter::new(chunk_size, false, on_chunk);
1445        let scan_result = ScanBuilder::new(self.store(), rid_shadow)
1446            .options(ScanOptions {
1447                with_row_ids: true,
1448                ..Default::default()
1449            })
1450            .run(&mut emitter);
1451
1452        match scan_result {
1453            Ok(()) => emitter.finish(),
1454            Err(Error::NotFound) => {
1455                let _ = emitter.finish();
1456                let all_rows = self.compute_table_row_ids()?;
1457                if all_rows.is_empty() {
1458                    return Ok(());
1459                }
1460
1461                let chunk_cap = cmp::max(1, chunk_size);
1462                let mut chunk = Vec::with_capacity(chunk_cap);
1463                for row_id in all_rows.iter() {
1464                    chunk.push(row_id);
1465                    if chunk.len() >= chunk_cap {
1466                        (on_chunk)(chunk.as_slice())?;
1467                        chunk.clear();
1468                    }
1469                }
1470                if !chunk.is_empty() {
1471                    (on_chunk)(chunk.as_slice())?;
1472                }
1473                Ok(())
1474            }
1475            Err(err) => {
1476                let _ = emitter.finish();
1477                Err(err)
1478            }
1479        }
1480    }
1481
1482    fn try_stream_row_ids_from_mvcc(
1483        &self,
1484        chunk_size: usize,
1485        on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
1486    ) -> LlkvResult<bool> {
1487        let created_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
1488        let mut emitter = RowIdChunkEmitter::new(chunk_size, false, on_chunk);
1489
1490        let scan_result = ScanBuilder::new(self.store(), created_lfid)
1491            .options(ScanOptions {
1492                with_row_ids: true,
1493                ..Default::default()
1494            })
1495            .run(&mut emitter);
1496
1497        match scan_result {
1498            Ok(()) => {
1499                emitter.finish()?;
1500                Ok(true)
1501            }
1502            Err(Error::NotFound) => Ok(false),
1503            Err(err) => Err(err),
1504        }
1505    }
1506
1507    fn fetch_mvcc_row_ids(&self) -> LlkvResult<Option<Vec<RowId>>> {
1508        let created_lfid = LogicalFieldId::for_mvcc_created_by(self.table_id);
1509        match self
1510            .store
1511            .filter_row_ids::<UInt64Type>(created_lfid, &Predicate::All)
1512        {
1513            Ok(rows) => Ok(Some(rows)),
1514            Err(Error::NotFound) => Ok(None),
1515            Err(err) => Err(err),
1516        }
1517    }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522    use super::*;
1523    use crate::reserved::CATALOG_TABLE_ID;
1524    use crate::types::RowId;
1525    use arrow::array::Array;
1526    use arrow::array::ArrayRef;
1527    use arrow::array::{
1528        BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt8Array,
1529        UInt32Array, UInt64Array,
1530    };
1531    use arrow::compute::{cast, max, min, sum, unary};
1532    use arrow::datatypes::DataType;
1533    use llkv_column_map::ColumnStore;
1534    use llkv_column_map::store::{GatherNullPolicy, Projection};
1535    use llkv_expr::{BinaryOp, CompareOp, Filter, Operator, ScalarExpr};
1536    use std::collections::HashMap;
1537    use std::ops::Bound;
1538
1539    fn setup_test_table() -> Table {
1540        let pager = Arc::new(MemPager::default());
1541        setup_test_table_with_pager(&pager)
1542    }
1543
1544    fn setup_test_table_with_pager(pager: &Arc<MemPager>) -> Table {
1545        let table = Table::from_id(1, Arc::clone(pager)).unwrap();
1546        const COL_A_U64: FieldId = 10;
1547        const COL_B_BIN: FieldId = 11;
1548        const COL_C_I32: FieldId = 12;
1549        const COL_D_F64: FieldId = 13;
1550        const COL_E_F32: FieldId = 14;
1551
1552        let schema = Arc::new(Schema::new(vec![
1553            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1554            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1555                crate::constants::FIELD_ID_META_KEY.to_string(),
1556                COL_A_U64.to_string(),
1557            )])),
1558            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
1559                crate::constants::FIELD_ID_META_KEY.to_string(),
1560                COL_B_BIN.to_string(),
1561            )])),
1562            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1563                crate::constants::FIELD_ID_META_KEY.to_string(),
1564                COL_C_I32.to_string(),
1565            )])),
1566            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
1567                crate::constants::FIELD_ID_META_KEY.to_string(),
1568                COL_D_F64.to_string(),
1569            )])),
1570            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
1571                crate::constants::FIELD_ID_META_KEY.to_string(),
1572                COL_E_F32.to_string(),
1573            )])),
1574        ]));
1575
1576        let batch = RecordBatch::try_new(
1577            schema.clone(),
1578            vec![
1579                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
1580                Arc::new(UInt64Array::from(vec![100, 200, 300, 200])),
1581                Arc::new(BinaryArray::from(vec![
1582                    b"foo" as &[u8],
1583                    b"bar",
1584                    b"baz",
1585                    b"qux",
1586                ])),
1587                Arc::new(Int32Array::from(vec![10, 20, 30, 20])),
1588                Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5, 2.5])),
1589                Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 2.0])),
1590            ],
1591        )
1592        .unwrap();
1593
1594        table.append(&batch).unwrap();
1595        table
1596    }
1597
1598    fn gather_single(
1599        store: &ColumnStore<MemPager>,
1600        field_id: LogicalFieldId,
1601        row_ids: &[u64],
1602    ) -> ArrayRef {
1603        store
1604            .gather_rows(&[field_id], row_ids, GatherNullPolicy::ErrorOnMissing)
1605            .unwrap()
1606            .column(0)
1607            .clone()
1608    }
1609
1610    fn pred_expr<'a>(filter: Filter<'a, FieldId>) -> Expr<'a, FieldId> {
1611        Expr::Pred(filter)
1612    }
1613
1614    fn proj(table: &Table, field_id: FieldId) -> Projection {
1615        Projection::from(LogicalFieldId::for_user(table.table_id, field_id))
1616    }
1617
1618    fn proj_alias<S: Into<String>>(table: &Table, field_id: FieldId, alias: S) -> Projection {
1619        Projection::with_alias(LogicalFieldId::for_user(table.table_id, field_id), alias)
1620    }
1621
1622    #[test]
1623    fn row_id_chunk_emitter_reverses_sorted_runs() {
1624        let array = UInt64Array::from(vec![10_u64, 20, 30, 40, 50]);
1625        let mut emitted: Vec<RowId> = Vec::new();
1626
1627        {
1628            let mut on_chunk = |chunk: &[RowId]| -> LlkvResult<()> {
1629                emitted.extend_from_slice(chunk);
1630                Ok(())
1631            };
1632            let mut emitter = RowIdChunkEmitter::new(2, true, &mut on_chunk);
1633            emitter.extend_sorted_run(&array, 0, array.len());
1634            emitter.finish().unwrap();
1635        }
1636
1637        assert_eq!(emitted, vec![50, 40, 30, 20, 10]);
1638    }
1639
1640    #[test]
1641    fn table_new_rejects_reserved_table_id() {
1642        let result = Table::from_id(CATALOG_TABLE_ID, Arc::new(MemPager::default()));
1643        assert!(matches!(
1644            result,
1645            Err(Error::ReservedTableId(id)) if id == CATALOG_TABLE_ID
1646        ));
1647    }
1648
1649    #[test]
1650    fn test_append_rejects_logical_field_id_in_metadata() {
1651        // Create a table and build a schema where the column's metadata
1652        // contains a fully-qualified LogicalFieldId (u64). Append should
1653        // reject this and require a plain user FieldId instead.
1654        let table = Table::from_id(7, Arc::new(MemPager::default())).unwrap();
1655
1656        const USER_FID: FieldId = 42;
1657        // Build a logical id (namespaced) and put its numeric value into metadata
1658        let logical: LogicalFieldId = LogicalFieldId::for_user(table.table_id(), USER_FID);
1659        let logical_val: u64 = logical.into();
1660
1661        let schema = Arc::new(Schema::new(vec![
1662            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1663            Field::new("bad", DataType::UInt64, false).with_metadata(HashMap::from([(
1664                crate::constants::FIELD_ID_META_KEY.to_string(),
1665                logical_val.to_string(),
1666            )])),
1667        ]));
1668
1669        let batch = RecordBatch::try_new(
1670            schema,
1671            vec![
1672                Arc::new(UInt64Array::from(vec![1u64, 2u64])),
1673                Arc::new(UInt64Array::from(vec![10u64, 20u64])),
1674            ],
1675        )
1676        .unwrap();
1677
1678        let res = table.append(&batch);
1679        assert!(matches!(res, Err(Error::Internal(_))));
1680    }
1681
1682    #[test]
1683    fn test_scan_with_u64_filter() {
1684        let table = setup_test_table();
1685        const COL_A_U64: FieldId = 10;
1686        const COL_C_I32: FieldId = 12;
1687
1688        let expr = pred_expr(Filter {
1689            field_id: COL_A_U64,
1690            op: Operator::Equals(200.into()),
1691        });
1692
1693        let mut vals: Vec<Option<i32>> = Vec::new();
1694        table
1695            .scan_stream(
1696                &[proj(&table, COL_C_I32)],
1697                &expr,
1698                ScanStreamOptions::default(),
1699                |b| {
1700                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1701                    vals.extend(
1702                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1703                    );
1704                },
1705            )
1706            .unwrap();
1707        assert_eq!(vals, vec![Some(20), Some(20)]);
1708    }
1709
1710    #[test]
1711    fn test_scan_with_string_filter() {
1712        let pager = Arc::new(MemPager::default());
1713        let table = Table::from_id(500, Arc::clone(&pager)).unwrap();
1714
1715        const COL_STR: FieldId = 42;
1716        let schema = Arc::new(Schema::new(vec![
1717            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1718            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
1719                crate::constants::FIELD_ID_META_KEY.to_string(),
1720                COL_STR.to_string(),
1721            )])),
1722        ]));
1723
1724        let batch = RecordBatch::try_new(
1725            schema,
1726            vec![
1727                Arc::new(UInt64Array::from(vec![1, 2, 3, 4])),
1728                Arc::new(StringArray::from(vec!["alice", "bob", "albert", "carol"])),
1729            ],
1730        )
1731        .unwrap();
1732        table.append(&batch).unwrap();
1733
1734        let expr = pred_expr(Filter {
1735            field_id: COL_STR,
1736            op: Operator::starts_with("al".to_string(), true),
1737        });
1738
1739        let mut collected: Vec<Option<String>> = Vec::new();
1740        table
1741            .scan_stream(
1742                &[proj(&table, COL_STR)],
1743                &expr,
1744                ScanStreamOptions::default(),
1745                |b| {
1746                    let arr = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1747                    collected.extend(arr.iter().map(|v| v.map(|s| s.to_string())));
1748                },
1749            )
1750            .unwrap();
1751
1752        assert_eq!(
1753            collected,
1754            vec![Some("alice".to_string()), Some("albert".to_string())]
1755        );
1756    }
1757
1758    #[test]
1759    fn test_table_reopen_with_shared_pager() {
1760        const TABLE_ALPHA: TableId = 42;
1761        const TABLE_BETA: TableId = 43;
1762        const TABLE_GAMMA: TableId = 44;
1763        const COL_ALPHA_U64: FieldId = 100;
1764        const COL_ALPHA_I32: FieldId = 101;
1765        const COL_ALPHA_U32: FieldId = 102;
1766        const COL_ALPHA_I16: FieldId = 103;
1767        const COL_BETA_U64: FieldId = 200;
1768        const COL_BETA_U8: FieldId = 201;
1769        const COL_GAMMA_I16: FieldId = 300;
1770
1771        let pager = Arc::new(MemPager::default());
1772
1773        let alpha_rows: Vec<RowId> = vec![1, 2, 3, 4];
1774        let alpha_vals_u64: Vec<u64> = vec![10, 20, 30, 40];
1775        let alpha_vals_i32: Vec<i32> = vec![-5, 15, 25, 35];
1776        let alpha_vals_u32: Vec<u32> = vec![7, 11, 13, 17];
1777        let alpha_vals_i16: Vec<i16> = vec![-2, 4, -6, 8];
1778
1779        let beta_rows: Vec<RowId> = vec![101, 102, 103];
1780        let beta_vals_u64: Vec<u64> = vec![900, 901, 902];
1781        let beta_vals_u8: Vec<u8> = vec![1, 2, 3];
1782
1783        let gamma_rows: Vec<RowId> = vec![501, 502];
1784        let gamma_vals_i16: Vec<i16> = vec![123, -321];
1785
1786        // First session: create tables and write data.
1787        {
1788            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1789            let schema = Arc::new(Schema::new(vec![
1790                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1791                Field::new("alpha_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1792                    crate::constants::FIELD_ID_META_KEY.to_string(),
1793                    COL_ALPHA_U64.to_string(),
1794                )])),
1795                Field::new("alpha_i32", DataType::Int32, false).with_metadata(HashMap::from([(
1796                    crate::constants::FIELD_ID_META_KEY.to_string(),
1797                    COL_ALPHA_I32.to_string(),
1798                )])),
1799                Field::new("alpha_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
1800                    crate::constants::FIELD_ID_META_KEY.to_string(),
1801                    COL_ALPHA_U32.to_string(),
1802                )])),
1803                Field::new("alpha_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1804                    crate::constants::FIELD_ID_META_KEY.to_string(),
1805                    COL_ALPHA_I16.to_string(),
1806                )])),
1807            ]));
1808            let batch = RecordBatch::try_new(
1809                schema,
1810                vec![
1811                    Arc::new(UInt64Array::from(alpha_rows.clone())),
1812                    Arc::new(UInt64Array::from(alpha_vals_u64.clone())),
1813                    Arc::new(Int32Array::from(alpha_vals_i32.clone())),
1814                    Arc::new(UInt32Array::from(alpha_vals_u32.clone())),
1815                    Arc::new(Int16Array::from(alpha_vals_i16.clone())),
1816                ],
1817            )
1818            .unwrap();
1819            table.append(&batch).unwrap();
1820        }
1821
1822        {
1823            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1824            let schema = Arc::new(Schema::new(vec![
1825                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1826                Field::new("beta_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
1827                    crate::constants::FIELD_ID_META_KEY.to_string(),
1828                    COL_BETA_U64.to_string(),
1829                )])),
1830                Field::new("beta_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
1831                    crate::constants::FIELD_ID_META_KEY.to_string(),
1832                    COL_BETA_U8.to_string(),
1833                )])),
1834            ]));
1835            let batch = RecordBatch::try_new(
1836                schema,
1837                vec![
1838                    Arc::new(UInt64Array::from(beta_rows.clone())),
1839                    Arc::new(UInt64Array::from(beta_vals_u64.clone())),
1840                    Arc::new(UInt8Array::from(beta_vals_u8.clone())),
1841                ],
1842            )
1843            .unwrap();
1844            table.append(&batch).unwrap();
1845        }
1846
1847        {
1848            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1849            let schema = Arc::new(Schema::new(vec![
1850                Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
1851                Field::new("gamma_i16", DataType::Int16, false).with_metadata(HashMap::from([(
1852                    crate::constants::FIELD_ID_META_KEY.to_string(),
1853                    COL_GAMMA_I16.to_string(),
1854                )])),
1855            ]));
1856            let batch = RecordBatch::try_new(
1857                schema,
1858                vec![
1859                    Arc::new(UInt64Array::from(gamma_rows.clone())),
1860                    Arc::new(Int16Array::from(gamma_vals_i16.clone())),
1861                ],
1862            )
1863            .unwrap();
1864            table.append(&batch).unwrap();
1865        }
1866
1867        // Second session: reopen each table and ensure schema and values are intact.
1868        {
1869            let table = Table::from_id(TABLE_ALPHA, Arc::clone(&pager)).unwrap();
1870            let store = table.store();
1871
1872            let expectations: &[(FieldId, DataType)] = &[
1873                (COL_ALPHA_U64, DataType::UInt64),
1874                (COL_ALPHA_I32, DataType::Int32),
1875                (COL_ALPHA_U32, DataType::UInt32),
1876                (COL_ALPHA_I16, DataType::Int16),
1877            ];
1878
1879            for &(col, ref ty) in expectations {
1880                let lfid = LogicalFieldId::for_user(TABLE_ALPHA, col);
1881                assert_eq!(store.data_type(lfid).unwrap(), *ty);
1882                let arr = gather_single(store, lfid, &alpha_rows);
1883                match ty {
1884                    DataType::UInt64 => {
1885                        let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap();
1886                        assert_eq!(arr.values(), alpha_vals_u64.as_slice());
1887                    }
1888                    DataType::Int32 => {
1889                        let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1890                        assert_eq!(arr.values(), alpha_vals_i32.as_slice());
1891                    }
1892                    DataType::UInt32 => {
1893                        let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
1894                        assert_eq!(arr.values(), alpha_vals_u32.as_slice());
1895                    }
1896                    DataType::Int16 => {
1897                        let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1898                        assert_eq!(arr.values(), alpha_vals_i16.as_slice());
1899                    }
1900                    other => panic!("unexpected dtype {other:?}"),
1901                }
1902            }
1903        }
1904
1905        {
1906            let table = Table::from_id(TABLE_BETA, Arc::clone(&pager)).unwrap();
1907            let store = table.store();
1908
1909            let lfid_u64 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U64);
1910            assert_eq!(store.data_type(lfid_u64).unwrap(), DataType::UInt64);
1911            let arr_u64 = gather_single(store, lfid_u64, &beta_rows);
1912            let arr_u64 = arr_u64.as_any().downcast_ref::<UInt64Array>().unwrap();
1913            assert_eq!(arr_u64.values(), beta_vals_u64.as_slice());
1914
1915            let lfid_u8 = LogicalFieldId::for_user(TABLE_BETA, COL_BETA_U8);
1916            assert_eq!(store.data_type(lfid_u8).unwrap(), DataType::UInt8);
1917            let arr_u8 = gather_single(store, lfid_u8, &beta_rows);
1918            let arr_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
1919            assert_eq!(arr_u8.values(), beta_vals_u8.as_slice());
1920        }
1921
1922        {
1923            let table = Table::from_id(TABLE_GAMMA, Arc::clone(&pager)).unwrap();
1924            let store = table.store();
1925            let lfid = LogicalFieldId::for_user(TABLE_GAMMA, COL_GAMMA_I16);
1926            assert_eq!(store.data_type(lfid).unwrap(), DataType::Int16);
1927            let arr = gather_single(store, lfid, &gamma_rows);
1928            let arr = arr.as_any().downcast_ref::<Int16Array>().unwrap();
1929            assert_eq!(arr.values(), gamma_vals_i16.as_slice());
1930        }
1931    }
1932
1933    #[test]
1934    fn test_scan_with_i32_filter() {
1935        let table = setup_test_table();
1936        const COL_A_U64: FieldId = 10;
1937        const COL_C_I32: FieldId = 12;
1938
1939        let filter = pred_expr(Filter {
1940            field_id: COL_C_I32,
1941            op: Operator::Equals(20.into()),
1942        });
1943
1944        let mut vals: Vec<Option<u64>> = Vec::new();
1945        table
1946            .scan_stream(
1947                &[proj(&table, COL_A_U64)],
1948                &filter,
1949                ScanStreamOptions::default(),
1950                |b| {
1951                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1952                    vals.extend(
1953                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1954                    );
1955                },
1956            )
1957            .unwrap();
1958        assert_eq!(vals, vec![Some(200), Some(200)]);
1959    }
1960
1961    #[test]
1962    fn test_scan_with_greater_than_filter() {
1963        let table = setup_test_table();
1964        const COL_A_U64: FieldId = 10;
1965        const COL_C_I32: FieldId = 12;
1966
1967        let filter = pred_expr(Filter {
1968            field_id: COL_C_I32,
1969            op: Operator::GreaterThan(15.into()),
1970        });
1971
1972        let mut vals: Vec<Option<u64>> = Vec::new();
1973        table
1974            .scan_stream(
1975                &[proj(&table, COL_A_U64)],
1976                &filter,
1977                ScanStreamOptions::default(),
1978                |b| {
1979                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
1980                    vals.extend(
1981                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
1982                    );
1983                },
1984            )
1985            .unwrap();
1986        assert_eq!(vals, vec![Some(200), Some(300), Some(200)]);
1987    }
1988
1989    #[test]
1990    fn test_scan_with_range_filter() {
1991        let table = setup_test_table();
1992        const COL_A_U64: FieldId = 10;
1993        const COL_C_I32: FieldId = 12;
1994
1995        let filter = pred_expr(Filter {
1996            field_id: COL_A_U64,
1997            op: Operator::Range {
1998                lower: Bound::Included(150.into()),
1999                upper: Bound::Excluded(300.into()),
2000            },
2001        });
2002
2003        let mut vals: Vec<Option<i32>> = Vec::new();
2004        table
2005            .scan_stream(
2006                &[proj(&table, COL_C_I32)],
2007                &filter,
2008                ScanStreamOptions::default(),
2009                |b| {
2010                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2011                    vals.extend(
2012                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
2013                    );
2014                },
2015            )
2016            .unwrap();
2017        assert_eq!(vals, vec![Some(20), Some(20)]);
2018    }
2019
2020    #[test]
2021    fn test_filtered_scan_sum_kernel() {
2022        // Trade-off note:
2023        // - We use Arrow's sum kernel per batch, then add the partial sums.
2024        // - This preserves Arrow null semantics and avoids concat.
2025        let table = setup_test_table();
2026        const COL_A_U64: FieldId = 10;
2027
2028        let filter = pred_expr(Filter {
2029            field_id: COL_A_U64,
2030            op: Operator::Range {
2031                lower: Bound::Included(150.into()),
2032                upper: Bound::Excluded(300.into()),
2033            },
2034        });
2035
2036        let mut total: u128 = 0;
2037        table
2038            .scan_stream(
2039                &[proj(&table, COL_A_U64)],
2040                &filter,
2041                ScanStreamOptions::default(),
2042                |b| {
2043                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2044                    if let Some(part) = sum(a) {
2045                        total += part as u128;
2046                    }
2047                },
2048            )
2049            .unwrap();
2050
2051        assert_eq!(total, 400);
2052    }
2053
2054    #[test]
2055    fn test_filtered_scan_sum_i32_kernel() {
2056        // Trade-off note:
2057        // - Per-batch sum + accumulate avoids building one big Array.
2058        // - For tiny batches overhead may match manual loops, but keeps
2059        //   Arrow semantics exact.
2060        let table = setup_test_table();
2061        const COL_A_U64: FieldId = 10;
2062        const COL_C_I32: FieldId = 12;
2063
2064        let candidates = [100.into(), 300.into()];
2065        let filter = pred_expr(Filter {
2066            field_id: COL_A_U64,
2067            op: Operator::In(&candidates),
2068        });
2069
2070        let mut total: i64 = 0;
2071        table
2072            .scan_stream(
2073                &[proj(&table, COL_C_I32)],
2074                &filter,
2075                ScanStreamOptions::default(),
2076                |b| {
2077                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2078                    if let Some(part) = sum(a) {
2079                        total += part as i64;
2080                    }
2081                },
2082            )
2083            .unwrap();
2084        assert_eq!(total, 40);
2085    }
2086
2087    #[test]
2088    fn test_filtered_scan_min_max_kernel() {
2089        // Trade-off note:
2090        // - min/max are computed per batch and folded. This preserves
2091        //   Arrow's null behavior and avoids concat.
2092        // - Be mindful of NaN semantics if extended to floats later.
2093        let table = setup_test_table();
2094        const COL_A_U64: FieldId = 10;
2095        const COL_C_I32: FieldId = 12;
2096
2097        let candidates = [100.into(), 300.into()];
2098        let filter = pred_expr(Filter {
2099            field_id: COL_A_U64,
2100            op: Operator::In(&candidates),
2101        });
2102
2103        let mut mn: Option<i32> = None;
2104        let mut mx: Option<i32> = None;
2105        table
2106            .scan_stream(
2107                &[proj(&table, COL_C_I32)],
2108                &filter,
2109                ScanStreamOptions::default(),
2110                |b| {
2111                    let a = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2112
2113                    if let Some(part_min) = min(a) {
2114                        mn = Some(mn.map_or(part_min, |m| m.min(part_min)));
2115                    }
2116                    if let Some(part_max) = max(a) {
2117                        mx = Some(mx.map_or(part_max, |m| m.max(part_max)));
2118                    }
2119                },
2120            )
2121            .unwrap();
2122        assert_eq!(mn, Some(10));
2123        assert_eq!(mx, Some(30));
2124    }
2125
2126    #[test]
2127    fn test_filtered_scan_float64_column() {
2128        let table = setup_test_table();
2129        const COL_D_F64: FieldId = 13;
2130
2131        let filter = pred_expr(Filter {
2132            field_id: COL_D_F64,
2133            op: Operator::GreaterThan(2.0_f64.into()),
2134        });
2135
2136        let mut got = Vec::new();
2137        table
2138            .scan_stream(
2139                &[proj(&table, COL_D_F64)],
2140                &filter,
2141                ScanStreamOptions::default(),
2142                |b| {
2143                    let arr = b.column(0).as_any().downcast_ref::<Float64Array>().unwrap();
2144                    for i in 0..arr.len() {
2145                        if arr.is_valid(i) {
2146                            got.push(arr.value(i));
2147                        }
2148                    }
2149                },
2150            )
2151            .unwrap();
2152
2153        assert_eq!(got, vec![2.5, 3.5, 2.5]);
2154    }
2155
2156    #[test]
2157    fn test_filtered_scan_float32_in_operator() {
2158        let table = setup_test_table();
2159        const COL_E_F32: FieldId = 14;
2160
2161        let candidates = [2.0_f32.into(), 3.0_f32.into()];
2162        let filter = pred_expr(Filter {
2163            field_id: COL_E_F32,
2164            op: Operator::In(&candidates),
2165        });
2166
2167        let mut vals: Vec<Option<f32>> = Vec::new();
2168        table
2169            .scan_stream(
2170                &[proj(&table, COL_E_F32)],
2171                &filter,
2172                ScanStreamOptions::default(),
2173                |b| {
2174                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
2175                    vals.extend((0..arr.len()).map(|i| {
2176                        if arr.is_null(i) {
2177                            None
2178                        } else {
2179                            Some(arr.value(i))
2180                        }
2181                    }));
2182                },
2183            )
2184            .unwrap();
2185
2186        let collected: Vec<f32> = vals.into_iter().flatten().collect();
2187        assert_eq!(collected, vec![2.0, 3.0, 2.0]);
2188    }
2189
2190    #[test]
2191    fn test_scan_stream_and_expression() {
2192        let table = setup_test_table();
2193        const COL_A_U64: FieldId = 10;
2194        const COL_C_I32: FieldId = 12;
2195        const COL_E_F32: FieldId = 14;
2196
2197        let expr = Expr::all_of(vec![
2198            Filter {
2199                field_id: COL_C_I32,
2200                op: Operator::GreaterThan(15.into()),
2201            },
2202            Filter {
2203                field_id: COL_A_U64,
2204                op: Operator::LessThan(250.into()),
2205            },
2206        ]);
2207
2208        let mut vals: Vec<Option<f32>> = Vec::new();
2209        table
2210            .scan_stream(
2211                &[proj(&table, COL_E_F32)],
2212                &expr,
2213                ScanStreamOptions::default(),
2214                |b| {
2215                    let arr = b.column(0).as_any().downcast_ref::<Float32Array>().unwrap();
2216                    vals.extend((0..arr.len()).map(|i| {
2217                        if arr.is_null(i) {
2218                            None
2219                        } else {
2220                            Some(arr.value(i))
2221                        }
2222                    }));
2223                },
2224            )
2225            .unwrap();
2226
2227        assert_eq!(vals, vec![Some(2.0_f32), Some(2.0_f32)]);
2228    }
2229
2230    #[test]
2231    fn test_scan_stream_or_expression() {
2232        let table = setup_test_table();
2233        const COL_A_U64: FieldId = 10;
2234        const COL_C_I32: FieldId = 12;
2235
2236        let expr = Expr::any_of(vec![
2237            Filter {
2238                field_id: COL_C_I32,
2239                op: Operator::Equals(10.into()),
2240            },
2241            Filter {
2242                field_id: COL_C_I32,
2243                op: Operator::Equals(30.into()),
2244            },
2245        ]);
2246
2247        let mut vals: Vec<Option<u64>> = Vec::new();
2248        table
2249            .scan_stream(
2250                &[proj(&table, COL_A_U64)],
2251                &expr,
2252                ScanStreamOptions::default(),
2253                |b| {
2254                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2255                    vals.extend((0..arr.len()).map(|i| {
2256                        if arr.is_null(i) {
2257                            None
2258                        } else {
2259                            Some(arr.value(i))
2260                        }
2261                    }));
2262                },
2263            )
2264            .unwrap();
2265
2266        assert_eq!(vals, vec![Some(100), Some(300)]);
2267    }
2268
2269    #[test]
2270    fn test_scan_stream_not_predicate() {
2271        let table = setup_test_table();
2272        const COL_A_U64: FieldId = 10;
2273        const COL_C_I32: FieldId = 12;
2274
2275        let expr = Expr::not(pred_expr(Filter {
2276            field_id: COL_C_I32,
2277            op: Operator::Equals(20.into()),
2278        }));
2279
2280        let mut vals: Vec<Option<u64>> = Vec::new();
2281        table
2282            .scan_stream(
2283                &[proj(&table, COL_A_U64)],
2284                &expr,
2285                ScanStreamOptions::default(),
2286                |b| {
2287                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2288                    vals.extend((0..arr.len()).map(|i| {
2289                        if arr.is_null(i) {
2290                            None
2291                        } else {
2292                            Some(arr.value(i))
2293                        }
2294                    }));
2295                },
2296            )
2297            .unwrap();
2298
2299        assert_eq!(vals, vec![Some(100), Some(300)]);
2300    }
2301
2302    #[test]
2303    fn test_scan_stream_not_and_expression() {
2304        let table = setup_test_table();
2305        const COL_A_U64: FieldId = 10;
2306        const COL_C_I32: FieldId = 12;
2307
2308        let expr = Expr::not(Expr::all_of(vec![
2309            Filter {
2310                field_id: COL_A_U64,
2311                op: Operator::GreaterThan(150.into()),
2312            },
2313            Filter {
2314                field_id: COL_C_I32,
2315                op: Operator::LessThan(40.into()),
2316            },
2317        ]));
2318
2319        let mut vals: Vec<Option<u64>> = Vec::new();
2320        table
2321            .scan_stream(
2322                &[proj(&table, COL_A_U64)],
2323                &expr,
2324                ScanStreamOptions::default(),
2325                |b| {
2326                    let arr = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2327                    vals.extend((0..arr.len()).map(|i| {
2328                        if arr.is_null(i) {
2329                            None
2330                        } else {
2331                            Some(arr.value(i))
2332                        }
2333                    }));
2334                },
2335            )
2336            .unwrap();
2337
2338        assert_eq!(vals, vec![Some(100)]);
2339    }
2340
2341    #[test]
2342    fn test_scan_stream_include_nulls_toggle() {
2343        let pager = Arc::new(MemPager::default());
2344        let table = setup_test_table_with_pager(&pager);
2345        const COL_A_U64: FieldId = 10;
2346        const COL_C_I32: FieldId = 12;
2347        const COL_B_BIN: FieldId = 11;
2348        const COL_D_F64: FieldId = 13;
2349        const COL_E_F32: FieldId = 14;
2350
2351        let schema = Arc::new(Schema::new(vec![
2352            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
2353            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
2354                crate::constants::FIELD_ID_META_KEY.to_string(),
2355                COL_A_U64.to_string(),
2356            )])),
2357            Field::new("b_bin", DataType::Binary, false).with_metadata(HashMap::from([(
2358                crate::constants::FIELD_ID_META_KEY.to_string(),
2359                COL_B_BIN.to_string(),
2360            )])),
2361            Field::new("c_i32", DataType::Int32, true).with_metadata(HashMap::from([(
2362                crate::constants::FIELD_ID_META_KEY.to_string(),
2363                COL_C_I32.to_string(),
2364            )])),
2365            Field::new("d_f64", DataType::Float64, false).with_metadata(HashMap::from([(
2366                crate::constants::FIELD_ID_META_KEY.to_string(),
2367                COL_D_F64.to_string(),
2368            )])),
2369            Field::new("e_f32", DataType::Float32, false).with_metadata(HashMap::from([(
2370                crate::constants::FIELD_ID_META_KEY.to_string(),
2371                COL_E_F32.to_string(),
2372            )])),
2373        ]));
2374
2375        let batch = RecordBatch::try_new(
2376            schema.clone(),
2377            vec![
2378                Arc::new(UInt64Array::from(vec![5, 6])),
2379                Arc::new(UInt64Array::from(vec![500, 600])),
2380                Arc::new(BinaryArray::from(vec![
2381                    Some(&b"new"[..]),
2382                    Some(&b"alt"[..]),
2383                ])),
2384                Arc::new(Int32Array::from(vec![Some(40), None])),
2385                Arc::new(Float64Array::from(vec![5.5, 6.5])),
2386                Arc::new(Float32Array::from(vec![5.0, 6.0])),
2387            ],
2388        )
2389        .unwrap();
2390        table.append(&batch).unwrap();
2391
2392        let filter = pred_expr(Filter {
2393            field_id: COL_A_U64,
2394            op: Operator::GreaterThan(450.into()),
2395        });
2396
2397        let mut default_vals: Vec<Option<i32>> = Vec::new();
2398        table
2399            .scan_stream(
2400                &[proj(&table, COL_C_I32)],
2401                &filter,
2402                ScanStreamOptions::default(),
2403                |b| {
2404                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2405                    default_vals.extend((0..arr.len()).map(|i| {
2406                        if arr.is_null(i) {
2407                            None
2408                        } else {
2409                            Some(arr.value(i))
2410                        }
2411                    }));
2412                },
2413            )
2414            .unwrap();
2415        assert_eq!(default_vals, vec![Some(40)]);
2416
2417        let mut include_null_vals: Vec<Option<i32>> = Vec::new();
2418        table
2419            .scan_stream(
2420                &[proj(&table, COL_C_I32)],
2421                &filter,
2422                ScanStreamOptions {
2423                    include_nulls: true,
2424                    order: None,
2425                    row_id_filter: None,
2426                    include_row_ids: true,
2427                },
2428                |b| {
2429                    let arr = b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2430
2431                    let mut paired_vals: Vec<(Option<i32>, Option<f64>)> = Vec::new();
2432                    table
2433                        .scan_stream(
2434                            &[proj(&table, COL_C_I32), proj(&table, COL_D_F64)],
2435                            &filter,
2436                            ScanStreamOptions::default(),
2437                            |b| {
2438                                assert_eq!(b.num_columns(), 2);
2439                                let c_arr =
2440                                    b.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
2441                                let d_arr =
2442                                    b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2443                                for i in 0..b.num_rows() {
2444                                    let c_val = if c_arr.is_null(i) {
2445                                        None
2446                                    } else {
2447                                        Some(c_arr.value(i))
2448                                    };
2449                                    let d_val = if d_arr.is_null(i) {
2450                                        None
2451                                    } else {
2452                                        Some(d_arr.value(i))
2453                                    };
2454                                    paired_vals.push((c_val, d_val));
2455                                }
2456                            },
2457                        )
2458                        .unwrap();
2459                    assert_eq!(paired_vals, vec![(Some(40), Some(5.5)), (None, Some(6.5))]);
2460                    include_null_vals.extend((0..arr.len()).map(|i| {
2461                        if arr.is_null(i) {
2462                            None
2463                        } else {
2464                            Some(arr.value(i))
2465                        }
2466                    }));
2467                },
2468            )
2469            .unwrap();
2470        assert_eq!(include_null_vals, vec![Some(40), None]);
2471    }
2472
2473    #[test]
2474    fn test_filtered_scan_int_sqrt_float64() {
2475        // Trade-off note:
2476        // - We cast per batch and apply a compute unary kernel for sqrt.
2477        // - This keeps processing streaming and avoids per-value loops.
2478        // - `unary` operates on `PrimitiveArray<T>`; cast and downcast to
2479        //   `Float64Array` first.
2480        let table = setup_test_table();
2481        const COL_A_U64: FieldId = 10;
2482        const COL_C_I32: FieldId = 12;
2483
2484        let filter = pred_expr(Filter {
2485            field_id: COL_C_I32,
2486            op: Operator::GreaterThan(15.into()),
2487        });
2488
2489        let mut got: Vec<f64> = Vec::new();
2490        table
2491            .scan_stream(
2492                &[proj(&table, COL_A_U64)],
2493                &filter,
2494                ScanStreamOptions::default(),
2495                |b| {
2496                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
2497                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
2498
2499                    // unary::<Float64Type, _, Float64Type>(...)
2500                    let sqrt_arr = unary::<
2501                        arrow::datatypes::Float64Type,
2502                        _,
2503                        arrow::datatypes::Float64Type,
2504                    >(f64_arr, |v: f64| v.sqrt());
2505
2506                    for i in 0..sqrt_arr.len() {
2507                        if !sqrt_arr.is_null(i) {
2508                            got.push(sqrt_arr.value(i));
2509                        }
2510                    }
2511                },
2512            )
2513            .unwrap();
2514
2515        let expected = [200_f64.sqrt(), 300_f64.sqrt(), 200_f64.sqrt()];
2516        assert_eq!(got, expected);
2517    }
2518
2519    #[test]
2520    fn test_multi_field_kernels_with_filters() {
2521        // Trade-off note:
2522        // - All reductions use per-batch kernels + accumulation to stay
2523        //   streaming. No concat or whole-column materialization.
2524        use arrow::array::{Int16Array, UInt8Array, UInt32Array};
2525
2526        let table = Table::from_id(2, Arc::new(MemPager::default())).unwrap();
2527
2528        const COL_A_U64: FieldId = 20;
2529        const COL_D_U32: FieldId = 21;
2530        const COL_E_I16: FieldId = 22;
2531        const COL_F_U8: FieldId = 23;
2532        const COL_C_I32: FieldId = 24;
2533
2534        let schema = Arc::new(Schema::new(vec![
2535            Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
2536            Field::new("a_u64", DataType::UInt64, false).with_metadata(HashMap::from([(
2537                crate::constants::FIELD_ID_META_KEY.to_string(),
2538                COL_A_U64.to_string(),
2539            )])),
2540            Field::new("d_u32", DataType::UInt32, false).with_metadata(HashMap::from([(
2541                crate::constants::FIELD_ID_META_KEY.to_string(),
2542                COL_D_U32.to_string(),
2543            )])),
2544            Field::new("e_i16", DataType::Int16, false).with_metadata(HashMap::from([(
2545                crate::constants::FIELD_ID_META_KEY.to_string(),
2546                COL_E_I16.to_string(),
2547            )])),
2548            Field::new("f_u8", DataType::UInt8, false).with_metadata(HashMap::from([(
2549                crate::constants::FIELD_ID_META_KEY.to_string(),
2550                COL_F_U8.to_string(),
2551            )])),
2552            Field::new("c_i32", DataType::Int32, false).with_metadata(HashMap::from([(
2553                crate::constants::FIELD_ID_META_KEY.to_string(),
2554                COL_C_I32.to_string(),
2555            )])),
2556        ]));
2557
2558        // Data: 5 rows. We will filter c_i32 >= 20 -> keep rows 2..5.
2559        let batch = RecordBatch::try_new(
2560            schema.clone(),
2561            vec![
2562                Arc::new(UInt64Array::from(vec![1, 2, 3, 4, 5])),
2563                Arc::new(UInt64Array::from(vec![100, 225, 400, 900, 1600])),
2564                Arc::new(UInt32Array::from(vec![7, 8, 9, 10, 11])),
2565                Arc::new(Int16Array::from(vec![-3, 0, 3, -6, 6])),
2566                Arc::new(UInt8Array::from(vec![2, 4, 6, 8, 10])),
2567                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
2568            ],
2569        )
2570        .unwrap();
2571
2572        table.append(&batch).unwrap();
2573
2574        // Filter: c_i32 >= 20.
2575        let filter = pred_expr(Filter {
2576            field_id: COL_C_I32,
2577            op: Operator::GreaterThanOrEquals(20.into()),
2578        });
2579
2580        // 1) SUM over d_u32 (per-batch sum + accumulate).
2581        let mut d_sum: u128 = 0;
2582        table
2583            .scan_stream(
2584                &[proj(&table, COL_D_U32)],
2585                &filter,
2586                ScanStreamOptions::default(),
2587                |b| {
2588                    let a = b.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
2589                    if let Some(part) = sum(a) {
2590                        d_sum += part as u128;
2591                    }
2592                },
2593            )
2594            .unwrap();
2595        assert_eq!(d_sum, (8 + 9 + 10 + 11) as u128);
2596
2597        // 2) MIN over e_i16 (per-batch min + fold).
2598        let mut e_min: Option<i16> = None;
2599        table
2600            .scan_stream(
2601                &[proj(&table, COL_E_I16)],
2602                &filter,
2603                ScanStreamOptions::default(),
2604                |b| {
2605                    let a = b.column(0).as_any().downcast_ref::<Int16Array>().unwrap();
2606                    if let Some(part_min) = min(a) {
2607                        e_min = Some(e_min.map_or(part_min, |m| m.min(part_min)));
2608                    }
2609                },
2610            )
2611            .unwrap();
2612        assert_eq!(e_min, Some(-6));
2613
2614        // 3) MAX over f_u8 (per-batch max + fold).
2615        let mut f_max: Option<u8> = None;
2616        table
2617            .scan_stream(
2618                &[proj(&table, COL_F_U8)],
2619                &filter,
2620                ScanStreamOptions::default(),
2621                |b| {
2622                    let a = b
2623                        .column(0)
2624                        .as_any()
2625                        .downcast_ref::<arrow::array::UInt8Array>()
2626                        .unwrap();
2627                    if let Some(part_max) = max(a) {
2628                        f_max = Some(f_max.map_or(part_max, |m| m.max(part_max)));
2629                    }
2630                },
2631            )
2632            .unwrap();
2633        assert_eq!(f_max, Some(10));
2634
2635        // 4) SQRT over a_u64 (cast to f64, then unary sqrt per batch).
2636        let mut got: Vec<f64> = Vec::new();
2637        table
2638            .scan_stream(
2639                &[proj(&table, COL_A_U64)],
2640                &filter,
2641                ScanStreamOptions::default(),
2642                |b| {
2643                    let casted = cast(b.column(0), &arrow::datatypes::DataType::Float64).unwrap();
2644                    let f64_arr = casted.as_any().downcast_ref::<Float64Array>().unwrap();
2645                    let sqrt_arr = unary::<
2646                        arrow::datatypes::Float64Type,
2647                        _,
2648                        arrow::datatypes::Float64Type,
2649                    >(f64_arr, |v: f64| v.sqrt());
2650
2651                    for i in 0..sqrt_arr.len() {
2652                        if !sqrt_arr.is_null(i) {
2653                            got.push(sqrt_arr.value(i));
2654                        }
2655                    }
2656                },
2657            )
2658            .unwrap();
2659        let expected = [15.0_f64, 20.0, 30.0, 40.0];
2660        assert_eq!(got, expected);
2661    }
2662
2663    #[test]
2664    fn test_scan_with_in_filter() {
2665        let table = setup_test_table();
2666        const COL_A_U64: FieldId = 10;
2667        const COL_C_I32: FieldId = 12;
2668
2669        // IN now uses untyped literals, too.
2670        let candidates = [10.into(), 30.into()];
2671        let filter = pred_expr(Filter {
2672            field_id: COL_C_I32,
2673            op: Operator::In(&candidates),
2674        });
2675
2676        let mut vals: Vec<Option<u64>> = Vec::new();
2677        table
2678            .scan_stream(
2679                &[proj(&table, COL_A_U64)],
2680                &filter,
2681                ScanStreamOptions::default(),
2682                |b| {
2683                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2684                    vals.extend(
2685                        (0..a.len()).map(|i| if a.is_null(i) { None } else { Some(a.value(i)) }),
2686                    );
2687                },
2688            )
2689            .unwrap();
2690        assert_eq!(vals, vec![Some(100), Some(300)]);
2691    }
2692
2693    #[test]
2694    fn test_scan_stream_single_column_batches() {
2695        let table = setup_test_table();
2696        const COL_A_U64: FieldId = 10;
2697        const COL_C_I32: FieldId = 12;
2698
2699        // Filter c_i32 == 20 -> two rows; stream a_u64 in batches of <= N.
2700        let filter = pred_expr(Filter {
2701            field_id: COL_C_I32,
2702            op: Operator::Equals(20.into()),
2703        });
2704
2705        let mut seen_cols = Vec::<u64>::new();
2706        table
2707            .scan_stream(
2708                &[proj(&table, COL_A_U64)],
2709                &filter,
2710                ScanStreamOptions::default(),
2711                |b| {
2712                    assert_eq!(b.num_columns(), 1);
2713                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2714                    // No kernel needed; just collect values for shape assertions.
2715                    for i in 0..a.len() {
2716                        if !a.is_null(i) {
2717                            seen_cols.push(a.value(i));
2718                        }
2719                    }
2720                },
2721            )
2722            .unwrap();
2723
2724        // In fixture, c_i32 == 20 corresponds to a_u64 values [200, 200].
2725        assert_eq!(seen_cols, vec![200, 200]);
2726    }
2727
2728    #[test]
2729    fn test_scan_with_multiple_projection_columns() {
2730        let table = setup_test_table();
2731        const COL_A_U64: FieldId = 10;
2732        const COL_C_I32: FieldId = 12;
2733
2734        let filter = pred_expr(Filter {
2735            field_id: COL_C_I32,
2736            op: Operator::Equals(20.into()),
2737        });
2738
2739        let expected_names = [COL_A_U64.to_string(), COL_C_I32.to_string()];
2740
2741        let mut combined: Vec<(Option<u64>, Option<i32>)> = Vec::new();
2742        table
2743            .scan_stream(
2744                &[proj(&table, COL_A_U64), proj(&table, COL_C_I32)],
2745                &filter,
2746                ScanStreamOptions::default(),
2747                |b| {
2748                    assert_eq!(b.num_columns(), 2);
2749                    assert_eq!(b.schema().field(0).name(), &expected_names[0]);
2750                    assert_eq!(b.schema().field(1).name(), &expected_names[1]);
2751
2752                    let a = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2753                    let c = b.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
2754                    for i in 0..b.num_rows() {
2755                        let left = if a.is_null(i) { None } else { Some(a.value(i)) };
2756                        let right = if c.is_null(i) { None } else { Some(c.value(i)) };
2757                        combined.push((left, right));
2758                    }
2759                },
2760            )
2761            .unwrap();
2762
2763        assert_eq!(combined, vec![(Some(200), Some(20)), (Some(200), Some(20))]);
2764    }
2765
2766    #[test]
2767    fn test_scan_stream_projection_validation() {
2768        let table = setup_test_table();
2769        const COL_A_U64: FieldId = 10;
2770        const COL_C_I32: FieldId = 12;
2771
2772        let filter = pred_expr(Filter {
2773            field_id: COL_C_I32,
2774            op: Operator::Equals(20.into()),
2775        });
2776
2777        let empty: [Projection; 0] = [];
2778        let result = table.scan_stream(&empty, &filter, ScanStreamOptions::default(), |_batch| {});
2779        assert!(matches!(result, Err(Error::InvalidArgumentError(_))));
2780
2781        // Duplicate projections are allowed: the same column will be
2782        // gathered once and duplicated in the output in the requested
2783        // order. Verify the call succeeds and produces two identical
2784        // columns per batch.
2785        let duplicate = [
2786            proj(&table, COL_A_U64),
2787            proj_alias(&table, COL_A_U64, "alias_a"),
2788        ];
2789        let mut collected = Vec::<u64>::new();
2790        table
2791            .scan_stream(&duplicate, &filter, ScanStreamOptions::default(), |b| {
2792                assert_eq!(b.num_columns(), 2);
2793                assert_eq!(b.schema().field(0).name(), &COL_A_U64.to_string());
2794                assert_eq!(b.schema().field(1).name(), "alias_a");
2795                let a0 = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2796                let a1 = b.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
2797                for i in 0..b.num_rows() {
2798                    if !a0.is_null(i) {
2799                        collected.push(a0.value(i));
2800                    }
2801                    if !a1.is_null(i) {
2802                        collected.push(a1.value(i));
2803                    }
2804                }
2805            })
2806            .unwrap();
2807        // Two matching rows, two columns per row -> four values.
2808        assert_eq!(collected, vec![200, 200, 200, 200]);
2809    }
2810
2811    #[test]
2812    fn test_scan_stream_computed_projection() {
2813        let table = setup_test_table();
2814        const COL_A_U64: FieldId = 10;
2815
2816        let projections = [
2817            ScanProjection::column(proj(&table, COL_A_U64)),
2818            ScanProjection::computed(
2819                ScalarExpr::binary(
2820                    ScalarExpr::column(COL_A_U64),
2821                    BinaryOp::Multiply,
2822                    ScalarExpr::literal(2),
2823                ),
2824                "a_times_two",
2825            ),
2826        ];
2827
2828        let filter = pred_expr(Filter {
2829            field_id: COL_A_U64,
2830            op: Operator::GreaterThanOrEquals(0.into()),
2831        });
2832
2833        let mut computed: Vec<(u64, f64)> = Vec::new();
2834        table
2835            .scan_stream_with_exprs(&projections, &filter, ScanStreamOptions::default(), |b| {
2836                assert_eq!(b.num_columns(), 2);
2837                let base = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2838                let comp = b.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
2839                for i in 0..b.num_rows() {
2840                    if base.is_null(i) || comp.is_null(i) {
2841                        continue;
2842                    }
2843                    computed.push((base.value(i), comp.value(i)));
2844                }
2845            })
2846            .unwrap();
2847
2848        let expected = vec![(100, 200.0), (200, 400.0), (300, 600.0), (200, 400.0)];
2849        assert_eq!(computed, expected);
2850    }
2851
2852    #[test]
2853    fn test_scan_stream_multi_column_filter_compare() {
2854        let table = setup_test_table();
2855        const COL_A_U64: FieldId = 10;
2856        const COL_C_I32: FieldId = 12;
2857
2858        let expr = Expr::Compare {
2859            left: ScalarExpr::binary(
2860                ScalarExpr::column(COL_A_U64),
2861                BinaryOp::Add,
2862                ScalarExpr::column(COL_C_I32),
2863            ),
2864            op: CompareOp::Gt,
2865            right: ScalarExpr::literal(220_i64),
2866        };
2867
2868        let mut vals: Vec<Option<u64>> = Vec::new();
2869        table
2870            .scan_stream(
2871                &[proj(&table, COL_A_U64)],
2872                &expr,
2873                ScanStreamOptions::default(),
2874                |b| {
2875                    let col = b.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
2876                    for i in 0..b.num_rows() {
2877                        vals.push(if col.is_null(i) {
2878                            None
2879                        } else {
2880                            Some(col.value(i))
2881                        });
2882                    }
2883                },
2884            )
2885            .unwrap();
2886
2887        assert_eq!(vals.into_iter().flatten().collect::<Vec<_>>(), vec![300]);
2888    }
2889}