llkv_scan/
lib.rs

1//! Shared scan interfaces and streaming helpers for LLKV.
2//!
3//! This crate is intended to host the storage-agnostic scan surface used by
4//! both the table layer and the executor. It currently contains the core scan
5//! types and storage abstraction; execution wiring will migrate here over time.
6
7use std::sync::Arc;
8
9use arrow::array::RecordBatch;
10use arrow::datatypes::DataType;
11use croaring::Treemap;
12use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext, Projection};
13use llkv_expr::{Expr, ScalarExpr};
14use llkv_result::Result as LlkvResult;
15use llkv_storage::pager::{MemPager, Pager};
16use llkv_types::{FieldId, LogicalFieldId, RowId, TableId};
17use rustc_hash::FxHashMap;
18use simd_r_drive_entry_handle::EntryHandle;
19pub mod row_stream;
20pub use row_stream::ScanRowStream;
21pub use row_stream::{
22    ColumnProjectionInfo, ComputedProjectionInfo, ProjectionEval, RowChunk, RowIdSource, RowStream,
23    RowStreamBuilder, materialize_row_window,
24};
25
26pub mod execute;
27pub mod ordering;
28pub mod predicate;
29pub use ordering::sort_row_ids_with_order;
30
31/// Sort direction for scan ordering.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum ScanOrderDirection {
34    Ascending,
35    Descending,
36}
37
38/// Value transformation to apply before sorting.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum ScanOrderTransform {
41    IdentityInt64,
42    IdentityInt32,
43    IdentityUtf8,
44    CastUtf8ToInteger,
45}
46
47/// Specification for ordering scan results.
48#[derive(Clone, Copy, Debug)]
49pub struct ScanOrderSpec {
50    pub field_id: FieldId,
51    pub direction: ScanOrderDirection,
52    pub nulls_first: bool,
53    pub transform: ScanOrderTransform,
54}
55
56/// A column or computed expression to include in scan results.
57#[derive(Clone, Debug)]
58pub enum ScanProjection {
59    Column(Projection),
60    Computed {
61        expr: ScalarExpr<FieldId>,
62        alias: String,
63    },
64}
65
66impl ScanProjection {
67    pub fn column<P: Into<Projection>>(proj: P) -> Self {
68        Self::Column(proj.into())
69    }
70
71    pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
72        Self::Computed {
73            expr,
74            alias: alias.into(),
75        }
76    }
77}
78
79impl From<Projection> for ScanProjection {
80    fn from(value: Projection) -> Self {
81        ScanProjection::Column(value)
82    }
83}
84
85impl From<&Projection> for ScanProjection {
86    fn from(value: &Projection) -> Self {
87        ScanProjection::Column(value.clone())
88    }
89}
90
91impl From<&ScanProjection> for ScanProjection {
92    fn from(value: &ScanProjection) -> Self {
93        value.clone()
94    }
95}
96
97/// Options for configuring table scans.
98pub struct ScanStreamOptions<P = MemPager>
99where
100    P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102    pub include_nulls: bool,
103    pub order: Option<ScanOrderSpec>,
104    pub row_id_filter: Option<Arc<dyn RowIdFilter<P>>>,
105    pub include_row_ids: bool,
106}
107
108impl<P> Clone for ScanStreamOptions<P>
109where
110    P: Pager<Blob = EntryHandle> + Send + Sync,
111{
112    fn clone(&self) -> Self {
113        Self {
114            include_nulls: self.include_nulls,
115            order: self.order,
116            row_id_filter: self.row_id_filter.clone(),
117            include_row_ids: self.include_row_ids,
118        }
119    }
120}
121
122impl<P> Default for ScanStreamOptions<P>
123where
124    P: Pager<Blob = EntryHandle> + Send + Sync,
125{
126    fn default() -> Self {
127        Self {
128            include_nulls: false,
129            order: None,
130            row_id_filter: None,
131            include_row_ids: false,
132        }
133    }
134}
135
136impl<P> std::fmt::Debug for ScanStreamOptions<P>
137where
138    P: Pager<Blob = EntryHandle> + Send + Sync,
139{
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("ScanStreamOptions")
142            .field("include_nulls", &self.include_nulls)
143            .field("order", &self.order)
144            .field(
145                "row_id_filter",
146                &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
147            )
148            .field("include_row_ids", &self.include_row_ids)
149            .finish()
150    }
151}
152
153/// Filter row IDs before they are materialized into batches.
154pub trait RowIdFilter<P>: Send + Sync
155where
156    P: Pager<Blob = EntryHandle> + Send + Sync,
157{
158    fn filter(
159        &self,
160        table_id: TableId,
161        storage: &dyn ScanStorage<P>,
162        row_ids: Treemap,
163    ) -> LlkvResult<Treemap>;
164}
165
166/// Capabilities the scan executor needs from storage.
167pub trait ScanStorage<P>: Send + Sync
168where
169    P: Pager<Blob = EntryHandle> + Send + Sync,
170{
171    fn table_id(&self) -> TableId;
172    fn field_data_type(&self, fid: LogicalFieldId) -> LlkvResult<DataType>;
173    fn total_rows(&self) -> LlkvResult<u64>;
174    fn all_row_ids(&self) -> LlkvResult<croaring::Treemap>;
175    fn prepare_gather_context(
176        &self,
177        logical_fields: &[LogicalFieldId],
178    ) -> LlkvResult<MultiGatherContext>;
179    fn gather_row_window_with_context(
180        &self,
181        logical_fields: &[LogicalFieldId],
182        row_ids: &[u64],
183        null_policy: GatherNullPolicy,
184        ctx: Option<&mut MultiGatherContext>,
185    ) -> LlkvResult<RecordBatch>;
186    fn filter_row_ids<'expr>(
187        &self,
188        filter_expr: &Expr<'expr, FieldId>,
189    ) -> LlkvResult<croaring::Treemap>;
190
191    /// Evaluate a leaf predicate (single column filter) against the storage.
192    fn filter_leaf(
193        &self,
194        filter: &llkv_compute::program::OwnedFilter,
195    ) -> LlkvResult<croaring::Treemap>;
196
197    /// Evaluate fused predicates against a single column.
198    fn filter_fused(
199        &self,
200        field_id: FieldId,
201        filters: &[llkv_compute::program::OwnedFilter],
202        cache: &llkv_compute::analysis::PredicateFusionCache,
203    ) -> LlkvResult<RowIdSource>;
204
205    /// Optionally return row IDs ordered by a column's sorted permutation when
206    /// the caller is scanning the entire table without additional filtering.
207    ///
208    /// Implementations should return `Ok(None)` when the storage backend cannot
209    /// satisfy the request for the given [`ScanOrderSpec`].
210    fn sorted_row_ids_full_table(&self, order_spec: ScanOrderSpec) -> LlkvResult<Option<Vec<u64>>>;
211
212    fn stream_row_ids(
213        &self,
214        chunk_size: usize,
215        on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
216    ) -> LlkvResult<()>;
217    fn as_any(&self) -> &dyn std::any::Any;
218}
219
220/// Utility alias for tracked numeric arrays during computed projection evaluation.
221pub type NumericArrayMap = FxHashMap<FieldId, arrow_array::ArrayRef>;