llkv_runtime/
runtime_lazy_frame.rs

1use std::sync::Arc;
2
3use llkv_executor::{ExecutorRowBatch, SelectExecution};
4use llkv_expr::expr::Expr as LlkvExpr;
5use llkv_plan::{AggregateExpr, PlanValue, SelectPlan, SelectProjection};
6use llkv_result::Result;
7use llkv_storage::pager::Pager;
8use simd_r_drive_entry_handle::EntryHandle;
9
10use crate::{RuntimeContext, canonical_table_name};
11
12/// Lazily built logical plan (thin wrapper over SelectPlan).
13pub struct RuntimeLazyFrame<P>
14where
15    P: Pager<Blob = EntryHandle> + Send + Sync,
16{
17    context: Arc<RuntimeContext<P>>,
18    plan: SelectPlan,
19}
20
21impl<P> RuntimeLazyFrame<P>
22where
23    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
24{
25    pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
26        let (display, canonical) = canonical_table_name(table)?;
27        context.lookup_table(&canonical)?;
28        Ok(Self {
29            context,
30            plan: SelectPlan::new(display),
31        })
32    }
33
34    pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
35        self.plan.filter = Some(predicate);
36        self
37    }
38
39    pub fn select_all(mut self) -> Self {
40        self.plan.projections = vec![SelectProjection::AllColumns];
41        self
42    }
43
44    pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
45    where
46        S: AsRef<str>,
47    {
48        self.plan.projections = columns
49            .into_iter()
50            .map(|name| SelectProjection::Column {
51                name: name.as_ref().to_string(),
52                alias: None,
53            })
54            .collect();
55        self
56    }
57
58    pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
59        self.plan.projections = projections;
60        self
61    }
62
63    pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
64        self.plan.aggregates = aggregates;
65        self
66    }
67
68    pub fn collect(self) -> Result<SelectExecution<P>> {
69        let snapshot = self.context.default_snapshot();
70        self.context.execute_select(self.plan, snapshot)
71    }
72
73    pub fn collect_rows(self) -> Result<ExecutorRowBatch> {
74        let snapshot = self.context.default_snapshot();
75        let execution = self.context.execute_select(self.plan, snapshot)?;
76        execution.collect_rows()
77    }
78
79    pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
80        Ok(self.collect_rows()?.rows)
81    }
82}