typed_arrow_dyn/view/
projection.rs

1use std::sync::Arc;
2
3use arrow_array::RecordBatch;
4use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
5use parquet::arrow::{ArrowSchemaConverter, ProjectionMask as ParquetProjectionMask};
6
7use super::{
8    path::Path,
9    rows::{DynRowRaw, DynRowView},
10};
11use crate::{DynViewError, schema::DynSchema};
12
13/// Column projection descriptor used to derive projected dynamic views.
14#[derive(Debug, Clone)]
15pub struct DynProjection(Arc<DynProjectionData>);
16
17#[derive(Debug, Clone)]
18pub(super) enum FieldProjector {
19    Identity,
20    Struct(Arc<StructProjection>),
21    List(Box<FieldProjector>),
22    LargeList(Box<FieldProjector>),
23    FixedSizeList(Box<FieldProjector>),
24    Map(Arc<StructProjection>),
25}
26
27impl FieldProjector {
28    fn is_identity(&self) -> bool {
29        matches!(self, FieldProjector::Identity)
30    }
31}
32
33#[derive(Debug, Clone)]
34pub(super) struct StructProjection {
35    pub(super) children: Arc<[StructChildProjection]>,
36}
37
38#[derive(Debug, Clone)]
39pub(super) struct StructChildProjection {
40    pub(super) source_index: usize,
41    pub(super) projector: FieldProjector,
42}
43
44#[derive(Debug)]
45struct DynProjectionData {
46    source_width: usize,
47    mapping: Arc<[usize]>,
48    fields: Fields,
49    projectors: Arc<[FieldProjector]>,
50    parquet_mask: ParquetProjectionMask,
51}
52
53impl DynProjection {
54    fn new_internal(
55        schema: &Schema,
56        source_width: usize,
57        mapping: Vec<usize>,
58        fields: Fields,
59        projectors: Vec<FieldProjector>,
60        selected_paths: Vec<Vec<usize>>,
61    ) -> Result<Self, DynViewError> {
62        debug_assert_eq!(
63            mapping.len(),
64            projectors.len(),
65            "projection mapping and projector width mismatch"
66        );
67        let parquet_mask = build_parquet_mask(schema, selected_paths)?;
68        Ok(Self(Arc::new(DynProjectionData {
69            source_width,
70            mapping: Arc::from(mapping),
71            fields,
72            projectors: Arc::from(projectors),
73            parquet_mask,
74        })))
75    }
76
77    /// Create a projection from explicit column indices.
78    ///
79    /// # Errors
80    /// Returns `DynViewError::ColumnOutOfBounds` if any index exceeds the schema width.
81    pub fn from_indices<I>(schema: &Schema, indices: I) -> Result<Self, DynViewError>
82    where
83        I: IntoIterator<Item = usize>,
84    {
85        let schema_fields = schema.fields();
86        let width = schema_fields.len();
87        let mut mapping = Vec::new();
88        let mut projected = Vec::new();
89        let mut projectors = Vec::new();
90        let mut selected_paths = Vec::new();
91        for idx in indices.into_iter() {
92            if idx >= width {
93                return Err(DynViewError::ColumnOutOfBounds { column: idx, width });
94            }
95            mapping.push(idx);
96            projected.push(schema_fields[idx].clone());
97            projectors.push(FieldProjector::Identity);
98            let mut index_path = vec![idx];
99            collect_all_leaf_paths_for_field(
100                schema_fields[idx].as_ref(),
101                &mut index_path,
102                &mut selected_paths,
103            );
104        }
105        Self::new_internal(
106            schema,
107            width,
108            mapping,
109            Fields::from(projected),
110            projectors,
111            selected_paths,
112        )
113    }
114
115    /// Create a projection by matching a projected schema against the source schema.
116    ///
117    /// Fields are matched by name; data type and nullability must also align.
118    ///
119    /// # Errors
120    /// Returns `DynViewError` when a projected field is missing from the source schema or when its
121    /// metadata disagrees.
122    pub fn from_schema(source: &Schema, projection: &Schema) -> Result<Self, DynViewError> {
123        let source_fields = source.fields();
124        let width = source_fields.len();
125        let mut mapping = Vec::with_capacity(projection.fields().len());
126        let mut projected = Vec::with_capacity(projection.fields().len());
127        let mut projectors = Vec::with_capacity(projection.fields().len());
128        let mut selected_paths = Vec::new();
129        for (pos, field) in projection.fields().iter().enumerate() {
130            let source_idx = match source.index_of(field.name()) {
131                Ok(idx) => idx,
132                Err(_) => {
133                    return Err(DynViewError::Invalid {
134                        column: pos,
135                        path: field.name().to_string(),
136                        message: "field not found in source schema".to_string(),
137                    });
138                }
139            };
140            let source_field = source_fields[source_idx].as_ref();
141            let path = Path::new(source_idx, field.name());
142            let mut index_path = vec![source_idx];
143            let projector = build_field_projector(
144                &path,
145                source_field,
146                field.as_ref(),
147                &mut index_path,
148                &mut selected_paths,
149            )?;
150            mapping.push(source_idx);
151            projected.push(field.clone());
152            projectors.push(projector);
153        }
154        Self::new_internal(
155            source,
156            width,
157            mapping,
158            Fields::from(projected),
159            projectors,
160            selected_paths,
161        )
162    }
163
164    /// Width of the source schema this projection was derived from.
165    pub(super) fn source_width(&self) -> usize {
166        self.0.source_width
167    }
168
169    pub(super) fn mapping_arc(&self) -> Arc<[usize]> {
170        Arc::clone(&self.0.mapping)
171    }
172
173    pub(super) fn projectors_arc(&self) -> Arc<[FieldProjector]> {
174        Arc::clone(&self.0.projectors)
175    }
176
177    /// Projected schema fields in order.
178    pub fn fields(&self) -> &Fields {
179        &self.0.fields
180    }
181
182    /// Number of projected columns.
183    pub fn len(&self) -> usize {
184        self.0.mapping.len()
185    }
186
187    /// Returns `true` when the projection contains zero columns.
188    pub fn is_empty(&self) -> bool {
189        self.len() == 0
190    }
191
192    /// Returns the Parquet projection mask corresponding to this projection.
193    pub fn to_parquet_mask(&self) -> ParquetProjectionMask {
194        self.0.parquet_mask.clone()
195    }
196
197    /// Project a single row from `batch` using this projection, returning a borrowed view.
198    ///
199    /// # Errors
200    /// Returns `DynViewError` when schema validation fails, the row index is out of bounds,
201    /// or the projection width mismatches the batch.
202    pub fn project_row_view<'a>(
203        &self,
204        schema: &'a DynSchema,
205        batch: &'a RecordBatch,
206        row: usize,
207    ) -> Result<DynRowView<'a>, DynViewError> {
208        let view = schema.view_at(batch, row)?;
209        view.project(self)
210    }
211
212    /// Project a single row from `batch` and capture it as lifetime-erased raw cells.
213    pub fn project_row_raw(
214        &self,
215        schema: &DynSchema,
216        batch: &RecordBatch,
217        row: usize,
218    ) -> Result<DynRowRaw, DynViewError> {
219        let view = self.project_row_view(schema, batch, row)?;
220        view.into_raw()
221    }
222}
223
224fn build_parquet_mask(
225    schema: &Schema,
226    mut selected_paths: Vec<Vec<usize>>,
227) -> Result<ParquetProjectionMask, DynViewError> {
228    let converter = ArrowSchemaConverter::new();
229    let descriptor = converter
230        .convert(schema)
231        .map_err(|err| DynViewError::Invalid {
232            column: 0,
233            path: "<projection>".to_string(),
234            message: format!("failed to convert schema to Parquet: {err}"),
235        })?;
236
237    if selected_paths.is_empty() {
238        return Ok(ParquetProjectionMask::none(descriptor.num_columns()));
239    }
240
241    selected_paths.sort();
242    selected_paths.dedup();
243
244    let mut leaf_paths = Vec::new();
245    collect_schema_leaf_paths(schema.fields(), &mut Vec::new(), &mut leaf_paths);
246    if selected_paths.len() == leaf_paths.len() {
247        return Ok(ParquetProjectionMask::all());
248    }
249    let leaf_indices = map_paths_to_leaf_indices(&selected_paths, &leaf_paths);
250    if leaf_indices.is_empty() {
251        return Ok(ParquetProjectionMask::none(descriptor.num_columns()));
252    }
253    Ok(ParquetProjectionMask::leaves(&descriptor, leaf_indices))
254}
255
256fn collect_all_leaf_paths_for_field(
257    field: &Field,
258    path: &mut Vec<usize>,
259    acc: &mut Vec<Vec<usize>>,
260) {
261    match field.data_type() {
262        DataType::Struct(children) => {
263            for (idx, child) in children.iter().enumerate() {
264                path.push(idx);
265                collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
266                path.pop();
267            }
268        }
269        DataType::List(child) | DataType::LargeList(child) => {
270            path.push(0);
271            collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
272            path.pop();
273        }
274        DataType::FixedSizeList(child, _) => {
275            path.push(0);
276            collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
277            path.pop();
278        }
279        DataType::Map(entry, _) => {
280            path.push(0);
281            collect_all_leaf_paths_for_field(entry.as_ref(), path, acc);
282            path.pop();
283        }
284        _ => acc.push(path.clone()),
285    }
286}
287
288fn collect_schema_leaf_paths(
289    fields: &Fields,
290    prefix: &mut Vec<usize>,
291    leaves: &mut Vec<Vec<usize>>,
292) {
293    for (idx, field) in fields.iter().enumerate() {
294        prefix.push(idx);
295        collect_all_leaf_paths_for_field(field.as_ref(), prefix, leaves);
296        prefix.pop();
297    }
298}
299
300fn map_paths_to_leaf_indices(
301    selected_paths: &[Vec<usize>],
302    leaf_paths: &[Vec<usize>],
303) -> Vec<usize> {
304    let mut indices = Vec::new();
305    'outer: for (idx, leaf_path) in leaf_paths.iter().enumerate() {
306        for selected in selected_paths {
307            if is_prefix(selected, leaf_path) {
308                indices.push(idx);
309                continue 'outer;
310            }
311        }
312    }
313    indices
314}
315
316fn is_prefix(prefix: &[usize], whole: &[usize]) -> bool {
317    if prefix.len() > whole.len() {
318        return false;
319    }
320    prefix.iter().zip(whole.iter()).all(|(lhs, rhs)| lhs == rhs)
321}
322
323/// Validate that the batch schema matches the runtime schema exactly.
324fn build_field_projector(
325    path: &Path,
326    source: &Field,
327    projected: &Field,
328    index_path: &mut Vec<usize>,
329    selected_paths: &mut Vec<Vec<usize>>,
330) -> Result<FieldProjector, DynViewError> {
331    if source.is_nullable() != projected.is_nullable() {
332        return Err(DynViewError::Invalid {
333            column: path.column,
334            path: path.path.clone(),
335            message: "nullability mismatch between source and projection".to_string(),
336        });
337    }
338    if source.data_type() == projected.data_type() {
339        collect_all_leaf_paths_for_field(source, index_path, selected_paths);
340        return Ok(FieldProjector::Identity);
341    }
342    match (source.data_type(), projected.data_type()) {
343        (DataType::Struct(source_children), DataType::Struct(projected_children)) => {
344            build_struct_projector(
345                path,
346                source_children,
347                projected_children,
348                index_path,
349                selected_paths,
350            )
351        }
352        (DataType::List(source_child), DataType::List(projected_child)) => {
353            build_list_like_projector(
354                path,
355                source_child,
356                projected_child,
357                source.data_type(),
358                index_path,
359                selected_paths,
360            )
361        }
362        (DataType::LargeList(source_child), DataType::LargeList(projected_child)) => {
363            build_list_like_projector(
364                path,
365                source_child,
366                projected_child,
367                source.data_type(),
368                index_path,
369                selected_paths,
370            )
371        }
372        (
373            DataType::FixedSizeList(source_child, source_len),
374            DataType::FixedSizeList(projected_child, projected_len),
375        ) => {
376            if source_len != projected_len {
377                return Err(DynViewError::Invalid {
378                    column: path.column,
379                    path: path.path.clone(),
380                    message: "fixed-size list length mismatch between source and projection"
381                        .to_string(),
382                });
383            }
384            build_list_like_projector(
385                path,
386                source_child,
387                projected_child,
388                source.data_type(),
389                index_path,
390                selected_paths,
391            )
392        }
393        (
394            DataType::Map(source_entry, keys_sorted),
395            DataType::Map(projected_entry, projected_sorted),
396        ) => {
397            if keys_sorted != projected_sorted {
398                return Err(DynViewError::Invalid {
399                    column: path.column,
400                    path: path.path.clone(),
401                    message: "map key ordering mismatch between source and projection".to_string(),
402                });
403            }
404            build_map_projector(
405                path,
406                source_entry,
407                projected_entry,
408                index_path,
409                selected_paths,
410            )
411        }
412        _ => Err(DynViewError::SchemaMismatch {
413            column: path.column,
414            field: path.path.clone(),
415            expected: source.data_type().clone(),
416            actual: projected.data_type().clone(),
417        }),
418    }
419}
420
421fn build_struct_projector(
422    path: &Path,
423    source_children: &Fields,
424    projected_children: &Fields,
425    index_path: &mut Vec<usize>,
426    selected_paths: &mut Vec<Vec<usize>>,
427) -> Result<FieldProjector, DynViewError> {
428    let mut children = Vec::with_capacity(projected_children.len());
429    for projected_child in projected_children.iter() {
430        let Some(source_index) = source_children
431            .iter()
432            .position(|f| f.name() == projected_child.name())
433        else {
434            return Err(DynViewError::Invalid {
435                column: path.column,
436                path: path.push_field(projected_child.name()).path,
437                message: "field not found in source schema".to_string(),
438            });
439        };
440        let child_path = path.push_field(projected_child.name());
441        index_path.push(source_index);
442        let child_projector = build_field_projector(
443            &child_path,
444            source_children[source_index].as_ref(),
445            projected_child.as_ref(),
446            index_path,
447            selected_paths,
448        )?;
449        index_path.pop();
450        children.push(StructChildProjection {
451            source_index,
452            projector: child_projector,
453        });
454    }
455    let is_identity = projected_children.len() == source_children.len()
456        && children
457            .iter()
458            .enumerate()
459            .all(|(idx, child)| child.source_index == idx && child.projector.is_identity());
460    if is_identity {
461        Ok(FieldProjector::Identity)
462    } else {
463        Ok(FieldProjector::Struct(Arc::new(StructProjection {
464            children: children.into(),
465        })))
466    }
467}
468
469fn build_list_like_projector(
470    path: &Path,
471    source_child: &FieldRef,
472    projected_child: &FieldRef,
473    parent_type: &DataType,
474    index_path: &mut Vec<usize>,
475    selected_paths: &mut Vec<Vec<usize>>,
476) -> Result<FieldProjector, DynViewError> {
477    let child_path = path.push_index(0);
478    index_path.push(0);
479    let child_projector = build_field_projector(
480        &child_path,
481        source_child.as_ref(),
482        projected_child.as_ref(),
483        index_path,
484        selected_paths,
485    )?;
486    index_path.pop();
487    if child_projector.is_identity() {
488        Ok(FieldProjector::Identity)
489    } else {
490        match parent_type {
491            DataType::List(_) => Ok(FieldProjector::List(Box::new(child_projector))),
492            DataType::LargeList(_) => Ok(FieldProjector::LargeList(Box::new(child_projector))),
493            DataType::FixedSizeList(_, _) => {
494                Ok(FieldProjector::FixedSizeList(Box::new(child_projector)))
495            }
496            _ => unreachable!("list-like projector invoked for non list type"),
497        }
498    }
499}
500
501fn build_map_projector(
502    path: &Path,
503    source_entry: &FieldRef,
504    projected_entry: &FieldRef,
505    index_path: &mut Vec<usize>,
506    selected_paths: &mut Vec<Vec<usize>>,
507) -> Result<FieldProjector, DynViewError> {
508    let DataType::Struct(source_children) = source_entry.data_type() else {
509        return Err(DynViewError::Invalid {
510            column: path.column,
511            path: path.path.clone(),
512            message: "map entry must be a struct field".to_string(),
513        });
514    };
515    let DataType::Struct(projected_children) = projected_entry.data_type() else {
516        return Err(DynViewError::Invalid {
517            column: path.column,
518            path: path.path.clone(),
519            message: "projected map entry must be a struct field".to_string(),
520        });
521    };
522    if projected_children.len() != 2 {
523        return Err(DynViewError::Invalid {
524            column: path.column,
525            path: path.path.clone(),
526            message: "map projection must contain exactly two fields (key then value)".to_string(),
527        });
528    }
529    let entry_path = path.push_index(0);
530    index_path.push(0);
531    let projector = build_struct_projector(
532        &entry_path,
533        source_children,
534        projected_children,
535        index_path,
536        selected_paths,
537    )?;
538    index_path.pop();
539    match projector {
540        FieldProjector::Struct(proj) => {
541            let children = proj.children.as_ref();
542            if children.len() != 2 {
543                return Err(DynViewError::Invalid {
544                    column: path.column,
545                    path: path.path.clone(),
546                    message: "map projection must preserve exactly two children".to_string(),
547                });
548            }
549            if children[0].source_index != 0 || children[1].source_index != 1 {
550                return Err(DynViewError::Invalid {
551                    column: path.column,
552                    path: path.path.clone(),
553                    message: "map projection must keep the key field before the value field"
554                        .to_string(),
555                });
556            }
557            Ok(FieldProjector::Map(proj))
558        }
559        FieldProjector::Identity => Ok(FieldProjector::Identity),
560        _ => Err(DynViewError::Invalid {
561            column: path.column,
562            path: path.path.clone(),
563            message: "unsupported map projection".to_string(),
564        }),
565    }
566}