vortex_layout/layouts/struct_/
writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use itertools::Itertools;
use vortex_array::Array;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::data::Layout;
use crate::layouts::struct_::StructLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter};
use crate::LayoutVTableRef;

/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
pub struct StructLayoutWriter {
    column_strategies: Vec<Box<dyn LayoutWriter>>,
    dtype: DType,
    row_count: u64,
}

impl StructLayoutWriter {
    pub fn new(dtype: DType, column_layout_writers: Vec<Box<dyn LayoutWriter>>) -> Self {
        let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
        if struct_dtype.dtypes().len() != column_layout_writers.len() {
            vortex_panic!(
                "number of fields in struct dtype does not match number of column layout writers"
            );
        }
        Self {
            column_strategies: column_layout_writers,
            dtype,
            row_count: 0,
        }
    }

    pub fn try_new_with_factory<F: LayoutStrategy>(
        dtype: &DType,
        factory: F,
    ) -> VortexResult<Self> {
        let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
        Ok(Self::new(
            dtype.clone(),
            struct_dtype
                .dtypes()
                .map(|dtype| factory.new_writer(&dtype))
                .try_collect()?,
        ))
    }
}

impl LayoutWriter for StructLayoutWriter {
    fn push_chunk(&mut self, segments: &mut dyn SegmentWriter, chunk: Array) -> VortexResult<()> {
        let struct_array = chunk
            .as_struct_array()
            .ok_or_else(|| vortex_err!("batch is not a struct array"))?;

        if struct_array.nfields() != self.column_strategies.len() {
            vortex_bail!(
                "number of fields in struct array does not match number of column layout writers"
            );
        }
        self.row_count += struct_array.len() as u64;

        for i in 0..struct_array.nfields() {
            // TODO(joe): handle struct validity
            let column = chunk
                .as_struct_array()
                .vortex_expect("batch is a struct array")
                .maybe_null_field_by_idx(i)
                .vortex_expect("bounds already checked");

            for column_chunk in column.into_array_iterator() {
                self.column_strategies[i].push_chunk(segments, column_chunk?)?;
            }
        }

        Ok(())
    }

    fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<Layout> {
        let mut column_layouts = vec![];
        for writer in self.column_strategies.iter_mut() {
            column_layouts.push(writer.finish(segments)?);
        }
        Ok(Layout::new_owned(
            LayoutVTableRef::from_static(&StructLayout),
            self.dtype.clone(),
            self.row_count,
            None,
            Some(column_layouts),
            None,
        ))
    }
}