1use arrow_array::RecordBatch;
5use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
6use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec};
7use datafusion_common::{Column, DFSchema};
8use datafusion_physical_expr::PhysicalExpr;
9use futures::TryStreamExt;
10use snafu::location;
11use std::{
12 collections::{HashMap, HashSet},
13 sync::Arc,
14};
15
16use lance_core::{
17 datatypes::{OnMissing, Projectable, Projection, Schema},
18 Error, Result, ROW_ADDR, ROW_ID, ROW_OFFSET,
19};
20
21use crate::{
22 exec::{execute_plan, LanceExecutionOptions, OneShotExec},
23 planner::Planner,
24};
25
26#[derive(Clone, Debug)]
27pub struct OutputColumn {
28 pub expr: Expr,
30 pub name: String,
32}
33
34#[derive(Clone, Debug)]
35pub struct ProjectionPlan {
36 pub physical_projection: Projection,
38
39 pub must_add_row_offset: bool,
41
42 pub requested_output_expr: Vec<OutputColumn>,
44}
45
46impl ProjectionPlan {
47 fn add_system_columns(schema: &ArrowSchema) -> ArrowSchema {
48 let mut fields = Vec::from_iter(schema.fields.iter().cloned());
49 fields.push(Arc::new(ArrowField::new(ROW_ID, DataType::UInt64, true)));
50 fields.push(Arc::new(ArrowField::new(ROW_ADDR, DataType::UInt64, true)));
51 fields.push(Arc::new(ArrowField::new(
52 ROW_OFFSET,
53 DataType::UInt64,
54 true,
55 )));
56 ArrowSchema::new(fields)
57 }
58
59 pub fn from_expressions(
61 base: Arc<dyn Projectable>,
62 columns: &[(impl AsRef<str>, impl AsRef<str>)],
63 ) -> Result<Self> {
64 let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
66 let full_schema = Arc::new(Self::add_system_columns(&full_schema));
67 let planner = Planner::new(full_schema);
68 let mut output = HashMap::new();
69 let mut physical_cols_set = HashSet::new();
70 let mut physical_cols = vec![];
71 let mut needs_row_id = false;
72 let mut needs_row_addr = false;
73 let mut must_add_row_offset = false;
74 for (output_name, raw_expr) in columns {
75 if output.contains_key(output_name.as_ref()) {
76 return Err(Error::io(
77 format!("Duplicate column name: {}", output_name.as_ref()),
78 location!(),
79 ));
80 }
81
82 let expr = planner.parse_expr(raw_expr.as_ref())?;
83
84 if let Expr::Column(Column {
86 name,
87 relation: None,
88 ..
89 }) = &expr
90 {
91 if name == ROW_ID {
92 needs_row_id = true;
93 } else if name == ROW_ADDR {
94 needs_row_addr = true;
95 } else if name == ROW_OFFSET {
96 must_add_row_offset = true;
97 }
98 }
99
100 for col in Planner::column_names_in_expr(&expr) {
101 if physical_cols_set.contains(&col) {
102 continue;
103 }
104 physical_cols.push(col.clone());
105 physical_cols_set.insert(col);
106 }
107 output.insert(output_name.as_ref().to_string(), expr);
108 }
109
110 let mut physical_projection =
116 Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
117
118 physical_projection.with_row_id = needs_row_id;
119 physical_projection.with_row_addr = needs_row_addr || must_add_row_offset;
120
121 let mut output_cols = vec![];
123 for (name, _) in columns {
124 output_cols.push(OutputColumn {
125 expr: output[name.as_ref()].clone(),
126 name: name.as_ref().to_string(),
127 });
128 }
129
130 Ok(Self {
131 physical_projection,
132 must_add_row_offset,
133 requested_output_expr: output_cols,
134 })
135 }
136
137 pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
172 let physical_projection = Projection::empty(base).union_schema(projection);
178 let mut must_add_row_offset = false;
179 let exprs = projection
182 .fields
183 .iter()
184 .map(|f| {
185 if f.name == ROW_ADDR {
186 must_add_row_offset = true;
187 }
188 OutputColumn {
189 expr: Expr::Column(Column::from_name(&f.name)),
190 name: f.name.clone(),
191 }
192 })
193 .collect::<Vec<_>>();
194 Ok(Self {
195 physical_projection,
196 requested_output_expr: exprs,
197 must_add_row_offset,
198 })
199 }
200
201 pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
202 let physical_cols: Vec<&str> = base
203 .schema()
204 .fields
205 .iter()
206 .map(|f| f.name.as_ref())
207 .collect::<Vec<_>>();
208
209 let physical_projection =
210 Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
211
212 let requested_output_expr = physical_cols
213 .into_iter()
214 .map(|col_name| OutputColumn {
215 expr: Expr::Column(Column::from_name(col_name)),
216 name: col_name.to_string(),
217 })
218 .collect();
219
220 Ok(Self {
221 physical_projection,
222 must_add_row_offset: false,
223 requested_output_expr,
224 })
225 }
226
227 pub fn to_physical_exprs(
231 &self,
232 current_schema: &ArrowSchema,
233 ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
234 let physical_df_schema = Arc::new(DFSchema::try_from(current_schema.clone())?);
235 self.requested_output_expr
236 .iter()
237 .map(|output_column| {
238 Ok((
239 datafusion::physical_expr::create_physical_expr(
240 &output_column.expr,
241 physical_df_schema.as_ref(),
242 &Default::default(),
243 )?,
244 output_column.name.clone(),
245 ))
246 })
247 .collect::<Result<Vec<_>>>()
248 }
249
250 pub fn include_row_id(&mut self) {
252 self.physical_projection.with_row_id = true;
253 if !self
254 .requested_output_expr
255 .iter()
256 .any(|OutputColumn { name, .. }| name == ROW_ID)
257 {
258 self.requested_output_expr.push(OutputColumn {
259 expr: Expr::Column(Column::from_name(ROW_ID)),
260 name: ROW_ID.to_string(),
261 });
262 }
263 }
264
265 pub fn include_row_addr(&mut self) {
267 self.physical_projection.with_row_addr = true;
268 if !self
269 .requested_output_expr
270 .iter()
271 .any(|OutputColumn { name, .. }| name == ROW_ADDR)
272 {
273 self.requested_output_expr.push(OutputColumn {
274 expr: Expr::Column(Column::from_name(ROW_ADDR)),
275 name: ROW_ADDR.to_string(),
276 });
277 }
278 }
279
280 pub fn include_row_offset(&mut self) {
281 self.physical_projection.with_row_addr = true;
283 self.must_add_row_offset = true;
284 if !self
285 .requested_output_expr
286 .iter()
287 .any(|OutputColumn { name, .. }| name == ROW_OFFSET)
288 {
289 self.requested_output_expr.push(OutputColumn {
290 expr: Expr::Column(Column::from_name(ROW_OFFSET)),
291 name: ROW_OFFSET.to_string(),
292 });
293 }
294 }
295
296 pub fn has_output_cols(&self) -> bool {
301 !self.requested_output_expr.is_empty()
302 }
303
304 pub fn output_schema(&self) -> Result<ArrowSchema> {
305 let exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
306 let physical_schema = self.physical_projection.to_arrow_schema();
307 let fields = exprs
308 .iter()
309 .map(|(expr, name)| {
310 Ok(ArrowField::new(
311 name,
312 expr.data_type(&physical_schema)?,
313 expr.nullable(&physical_schema)?,
314 ))
315 })
316 .collect::<Result<Vec<_>>>()?;
317 Ok(ArrowSchema::new(fields))
318 }
319
320 pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
321 let src = Arc::new(OneShotExec::from_batch(batch));
322 let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
323 let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
324 let stream = execute_plan(projection, LanceExecutionOptions::default())?;
325 let batches = stream.try_collect::<Vec<_>>().await?;
326 if batches.len() != 1 {
327 Err(Error::Internal {
328 message: "Expected exactly one batch".to_string(),
329 location: location!(),
330 })
331 } else {
332 Ok(batches.into_iter().next().unwrap())
333 }
334 }
335}