1use std::sync::Arc;
2
3use arrow_array::RecordBatch;
4use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
5use parquet::arrow::{ArrowSchemaConverter, ProjectionMask as ParquetProjectionMask};
6
7use super::{
8 path::Path,
9 rows::{DynRowRaw, DynRowView},
10};
11use crate::{DynViewError, schema::DynSchema};
12
13#[derive(Debug, Clone)]
15pub struct DynProjection(Arc<DynProjectionData>);
16
17#[derive(Debug, Clone)]
18pub(super) enum FieldProjector {
19 Identity,
20 Struct(Arc<StructProjection>),
21 List(Box<FieldProjector>),
22 LargeList(Box<FieldProjector>),
23 FixedSizeList(Box<FieldProjector>),
24 Map(Arc<StructProjection>),
25}
26
27impl FieldProjector {
28 fn is_identity(&self) -> bool {
29 matches!(self, FieldProjector::Identity)
30 }
31}
32
33#[derive(Debug, Clone)]
34pub(super) struct StructProjection {
35 pub(super) children: Arc<[StructChildProjection]>,
36}
37
38#[derive(Debug, Clone)]
39pub(super) struct StructChildProjection {
40 pub(super) source_index: usize,
41 pub(super) projector: FieldProjector,
42}
43
44#[derive(Debug)]
45struct DynProjectionData {
46 source_width: usize,
47 mapping: Arc<[usize]>,
48 fields: Fields,
49 projectors: Arc<[FieldProjector]>,
50 parquet_mask: ParquetProjectionMask,
51}
52
53impl DynProjection {
54 fn new_internal(
55 schema: &Schema,
56 source_width: usize,
57 mapping: Vec<usize>,
58 fields: Fields,
59 projectors: Vec<FieldProjector>,
60 selected_paths: Vec<Vec<usize>>,
61 ) -> Result<Self, DynViewError> {
62 debug_assert_eq!(
63 mapping.len(),
64 projectors.len(),
65 "projection mapping and projector width mismatch"
66 );
67 let parquet_mask = build_parquet_mask(schema, selected_paths)?;
68 Ok(Self(Arc::new(DynProjectionData {
69 source_width,
70 mapping: Arc::from(mapping),
71 fields,
72 projectors: Arc::from(projectors),
73 parquet_mask,
74 })))
75 }
76
77 pub fn from_indices<I>(schema: &Schema, indices: I) -> Result<Self, DynViewError>
82 where
83 I: IntoIterator<Item = usize>,
84 {
85 let schema_fields = schema.fields();
86 let width = schema_fields.len();
87 let mut mapping = Vec::new();
88 let mut projected = Vec::new();
89 let mut projectors = Vec::new();
90 let mut selected_paths = Vec::new();
91 for idx in indices.into_iter() {
92 if idx >= width {
93 return Err(DynViewError::ColumnOutOfBounds { column: idx, width });
94 }
95 mapping.push(idx);
96 projected.push(schema_fields[idx].clone());
97 projectors.push(FieldProjector::Identity);
98 let mut index_path = vec![idx];
99 collect_all_leaf_paths_for_field(
100 schema_fields[idx].as_ref(),
101 &mut index_path,
102 &mut selected_paths,
103 );
104 }
105 Self::new_internal(
106 schema,
107 width,
108 mapping,
109 Fields::from(projected),
110 projectors,
111 selected_paths,
112 )
113 }
114
115 pub fn from_schema(source: &Schema, projection: &Schema) -> Result<Self, DynViewError> {
123 let source_fields = source.fields();
124 let width = source_fields.len();
125 let mut mapping = Vec::with_capacity(projection.fields().len());
126 let mut projected = Vec::with_capacity(projection.fields().len());
127 let mut projectors = Vec::with_capacity(projection.fields().len());
128 let mut selected_paths = Vec::new();
129 for (pos, field) in projection.fields().iter().enumerate() {
130 let source_idx = match source.index_of(field.name()) {
131 Ok(idx) => idx,
132 Err(_) => {
133 return Err(DynViewError::Invalid {
134 column: pos,
135 path: field.name().to_string(),
136 message: "field not found in source schema".to_string(),
137 });
138 }
139 };
140 let source_field = source_fields[source_idx].as_ref();
141 let path = Path::new(source_idx, field.name());
142 let mut index_path = vec![source_idx];
143 let projector = build_field_projector(
144 &path,
145 source_field,
146 field.as_ref(),
147 &mut index_path,
148 &mut selected_paths,
149 )?;
150 mapping.push(source_idx);
151 projected.push(field.clone());
152 projectors.push(projector);
153 }
154 Self::new_internal(
155 source,
156 width,
157 mapping,
158 Fields::from(projected),
159 projectors,
160 selected_paths,
161 )
162 }
163
164 pub(super) fn source_width(&self) -> usize {
166 self.0.source_width
167 }
168
169 pub(super) fn mapping_arc(&self) -> Arc<[usize]> {
170 Arc::clone(&self.0.mapping)
171 }
172
173 pub(super) fn projectors_arc(&self) -> Arc<[FieldProjector]> {
174 Arc::clone(&self.0.projectors)
175 }
176
177 pub fn fields(&self) -> &Fields {
179 &self.0.fields
180 }
181
182 pub fn len(&self) -> usize {
184 self.0.mapping.len()
185 }
186
187 pub fn is_empty(&self) -> bool {
189 self.len() == 0
190 }
191
192 pub fn to_parquet_mask(&self) -> ParquetProjectionMask {
194 self.0.parquet_mask.clone()
195 }
196
197 pub fn project_row_view<'a>(
203 &self,
204 schema: &'a DynSchema,
205 batch: &'a RecordBatch,
206 row: usize,
207 ) -> Result<DynRowView<'a>, DynViewError> {
208 let view = schema.view_at(batch, row)?;
209 view.project(self)
210 }
211
212 pub fn project_row_raw(
214 &self,
215 schema: &DynSchema,
216 batch: &RecordBatch,
217 row: usize,
218 ) -> Result<DynRowRaw, DynViewError> {
219 let view = self.project_row_view(schema, batch, row)?;
220 view.into_raw()
221 }
222}
223
224fn build_parquet_mask(
225 schema: &Schema,
226 mut selected_paths: Vec<Vec<usize>>,
227) -> Result<ParquetProjectionMask, DynViewError> {
228 let converter = ArrowSchemaConverter::new();
229 let descriptor = converter
230 .convert(schema)
231 .map_err(|err| DynViewError::Invalid {
232 column: 0,
233 path: "<projection>".to_string(),
234 message: format!("failed to convert schema to Parquet: {err}"),
235 })?;
236
237 if selected_paths.is_empty() {
238 return Ok(ParquetProjectionMask::none(descriptor.num_columns()));
239 }
240
241 selected_paths.sort();
242 selected_paths.dedup();
243
244 let mut leaf_paths = Vec::new();
245 collect_schema_leaf_paths(schema.fields(), &mut Vec::new(), &mut leaf_paths);
246 if selected_paths.len() == leaf_paths.len() {
247 return Ok(ParquetProjectionMask::all());
248 }
249 let leaf_indices = map_paths_to_leaf_indices(&selected_paths, &leaf_paths);
250 if leaf_indices.is_empty() {
251 return Ok(ParquetProjectionMask::none(descriptor.num_columns()));
252 }
253 Ok(ParquetProjectionMask::leaves(&descriptor, leaf_indices))
254}
255
256fn collect_all_leaf_paths_for_field(
257 field: &Field,
258 path: &mut Vec<usize>,
259 acc: &mut Vec<Vec<usize>>,
260) {
261 match field.data_type() {
262 DataType::Struct(children) => {
263 for (idx, child) in children.iter().enumerate() {
264 path.push(idx);
265 collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
266 path.pop();
267 }
268 }
269 DataType::List(child) | DataType::LargeList(child) => {
270 path.push(0);
271 collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
272 path.pop();
273 }
274 DataType::FixedSizeList(child, _) => {
275 path.push(0);
276 collect_all_leaf_paths_for_field(child.as_ref(), path, acc);
277 path.pop();
278 }
279 DataType::Map(entry, _) => {
280 path.push(0);
281 collect_all_leaf_paths_for_field(entry.as_ref(), path, acc);
282 path.pop();
283 }
284 _ => acc.push(path.clone()),
285 }
286}
287
288fn collect_schema_leaf_paths(
289 fields: &Fields,
290 prefix: &mut Vec<usize>,
291 leaves: &mut Vec<Vec<usize>>,
292) {
293 for (idx, field) in fields.iter().enumerate() {
294 prefix.push(idx);
295 collect_all_leaf_paths_for_field(field.as_ref(), prefix, leaves);
296 prefix.pop();
297 }
298}
299
300fn map_paths_to_leaf_indices(
301 selected_paths: &[Vec<usize>],
302 leaf_paths: &[Vec<usize>],
303) -> Vec<usize> {
304 let mut indices = Vec::new();
305 'outer: for (idx, leaf_path) in leaf_paths.iter().enumerate() {
306 for selected in selected_paths {
307 if is_prefix(selected, leaf_path) {
308 indices.push(idx);
309 continue 'outer;
310 }
311 }
312 }
313 indices
314}
315
316fn is_prefix(prefix: &[usize], whole: &[usize]) -> bool {
317 if prefix.len() > whole.len() {
318 return false;
319 }
320 prefix.iter().zip(whole.iter()).all(|(lhs, rhs)| lhs == rhs)
321}
322
323fn build_field_projector(
325 path: &Path,
326 source: &Field,
327 projected: &Field,
328 index_path: &mut Vec<usize>,
329 selected_paths: &mut Vec<Vec<usize>>,
330) -> Result<FieldProjector, DynViewError> {
331 if source.is_nullable() != projected.is_nullable() {
332 return Err(DynViewError::Invalid {
333 column: path.column,
334 path: path.path.clone(),
335 message: "nullability mismatch between source and projection".to_string(),
336 });
337 }
338 if source.data_type() == projected.data_type() {
339 collect_all_leaf_paths_for_field(source, index_path, selected_paths);
340 return Ok(FieldProjector::Identity);
341 }
342 match (source.data_type(), projected.data_type()) {
343 (DataType::Struct(source_children), DataType::Struct(projected_children)) => {
344 build_struct_projector(
345 path,
346 source_children,
347 projected_children,
348 index_path,
349 selected_paths,
350 )
351 }
352 (DataType::List(source_child), DataType::List(projected_child)) => {
353 build_list_like_projector(
354 path,
355 source_child,
356 projected_child,
357 source.data_type(),
358 index_path,
359 selected_paths,
360 )
361 }
362 (DataType::LargeList(source_child), DataType::LargeList(projected_child)) => {
363 build_list_like_projector(
364 path,
365 source_child,
366 projected_child,
367 source.data_type(),
368 index_path,
369 selected_paths,
370 )
371 }
372 (
373 DataType::FixedSizeList(source_child, source_len),
374 DataType::FixedSizeList(projected_child, projected_len),
375 ) => {
376 if source_len != projected_len {
377 return Err(DynViewError::Invalid {
378 column: path.column,
379 path: path.path.clone(),
380 message: "fixed-size list length mismatch between source and projection"
381 .to_string(),
382 });
383 }
384 build_list_like_projector(
385 path,
386 source_child,
387 projected_child,
388 source.data_type(),
389 index_path,
390 selected_paths,
391 )
392 }
393 (
394 DataType::Map(source_entry, keys_sorted),
395 DataType::Map(projected_entry, projected_sorted),
396 ) => {
397 if keys_sorted != projected_sorted {
398 return Err(DynViewError::Invalid {
399 column: path.column,
400 path: path.path.clone(),
401 message: "map key ordering mismatch between source and projection".to_string(),
402 });
403 }
404 build_map_projector(
405 path,
406 source_entry,
407 projected_entry,
408 index_path,
409 selected_paths,
410 )
411 }
412 _ => Err(DynViewError::SchemaMismatch {
413 column: path.column,
414 field: path.path.clone(),
415 expected: source.data_type().clone(),
416 actual: projected.data_type().clone(),
417 }),
418 }
419}
420
421fn build_struct_projector(
422 path: &Path,
423 source_children: &Fields,
424 projected_children: &Fields,
425 index_path: &mut Vec<usize>,
426 selected_paths: &mut Vec<Vec<usize>>,
427) -> Result<FieldProjector, DynViewError> {
428 let mut children = Vec::with_capacity(projected_children.len());
429 for projected_child in projected_children.iter() {
430 let Some(source_index) = source_children
431 .iter()
432 .position(|f| f.name() == projected_child.name())
433 else {
434 return Err(DynViewError::Invalid {
435 column: path.column,
436 path: path.push_field(projected_child.name()).path,
437 message: "field not found in source schema".to_string(),
438 });
439 };
440 let child_path = path.push_field(projected_child.name());
441 index_path.push(source_index);
442 let child_projector = build_field_projector(
443 &child_path,
444 source_children[source_index].as_ref(),
445 projected_child.as_ref(),
446 index_path,
447 selected_paths,
448 )?;
449 index_path.pop();
450 children.push(StructChildProjection {
451 source_index,
452 projector: child_projector,
453 });
454 }
455 let is_identity = projected_children.len() == source_children.len()
456 && children
457 .iter()
458 .enumerate()
459 .all(|(idx, child)| child.source_index == idx && child.projector.is_identity());
460 if is_identity {
461 Ok(FieldProjector::Identity)
462 } else {
463 Ok(FieldProjector::Struct(Arc::new(StructProjection {
464 children: children.into(),
465 })))
466 }
467}
468
469fn build_list_like_projector(
470 path: &Path,
471 source_child: &FieldRef,
472 projected_child: &FieldRef,
473 parent_type: &DataType,
474 index_path: &mut Vec<usize>,
475 selected_paths: &mut Vec<Vec<usize>>,
476) -> Result<FieldProjector, DynViewError> {
477 let child_path = path.push_index(0);
478 index_path.push(0);
479 let child_projector = build_field_projector(
480 &child_path,
481 source_child.as_ref(),
482 projected_child.as_ref(),
483 index_path,
484 selected_paths,
485 )?;
486 index_path.pop();
487 if child_projector.is_identity() {
488 Ok(FieldProjector::Identity)
489 } else {
490 match parent_type {
491 DataType::List(_) => Ok(FieldProjector::List(Box::new(child_projector))),
492 DataType::LargeList(_) => Ok(FieldProjector::LargeList(Box::new(child_projector))),
493 DataType::FixedSizeList(_, _) => {
494 Ok(FieldProjector::FixedSizeList(Box::new(child_projector)))
495 }
496 _ => unreachable!("list-like projector invoked for non list type"),
497 }
498 }
499}
500
501fn build_map_projector(
502 path: &Path,
503 source_entry: &FieldRef,
504 projected_entry: &FieldRef,
505 index_path: &mut Vec<usize>,
506 selected_paths: &mut Vec<Vec<usize>>,
507) -> Result<FieldProjector, DynViewError> {
508 let DataType::Struct(source_children) = source_entry.data_type() else {
509 return Err(DynViewError::Invalid {
510 column: path.column,
511 path: path.path.clone(),
512 message: "map entry must be a struct field".to_string(),
513 });
514 };
515 let DataType::Struct(projected_children) = projected_entry.data_type() else {
516 return Err(DynViewError::Invalid {
517 column: path.column,
518 path: path.path.clone(),
519 message: "projected map entry must be a struct field".to_string(),
520 });
521 };
522 if projected_children.len() != 2 {
523 return Err(DynViewError::Invalid {
524 column: path.column,
525 path: path.path.clone(),
526 message: "map projection must contain exactly two fields (key then value)".to_string(),
527 });
528 }
529 let entry_path = path.push_index(0);
530 index_path.push(0);
531 let projector = build_struct_projector(
532 &entry_path,
533 source_children,
534 projected_children,
535 index_path,
536 selected_paths,
537 )?;
538 index_path.pop();
539 match projector {
540 FieldProjector::Struct(proj) => {
541 let children = proj.children.as_ref();
542 if children.len() != 2 {
543 return Err(DynViewError::Invalid {
544 column: path.column,
545 path: path.path.clone(),
546 message: "map projection must preserve exactly two children".to_string(),
547 });
548 }
549 if children[0].source_index != 0 || children[1].source_index != 1 {
550 return Err(DynViewError::Invalid {
551 column: path.column,
552 path: path.path.clone(),
553 message: "map projection must keep the key field before the value field"
554 .to_string(),
555 });
556 }
557 Ok(FieldProjector::Map(proj))
558 }
559 FieldProjector::Identity => Ok(FieldProjector::Identity),
560 _ => Err(DynViewError::Invalid {
561 column: path.column,
562 path: path.path.clone(),
563 message: "unsupported map projection".to_string(),
564 }),
565 }
566}