1use arrow::datatypes::DataType;
2
3use llkv_expr::{Expr, ScalarExpr};
4use llkv_types::{FieldId, LogicalFieldId, TableId};
5
6use crate::{
7 PlanEdge, PlanExpression, PlanField, PlanGraph, PlanGraphBuilder, PlanGraphError, PlanNode,
8 PlanNodeId, PlanOperator,
9};
10
11pub enum TableScanProjectionSpec {
14 Column {
15 logical_field_id: LogicalFieldId,
16 data_type: DataType,
17 alias: Option<String>,
18 },
19 Computed {
20 expr: ScalarExpr<FieldId>,
21 alias: String,
22 data_type: DataType,
23 },
24}
25
26pub fn build_table_scan_plan(
31 table_id: TableId,
32 projections: &[TableScanProjectionSpec],
33 filter_expr: &Expr<'_, FieldId>,
34 include_nulls: bool,
35) -> Result<PlanGraph, PlanGraphError> {
36 let mut builder = PlanGraphBuilder::new();
37
38 let scan_node_id = PlanNodeId::new(1);
39 let mut scan_node = PlanNode::new(scan_node_id, PlanOperator::TableScan);
40 scan_node.metadata.insert("table_id", table_id.to_string());
41 scan_node
42 .metadata
43 .insert("projection_count", projections.len().to_string());
44 builder.add_node(scan_node)?;
45 builder.add_root(scan_node_id)?;
46
47 let mut next_node = 2u32;
48 let mut parent = scan_node_id;
49
50 if !filter_expr.is_trivially_true() {
51 let filter_node_id = PlanNodeId::new(next_node);
52 next_node += 1;
53 let mut filter_node = PlanNode::new(filter_node_id, PlanOperator::Filter);
54 filter_node.add_predicate(PlanExpression::new(filter_expr.format_display()));
55 builder.add_node(filter_node)?;
56 builder.add_edge(PlanEdge::new(parent, filter_node_id))?;
57 parent = filter_node_id;
58 }
59
60 let project_node_id = PlanNodeId::new(next_node);
61 next_node += 1;
62 let mut project_node = PlanNode::new(project_node_id, PlanOperator::Project);
63
64 for projection in projections {
65 match projection {
66 TableScanProjectionSpec::Column {
67 logical_field_id,
68 data_type,
69 alias,
70 } => {
71 let fallback = logical_field_id.field_id().to_string();
72 let name = alias.clone().unwrap_or(fallback);
73 project_node.add_projection(PlanExpression::new(format!("column({name})")));
74 project_node.add_field(
75 PlanField::new(name, format!("{data_type:?}")).with_nullability(true),
76 );
77 }
78 TableScanProjectionSpec::Computed {
79 expr,
80 alias,
81 data_type,
82 } => {
83 project_node.add_projection(PlanExpression::new(format!(
84 "{} := {}",
85 alias,
86 expr.format_display()
87 )));
88 project_node.add_field(
89 PlanField::new(alias.clone(), format!("{data_type:?}")).with_nullability(true),
90 );
91 }
92 }
93 }
94
95 builder.add_node(project_node)?;
96 builder.add_edge(PlanEdge::new(parent, project_node_id))?;
97 parent = project_node_id;
98
99 let output_node_id = PlanNodeId::new(next_node);
100 let mut output_node = PlanNode::new(output_node_id, PlanOperator::Output);
101 output_node
102 .metadata
103 .insert("include_nulls", include_nulls.to_string());
104 builder.add_node(output_node)?;
105 builder.add_edge(PlanEdge::new(parent, output_node_id))?;
106
107 let annotations = builder.annotations_mut();
108 annotations.description = Some("table.scan_stream".to_string());
109 annotations
110 .properties
111 .insert("table_id".to_string(), table_id.to_string());
112
113 builder.finish()
114}