lance_datafusion/
projection.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::RecordBatch;
5use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
6use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec};
7use datafusion_common::{Column, DFSchema};
8use datafusion_physical_expr::PhysicalExpr;
9use futures::TryStreamExt;
10use snafu::location;
11use std::{
12    collections::{HashMap, HashSet},
13    sync::Arc,
14};
15
16use lance_core::{
17    datatypes::{OnMissing, Projectable, Projection, Schema},
18    Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION,
19    ROW_OFFSET, WILDCARD,
20};
21
22use crate::{
23    exec::{execute_plan, LanceExecutionOptions, OneShotExec},
24    planner::Planner,
25};
26
27struct ProjectionBuilder {
28    base: Arc<dyn Projectable>,
29    planner: Planner,
30    output: HashMap<String, Expr>,
31    output_cols: Vec<OutputColumn>,
32    physical_cols_set: HashSet<String>,
33    physical_cols: Vec<String>,
34    needs_row_id: bool,
35    needs_row_addr: bool,
36    needs_row_last_updated_at: bool,
37    needs_row_created_at: bool,
38    must_add_row_offset: bool,
39    has_wildcard: bool,
40}
41
42impl ProjectionBuilder {
43    fn new(base: Arc<dyn Projectable>) -> Self {
44        let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
45        let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
46        let planner = Planner::new(full_schema);
47
48        Self {
49            base,
50            planner,
51            output: HashMap::default(),
52            output_cols: Vec::default(),
53            physical_cols_set: HashSet::default(),
54            physical_cols: Vec::default(),
55            needs_row_id: false,
56            needs_row_addr: false,
57            needs_row_created_at: false,
58            needs_row_last_updated_at: false,
59            must_add_row_offset: false,
60            has_wildcard: false,
61        }
62    }
63
64    fn check_duplicate_column(&self, name: &str) -> Result<()> {
65        if self.output.contains_key(name) {
66            return Err(Error::io(
67                format!("Duplicate column name: {}", name),
68                location!(),
69            ));
70        }
71        Ok(())
72    }
73
74    fn add_column(&mut self, output_name: &str, raw_expr: &str) -> Result<()> {
75        self.check_duplicate_column(output_name)?;
76
77        let expr = self.planner.parse_expr(raw_expr)?;
78
79        // If the expression is a bare column reference to a system column, mark that we need it
80        if let Expr::Column(Column {
81            name,
82            relation: None,
83            ..
84        }) = &expr
85        {
86            if name == ROW_ID {
87                self.needs_row_id = true;
88            } else if name == ROW_ADDR {
89                self.needs_row_addr = true;
90            } else if name == ROW_OFFSET {
91                self.must_add_row_offset = true;
92            } else if name == ROW_LAST_UPDATED_AT_VERSION {
93                self.needs_row_last_updated_at = true;
94            } else if name == ROW_CREATED_AT_VERSION {
95                self.needs_row_created_at = true;
96            }
97        }
98
99        for col in Planner::column_names_in_expr(&expr) {
100            if self.physical_cols_set.contains(&col) {
101                continue;
102            }
103            self.physical_cols.push(col.clone());
104            self.physical_cols_set.insert(col);
105        }
106        self.output.insert(output_name.to_string(), expr.clone());
107
108        self.output_cols.push(OutputColumn {
109            expr,
110            name: output_name.to_string(),
111        });
112
113        Ok(())
114    }
115
116    fn add_columns(&mut self, columns: &[(impl AsRef<str>, impl AsRef<str>)]) -> Result<()> {
117        for (output_name, raw_expr) in columns {
118            if raw_expr.as_ref() == WILDCARD {
119                self.has_wildcard = true;
120                for col in self.base.schema().fields.iter().map(|f| f.name.as_str()) {
121                    self.check_duplicate_column(col)?;
122                    self.output_cols.push(OutputColumn {
123                        expr: Expr::Column(Column::from_name(col)),
124                        name: col.to_string(),
125                    });
126                    // Throw placeholder expr in self.output, this will trigger error on duplicates
127                    self.output.insert(col.to_string(), Expr::default());
128                }
129            } else {
130                self.add_column(output_name.as_ref(), raw_expr.as_ref())?;
131            }
132        }
133        Ok(())
134    }
135
136    fn build(self) -> Result<ProjectionPlan> {
137        // Now, calculate the physical projection from the columns referenced by the expressions
138        //
139        // If a column is missing it might be a system column (_rowid, _distance, etc.) and so
140        // we ignore it.  We don't need to load that column from disk at least, which is all we are
141        // trying to calculate here.
142        let mut physical_projection = if self.has_wildcard {
143            Projection::full(self.base.clone())
144        } else {
145            Projection::empty(self.base.clone())
146                .union_columns(&self.physical_cols, OnMissing::Ignore)?
147        };
148
149        physical_projection.with_row_id = self.needs_row_id;
150        physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
151        physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
152        physical_projection.with_row_created_at_version = self.needs_row_created_at;
153
154        Ok(ProjectionPlan {
155            physical_projection,
156            must_add_row_offset: self.must_add_row_offset,
157            requested_output_expr: self.output_cols,
158        })
159    }
160}
161
162#[derive(Clone, Debug)]
163pub struct OutputColumn {
164    /// The expression that represents the output column
165    pub expr: Expr,
166    /// The name of the output column
167    pub name: String,
168}
169
170#[derive(Clone, Debug)]
171pub struct ProjectionPlan {
172    /// The physical schema that must be loaded from the dataset
173    pub physical_projection: Projection,
174
175    /// Needs the row address converted into a row offset
176    pub must_add_row_offset: bool,
177
178    /// The desired output columns
179    pub requested_output_expr: Vec<OutputColumn>,
180}
181
182impl ProjectionPlan {
183    fn add_system_columns(schema: &ArrowSchema) -> ArrowSchema {
184        let mut fields = Vec::from_iter(schema.fields.iter().cloned());
185        fields.push(Arc::new(ArrowField::new(ROW_ID, DataType::UInt64, true)));
186        fields.push(Arc::new(ArrowField::new(ROW_ADDR, DataType::UInt64, true)));
187        fields.push(Arc::new(ArrowField::new(
188            ROW_OFFSET,
189            DataType::UInt64,
190            true,
191        )));
192        fields.push(Arc::new(
193            (*lance_core::ROW_LAST_UPDATED_AT_VERSION_FIELD).clone(),
194        ));
195        fields.push(Arc::new(
196            (*lance_core::ROW_CREATED_AT_VERSION_FIELD).clone(),
197        ));
198        ArrowSchema::new(fields)
199    }
200
201    /// Set the projection from SQL expressions
202    pub fn from_expressions(
203        base: Arc<dyn Projectable>,
204        columns: &[(impl AsRef<str>, impl AsRef<str>)],
205    ) -> Result<Self> {
206        let mut builder = ProjectionBuilder::new(base);
207        builder.add_columns(columns)?;
208        builder.build()
209    }
210
211    /// Set the projection from a schema
212    ///
213    /// This plan will have no complex expressions, the schema must be a subset of the dataset schema.
214    ///
215    /// With this approach it is possible to refer to portions of nested fields.
216    ///
217    /// For example, if the schema is:
218    ///
219    /// ```ignore
220    /// {
221    ///   "metadata": {
222    ///     "location": {
223    ///       "x": f32,
224    ///       "y": f32,
225    ///     },
226    ///     "age": i32,
227    ///   }
228    /// }
229    /// ```
230    ///
231    /// It is possible to project a partial schema that drops `y` like:
232    ///
233    /// ```ignore
234    /// {
235    ///   "metadata": {
236    ///     "location": {
237    ///       "x": f32,
238    ///     },
239    ///     "age": i32,
240    ///   }
241    /// }
242    /// ```
243    ///
244    /// This is something that cannot be done easily using expressions.
245    pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
246        // Separate data columns from system columns
247        // System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection,
248        // not as fields in the Schema
249        let mut data_fields = Vec::new();
250        let mut with_row_id = false;
251        let mut with_row_addr = false;
252        let mut must_add_row_offset = false;
253
254        for field in projection.fields.iter() {
255            if lance_core::is_system_column(&field.name) {
256                // Handle known system columns that can be included in projections
257                if field.name == ROW_ID {
258                    with_row_id = true;
259                    must_add_row_offset = true;
260                } else if field.name == ROW_ADDR {
261                    with_row_addr = true;
262                    must_add_row_offset = true;
263                }
264                // Note: Other system columns like _rowoffset are computed differently
265                // and shouldn't appear in the schema at this point
266            } else {
267                // Regular data column - validate it exists in base schema
268                if base.schema().field(&field.name).is_none() {
269                    return Err(Error::io(
270                        format!("Column '{}' not found in schema", field.name),
271                        location!(),
272                    ));
273                }
274                data_fields.push(field.clone());
275            }
276        }
277
278        // Create a schema with only data columns for the physical projection
279        let data_schema = Schema {
280            fields: data_fields,
281            metadata: projection.metadata.clone(),
282        };
283
284        // Calculate the physical projection from data columns only
285        let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
286        physical_projection.with_row_id = with_row_id;
287        physical_projection.with_row_addr = with_row_addr;
288
289        // Build output expressions preserving the original order (including system columns)
290        let exprs = projection
291            .fields
292            .iter()
293            .map(|f| OutputColumn {
294                expr: Expr::Column(Column::from_name(&f.name)),
295                name: f.name.clone(),
296            })
297            .collect::<Vec<_>>();
298
299        Ok(Self {
300            physical_projection,
301            requested_output_expr: exprs,
302            must_add_row_offset,
303        })
304    }
305
306    pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
307        let physical_cols: Vec<&str> = base
308            .schema()
309            .fields
310            .iter()
311            .map(|f| f.name.as_ref())
312            .collect::<Vec<_>>();
313
314        let physical_projection =
315            Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
316
317        let requested_output_expr = physical_cols
318            .into_iter()
319            .map(|col_name| OutputColumn {
320                expr: Expr::Column(Column::from_name(col_name)),
321                name: col_name.to_string(),
322            })
323            .collect();
324
325        Ok(Self {
326            physical_projection,
327            must_add_row_offset: false,
328            requested_output_expr,
329        })
330    }
331
332    /// Convert the projection to a list of physical expressions
333    ///
334    /// This is used to apply the final projection (including dynamic expressions) to the data.
335    pub fn to_physical_exprs(
336        &self,
337        current_schema: &ArrowSchema,
338    ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
339        let physical_df_schema = Arc::new(DFSchema::try_from(current_schema.clone())?);
340        self.requested_output_expr
341            .iter()
342            .map(|output_column| {
343                Ok((
344                    datafusion::physical_expr::create_physical_expr(
345                        &output_column.expr,
346                        physical_df_schema.as_ref(),
347                        &Default::default(),
348                    )?,
349                    output_column.name.clone(),
350                ))
351            })
352            .collect::<Result<Vec<_>>>()
353    }
354
355    /// Include the row id in the output
356    pub fn include_row_id(&mut self) {
357        self.physical_projection.with_row_id = true;
358        if !self
359            .requested_output_expr
360            .iter()
361            .any(|OutputColumn { name, .. }| name == ROW_ID)
362        {
363            self.requested_output_expr.push(OutputColumn {
364                expr: Expr::Column(Column::from_name(ROW_ID)),
365                name: ROW_ID.to_string(),
366            });
367        }
368    }
369
370    /// Include the row address in the output
371    pub fn include_row_addr(&mut self) {
372        self.physical_projection.with_row_addr = true;
373        if !self
374            .requested_output_expr
375            .iter()
376            .any(|OutputColumn { name, .. }| name == ROW_ADDR)
377        {
378            self.requested_output_expr.push(OutputColumn {
379                expr: Expr::Column(Column::from_name(ROW_ADDR)),
380                name: ROW_ADDR.to_string(),
381            });
382        }
383    }
384
385    /// Check if the projection has any output columns
386    ///
387    /// This doesn't mean there is a physical projection.  For example, we may someday support
388    /// something like `SELECT 1 AS foo` which would have an output column (foo) but no physical projection
389    pub fn has_output_cols(&self) -> bool {
390        !self.requested_output_expr.is_empty()
391    }
392
393    pub fn output_schema(&self) -> Result<ArrowSchema> {
394        let exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
395        let physical_schema = self.physical_projection.to_arrow_schema();
396        let fields = exprs
397            .iter()
398            .map(|(expr, name)| {
399                Ok(ArrowField::new(
400                    name,
401                    expr.data_type(&physical_schema)?,
402                    expr.nullable(&physical_schema)?,
403                ))
404            })
405            .collect::<Result<Vec<_>>>()?;
406        Ok(ArrowSchema::new(fields))
407    }
408
409    pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
410        let src = Arc::new(OneShotExec::from_batch(batch));
411        let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
412        let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
413        let stream = execute_plan(projection, LanceExecutionOptions::default())?;
414        let batches = stream.try_collect::<Vec<_>>().await?;
415        if batches.len() != 1 {
416            Err(Error::Internal {
417                message: "Expected exactly one batch".to_string(),
418                location: location!(),
419            })
420        } else {
421            Ok(batches.into_iter().next().unwrap())
422        }
423    }
424}