llkv_runtime/
runtime_lazy_frame.rs

1use std::sync::Arc;
2
3use llkv_executor::{ExecutorRowBatch, SelectExecution};
4use llkv_expr::expr::Expr as LlkvExpr;
5use llkv_plan::{AggregateExpr, PlanValue, SelectFilter, SelectPlan, SelectProjection};
6use llkv_result::Result;
7use llkv_storage::pager::Pager;
8use simd_r_drive_entry_handle::EntryHandle;
9
10use crate::{RuntimeContext, canonical_table_name};
11
12/// Lazily built logical plan (thin wrapper over SelectPlan).
13pub struct RuntimeLazyFrame<P>
14where
15    P: Pager<Blob = EntryHandle> + Send + Sync,
16{
17    context: Arc<RuntimeContext<P>>,
18    plan: SelectPlan,
19}
20
21impl<P> RuntimeLazyFrame<P>
22where
23    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
24{
25    pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
26        let (display, canonical) = canonical_table_name(table)?;
27        context.lookup_table(&canonical)?;
28        Ok(Self {
29            context,
30            plan: SelectPlan::new(display),
31        })
32    }
33
34    pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
35        self.plan.filter = Some(SelectFilter {
36            predicate,
37            subqueries: Vec::new(),
38        });
39        self
40    }
41
42    pub fn select_all(mut self) -> Self {
43        self.plan.projections = vec![SelectProjection::AllColumns];
44        self
45    }
46
47    pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
48    where
49        S: AsRef<str>,
50    {
51        self.plan.projections = columns
52            .into_iter()
53            .map(|name| SelectProjection::Column {
54                name: name.as_ref().to_string(),
55                alias: None,
56            })
57            .collect();
58        self
59    }
60
61    pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
62        self.plan.projections = projections;
63        self
64    }
65
66    pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
67        self.plan.aggregates = aggregates;
68        self
69    }
70
71    pub fn collect(self) -> Result<SelectExecution<P>> {
72        let snapshot = self.context.default_snapshot();
73        self.context.execute_select(self.plan, snapshot)
74    }
75
76    pub fn collect_rows(self) -> Result<ExecutorRowBatch> {
77        let snapshot = self.context.default_snapshot();
78        let execution = self.context.execute_select(self.plan, snapshot)?;
79        execution.collect_rows()
80    }
81
82    pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
83        Ok(self.collect_rows()?.rows)
84    }
85}