llkv_executor/scan/
mod.rs

1use arrow::array::RecordBatch;
2use croaring::Treemap;
3use llkv_expr::Expr;
4use llkv_plan::{PlanGraph, ProgramSet};
5use llkv_result::{Error, Result as ExecutorResult};
6use llkv_table::table::Table as LlkvTable;
7use llkv_types::{FieldId, LogicalFieldId, RowId, TableId};
8use simd_r_drive_entry_handle::EntryHandle;
9
10use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
11use llkv_compute::analysis::PredicateFusionCache;
12use llkv_compute::program::OwnedFilter;
13use llkv_scan::{RowIdSource, ScanProjection, ScanStorage, ScanStreamOptions};
14use llkv_storage::pager::Pager;
15
16impl<P> ScanStorage<P> for crate::types::TableStorageAdapter<P>
17where
18    P: Pager<Blob = EntryHandle> + Send + Sync,
19{
20    fn table_id(&self) -> TableId {
21        self.table().table_id()
22    }
23
24    fn field_data_type(&self, fid: LogicalFieldId) -> ExecutorResult<arrow::datatypes::DataType> {
25        self.table().store().data_type(fid)
26    }
27
28    fn total_rows(&self) -> ExecutorResult<u64> {
29        self.table().total_rows()
30    }
31
32    fn prepare_gather_context(
33        &self,
34        logical_fields: &[LogicalFieldId],
35    ) -> ExecutorResult<MultiGatherContext> {
36        self.table().store().prepare_gather_context(logical_fields)
37    }
38
39    fn gather_row_window_with_context(
40        &self,
41        logical_fields: &[LogicalFieldId],
42        row_ids: &[u64],
43        null_policy: GatherNullPolicy,
44        ctx: Option<&mut MultiGatherContext>,
45    ) -> ExecutorResult<RecordBatch> {
46        self.table().store().gather_row_window_with_context(
47            logical_fields,
48            row_ids,
49            null_policy,
50            ctx,
51        )
52    }
53
54    fn filter_row_ids<'expr>(&self, filter_expr: &Expr<'expr, FieldId>) -> ExecutorResult<Treemap> {
55        self.table().filter_row_ids(filter_expr)
56    }
57
58    fn all_row_ids(&self) -> ExecutorResult<Treemap> {
59        self.table().all_row_ids()
60    }
61
62    fn sorted_row_ids_full_table(
63        &self,
64        order_spec: llkv_scan::ScanOrderSpec,
65    ) -> ExecutorResult<Option<Vec<u64>>> {
66        use llkv_scan::ScanStorage;
67        <LlkvTable<P> as ScanStorage<P>>::sorted_row_ids_full_table(self.table(), order_spec)
68    }
69
70    fn filter_leaf(&self, filter: &OwnedFilter) -> ExecutorResult<Treemap> {
71        self.table().filter_leaf(filter)
72    }
73
74    fn filter_fused(
75        &self,
76        field_id: FieldId,
77        filters: &[OwnedFilter],
78        cache: &PredicateFusionCache,
79    ) -> ExecutorResult<RowIdSource> {
80        self.table().filter_fused(field_id, filters, cache)
81    }
82
83    fn stream_row_ids(
84        &self,
85        chunk_size: usize,
86        on_chunk: &mut dyn FnMut(&[RowId]) -> ExecutorResult<()>,
87    ) -> ExecutorResult<()> {
88        use llkv_expr::{Expr, Filter, Operator};
89        use std::ops::Bound;
90
91        let ids = self.table().filter_row_ids(&Expr::Pred(Filter {
92            field_id: llkv_table::ROW_ID_FIELD_ID,
93            op: Operator::Range {
94                lower: Bound::Unbounded,
95                upper: Bound::Unbounded,
96            },
97        }))?;
98        let rows: Vec<u64> = ids.iter().collect();
99        for chunk in rows.chunks(chunk_size.max(1)) {
100            on_chunk(chunk)?;
101        }
102        Ok(())
103    }
104
105    fn as_any(&self) -> &dyn std::any::Any {
106        self
107    }
108}
109
110/// Thin wrapper capturing what the executor needs to run a scan.
111pub struct ScanExecutor<'a, P, S>
112where
113    P: Pager<Blob = EntryHandle> + Send + Sync,
114    S: ScanStorage<P>,
115{
116    storage: &'a S,
117    _phantom: std::marker::PhantomData<P>,
118}
119
120impl<'a, P, S> ScanExecutor<'a, P, S>
121where
122    P: Pager<Blob = EntryHandle> + Send + Sync,
123    S: ScanStorage<P>,
124{
125    pub fn new(storage: &'a S) -> Self {
126        Self {
127            storage,
128            _phantom: std::marker::PhantomData,
129        }
130    }
131
132    pub fn table_id(&self) -> TableId {
133        self.storage.table_id()
134    }
135
136    /// Execute a scan using the executor's storage abstraction.
137    pub fn execute<'expr, F>(
138        &self,
139        _plan_graph: PlanGraph,
140        _programs: ProgramSet<'expr>,
141        projections: &[ScanProjection],
142        filter_expr: &Expr<'expr, FieldId>,
143        options: ScanStreamOptions<P>,
144        on_batch: &mut F,
145    ) -> ExecutorResult<()>
146    where
147        F: FnMut(RecordBatch),
148    {
149        let table = self
150            .storage
151            .as_any()
152            .downcast_ref::<LlkvTable<P>>()
153            .ok_or_else(|| {
154                Error::InvalidArgumentError(
155                    "scan executor requires table-backed storage for now".into(),
156                )
157            })?;
158
159        table.scan_stream_with_exprs(projections, filter_expr, options, on_batch)
160    }
161}