1use std::sync::Arc;
8
9use arrow::array::RecordBatch;
10use arrow::datatypes::DataType;
11use croaring::Treemap;
12use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext, Projection};
13use llkv_expr::{Expr, ScalarExpr};
14use llkv_result::Result as LlkvResult;
15use llkv_storage::pager::{MemPager, Pager};
16use llkv_types::{FieldId, LogicalFieldId, RowId, TableId};
17use rustc_hash::FxHashMap;
18use simd_r_drive_entry_handle::EntryHandle;
19pub mod row_stream;
20pub use row_stream::ScanRowStream;
21pub use row_stream::{
22 ColumnProjectionInfo, ComputedProjectionInfo, ProjectionEval, RowChunk, RowIdSource, RowStream,
23 RowStreamBuilder, materialize_row_window,
24};
25
26pub mod execute;
27pub mod ordering;
28pub mod predicate;
29pub use ordering::sort_row_ids_with_order;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum ScanOrderDirection {
34 Ascending,
35 Descending,
36}
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum ScanOrderTransform {
41 IdentityInt64,
42 IdentityInt32,
43 IdentityUtf8,
44 CastUtf8ToInteger,
45}
46
47#[derive(Clone, Copy, Debug)]
49pub struct ScanOrderSpec {
50 pub field_id: FieldId,
51 pub direction: ScanOrderDirection,
52 pub nulls_first: bool,
53 pub transform: ScanOrderTransform,
54}
55
56#[derive(Clone, Debug)]
58pub enum ScanProjection {
59 Column(Projection),
60 Computed {
61 expr: ScalarExpr<FieldId>,
62 alias: String,
63 },
64}
65
66impl ScanProjection {
67 pub fn column<P: Into<Projection>>(proj: P) -> Self {
68 Self::Column(proj.into())
69 }
70
71 pub fn computed<S: Into<String>>(expr: ScalarExpr<FieldId>, alias: S) -> Self {
72 Self::Computed {
73 expr,
74 alias: alias.into(),
75 }
76 }
77}
78
79impl From<Projection> for ScanProjection {
80 fn from(value: Projection) -> Self {
81 ScanProjection::Column(value)
82 }
83}
84
85impl From<&Projection> for ScanProjection {
86 fn from(value: &Projection) -> Self {
87 ScanProjection::Column(value.clone())
88 }
89}
90
91impl From<&ScanProjection> for ScanProjection {
92 fn from(value: &ScanProjection) -> Self {
93 value.clone()
94 }
95}
96
97pub struct ScanStreamOptions<P = MemPager>
99where
100 P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102 pub include_nulls: bool,
103 pub order: Option<ScanOrderSpec>,
104 pub row_id_filter: Option<Arc<dyn RowIdFilter<P>>>,
105 pub include_row_ids: bool,
106}
107
108impl<P> Clone for ScanStreamOptions<P>
109where
110 P: Pager<Blob = EntryHandle> + Send + Sync,
111{
112 fn clone(&self) -> Self {
113 Self {
114 include_nulls: self.include_nulls,
115 order: self.order,
116 row_id_filter: self.row_id_filter.clone(),
117 include_row_ids: self.include_row_ids,
118 }
119 }
120}
121
122impl<P> Default for ScanStreamOptions<P>
123where
124 P: Pager<Blob = EntryHandle> + Send + Sync,
125{
126 fn default() -> Self {
127 Self {
128 include_nulls: false,
129 order: None,
130 row_id_filter: None,
131 include_row_ids: false,
132 }
133 }
134}
135
136impl<P> std::fmt::Debug for ScanStreamOptions<P>
137where
138 P: Pager<Blob = EntryHandle> + Send + Sync,
139{
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("ScanStreamOptions")
142 .field("include_nulls", &self.include_nulls)
143 .field("order", &self.order)
144 .field(
145 "row_id_filter",
146 &self.row_id_filter.as_ref().map(|_| "<RowIdFilter>"),
147 )
148 .field("include_row_ids", &self.include_row_ids)
149 .finish()
150 }
151}
152
153pub trait RowIdFilter<P>: Send + Sync
155where
156 P: Pager<Blob = EntryHandle> + Send + Sync,
157{
158 fn filter(
159 &self,
160 table_id: TableId,
161 storage: &dyn ScanStorage<P>,
162 row_ids: Treemap,
163 ) -> LlkvResult<Treemap>;
164}
165
166pub trait ScanStorage<P>: Send + Sync
168where
169 P: Pager<Blob = EntryHandle> + Send + Sync,
170{
171 fn table_id(&self) -> TableId;
172 fn field_data_type(&self, fid: LogicalFieldId) -> LlkvResult<DataType>;
173 fn total_rows(&self) -> LlkvResult<u64>;
174 fn all_row_ids(&self) -> LlkvResult<croaring::Treemap>;
175 fn prepare_gather_context(
176 &self,
177 logical_fields: &[LogicalFieldId],
178 ) -> LlkvResult<MultiGatherContext>;
179 fn gather_row_window_with_context(
180 &self,
181 logical_fields: &[LogicalFieldId],
182 row_ids: &[u64],
183 null_policy: GatherNullPolicy,
184 ctx: Option<&mut MultiGatherContext>,
185 ) -> LlkvResult<RecordBatch>;
186 fn filter_row_ids<'expr>(
187 &self,
188 filter_expr: &Expr<'expr, FieldId>,
189 ) -> LlkvResult<croaring::Treemap>;
190
191 fn filter_leaf(
193 &self,
194 filter: &llkv_compute::program::OwnedFilter,
195 ) -> LlkvResult<croaring::Treemap>;
196
197 fn filter_fused(
199 &self,
200 field_id: FieldId,
201 filters: &[llkv_compute::program::OwnedFilter],
202 cache: &llkv_compute::analysis::PredicateFusionCache,
203 ) -> LlkvResult<RowIdSource>;
204
205 fn sorted_row_ids_full_table(&self, order_spec: ScanOrderSpec) -> LlkvResult<Option<Vec<u64>>>;
211
212 fn stream_row_ids(
213 &self,
214 chunk_size: usize,
215 on_chunk: &mut dyn FnMut(&[RowId]) -> LlkvResult<()>,
216 ) -> LlkvResult<()>;
217 fn as_any(&self) -> &dyn std::any::Any;
218}
219
220pub type NumericArrayMap = FxHashMap<FieldId, arrow_array::ArrayRef>;