1use arrow_array::RecordBatch;
5use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
6use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec};
7use datafusion_common::{Column, DFSchema};
8use datafusion_physical_expr::PhysicalExpr;
9use futures::TryStreamExt;
10use std::{
11 collections::{HashMap, HashSet},
12 sync::Arc,
13};
14use tracing::instrument;
15
16use lance_core::{
17 Error, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET,
18 Result, WILDCARD,
19 datatypes::{OnMissing, Projectable, Projection, Schema},
20};
21
22use crate::{
23 exec::{LanceExecutionOptions, OneShotExec, execute_plan},
24 planner::Planner,
25};
26
27struct ProjectionBuilder {
28 base: Arc<dyn Projectable>,
29 planner: Planner,
30 output: HashMap<String, Expr>,
31 output_cols: Vec<OutputColumn>,
32 physical_cols_set: HashSet<String>,
33 physical_cols: Vec<String>,
34 needs_row_id: bool,
35 needs_row_addr: bool,
36 needs_row_last_updated_at: bool,
37 needs_row_created_at: bool,
38 must_add_row_offset: bool,
39 has_wildcard: bool,
40}
41
42impl ProjectionBuilder {
43 fn new(base: Arc<dyn Projectable>) -> Self {
44 let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
45 let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
46 let planner = Planner::new(full_schema);
47
48 Self {
49 base,
50 planner,
51 output: HashMap::default(),
52 output_cols: Vec::default(),
53 physical_cols_set: HashSet::default(),
54 physical_cols: Vec::default(),
55 needs_row_id: false,
56 needs_row_addr: false,
57 needs_row_created_at: false,
58 needs_row_last_updated_at: false,
59 must_add_row_offset: false,
60 has_wildcard: false,
61 }
62 }
63
64 fn check_duplicate_column(&self, name: &str) -> Result<()> {
65 if self.output.contains_key(name) {
66 return Err(Error::invalid_input(format!(
67 "Duplicate column name: {}",
68 name
69 )));
70 }
71 Ok(())
72 }
73
74 fn add_column(&mut self, output_name: &str, raw_expr: &str) -> Result<()> {
75 self.check_duplicate_column(output_name)?;
76
77 let expr = self.planner.parse_expr(raw_expr)?;
78
79 if let Expr::Column(Column {
81 name,
82 relation: None,
83 ..
84 }) = &expr
85 {
86 if name == ROW_ID {
87 self.needs_row_id = true;
88 } else if name == ROW_ADDR {
89 self.needs_row_addr = true;
90 } else if name == ROW_OFFSET {
91 self.must_add_row_offset = true;
92 } else if name == ROW_LAST_UPDATED_AT_VERSION {
93 self.needs_row_last_updated_at = true;
94 } else if name == ROW_CREATED_AT_VERSION {
95 self.needs_row_created_at = true;
96 }
97 }
98
99 for col in Planner::column_names_in_expr(&expr) {
100 if self.physical_cols_set.contains(&col) {
101 continue;
102 }
103 self.physical_cols.push(col.clone());
104 self.physical_cols_set.insert(col);
105 }
106 self.output.insert(output_name.to_string(), expr.clone());
107
108 self.output_cols.push(OutputColumn {
109 expr,
110 name: output_name.to_string(),
111 });
112
113 Ok(())
114 }
115
116 fn add_columns(&mut self, columns: &[(impl AsRef<str>, impl AsRef<str>)]) -> Result<()> {
117 for (output_name, raw_expr) in columns {
118 if raw_expr.as_ref() == WILDCARD {
119 self.has_wildcard = true;
120 for col in self.base.schema().fields.iter().map(|f| f.name.as_str()) {
121 self.check_duplicate_column(col)?;
122 self.output_cols.push(OutputColumn {
123 expr: Expr::Column(Column::from_name(col)),
124 name: col.to_string(),
125 });
126 self.output.insert(col.to_string(), Expr::default());
128 }
129 } else {
130 self.add_column(output_name.as_ref(), raw_expr.as_ref())?;
131 }
132 }
133 Ok(())
134 }
135
136 fn build(self) -> Result<ProjectionPlan> {
137 let mut physical_projection = if self.has_wildcard {
143 Projection::full(self.base.clone())
144 } else {
145 Projection::empty(self.base.clone())
146 .union_columns(&self.physical_cols, OnMissing::Ignore)?
147 };
148
149 physical_projection.with_row_id = self.needs_row_id;
150 physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
151 physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
152 physical_projection.with_row_created_at_version = self.needs_row_created_at;
153
154 Ok(ProjectionPlan {
155 physical_projection,
156 must_add_row_offset: self.must_add_row_offset,
157 requested_output_expr: self.output_cols,
158 })
159 }
160}
161
162#[derive(Clone, Debug)]
163pub struct OutputColumn {
164 pub expr: Expr,
166 pub name: String,
168}
169
170#[derive(Clone, Debug)]
171pub struct ProjectionPlan {
172 pub physical_projection: Projection,
174
175 pub must_add_row_offset: bool,
177
178 pub requested_output_expr: Vec<OutputColumn>,
180}
181
182impl ProjectionPlan {
183 fn add_system_columns(schema: &ArrowSchema) -> ArrowSchema {
184 let mut fields = Vec::from_iter(schema.fields.iter().cloned());
185 fields.push(Arc::new(ArrowField::new(ROW_ID, DataType::UInt64, true)));
186 fields.push(Arc::new(ArrowField::new(ROW_ADDR, DataType::UInt64, true)));
187 fields.push(Arc::new(ArrowField::new(
188 ROW_OFFSET,
189 DataType::UInt64,
190 true,
191 )));
192 fields.push(Arc::new(
193 (*lance_core::ROW_LAST_UPDATED_AT_VERSION_FIELD).clone(),
194 ));
195 fields.push(Arc::new(
196 (*lance_core::ROW_CREATED_AT_VERSION_FIELD).clone(),
197 ));
198 ArrowSchema::new(fields)
199 }
200
201 pub fn from_expressions(
203 base: Arc<dyn Projectable>,
204 columns: &[(impl AsRef<str>, impl AsRef<str>)],
205 ) -> Result<Self> {
206 let mut builder = ProjectionBuilder::new(base);
207 builder.add_columns(columns)?;
208 builder.build()
209 }
210
211 pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
246 let mut data_fields = Vec::new();
250 let mut with_row_id = false;
251 let mut with_row_addr = false;
252 let mut must_add_row_offset = false;
253 let mut with_row_last_updated_at_version = false;
254 let mut with_row_created_at_version = false;
255
256 for field in projection.fields.iter() {
257 if lance_core::is_system_column(&field.name) {
258 if field.name == ROW_ID {
260 with_row_id = true;
261 must_add_row_offset = true;
262 } else if field.name == ROW_ADDR {
263 with_row_addr = true;
264 } else if field.name == ROW_OFFSET {
265 with_row_addr = true;
266 must_add_row_offset = true;
267 } else if field.name == ROW_LAST_UPDATED_AT_VERSION {
268 with_row_last_updated_at_version = true;
269 } else if field.name == ROW_CREATED_AT_VERSION {
270 with_row_created_at_version = true;
271 }
272 } else {
273 if base.schema().field(&field.name).is_none() {
275 return Err(Error::invalid_input(format!(
276 "Column '{}' not found in schema",
277 field.name
278 )));
279 }
280 data_fields.push(field.clone());
281 }
282 }
283
284 let data_schema = Schema {
286 fields: data_fields,
287 metadata: projection.metadata.clone(),
288 };
289
290 let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
292 physical_projection.with_row_id = with_row_id;
293 physical_projection.with_row_addr = with_row_addr;
294 physical_projection.with_row_last_updated_at_version = with_row_last_updated_at_version;
295 physical_projection.with_row_created_at_version = with_row_created_at_version;
296
297 let exprs = projection
299 .fields
300 .iter()
301 .map(|f| OutputColumn {
302 expr: Expr::Column(Column::from_name(&f.name)),
303 name: f.name.clone(),
304 })
305 .collect::<Vec<_>>();
306
307 Ok(Self {
308 physical_projection,
309 requested_output_expr: exprs,
310 must_add_row_offset,
311 })
312 }
313
314 pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
315 let physical_cols: Vec<&str> = base
316 .schema()
317 .fields
318 .iter()
319 .map(|f| f.name.as_ref())
320 .collect::<Vec<_>>();
321
322 let physical_projection =
323 Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
324
325 let requested_output_expr = physical_cols
326 .into_iter()
327 .map(|col_name| OutputColumn {
328 expr: Expr::Column(Column::from_name(col_name)),
329 name: col_name.to_string(),
330 })
331 .collect();
332
333 Ok(Self {
334 physical_projection,
335 must_add_row_offset: false,
336 requested_output_expr,
337 })
338 }
339
340 pub fn to_physical_exprs(
344 &self,
345 current_schema: &ArrowSchema,
346 ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
347 let physical_df_schema = Arc::new(DFSchema::try_from(current_schema.clone())?);
348 self.requested_output_expr
349 .iter()
350 .map(|output_column| {
351 Ok((
352 datafusion::physical_expr::create_physical_expr(
353 &output_column.expr,
354 physical_df_schema.as_ref(),
355 &Default::default(),
356 )?,
357 output_column.name.clone(),
358 ))
359 })
360 .collect::<Result<Vec<_>>>()
361 }
362
363 pub fn include_row_id(&mut self) {
365 self.physical_projection.with_row_id = true;
366 if !self
367 .requested_output_expr
368 .iter()
369 .any(|OutputColumn { name, .. }| name == ROW_ID)
370 {
371 self.requested_output_expr.push(OutputColumn {
372 expr: Expr::Column(Column::from_name(ROW_ID)),
373 name: ROW_ID.to_string(),
374 });
375 }
376 }
377
378 pub fn include_row_addr(&mut self) {
380 self.physical_projection.with_row_addr = true;
381 if !self
382 .requested_output_expr
383 .iter()
384 .any(|OutputColumn { name, .. }| name == ROW_ADDR)
385 {
386 self.requested_output_expr.push(OutputColumn {
387 expr: Expr::Column(Column::from_name(ROW_ADDR)),
388 name: ROW_ADDR.to_string(),
389 });
390 }
391 }
392
393 pub fn has_output_cols(&self) -> bool {
398 !self.requested_output_expr.is_empty()
399 }
400
401 pub fn output_schema(&self) -> Result<ArrowSchema> {
402 let physical_schema = self.physical_projection.to_arrow_schema();
403 let exprs = self.to_physical_exprs(&physical_schema)?;
404 let fields = exprs
405 .iter()
406 .map(|(expr, name)| {
407 let metadata = expr.return_field(&physical_schema)?.metadata().clone();
408 Ok(ArrowField::new(
409 name,
410 expr.data_type(&physical_schema)?,
411 expr.nullable(&physical_schema)?,
412 )
413 .with_metadata(metadata))
414 })
415 .collect::<Result<Vec<_>>>()?;
416 Ok(ArrowSchema::new_with_metadata(
417 fields,
418 physical_schema.metadata().clone(),
419 ))
420 }
421
422 #[instrument(skip_all, level = "debug")]
423 pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
424 let src = Arc::new(OneShotExec::from_batch(batch));
425
426 let extra_columns = vec![
428 ArrowField::new(ROW_ADDR, DataType::UInt64, true),
429 ArrowField::new(ROW_OFFSET, DataType::UInt64, true),
430 ];
431 let mut filterable_schema = self.physical_projection.to_schema();
432 filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?;
433
434 let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?;
435 let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
436
437 let stream = execute_plan(
439 projection,
440 LanceExecutionOptions {
441 skip_logging: true,
442 ..Default::default()
443 },
444 )?;
445 let batches = stream.try_collect::<Vec<_>>().await?;
446 if batches.len() != 1 {
447 Err(Error::internal("Expected exactly one batch".to_string()))
448 } else {
449 Ok(batches.into_iter().next().unwrap())
450 }
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 use lance_arrow::json::{is_json_field, json_field};
459
460 #[test]
461 fn test_output_schema_preserves_json_extension_metadata() {
462 let arrow_schema = ArrowSchema::new(vec![
463 ArrowField::new("id", DataType::Int32, false),
464 json_field("meta", true),
465 ]);
466 let base_schema = Schema::try_from(&arrow_schema).unwrap();
467 let base = Arc::new(base_schema.clone());
468
469 let plan = ProjectionPlan::from_schema(base, &base_schema).unwrap();
470
471 let physical = plan.physical_projection.to_arrow_schema();
472 assert!(is_json_field(physical.field_with_name("meta").unwrap()));
473
474 let output = plan.output_schema().unwrap();
475 let output_field = output.field_with_name("meta").unwrap();
476 assert!(is_json_field(output_field));
477 }
478}