llkv_runtime/
runtime_lazy_frame.rs1use std::sync::Arc;
2
3use llkv_executor::{ExecutorRowBatch, SelectExecution};
4use llkv_expr::expr::Expr as LlkvExpr;
5use llkv_plan::{AggregateExpr, PlanValue, SelectFilter, SelectPlan, SelectProjection};
6use llkv_result::Result;
7use llkv_storage::pager::Pager;
8use simd_r_drive_entry_handle::EntryHandle;
9
10use crate::{RuntimeContext, canonical_table_name};
11
12pub struct RuntimeLazyFrame<P>
14where
15 P: Pager<Blob = EntryHandle> + Send + Sync,
16{
17 context: Arc<RuntimeContext<P>>,
18 plan: SelectPlan,
19}
20
21impl<P> RuntimeLazyFrame<P>
22where
23 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
24{
25 pub fn scan(context: Arc<RuntimeContext<P>>, table: &str) -> Result<Self> {
26 let (display, canonical) = canonical_table_name(table)?;
27 context.lookup_table(&canonical)?;
28 Ok(Self {
29 context,
30 plan: SelectPlan::new(display),
31 })
32 }
33
34 pub fn filter(mut self, predicate: LlkvExpr<'static, String>) -> Self {
35 self.plan.filter = Some(SelectFilter {
36 predicate,
37 subqueries: Vec::new(),
38 });
39 self
40 }
41
42 pub fn select_all(mut self) -> Self {
43 self.plan.projections = vec![SelectProjection::AllColumns];
44 self
45 }
46
47 pub fn select_columns<S>(mut self, columns: impl IntoIterator<Item = S>) -> Self
48 where
49 S: AsRef<str>,
50 {
51 self.plan.projections = columns
52 .into_iter()
53 .map(|name| SelectProjection::Column {
54 name: name.as_ref().to_string(),
55 alias: None,
56 })
57 .collect();
58 self
59 }
60
61 pub fn select(mut self, projections: Vec<SelectProjection>) -> Self {
62 self.plan.projections = projections;
63 self
64 }
65
66 pub fn aggregate(mut self, aggregates: Vec<AggregateExpr>) -> Self {
67 self.plan.aggregates = aggregates;
68 self
69 }
70
71 pub fn collect(self) -> Result<SelectExecution<P>> {
72 let snapshot = self.context.default_snapshot();
73 self.context.execute_select(self.plan, snapshot)
74 }
75
76 pub fn collect_rows(self) -> Result<ExecutorRowBatch> {
77 let snapshot = self.context.default_snapshot();
78 let execution = self.context.execute_select(self.plan, snapshot)?;
79 execution.collect_rows()
80 }
81
82 pub fn collect_rows_vec(self) -> Result<Vec<Vec<PlanValue>>> {
83 Ok(self.collect_rows()?.rows)
84 }
85}