llkv_executor/scan/
mod.rs1use arrow::array::RecordBatch;
2use croaring::Treemap;
3use llkv_expr::Expr;
4use llkv_plan::{PlanGraph, ProgramSet};
5use llkv_result::{Error, Result as ExecutorResult};
6use llkv_table::table::Table as LlkvTable;
7use llkv_types::{FieldId, LogicalFieldId, RowId, TableId};
8use simd_r_drive_entry_handle::EntryHandle;
9
10use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
11use llkv_compute::analysis::PredicateFusionCache;
12use llkv_compute::program::OwnedFilter;
13use llkv_scan::{RowIdSource, ScanProjection, ScanStorage, ScanStreamOptions};
14use llkv_storage::pager::Pager;
15
16impl<P> ScanStorage<P> for crate::types::TableStorageAdapter<P>
17where
18 P: Pager<Blob = EntryHandle> + Send + Sync,
19{
20 fn table_id(&self) -> TableId {
21 self.table().table_id()
22 }
23
24 fn field_data_type(&self, fid: LogicalFieldId) -> ExecutorResult<arrow::datatypes::DataType> {
25 self.table().store().data_type(fid)
26 }
27
28 fn total_rows(&self) -> ExecutorResult<u64> {
29 self.table().total_rows()
30 }
31
32 fn prepare_gather_context(
33 &self,
34 logical_fields: &[LogicalFieldId],
35 ) -> ExecutorResult<MultiGatherContext> {
36 self.table().store().prepare_gather_context(logical_fields)
37 }
38
39 fn gather_row_window_with_context(
40 &self,
41 logical_fields: &[LogicalFieldId],
42 row_ids: &[u64],
43 null_policy: GatherNullPolicy,
44 ctx: Option<&mut MultiGatherContext>,
45 ) -> ExecutorResult<RecordBatch> {
46 self.table().store().gather_row_window_with_context(
47 logical_fields,
48 row_ids,
49 null_policy,
50 ctx,
51 )
52 }
53
54 fn filter_row_ids<'expr>(&self, filter_expr: &Expr<'expr, FieldId>) -> ExecutorResult<Treemap> {
55 self.table().filter_row_ids(filter_expr)
56 }
57
58 fn all_row_ids(&self) -> ExecutorResult<Treemap> {
59 self.table().all_row_ids()
60 }
61
62 fn sorted_row_ids_full_table(
63 &self,
64 order_spec: llkv_scan::ScanOrderSpec,
65 ) -> ExecutorResult<Option<Vec<u64>>> {
66 use llkv_scan::ScanStorage;
67 <LlkvTable<P> as ScanStorage<P>>::sorted_row_ids_full_table(self.table(), order_spec)
68 }
69
70 fn filter_leaf(&self, filter: &OwnedFilter) -> ExecutorResult<Treemap> {
71 self.table().filter_leaf(filter)
72 }
73
74 fn filter_fused(
75 &self,
76 field_id: FieldId,
77 filters: &[OwnedFilter],
78 cache: &PredicateFusionCache,
79 ) -> ExecutorResult<RowIdSource> {
80 self.table().filter_fused(field_id, filters, cache)
81 }
82
83 fn stream_row_ids(
84 &self,
85 chunk_size: usize,
86 on_chunk: &mut dyn FnMut(&[RowId]) -> ExecutorResult<()>,
87 ) -> ExecutorResult<()> {
88 use llkv_expr::{Expr, Filter, Operator};
89 use std::ops::Bound;
90
91 let ids = self.table().filter_row_ids(&Expr::Pred(Filter {
92 field_id: llkv_table::ROW_ID_FIELD_ID,
93 op: Operator::Range {
94 lower: Bound::Unbounded,
95 upper: Bound::Unbounded,
96 },
97 }))?;
98 let rows: Vec<u64> = ids.iter().collect();
99 for chunk in rows.chunks(chunk_size.max(1)) {
100 on_chunk(chunk)?;
101 }
102 Ok(())
103 }
104
105 fn as_any(&self) -> &dyn std::any::Any {
106 self
107 }
108}
109
110pub struct ScanExecutor<'a, P, S>
112where
113 P: Pager<Blob = EntryHandle> + Send + Sync,
114 S: ScanStorage<P>,
115{
116 storage: &'a S,
117 _phantom: std::marker::PhantomData<P>,
118}
119
120impl<'a, P, S> ScanExecutor<'a, P, S>
121where
122 P: Pager<Blob = EntryHandle> + Send + Sync,
123 S: ScanStorage<P>,
124{
125 pub fn new(storage: &'a S) -> Self {
126 Self {
127 storage,
128 _phantom: std::marker::PhantomData,
129 }
130 }
131
132 pub fn table_id(&self) -> TableId {
133 self.storage.table_id()
134 }
135
136 pub fn execute<'expr, F>(
138 &self,
139 _plan_graph: PlanGraph,
140 _programs: ProgramSet<'expr>,
141 projections: &[ScanProjection],
142 filter_expr: &Expr<'expr, FieldId>,
143 options: ScanStreamOptions<P>,
144 on_batch: &mut F,
145 ) -> ExecutorResult<()>
146 where
147 F: FnMut(RecordBatch),
148 {
149 let table = self
150 .storage
151 .as_any()
152 .downcast_ref::<LlkvTable<P>>()
153 .ok_or_else(|| {
154 Error::InvalidArgumentError(
155 "scan executor requires table-backed storage for now".into(),
156 )
157 })?;
158
159 table.scan_stream_with_exprs(projections, filter_expr, options, on_batch)
160 }
161}