Skip to main content

lance_datafusion/
projection.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::RecordBatch;
5use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
6use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec};
7use datafusion_common::{Column, DFSchema};
8use datafusion_physical_expr::PhysicalExpr;
9use futures::TryStreamExt;
10use std::{
11    collections::{HashMap, HashSet},
12    sync::Arc,
13};
14use tracing::instrument;
15
16use lance_core::{
17    Error, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET,
18    Result, WILDCARD,
19    datatypes::{OnMissing, Projectable, Projection, Schema},
20};
21
22use crate::{
23    exec::{LanceExecutionOptions, OneShotExec, execute_plan},
24    planner::Planner,
25};
26
27struct ProjectionBuilder {
28    base: Arc<dyn Projectable>,
29    planner: Planner,
30    output: HashMap<String, Expr>,
31    output_cols: Vec<OutputColumn>,
32    physical_cols_set: HashSet<String>,
33    physical_cols: Vec<String>,
34    needs_row_id: bool,
35    needs_row_addr: bool,
36    needs_row_last_updated_at: bool,
37    needs_row_created_at: bool,
38    must_add_row_offset: bool,
39    has_wildcard: bool,
40}
41
42impl ProjectionBuilder {
43    fn new(base: Arc<dyn Projectable>) -> Self {
44        let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
45        let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
46        let planner = Planner::new(full_schema);
47
48        Self {
49            base,
50            planner,
51            output: HashMap::default(),
52            output_cols: Vec::default(),
53            physical_cols_set: HashSet::default(),
54            physical_cols: Vec::default(),
55            needs_row_id: false,
56            needs_row_addr: false,
57            needs_row_created_at: false,
58            needs_row_last_updated_at: false,
59            must_add_row_offset: false,
60            has_wildcard: false,
61        }
62    }
63
64    fn check_duplicate_column(&self, name: &str) -> Result<()> {
65        if self.output.contains_key(name) {
66            return Err(Error::invalid_input(format!(
67                "Duplicate column name: {}",
68                name
69            )));
70        }
71        Ok(())
72    }
73
74    fn add_column(&mut self, output_name: &str, raw_expr: &str) -> Result<()> {
75        self.check_duplicate_column(output_name)?;
76
77        let expr = self.planner.parse_expr(raw_expr)?;
78
79        // If the expression is a bare column reference to a system column, mark that we need it
80        if let Expr::Column(Column {
81            name,
82            relation: None,
83            ..
84        }) = &expr
85        {
86            if name == ROW_ID {
87                self.needs_row_id = true;
88            } else if name == ROW_ADDR {
89                self.needs_row_addr = true;
90            } else if name == ROW_OFFSET {
91                self.must_add_row_offset = true;
92            } else if name == ROW_LAST_UPDATED_AT_VERSION {
93                self.needs_row_last_updated_at = true;
94            } else if name == ROW_CREATED_AT_VERSION {
95                self.needs_row_created_at = true;
96            }
97        }
98
99        for col in Planner::column_names_in_expr(&expr) {
100            if self.physical_cols_set.contains(&col) {
101                continue;
102            }
103            self.physical_cols.push(col.clone());
104            self.physical_cols_set.insert(col);
105        }
106        self.output.insert(output_name.to_string(), expr.clone());
107
108        self.output_cols.push(OutputColumn {
109            expr,
110            name: output_name.to_string(),
111        });
112
113        Ok(())
114    }
115
116    fn add_columns(&mut self, columns: &[(impl AsRef<str>, impl AsRef<str>)]) -> Result<()> {
117        for (output_name, raw_expr) in columns {
118            if raw_expr.as_ref() == WILDCARD {
119                self.has_wildcard = true;
120                for col in self.base.schema().fields.iter().map(|f| f.name.as_str()) {
121                    self.check_duplicate_column(col)?;
122                    self.output_cols.push(OutputColumn {
123                        expr: Expr::Column(Column::from_name(col)),
124                        name: col.to_string(),
125                    });
126                    // Throw placeholder expr in self.output, this will trigger error on duplicates
127                    self.output.insert(col.to_string(), Expr::default());
128                }
129            } else {
130                self.add_column(output_name.as_ref(), raw_expr.as_ref())?;
131            }
132        }
133        Ok(())
134    }
135
136    fn build(self) -> Result<ProjectionPlan> {
137        // Now, calculate the physical projection from the columns referenced by the expressions
138        //
139        // If a column is missing it might be a system column (_rowid, _distance, etc.) and so
140        // we ignore it.  We don't need to load that column from disk at least, which is all we are
141        // trying to calculate here.
142        let mut physical_projection = if self.has_wildcard {
143            Projection::full(self.base.clone())
144        } else {
145            Projection::empty(self.base.clone())
146                .union_columns(&self.physical_cols, OnMissing::Ignore)?
147        };
148
149        physical_projection.with_row_id = self.needs_row_id;
150        physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
151        physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
152        physical_projection.with_row_created_at_version = self.needs_row_created_at;
153
154        Ok(ProjectionPlan {
155            physical_projection,
156            must_add_row_offset: self.must_add_row_offset,
157            requested_output_expr: self.output_cols,
158        })
159    }
160}
161
162#[derive(Clone, Debug)]
163pub struct OutputColumn {
164    /// The expression that represents the output column
165    pub expr: Expr,
166    /// The name of the output column
167    pub name: String,
168}
169
170#[derive(Clone, Debug)]
171pub struct ProjectionPlan {
172    /// The physical schema that must be loaded from the dataset
173    pub physical_projection: Projection,
174
175    /// Needs the row address converted into a row offset
176    pub must_add_row_offset: bool,
177
178    /// The desired output columns
179    pub requested_output_expr: Vec<OutputColumn>,
180}
181
182impl ProjectionPlan {
183    fn add_system_columns(schema: &ArrowSchema) -> ArrowSchema {
184        let mut fields = Vec::from_iter(schema.fields.iter().cloned());
185        fields.push(Arc::new(ArrowField::new(ROW_ID, DataType::UInt64, true)));
186        fields.push(Arc::new(ArrowField::new(ROW_ADDR, DataType::UInt64, true)));
187        fields.push(Arc::new(ArrowField::new(
188            ROW_OFFSET,
189            DataType::UInt64,
190            true,
191        )));
192        fields.push(Arc::new(
193            (*lance_core::ROW_LAST_UPDATED_AT_VERSION_FIELD).clone(),
194        ));
195        fields.push(Arc::new(
196            (*lance_core::ROW_CREATED_AT_VERSION_FIELD).clone(),
197        ));
198        ArrowSchema::new(fields)
199    }
200
201    /// Set the projection from SQL expressions
202    pub fn from_expressions(
203        base: Arc<dyn Projectable>,
204        columns: &[(impl AsRef<str>, impl AsRef<str>)],
205    ) -> Result<Self> {
206        let mut builder = ProjectionBuilder::new(base);
207        builder.add_columns(columns)?;
208        builder.build()
209    }
210
211    /// Set the projection from a schema
212    ///
213    /// This plan will have no complex expressions, the schema must be a subset of the dataset schema.
214    ///
215    /// With this approach it is possible to refer to portions of nested fields.
216    ///
217    /// For example, if the schema is:
218    ///
219    /// ```ignore
220    /// {
221    ///   "metadata": {
222    ///     "location": {
223    ///       "x": f32,
224    ///       "y": f32,
225    ///     },
226    ///     "age": i32,
227    ///   }
228    /// }
229    /// ```
230    ///
231    /// It is possible to project a partial schema that drops `y` like:
232    ///
233    /// ```ignore
234    /// {
235    ///   "metadata": {
236    ///     "location": {
237    ///       "x": f32,
238    ///     },
239    ///     "age": i32,
240    ///   }
241    /// }
242    /// ```
243    ///
244    /// This is something that cannot be done easily using expressions.
245    pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
246        // Separate data columns from system columns
247        // System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection,
248        // not as fields in the Schema
249        let mut data_fields = Vec::new();
250        let mut with_row_id = false;
251        let mut with_row_addr = false;
252        let mut must_add_row_offset = false;
253        let mut with_row_last_updated_at_version = false;
254        let mut with_row_created_at_version = false;
255
256        for field in projection.fields.iter() {
257            if lance_core::is_system_column(&field.name) {
258                // Handle known system columns that can be included in projections
259                if field.name == ROW_ID {
260                    with_row_id = true;
261                    must_add_row_offset = true;
262                } else if field.name == ROW_ADDR {
263                    with_row_addr = true;
264                } else if field.name == ROW_OFFSET {
265                    with_row_addr = true;
266                    must_add_row_offset = true;
267                } else if field.name == ROW_LAST_UPDATED_AT_VERSION {
268                    with_row_last_updated_at_version = true;
269                } else if field.name == ROW_CREATED_AT_VERSION {
270                    with_row_created_at_version = true;
271                }
272            } else {
273                // Regular data column - validate it exists in base schema
274                if base.schema().field(&field.name).is_none() {
275                    return Err(Error::invalid_input(format!(
276                        "Column '{}' not found in schema",
277                        field.name
278                    )));
279                }
280                data_fields.push(field.clone());
281            }
282        }
283
284        // Create a schema with only data columns for the physical projection
285        let data_schema = Schema {
286            fields: data_fields,
287            metadata: projection.metadata.clone(),
288        };
289
290        // Calculate the physical projection from data columns only
291        let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
292        physical_projection.with_row_id = with_row_id;
293        physical_projection.with_row_addr = with_row_addr;
294        physical_projection.with_row_last_updated_at_version = with_row_last_updated_at_version;
295        physical_projection.with_row_created_at_version = with_row_created_at_version;
296
297        // Build output expressions preserving the original order (including system columns)
298        let exprs = projection
299            .fields
300            .iter()
301            .map(|f| OutputColumn {
302                expr: Expr::Column(Column::from_name(&f.name)),
303                name: f.name.clone(),
304            })
305            .collect::<Vec<_>>();
306
307        Ok(Self {
308            physical_projection,
309            requested_output_expr: exprs,
310            must_add_row_offset,
311        })
312    }
313
314    pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
315        let physical_cols: Vec<&str> = base
316            .schema()
317            .fields
318            .iter()
319            .map(|f| f.name.as_ref())
320            .collect::<Vec<_>>();
321
322        let physical_projection =
323            Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
324
325        let requested_output_expr = physical_cols
326            .into_iter()
327            .map(|col_name| OutputColumn {
328                expr: Expr::Column(Column::from_name(col_name)),
329                name: col_name.to_string(),
330            })
331            .collect();
332
333        Ok(Self {
334            physical_projection,
335            must_add_row_offset: false,
336            requested_output_expr,
337        })
338    }
339
340    /// Convert the projection to a list of physical expressions
341    ///
342    /// This is used to apply the final projection (including dynamic expressions) to the data.
343    pub fn to_physical_exprs(
344        &self,
345        current_schema: &ArrowSchema,
346    ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
347        let physical_df_schema = Arc::new(DFSchema::try_from(current_schema.clone())?);
348        self.requested_output_expr
349            .iter()
350            .map(|output_column| {
351                Ok((
352                    datafusion::physical_expr::create_physical_expr(
353                        &output_column.expr,
354                        physical_df_schema.as_ref(),
355                        &Default::default(),
356                    )?,
357                    output_column.name.clone(),
358                ))
359            })
360            .collect::<Result<Vec<_>>>()
361    }
362
363    /// Include the row id in the output
364    pub fn include_row_id(&mut self) {
365        self.physical_projection.with_row_id = true;
366        if !self
367            .requested_output_expr
368            .iter()
369            .any(|OutputColumn { name, .. }| name == ROW_ID)
370        {
371            self.requested_output_expr.push(OutputColumn {
372                expr: Expr::Column(Column::from_name(ROW_ID)),
373                name: ROW_ID.to_string(),
374            });
375        }
376    }
377
378    /// Include the row address in the output
379    pub fn include_row_addr(&mut self) {
380        self.physical_projection.with_row_addr = true;
381        if !self
382            .requested_output_expr
383            .iter()
384            .any(|OutputColumn { name, .. }| name == ROW_ADDR)
385        {
386            self.requested_output_expr.push(OutputColumn {
387                expr: Expr::Column(Column::from_name(ROW_ADDR)),
388                name: ROW_ADDR.to_string(),
389            });
390        }
391    }
392
393    /// Check if the projection has any output columns
394    ///
395    /// This doesn't mean there is a physical projection.  For example, we may someday support
396    /// something like `SELECT 1 AS foo` which would have an output column (foo) but no physical projection
397    pub fn has_output_cols(&self) -> bool {
398        !self.requested_output_expr.is_empty()
399    }
400
401    pub fn output_schema(&self) -> Result<ArrowSchema> {
402        let physical_schema = self.physical_projection.to_arrow_schema();
403        let exprs = self.to_physical_exprs(&physical_schema)?;
404        let fields = exprs
405            .iter()
406            .map(|(expr, name)| {
407                let metadata = expr.return_field(&physical_schema)?.metadata().clone();
408                Ok(ArrowField::new(
409                    name,
410                    expr.data_type(&physical_schema)?,
411                    expr.nullable(&physical_schema)?,
412                )
413                .with_metadata(metadata))
414            })
415            .collect::<Result<Vec<_>>>()?;
416        Ok(ArrowSchema::new_with_metadata(
417            fields,
418            physical_schema.metadata().clone(),
419        ))
420    }
421
422    #[instrument(skip_all, level = "debug")]
423    pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
424        let src = Arc::new(OneShotExec::from_batch(batch));
425
426        // Need to add ROW_OFFSET to get filterable schema
427        let extra_columns = vec![
428            ArrowField::new(ROW_ADDR, DataType::UInt64, true),
429            ArrowField::new(ROW_OFFSET, DataType::UInt64, true),
430        ];
431        let mut filterable_schema = self.physical_projection.to_schema();
432        filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?;
433
434        let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?;
435        let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
436
437        // Run dummy plan to execute projection, do not log the plan run
438        let stream = execute_plan(
439            projection,
440            LanceExecutionOptions {
441                skip_logging: true,
442                ..Default::default()
443            },
444        )?;
445        let batches = stream.try_collect::<Vec<_>>().await?;
446        if batches.len() != 1 {
447            Err(Error::internal("Expected exactly one batch".to_string()))
448        } else {
449            Ok(batches.into_iter().next().unwrap())
450        }
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    use lance_arrow::json::{is_json_field, json_field};
459
460    #[test]
461    fn test_output_schema_preserves_json_extension_metadata() {
462        let arrow_schema = ArrowSchema::new(vec![
463            ArrowField::new("id", DataType::Int32, false),
464            json_field("meta", true),
465        ]);
466        let base_schema = Schema::try_from(&arrow_schema).unwrap();
467        let base = Arc::new(base_schema.clone());
468
469        let plan = ProjectionPlan::from_schema(base, &base_schema).unwrap();
470
471        let physical = plan.physical_projection.to_arrow_schema();
472        assert!(is_json_field(physical.field_with_name("meta").unwrap()));
473
474        let output = plan.output_schema().unwrap();
475        let output_field = output.field_with_name("meta").unwrap();
476        assert!(is_json_field(output_field));
477    }
478}