1use arrow_array::RecordBatch;
5use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
6use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec};
7use datafusion_common::{Column, DFSchema};
8use datafusion_physical_expr::PhysicalExpr;
9use futures::TryStreamExt;
10use snafu::location;
11use std::{
12 collections::{HashMap, HashSet},
13 sync::Arc,
14};
15
16use lance_core::{
17 datatypes::{OnMissing, Projectable, Projection, Schema},
18 Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION,
19 ROW_OFFSET, WILDCARD,
20};
21
22use crate::{
23 exec::{execute_plan, LanceExecutionOptions, OneShotExec},
24 planner::Planner,
25};
26
27struct ProjectionBuilder {
28 base: Arc<dyn Projectable>,
29 planner: Planner,
30 output: HashMap<String, Expr>,
31 output_cols: Vec<OutputColumn>,
32 physical_cols_set: HashSet<String>,
33 physical_cols: Vec<String>,
34 needs_row_id: bool,
35 needs_row_addr: bool,
36 needs_row_last_updated_at: bool,
37 needs_row_created_at: bool,
38 must_add_row_offset: bool,
39 has_wildcard: bool,
40}
41
42impl ProjectionBuilder {
43 fn new(base: Arc<dyn Projectable>) -> Self {
44 let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
45 let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
46 let planner = Planner::new(full_schema);
47
48 Self {
49 base,
50 planner,
51 output: HashMap::default(),
52 output_cols: Vec::default(),
53 physical_cols_set: HashSet::default(),
54 physical_cols: Vec::default(),
55 needs_row_id: false,
56 needs_row_addr: false,
57 needs_row_created_at: false,
58 needs_row_last_updated_at: false,
59 must_add_row_offset: false,
60 has_wildcard: false,
61 }
62 }
63
64 fn check_duplicate_column(&self, name: &str) -> Result<()> {
65 if self.output.contains_key(name) {
66 return Err(Error::io(
67 format!("Duplicate column name: {}", name),
68 location!(),
69 ));
70 }
71 Ok(())
72 }
73
74 fn add_column(&mut self, output_name: &str, raw_expr: &str) -> Result<()> {
75 self.check_duplicate_column(output_name)?;
76
77 let expr = self.planner.parse_expr(raw_expr)?;
78
79 if let Expr::Column(Column {
81 name,
82 relation: None,
83 ..
84 }) = &expr
85 {
86 if name == ROW_ID {
87 self.needs_row_id = true;
88 } else if name == ROW_ADDR {
89 self.needs_row_addr = true;
90 } else if name == ROW_OFFSET {
91 self.must_add_row_offset = true;
92 } else if name == ROW_LAST_UPDATED_AT_VERSION {
93 self.needs_row_last_updated_at = true;
94 } else if name == ROW_CREATED_AT_VERSION {
95 self.needs_row_created_at = true;
96 }
97 }
98
99 for col in Planner::column_names_in_expr(&expr) {
100 if self.physical_cols_set.contains(&col) {
101 continue;
102 }
103 self.physical_cols.push(col.clone());
104 self.physical_cols_set.insert(col);
105 }
106 self.output.insert(output_name.to_string(), expr.clone());
107
108 self.output_cols.push(OutputColumn {
109 expr,
110 name: output_name.to_string(),
111 });
112
113 Ok(())
114 }
115
116 fn add_columns(&mut self, columns: &[(impl AsRef<str>, impl AsRef<str>)]) -> Result<()> {
117 for (output_name, raw_expr) in columns {
118 if raw_expr.as_ref() == WILDCARD {
119 self.has_wildcard = true;
120 for col in self.base.schema().fields.iter().map(|f| f.name.as_str()) {
121 self.check_duplicate_column(col)?;
122 self.output_cols.push(OutputColumn {
123 expr: Expr::Column(Column::from_name(col)),
124 name: col.to_string(),
125 });
126 self.output.insert(col.to_string(), Expr::default());
128 }
129 } else {
130 self.add_column(output_name.as_ref(), raw_expr.as_ref())?;
131 }
132 }
133 Ok(())
134 }
135
136 fn build(self) -> Result<ProjectionPlan> {
137 let mut physical_projection = if self.has_wildcard {
143 Projection::full(self.base.clone())
144 } else {
145 Projection::empty(self.base.clone())
146 .union_columns(&self.physical_cols, OnMissing::Ignore)?
147 };
148
149 physical_projection.with_row_id = self.needs_row_id;
150 physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
151 physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
152 physical_projection.with_row_created_at_version = self.needs_row_created_at;
153
154 Ok(ProjectionPlan {
155 physical_projection,
156 must_add_row_offset: self.must_add_row_offset,
157 requested_output_expr: self.output_cols,
158 })
159 }
160}
161
162#[derive(Clone, Debug)]
163pub struct OutputColumn {
164 pub expr: Expr,
166 pub name: String,
168}
169
170#[derive(Clone, Debug)]
171pub struct ProjectionPlan {
172 pub physical_projection: Projection,
174
175 pub must_add_row_offset: bool,
177
178 pub requested_output_expr: Vec<OutputColumn>,
180}
181
182impl ProjectionPlan {
183 fn add_system_columns(schema: &ArrowSchema) -> ArrowSchema {
184 let mut fields = Vec::from_iter(schema.fields.iter().cloned());
185 fields.push(Arc::new(ArrowField::new(ROW_ID, DataType::UInt64, true)));
186 fields.push(Arc::new(ArrowField::new(ROW_ADDR, DataType::UInt64, true)));
187 fields.push(Arc::new(ArrowField::new(
188 ROW_OFFSET,
189 DataType::UInt64,
190 true,
191 )));
192 fields.push(Arc::new(
193 (*lance_core::ROW_LAST_UPDATED_AT_VERSION_FIELD).clone(),
194 ));
195 fields.push(Arc::new(
196 (*lance_core::ROW_CREATED_AT_VERSION_FIELD).clone(),
197 ));
198 ArrowSchema::new(fields)
199 }
200
201 pub fn from_expressions(
203 base: Arc<dyn Projectable>,
204 columns: &[(impl AsRef<str>, impl AsRef<str>)],
205 ) -> Result<Self> {
206 let mut builder = ProjectionBuilder::new(base);
207 builder.add_columns(columns)?;
208 builder.build()
209 }
210
211 pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
246 let mut data_fields = Vec::new();
250 let mut with_row_id = false;
251 let mut with_row_addr = false;
252 let mut must_add_row_offset = false;
253
254 for field in projection.fields.iter() {
255 if lance_core::is_system_column(&field.name) {
256 if field.name == ROW_ID {
258 with_row_id = true;
259 must_add_row_offset = true;
260 } else if field.name == ROW_ADDR {
261 with_row_addr = true;
262 must_add_row_offset = true;
263 }
264 } else {
267 if base.schema().field(&field.name).is_none() {
269 return Err(Error::io(
270 format!("Column '{}' not found in schema", field.name),
271 location!(),
272 ));
273 }
274 data_fields.push(field.clone());
275 }
276 }
277
278 let data_schema = Schema {
280 fields: data_fields,
281 metadata: projection.metadata.clone(),
282 };
283
284 let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
286 physical_projection.with_row_id = with_row_id;
287 physical_projection.with_row_addr = with_row_addr;
288
289 let exprs = projection
291 .fields
292 .iter()
293 .map(|f| OutputColumn {
294 expr: Expr::Column(Column::from_name(&f.name)),
295 name: f.name.clone(),
296 })
297 .collect::<Vec<_>>();
298
299 Ok(Self {
300 physical_projection,
301 requested_output_expr: exprs,
302 must_add_row_offset,
303 })
304 }
305
306 pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
307 let physical_cols: Vec<&str> = base
308 .schema()
309 .fields
310 .iter()
311 .map(|f| f.name.as_ref())
312 .collect::<Vec<_>>();
313
314 let physical_projection =
315 Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
316
317 let requested_output_expr = physical_cols
318 .into_iter()
319 .map(|col_name| OutputColumn {
320 expr: Expr::Column(Column::from_name(col_name)),
321 name: col_name.to_string(),
322 })
323 .collect();
324
325 Ok(Self {
326 physical_projection,
327 must_add_row_offset: false,
328 requested_output_expr,
329 })
330 }
331
332 pub fn to_physical_exprs(
336 &self,
337 current_schema: &ArrowSchema,
338 ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
339 let physical_df_schema = Arc::new(DFSchema::try_from(current_schema.clone())?);
340 self.requested_output_expr
341 .iter()
342 .map(|output_column| {
343 Ok((
344 datafusion::physical_expr::create_physical_expr(
345 &output_column.expr,
346 physical_df_schema.as_ref(),
347 &Default::default(),
348 )?,
349 output_column.name.clone(),
350 ))
351 })
352 .collect::<Result<Vec<_>>>()
353 }
354
355 pub fn include_row_id(&mut self) {
357 self.physical_projection.with_row_id = true;
358 if !self
359 .requested_output_expr
360 .iter()
361 .any(|OutputColumn { name, .. }| name == ROW_ID)
362 {
363 self.requested_output_expr.push(OutputColumn {
364 expr: Expr::Column(Column::from_name(ROW_ID)),
365 name: ROW_ID.to_string(),
366 });
367 }
368 }
369
370 pub fn include_row_addr(&mut self) {
372 self.physical_projection.with_row_addr = true;
373 if !self
374 .requested_output_expr
375 .iter()
376 .any(|OutputColumn { name, .. }| name == ROW_ADDR)
377 {
378 self.requested_output_expr.push(OutputColumn {
379 expr: Expr::Column(Column::from_name(ROW_ADDR)),
380 name: ROW_ADDR.to_string(),
381 });
382 }
383 }
384
385 pub fn has_output_cols(&self) -> bool {
390 !self.requested_output_expr.is_empty()
391 }
392
393 pub fn output_schema(&self) -> Result<ArrowSchema> {
394 let exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
395 let physical_schema = self.physical_projection.to_arrow_schema();
396 let fields = exprs
397 .iter()
398 .map(|(expr, name)| {
399 Ok(ArrowField::new(
400 name,
401 expr.data_type(&physical_schema)?,
402 expr.nullable(&physical_schema)?,
403 ))
404 })
405 .collect::<Result<Vec<_>>>()?;
406 Ok(ArrowSchema::new(fields))
407 }
408
409 pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
410 let src = Arc::new(OneShotExec::from_batch(batch));
411 let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
412 let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
413 let stream = execute_plan(projection, LanceExecutionOptions::default())?;
414 let batches = stream.try_collect::<Vec<_>>().await?;
415 if batches.len() != 1 {
416 Err(Error::Internal {
417 message: "Expected exactly one batch".to_string(),
418 location: location!(),
419 })
420 } else {
421 Ok(batches.into_iter().next().unwrap())
422 }
423 }
424}