typed_arrow_dyn/view/
rows.rs

1use std::sync::Arc;
2
3use arrow_array::RecordBatch;
4use arrow_schema::{DataType, Field, Fields, Schema};
5
6use super::{
7    cell::{DynCellRaw, DynCellRef, view_cell_with_projector},
8    path::Path,
9    projection::{DynProjection, FieldProjector},
10};
11use crate::{DynViewError, cell::DynCell, rows::DynRow, schema::DynSchema};
12
13/// Iterator over borrowed dynamic rows.
14#[derive(Debug)]
15pub struct DynRowViews<'a> {
16    batch: &'a RecordBatch,
17    fields: Fields,
18    mapping: Option<Arc<[usize]>>,
19    projectors: Option<Arc<[FieldProjector]>>,
20    row: usize,
21    len: usize,
22}
23
24impl<'a> DynRowViews<'a> {
25    /// Create a dynamic view iterator from a record batch after validating schema compatibility.
26    pub fn new(batch: &'a RecordBatch, schema: &'a Schema) -> Result<Self, DynViewError> {
27        validate_schema_matches(batch, schema)?;
28        Ok(Self {
29            batch,
30            fields: schema.fields().clone(),
31            mapping: None,
32            projectors: None,
33            row: 0,
34            len: batch.num_rows(),
35        })
36    }
37
38    /// Borrow the underlying schema fields.
39    #[inline]
40    pub fn fields(&self) -> &Fields {
41        &self.fields
42    }
43
44    /// Apply a top-level projection to this iterator, yielding views that expose only the mapped
45    /// columns.
46    ///
47    /// The projection is lazy: rows are fetched on demand from the underlying iterator, and only
48    /// the referenced columns are materialized.
49    ///
50    /// # Errors
51    /// Returns `DynViewError::Invalid` if the projection was derived from a schema with a different
52    /// width than this iterator.
53    pub fn project(self, projection: DynProjection) -> Result<Self, DynViewError> {
54        let DynRowViews {
55            batch,
56            fields,
57            mapping,
58            projectors,
59            row,
60            len,
61        } = self;
62
63        let base_view = DynRowView {
64            batch,
65            fields,
66            mapping,
67            projectors,
68            row,
69        };
70
71        let projected_view = base_view.project(&projection)?;
72        let DynRowView {
73            batch,
74            fields,
75            mapping,
76            projectors,
77            row,
78        } = projected_view;
79
80        Ok(Self {
81            batch,
82            fields,
83            mapping,
84            projectors,
85            row,
86            len,
87        })
88    }
89}
90
91impl<'a> Iterator for DynRowViews<'a> {
92    type Item = Result<DynRowView<'a>, DynViewError>;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        if self.row >= self.len {
96            return None;
97        }
98        let view = DynRowView {
99            batch: self.batch,
100            fields: self.fields.clone(),
101            mapping: self.mapping.clone(),
102            projectors: self.projectors.clone(),
103            row: self.row,
104        };
105        self.row += 1;
106        Some(Ok(view))
107    }
108
109    fn size_hint(&self) -> (usize, Option<usize>) {
110        let remaining = self.len.saturating_sub(self.row);
111        (remaining, Some(remaining))
112    }
113}
114
115impl<'a> ExactSizeIterator for DynRowViews<'a> {}
116
117/// Borrowed dynamic row backed by an `arrow_array::RecordBatch`.
118#[derive(Debug)]
119pub struct DynRowView<'a> {
120    batch: &'a RecordBatch,
121    fields: Fields,
122    mapping: Option<Arc<[usize]>>,
123    projectors: Option<Arc<[FieldProjector]>>,
124    row: usize,
125}
126
127impl<'a> DynRowView<'a> {
128    #[cfg(test)]
129    pub(super) fn new_for_testing(
130        batch: &'a RecordBatch,
131        fields: Fields,
132        mapping: Option<Arc<[usize]>>,
133        projectors: Option<Arc<[FieldProjector]>>,
134        row: usize,
135    ) -> Self {
136        Self {
137            batch,
138            fields,
139            mapping,
140            projectors,
141            row,
142        }
143    }
144
145    /// Number of columns in this row.
146    #[inline]
147    pub fn len(&self) -> usize {
148        self.fields.len()
149    }
150
151    /// Returns true when the row has zero columns.
152    #[inline]
153    pub fn is_empty(&self) -> bool {
154        self.len() == 0
155    }
156
157    /// Borrow the schema fields.
158    #[inline]
159    pub fn fields(&self) -> &Fields {
160        &self.fields
161    }
162
163    /// Retrieve the cell at `column` as a borrowed [`DynCellRef`].
164    pub fn get(&self, column: usize) -> Result<Option<DynCellRef<'_>>, DynViewError> {
165        if self.row >= self.batch.num_rows() {
166            return Err(DynViewError::RowOutOfBounds {
167                row: self.row,
168                len: self.batch.num_rows(),
169            });
170        }
171        let width = self.fields.len();
172        let field = self
173            .fields
174            .get(column)
175            .ok_or(DynViewError::ColumnOutOfBounds { column, width })?;
176        let source_index = match &self.mapping {
177            Some(mapping) => mapping[column],
178            None => column,
179        };
180        if source_index >= self.batch.num_columns() {
181            return Err(DynViewError::Invalid {
182                column,
183                path: self
184                    .fields
185                    .get(column)
186                    .map(|f| f.name().to_string())
187                    .unwrap_or_else(|| "<unknown>".to_string()),
188                message: format!(
189                    "projection index {source_index} exceeds batch width {}",
190                    self.batch.num_columns()
191                ),
192            });
193        }
194        let array = self.batch.column(source_index);
195        let path = Path::new(column, field.name());
196        let projector = match self.projectors.as_ref() {
197            Some(projectors) => {
198                Some(
199                    projectors
200                        .get(column)
201                        .ok_or_else(|| DynViewError::Invalid {
202                            column,
203                            path: field.name().to_string(),
204                            message: "projection width mismatch".to_string(),
205                        })?,
206                )
207            }
208            None => None,
209        };
210        view_cell_with_projector(&path, field.as_ref(), projector, array.as_ref(), self.row)
211    }
212
213    /// Retrieve a column by name, returning `None` if the field does not exist.
214    pub fn get_by_name(&self, name: &str) -> Option<Result<Option<DynCellRef<'_>>, DynViewError>> {
215        self.fields
216            .iter()
217            .position(|f| f.name() == name)
218            .map(move |idx| self.get(idx))
219    }
220
221    /// Clone this row into an owned [`DynRow`], allocating owned dynamic cells for each column.
222    pub fn to_owned(&self) -> Result<DynRow, DynViewError> {
223        let width = self.len();
224        let mut cells = Vec::with_capacity(width);
225        for idx in 0..width {
226            let value = self.get(idx)?;
227            let owned = match value {
228                None => None,
229                Some(cell) => Some(cell.into_owned()?),
230            };
231            cells.push(owned);
232        }
233        Ok(DynRow(cells))
234    }
235
236    /// Consume this row view and capture its values as lifetime-erased [`DynCellRaw`] entries.
237    pub fn into_raw(self) -> Result<DynRowRaw, DynViewError> {
238        let fields = self.fields.clone();
239        let mut cells = Vec::with_capacity(fields.len());
240        for idx in 0..fields.len() {
241            let value = self.get(idx)?;
242            cells.push(value.map(DynCellRef::into_raw));
243        }
244        Ok(DynRowRaw { fields, cells })
245    }
246
247    /// Apply a projection to this view, yielding a new view that references only the mapped
248    /// columns.
249    ///
250    /// The projection is lazy and reuses the underlying batch buffers.
251    ///
252    /// # Errors
253    /// Returns `DynViewError::Invalid` if the projection was derived from a schema whose width
254    /// differs from the underlying batch.
255    pub fn project(self, projection: &DynProjection) -> Result<DynRowView<'a>, DynViewError> {
256        if projection.source_width() != self.batch.num_columns() {
257            return Err(DynViewError::Invalid {
258                column: 0,
259                path: "<projection>".to_string(),
260                message: format!(
261                    "projection source width {} does not match batch width {}",
262                    projection.source_width(),
263                    self.batch.num_columns()
264                ),
265            });
266        }
267        Ok(DynRowView {
268            batch: self.batch,
269            fields: projection.fields().clone(),
270            mapping: Some(projection.mapping_arc()),
271            projectors: Some(projection.projectors_arc()),
272            row: self.row,
273        })
274    }
275
276    /// Access the underlying row index.
277    pub fn row_index(&self) -> usize {
278        self.row
279    }
280}
281
282/// Lifetime-erased dynamic row produced by [`DynRowView::into_raw`].
283#[derive(Clone, Debug)]
284pub struct DynRowRaw {
285    fields: Fields,
286    cells: Vec<Option<DynCellRaw>>,
287}
288
289// Safety: this type is a lightweight handle over raw cells and schema metadata. The same lifetime
290// caveats as `DynCellRaw` apply: callers must ensure the backing Arrow data outlives any moved
291// `DynRowRaw` instances.
292unsafe impl Send for DynRowRaw {}
293unsafe impl Sync for DynRowRaw {}
294
295fn validate_row_width(fields: &Fields, cells_len: usize) -> Result<(), DynViewError> {
296    if fields.len() != cells_len {
297        let column = fields.len().min(cells_len);
298        return Err(DynViewError::Invalid {
299            column,
300            path: "<row>".to_string(),
301            message: format!(
302                "field count {} does not match cell count {}",
303                fields.len(),
304                cells_len
305            ),
306        });
307    }
308    Ok(())
309}
310
311fn validate_field_shape(
312    column: usize,
313    field_name: &str,
314    expected_type: &DataType,
315    expected_nullable: bool,
316    actual: &Field,
317) -> Result<(), DynViewError> {
318    if actual.data_type() != expected_type {
319        return Err(DynViewError::SchemaMismatch {
320            column,
321            field: field_name.to_string(),
322            expected: expected_type.clone(),
323            actual: actual.data_type().clone(),
324        });
325    }
326    if actual.is_nullable() != expected_nullable {
327        return Err(DynViewError::Invalid {
328            column,
329            path: field_name.to_string(),
330            message: format!(
331                "nullability mismatch: expected {}, got {}",
332                expected_nullable,
333                actual.is_nullable()
334            ),
335        });
336    }
337    Ok(())
338}
339
340impl DynRowRaw {
341    /// Construct a raw row from explicit schema fields and raw cells.
342    ///
343    /// # Errors
344    /// Returns [`DynViewError::Invalid`] when the number of cells does not match
345    /// the number of fields in the provided schema slice.
346    pub fn try_new(fields: Fields, cells: Vec<Option<DynCellRaw>>) -> Result<Self, DynViewError> {
347        validate_row_width(&fields, cells.len())?;
348        Ok(Self { fields, cells })
349    }
350
351    /// Construct a raw row from non-null cells.
352    ///
353    /// # Errors
354    /// Returns [`DynViewError::Invalid`] when the number of cells does not match the schema.
355    pub fn from_cells(fields: Fields, cells: Vec<DynCellRaw>) -> Result<Self, DynViewError> {
356        let wrapped = cells.into_iter().map(Some).collect();
357        Self::try_new(fields, wrapped)
358    }
359
360    /// Number of columns carried by this raw row.
361    #[inline]
362    pub fn len(&self) -> usize {
363        self.cells.len()
364    }
365
366    /// Returns true when the row has zero columns.
367    #[inline]
368    pub fn is_empty(&self) -> bool {
369        self.cells.is_empty()
370    }
371
372    /// Borrow the schema fields associated with this row.
373    #[inline]
374    pub fn fields(&self) -> &Fields {
375        &self.fields
376    }
377
378    /// Borrow the raw cell payloads.
379    #[inline]
380    pub fn cells(&self) -> &[Option<DynCellRaw>] {
381        &self.cells
382    }
383
384    /// Consume the raw row, yielding the underlying raw cells.
385    #[inline]
386    pub fn into_cells(self) -> Vec<Option<DynCellRaw>> {
387        self.cells
388    }
389
390    /// Convert this raw row into an owned [`DynRow`], cloning nested data as needed.
391    pub fn into_owned(self) -> Result<DynRow, DynViewError> {
392        let mut cells = Vec::with_capacity(self.cells.len());
393        for cell in self.cells {
394            let owned = match cell {
395                None => None,
396                Some(raw) => Some(raw.into_owned()?),
397            };
398            cells.push(owned);
399        }
400        Ok(DynRow(cells))
401    }
402
403    /// Clone this raw row into an owned [`DynRow`] without consuming the raw payloads.
404    pub fn to_owned(&self) -> Result<DynRow, DynViewError> {
405        self.clone().into_owned()
406    }
407}
408
409/// Owned dynamic row that retains schema metadata alongside owned cell payloads.
410#[derive(Clone, Debug)]
411pub struct DynRowOwned {
412    fields: Fields,
413    cells: Vec<Option<DynCell>>,
414}
415
416impl DynRowOwned {
417    /// Construct an owned row from explicit schema fields and owned cells.
418    ///
419    /// # Errors
420    /// Returns [`DynViewError::Invalid`] when the number of cells does not match the schema.
421    pub fn try_new(fields: Fields, cells: Vec<Option<DynCell>>) -> Result<Self, DynViewError> {
422        validate_row_width(&fields, cells.len())?;
423        Ok(Self { fields, cells })
424    }
425
426    /// Construct an owned row from a [`DynRow`].
427    pub fn from_dyn_row(fields: Fields, row: DynRow) -> Result<Self, DynViewError> {
428        Self::try_new(fields, row.0)
429    }
430
431    /// Clone the lifetime-erased raw row into an owned representation.
432    pub fn from_raw(raw: &DynRowRaw) -> Result<Self, DynViewError> {
433        let owned = raw.to_owned()?;
434        Self::from_dyn_row(raw.fields().clone(), owned)
435    }
436
437    /// Borrow the schema fields associated with this row.
438    #[inline]
439    pub fn fields(&self) -> &Fields {
440        &self.fields
441    }
442
443    /// Borrow the owned cell payloads.
444    #[inline]
445    pub fn cells(&self) -> &[Option<DynCell>] {
446        &self.cells
447    }
448
449    /// Number of columns carried by this row.
450    #[inline]
451    pub fn len(&self) -> usize {
452        self.cells.len()
453    }
454
455    /// Returns true when the row has zero columns.
456    #[inline]
457    pub fn is_empty(&self) -> bool {
458        self.cells.is_empty()
459    }
460
461    /// Borrow this owned row as a lifetime-erased raw row referencing the owned buffers.
462    pub fn as_raw(&self) -> Result<DynRowRaw, DynViewError> {
463        let mut raw_cells = Vec::with_capacity(self.cells.len());
464        for (idx, cell) in self.cells.iter().enumerate() {
465            match cell {
466                None => raw_cells.push(None),
467                Some(value) => {
468                    let raw =
469                        owned_cell_to_raw(value).map_err(|message| DynViewError::Invalid {
470                            column: idx,
471                            path: self
472                                .fields
473                                .get(idx)
474                                .map(|f| f.name().to_string())
475                                .unwrap_or_else(|| format!("col{idx}")),
476                            message,
477                        })?;
478                    raw_cells.push(Some(raw));
479                }
480            }
481        }
482        DynRowRaw::try_new(self.fields.clone(), raw_cells)
483    }
484
485    /// Consume this owned row, yielding the underlying dynamic row cells.
486    pub fn into_dyn_row(self) -> DynRow {
487        DynRow(self.cells)
488    }
489
490    /// Clone this owned row into a [`DynRow`].
491    pub fn to_dyn_row(&self) -> DynRow {
492        DynRow(self.cells.clone())
493    }
494
495    /// Decompose the owned row into its schema fields and owned cells.
496    pub fn into_parts(self) -> (Fields, Vec<Option<DynCell>>) {
497        (self.fields, self.cells)
498    }
499}
500
501fn owned_cell_to_raw(cell: &DynCell) -> Result<DynCellRaw, String> {
502    use DynCell::*;
503    match cell {
504        Null => Ok(DynCellRaw::Null),
505        Bool(v) => Ok(DynCellRaw::Bool(*v)),
506        I8(v) => Ok(DynCellRaw::I8(*v)),
507        I16(v) => Ok(DynCellRaw::I16(*v)),
508        I32(v) => Ok(DynCellRaw::I32(*v)),
509        I64(v) => Ok(DynCellRaw::I64(*v)),
510        U8(v) => Ok(DynCellRaw::U8(*v)),
511        U16(v) => Ok(DynCellRaw::U16(*v)),
512        U32(v) => Ok(DynCellRaw::U32(*v)),
513        U64(v) => Ok(DynCellRaw::U64(*v)),
514        F32(v) => Ok(DynCellRaw::F32(*v)),
515        F64(v) => Ok(DynCellRaw::F64(*v)),
516        Str(value) => Ok(DynCellRaw::from_str(value)),
517        Bin(value) => Ok(DynCellRaw::from_bin(value)),
518        Struct(_) => Err("struct key component not supported".to_string()),
519        List(_) => Err("list key component not supported".to_string()),
520        FixedSizeList(_) => Err("fixed-size list key component not supported".to_string()),
521        Map(_) => Err("map key component not supported".to_string()),
522        Union { .. } => Err("union key component not supported".to_string()),
523    }
524}
525
526fn validate_schema_matches(batch: &RecordBatch, schema: &Schema) -> Result<(), DynViewError> {
527    let batch_schema = batch.schema();
528    let batch_fields = batch_schema.fields();
529    let expected = schema.fields();
530    if batch_fields.len() != expected.len() {
531        return Err(DynViewError::Invalid {
532            column: expected.len().min(batch_fields.len()),
533            path: "<schema>".to_string(),
534            message: format!(
535                "column count mismatch: schema has {}, batch has {}",
536                expected.len(),
537                batch_fields.len()
538            ),
539        });
540    }
541
542    for (idx, (expected_field, actual_field)) in
543        expected.iter().zip(batch_fields.iter()).enumerate()
544    {
545        if expected_field.name() != actual_field.name() {
546            return Err(DynViewError::Invalid {
547                column: idx,
548                path: expected_field.name().to_string(),
549                message: format!(
550                    "field name mismatch: expected '{}', got '{}'",
551                    expected_field.name(),
552                    actual_field.name()
553                ),
554            });
555        }
556        validate_field_shape(
557            idx,
558            expected_field.name(),
559            expected_field.data_type(),
560            expected_field.is_nullable(),
561            actual_field.as_ref(),
562        )?;
563    }
564
565    Ok(())
566}
567
568/// Create dynamic views for a batch using the provided schema reference.
569pub fn iter_batch_views<'a>(
570    schema: &'a DynSchema,
571    batch: &'a RecordBatch,
572) -> Result<DynRowViews<'a>, DynViewError> {
573    DynRowViews::new(batch, schema.schema.as_ref())
574}
575
576/// Borrow a single row from `batch` as a dynamic view after schema validation.
577pub fn view_batch_row<'a>(
578    schema: &'a DynSchema,
579    batch: &'a RecordBatch,
580    row: usize,
581) -> Result<DynRowView<'a>, DynViewError> {
582    validate_schema_matches(batch, schema.schema.as_ref())?;
583    let len = batch.num_rows();
584    if row >= len {
585        return Err(DynViewError::RowOutOfBounds { row, len });
586    }
587    Ok(DynRowView {
588        batch,
589        fields: schema.schema.fields().clone(),
590        mapping: None,
591        projectors: None,
592        row,
593    })
594}