1use std::sync::Arc;
2
3use arrow_array::RecordBatch;
4use arrow_schema::{DataType, Field, Fields, Schema};
5
6use super::{
7 cell::{DynCellRaw, DynCellRef, view_cell_with_projector},
8 path::Path,
9 projection::{DynProjection, FieldProjector},
10};
11use crate::{DynViewError, cell::DynCell, rows::DynRow, schema::DynSchema};
12
13#[derive(Debug)]
15pub struct DynRowViews<'a> {
16 batch: &'a RecordBatch,
17 fields: Fields,
18 mapping: Option<Arc<[usize]>>,
19 projectors: Option<Arc<[FieldProjector]>>,
20 row: usize,
21 len: usize,
22}
23
24impl<'a> DynRowViews<'a> {
25 pub fn new(batch: &'a RecordBatch, schema: &'a Schema) -> Result<Self, DynViewError> {
27 validate_schema_matches(batch, schema)?;
28 Ok(Self {
29 batch,
30 fields: schema.fields().clone(),
31 mapping: None,
32 projectors: None,
33 row: 0,
34 len: batch.num_rows(),
35 })
36 }
37
38 #[inline]
40 pub fn fields(&self) -> &Fields {
41 &self.fields
42 }
43
44 pub fn project(self, projection: DynProjection) -> Result<Self, DynViewError> {
54 let DynRowViews {
55 batch,
56 fields,
57 mapping,
58 projectors,
59 row,
60 len,
61 } = self;
62
63 let base_view = DynRowView {
64 batch,
65 fields,
66 mapping,
67 projectors,
68 row,
69 };
70
71 let projected_view = base_view.project(&projection)?;
72 let DynRowView {
73 batch,
74 fields,
75 mapping,
76 projectors,
77 row,
78 } = projected_view;
79
80 Ok(Self {
81 batch,
82 fields,
83 mapping,
84 projectors,
85 row,
86 len,
87 })
88 }
89}
90
91impl<'a> Iterator for DynRowViews<'a> {
92 type Item = Result<DynRowView<'a>, DynViewError>;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 if self.row >= self.len {
96 return None;
97 }
98 let view = DynRowView {
99 batch: self.batch,
100 fields: self.fields.clone(),
101 mapping: self.mapping.clone(),
102 projectors: self.projectors.clone(),
103 row: self.row,
104 };
105 self.row += 1;
106 Some(Ok(view))
107 }
108
109 fn size_hint(&self) -> (usize, Option<usize>) {
110 let remaining = self.len.saturating_sub(self.row);
111 (remaining, Some(remaining))
112 }
113}
114
115impl<'a> ExactSizeIterator for DynRowViews<'a> {}
116
117#[derive(Debug)]
119pub struct DynRowView<'a> {
120 batch: &'a RecordBatch,
121 fields: Fields,
122 mapping: Option<Arc<[usize]>>,
123 projectors: Option<Arc<[FieldProjector]>>,
124 row: usize,
125}
126
127impl<'a> DynRowView<'a> {
128 #[cfg(test)]
129 pub(super) fn new_for_testing(
130 batch: &'a RecordBatch,
131 fields: Fields,
132 mapping: Option<Arc<[usize]>>,
133 projectors: Option<Arc<[FieldProjector]>>,
134 row: usize,
135 ) -> Self {
136 Self {
137 batch,
138 fields,
139 mapping,
140 projectors,
141 row,
142 }
143 }
144
145 #[inline]
147 pub fn len(&self) -> usize {
148 self.fields.len()
149 }
150
151 #[inline]
153 pub fn is_empty(&self) -> bool {
154 self.len() == 0
155 }
156
157 #[inline]
159 pub fn fields(&self) -> &Fields {
160 &self.fields
161 }
162
163 pub fn get(&self, column: usize) -> Result<Option<DynCellRef<'_>>, DynViewError> {
165 if self.row >= self.batch.num_rows() {
166 return Err(DynViewError::RowOutOfBounds {
167 row: self.row,
168 len: self.batch.num_rows(),
169 });
170 }
171 let width = self.fields.len();
172 let field = self
173 .fields
174 .get(column)
175 .ok_or(DynViewError::ColumnOutOfBounds { column, width })?;
176 let source_index = match &self.mapping {
177 Some(mapping) => mapping[column],
178 None => column,
179 };
180 if source_index >= self.batch.num_columns() {
181 return Err(DynViewError::Invalid {
182 column,
183 path: self
184 .fields
185 .get(column)
186 .map(|f| f.name().to_string())
187 .unwrap_or_else(|| "<unknown>".to_string()),
188 message: format!(
189 "projection index {source_index} exceeds batch width {}",
190 self.batch.num_columns()
191 ),
192 });
193 }
194 let array = self.batch.column(source_index);
195 let path = Path::new(column, field.name());
196 let projector = match self.projectors.as_ref() {
197 Some(projectors) => {
198 Some(
199 projectors
200 .get(column)
201 .ok_or_else(|| DynViewError::Invalid {
202 column,
203 path: field.name().to_string(),
204 message: "projection width mismatch".to_string(),
205 })?,
206 )
207 }
208 None => None,
209 };
210 view_cell_with_projector(&path, field.as_ref(), projector, array.as_ref(), self.row)
211 }
212
213 pub fn get_by_name(&self, name: &str) -> Option<Result<Option<DynCellRef<'_>>, DynViewError>> {
215 self.fields
216 .iter()
217 .position(|f| f.name() == name)
218 .map(move |idx| self.get(idx))
219 }
220
221 pub fn to_owned(&self) -> Result<DynRow, DynViewError> {
223 let width = self.len();
224 let mut cells = Vec::with_capacity(width);
225 for idx in 0..width {
226 let value = self.get(idx)?;
227 let owned = match value {
228 None => None,
229 Some(cell) => Some(cell.into_owned()?),
230 };
231 cells.push(owned);
232 }
233 Ok(DynRow(cells))
234 }
235
236 pub fn into_raw(self) -> Result<DynRowRaw, DynViewError> {
238 let fields = self.fields.clone();
239 let mut cells = Vec::with_capacity(fields.len());
240 for idx in 0..fields.len() {
241 let value = self.get(idx)?;
242 cells.push(value.map(DynCellRef::into_raw));
243 }
244 Ok(DynRowRaw { fields, cells })
245 }
246
247 pub fn project(self, projection: &DynProjection) -> Result<DynRowView<'a>, DynViewError> {
256 if projection.source_width() != self.batch.num_columns() {
257 return Err(DynViewError::Invalid {
258 column: 0,
259 path: "<projection>".to_string(),
260 message: format!(
261 "projection source width {} does not match batch width {}",
262 projection.source_width(),
263 self.batch.num_columns()
264 ),
265 });
266 }
267 Ok(DynRowView {
268 batch: self.batch,
269 fields: projection.fields().clone(),
270 mapping: Some(projection.mapping_arc()),
271 projectors: Some(projection.projectors_arc()),
272 row: self.row,
273 })
274 }
275
276 pub fn row_index(&self) -> usize {
278 self.row
279 }
280}
281
282#[derive(Clone, Debug)]
284pub struct DynRowRaw {
285 fields: Fields,
286 cells: Vec<Option<DynCellRaw>>,
287}
288
289unsafe impl Send for DynRowRaw {}
293unsafe impl Sync for DynRowRaw {}
294
295fn validate_row_width(fields: &Fields, cells_len: usize) -> Result<(), DynViewError> {
296 if fields.len() != cells_len {
297 let column = fields.len().min(cells_len);
298 return Err(DynViewError::Invalid {
299 column,
300 path: "<row>".to_string(),
301 message: format!(
302 "field count {} does not match cell count {}",
303 fields.len(),
304 cells_len
305 ),
306 });
307 }
308 Ok(())
309}
310
311fn validate_field_shape(
312 column: usize,
313 field_name: &str,
314 expected_type: &DataType,
315 expected_nullable: bool,
316 actual: &Field,
317) -> Result<(), DynViewError> {
318 if actual.data_type() != expected_type {
319 return Err(DynViewError::SchemaMismatch {
320 column,
321 field: field_name.to_string(),
322 expected: expected_type.clone(),
323 actual: actual.data_type().clone(),
324 });
325 }
326 if actual.is_nullable() != expected_nullable {
327 return Err(DynViewError::Invalid {
328 column,
329 path: field_name.to_string(),
330 message: format!(
331 "nullability mismatch: expected {}, got {}",
332 expected_nullable,
333 actual.is_nullable()
334 ),
335 });
336 }
337 Ok(())
338}
339
340impl DynRowRaw {
341 pub fn try_new(fields: Fields, cells: Vec<Option<DynCellRaw>>) -> Result<Self, DynViewError> {
347 validate_row_width(&fields, cells.len())?;
348 Ok(Self { fields, cells })
349 }
350
351 pub fn from_cells(fields: Fields, cells: Vec<DynCellRaw>) -> Result<Self, DynViewError> {
356 let wrapped = cells.into_iter().map(Some).collect();
357 Self::try_new(fields, wrapped)
358 }
359
360 #[inline]
362 pub fn len(&self) -> usize {
363 self.cells.len()
364 }
365
366 #[inline]
368 pub fn is_empty(&self) -> bool {
369 self.cells.is_empty()
370 }
371
372 #[inline]
374 pub fn fields(&self) -> &Fields {
375 &self.fields
376 }
377
378 #[inline]
380 pub fn cells(&self) -> &[Option<DynCellRaw>] {
381 &self.cells
382 }
383
384 #[inline]
386 pub fn into_cells(self) -> Vec<Option<DynCellRaw>> {
387 self.cells
388 }
389
390 pub fn into_owned(self) -> Result<DynRow, DynViewError> {
392 let mut cells = Vec::with_capacity(self.cells.len());
393 for cell in self.cells {
394 let owned = match cell {
395 None => None,
396 Some(raw) => Some(raw.into_owned()?),
397 };
398 cells.push(owned);
399 }
400 Ok(DynRow(cells))
401 }
402
403 pub fn to_owned(&self) -> Result<DynRow, DynViewError> {
405 self.clone().into_owned()
406 }
407}
408
409#[derive(Clone, Debug)]
411pub struct DynRowOwned {
412 fields: Fields,
413 cells: Vec<Option<DynCell>>,
414}
415
416impl DynRowOwned {
417 pub fn try_new(fields: Fields, cells: Vec<Option<DynCell>>) -> Result<Self, DynViewError> {
422 validate_row_width(&fields, cells.len())?;
423 Ok(Self { fields, cells })
424 }
425
426 pub fn from_dyn_row(fields: Fields, row: DynRow) -> Result<Self, DynViewError> {
428 Self::try_new(fields, row.0)
429 }
430
431 pub fn from_raw(raw: &DynRowRaw) -> Result<Self, DynViewError> {
433 let owned = raw.to_owned()?;
434 Self::from_dyn_row(raw.fields().clone(), owned)
435 }
436
437 #[inline]
439 pub fn fields(&self) -> &Fields {
440 &self.fields
441 }
442
443 #[inline]
445 pub fn cells(&self) -> &[Option<DynCell>] {
446 &self.cells
447 }
448
449 #[inline]
451 pub fn len(&self) -> usize {
452 self.cells.len()
453 }
454
455 #[inline]
457 pub fn is_empty(&self) -> bool {
458 self.cells.is_empty()
459 }
460
461 pub fn as_raw(&self) -> Result<DynRowRaw, DynViewError> {
463 let mut raw_cells = Vec::with_capacity(self.cells.len());
464 for (idx, cell) in self.cells.iter().enumerate() {
465 match cell {
466 None => raw_cells.push(None),
467 Some(value) => {
468 let raw =
469 owned_cell_to_raw(value).map_err(|message| DynViewError::Invalid {
470 column: idx,
471 path: self
472 .fields
473 .get(idx)
474 .map(|f| f.name().to_string())
475 .unwrap_or_else(|| format!("col{idx}")),
476 message,
477 })?;
478 raw_cells.push(Some(raw));
479 }
480 }
481 }
482 DynRowRaw::try_new(self.fields.clone(), raw_cells)
483 }
484
485 pub fn into_dyn_row(self) -> DynRow {
487 DynRow(self.cells)
488 }
489
490 pub fn to_dyn_row(&self) -> DynRow {
492 DynRow(self.cells.clone())
493 }
494
495 pub fn into_parts(self) -> (Fields, Vec<Option<DynCell>>) {
497 (self.fields, self.cells)
498 }
499}
500
501fn owned_cell_to_raw(cell: &DynCell) -> Result<DynCellRaw, String> {
502 use DynCell::*;
503 match cell {
504 Null => Ok(DynCellRaw::Null),
505 Bool(v) => Ok(DynCellRaw::Bool(*v)),
506 I8(v) => Ok(DynCellRaw::I8(*v)),
507 I16(v) => Ok(DynCellRaw::I16(*v)),
508 I32(v) => Ok(DynCellRaw::I32(*v)),
509 I64(v) => Ok(DynCellRaw::I64(*v)),
510 U8(v) => Ok(DynCellRaw::U8(*v)),
511 U16(v) => Ok(DynCellRaw::U16(*v)),
512 U32(v) => Ok(DynCellRaw::U32(*v)),
513 U64(v) => Ok(DynCellRaw::U64(*v)),
514 F32(v) => Ok(DynCellRaw::F32(*v)),
515 F64(v) => Ok(DynCellRaw::F64(*v)),
516 Str(value) => Ok(DynCellRaw::from_str(value)),
517 Bin(value) => Ok(DynCellRaw::from_bin(value)),
518 Struct(_) => Err("struct key component not supported".to_string()),
519 List(_) => Err("list key component not supported".to_string()),
520 FixedSizeList(_) => Err("fixed-size list key component not supported".to_string()),
521 Map(_) => Err("map key component not supported".to_string()),
522 Union { .. } => Err("union key component not supported".to_string()),
523 }
524}
525
526fn validate_schema_matches(batch: &RecordBatch, schema: &Schema) -> Result<(), DynViewError> {
527 let batch_schema = batch.schema();
528 let batch_fields = batch_schema.fields();
529 let expected = schema.fields();
530 if batch_fields.len() != expected.len() {
531 return Err(DynViewError::Invalid {
532 column: expected.len().min(batch_fields.len()),
533 path: "<schema>".to_string(),
534 message: format!(
535 "column count mismatch: schema has {}, batch has {}",
536 expected.len(),
537 batch_fields.len()
538 ),
539 });
540 }
541
542 for (idx, (expected_field, actual_field)) in
543 expected.iter().zip(batch_fields.iter()).enumerate()
544 {
545 if expected_field.name() != actual_field.name() {
546 return Err(DynViewError::Invalid {
547 column: idx,
548 path: expected_field.name().to_string(),
549 message: format!(
550 "field name mismatch: expected '{}', got '{}'",
551 expected_field.name(),
552 actual_field.name()
553 ),
554 });
555 }
556 validate_field_shape(
557 idx,
558 expected_field.name(),
559 expected_field.data_type(),
560 expected_field.is_nullable(),
561 actual_field.as_ref(),
562 )?;
563 }
564
565 Ok(())
566}
567
568pub fn iter_batch_views<'a>(
570 schema: &'a DynSchema,
571 batch: &'a RecordBatch,
572) -> Result<DynRowViews<'a>, DynViewError> {
573 DynRowViews::new(batch, schema.schema.as_ref())
574}
575
576pub fn view_batch_row<'a>(
578 schema: &'a DynSchema,
579 batch: &'a RecordBatch,
580 row: usize,
581) -> Result<DynRowView<'a>, DynViewError> {
582 validate_schema_matches(batch, schema.schema.as_ref())?;
583 let len = batch.num_rows();
584 if row >= len {
585 return Err(DynViewError::RowOutOfBounds { row, len });
586 }
587 Ok(DynRowView {
588 batch,
589 fields: schema.schema.fields().clone(),
590 mapping: None,
591 projectors: None,
592 row,
593 })
594}