vortex_layout/layouts/struct_/
writer.rs

1use itertools::Itertools;
2use vortex_array::aliases::hash_set::HashSet;
3use vortex_array::iter::ArrayIteratorArrayExt;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
7
8use crate::LayoutVTableRef;
9use crate::data::Layout;
10use crate::layouts::struct_::StructLayout;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14
15/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
16pub struct StructLayoutWriter {
17    column_strategies: Vec<Box<dyn LayoutWriter>>,
18    dtype: DType,
19    row_count: u64,
20}
21
22impl StructLayoutWriter {
23    pub fn try_new(
24        dtype: DType,
25        column_layout_writers: Vec<Box<dyn LayoutWriter>>,
26    ) -> VortexResult<Self> {
27        let struct_dtype = dtype
28            .as_struct()
29            .ok_or_else(|| vortex_err!("expected StructDType"))?;
30        if HashSet::from_iter(struct_dtype.names().iter()).len() != struct_dtype.names().len() {
31            vortex_bail!("StructLayout must have unique field names")
32        }
33        if struct_dtype.fields().len() != column_layout_writers.len() {
34            vortex_bail!(
35                "number of fields in struct dtype does not match number of column layout writers"
36            );
37        }
38        Ok(Self {
39            column_strategies: column_layout_writers,
40            dtype,
41            row_count: 0,
42        })
43    }
44
45    pub fn try_new_with_strategy<S: LayoutStrategy>(
46        ctx: &ArrayContext,
47        dtype: &DType,
48        factory: S,
49    ) -> VortexResult<Self> {
50        let struct_dtype = dtype
51            .as_struct()
52            .ok_or_else(|| vortex_err!("expected StructDType"))?;
53        Self::try_new(
54            dtype.clone(),
55            struct_dtype
56                .fields()
57                .map(|dtype| factory.new_writer(ctx, &dtype))
58                .try_collect()?,
59        )
60    }
61}
62
63impl LayoutWriter for StructLayoutWriter {
64    fn push_chunk(
65        &mut self,
66        segment_writer: &mut dyn SegmentWriter,
67        chunk: ArrayRef,
68    ) -> VortexResult<()> {
69        let struct_array = chunk
70            .as_struct_typed()
71            .ok_or_else(|| vortex_err!("batch is not a struct array"))?;
72
73        if struct_array.nfields() != self.column_strategies.len() {
74            vortex_bail!(
75                "number of fields in struct array does not match number of column layout writers"
76            );
77        }
78        self.row_count += struct_array.len() as u64;
79
80        for i in 0..struct_array.nfields() {
81            // TODO(joe): handle struct validity
82            let column = chunk
83                .as_struct_typed()
84                .vortex_expect("batch is a struct array")
85                .maybe_null_field_by_idx(i)
86                .vortex_expect("bounds already checked");
87
88            for column_chunk in column.to_array_iterator() {
89                self.column_strategies[i].push_chunk(segment_writer, column_chunk?)?;
90            }
91        }
92
93        Ok(())
94    }
95
96    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
97        for writer in self.column_strategies.iter_mut() {
98            writer.flush(segment_writer)?;
99        }
100        Ok(())
101    }
102
103    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
104        let mut column_layouts = vec![];
105        for writer in self.column_strategies.iter_mut() {
106            column_layouts.push(writer.finish(segment_writer)?);
107        }
108        Ok(Layout::new_owned(
109            "struct".into(),
110            LayoutVTableRef::new_ref(&StructLayout),
111            self.dtype.clone(),
112            self.row_count,
113            vec![],
114            column_layouts,
115            None,
116        ))
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::sync::Arc;
123
124    use vortex_array::ArrayContext;
125    use vortex_dtype::{DType, Nullability, PType};
126
127    use crate::LayoutWriterExt;
128    use crate::layouts::flat::writer::{FlatLayoutOptions, FlatLayoutWriter};
129    use crate::layouts::struct_::writer::StructLayoutWriter;
130
131    #[test]
132    #[should_panic]
133    fn fails_on_duplicate_field() {
134        StructLayoutWriter::try_new(
135            DType::Struct(
136                Arc::new(
137                    [
138                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
139                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
140                    ]
141                    .into_iter()
142                    .collect(),
143                ),
144                Nullability::NonNullable,
145            ),
146            vec![
147                FlatLayoutWriter::new(
148                    ArrayContext::empty(),
149                    DType::Primitive(PType::I32, Nullability::NonNullable),
150                    FlatLayoutOptions::default(),
151                )
152                .boxed(),
153                FlatLayoutWriter::new(
154                    ArrayContext::empty(),
155                    DType::Primitive(PType::I32, Nullability::NonNullable),
156                    FlatLayoutOptions::default(),
157                )
158                .boxed(),
159            ],
160        )
161        .unwrap();
162    }
163}