vortex_layout/layouts/struct_/
writer.rs

1use itertools::Itertools;
2use vortex_array::aliases::hash_set::HashSet;
3use vortex_array::{Array, ArrayContext, ArrayRef, ToCanonical};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6
7use crate::LayoutVTableRef;
8use crate::data::Layout;
9use crate::layouts::struct_::StructLayout;
10use crate::segments::SegmentWriter;
11use crate::strategy::LayoutStrategy;
12use crate::writer::LayoutWriter;
13
14/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
15pub struct StructLayoutWriter {
16    column_strategies: Vec<Box<dyn LayoutWriter>>,
17    dtype: DType,
18    row_count: u64,
19}
20
21impl StructLayoutWriter {
22    pub fn try_new(
23        dtype: DType,
24        column_layout_writers: Vec<Box<dyn LayoutWriter>>,
25    ) -> VortexResult<Self> {
26        let struct_dtype = dtype
27            .as_struct()
28            .ok_or_else(|| vortex_err!("expected StructDType"))?;
29        if HashSet::from_iter(struct_dtype.names().iter()).len() != struct_dtype.names().len() {
30            vortex_bail!("StructLayout must have unique field names")
31        }
32        if struct_dtype.fields().len() != column_layout_writers.len() {
33            vortex_bail!(
34                "number of fields in struct dtype does not match number of column layout writers"
35            );
36        }
37        Ok(Self {
38            column_strategies: column_layout_writers,
39            dtype,
40            row_count: 0,
41        })
42    }
43
44    pub fn try_new_with_strategy<S: LayoutStrategy>(
45        ctx: &ArrayContext,
46        dtype: &DType,
47        factory: S,
48    ) -> VortexResult<Self> {
49        let struct_dtype = dtype
50            .as_struct()
51            .ok_or_else(|| vortex_err!("expected StructDType"))?;
52        Self::try_new(
53            dtype.clone(),
54            struct_dtype
55                .fields()
56                .map(|field_dtype| factory.new_writer(ctx, &field_dtype))
57                .try_collect()?,
58        )
59    }
60}
61
62impl LayoutWriter for StructLayoutWriter {
63    fn push_chunk(
64        &mut self,
65        segment_writer: &mut dyn SegmentWriter,
66        chunk: ArrayRef,
67    ) -> VortexResult<()> {
68        assert_eq!(
69            chunk.dtype(),
70            &self.dtype,
71            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
72            chunk.dtype(),
73            self.dtype
74        );
75        let struct_array = chunk.to_struct()?;
76        if struct_array.struct_dtype().nfields() != self.column_strategies.len() {
77            vortex_bail!(
78                "number of fields in struct array does not match number of column layout writers"
79            );
80        }
81        self.row_count += struct_array.len() as u64;
82
83        for i in 0..struct_array.struct_dtype().nfields() {
84            // TODO(joe): handle struct validity
85            for column_chunk in struct_array.fields()[i].to_array_iterator() {
86                let column_chunk = column_chunk?;
87                self.column_strategies[i].push_chunk(segment_writer, column_chunk)?;
88            }
89        }
90
91        Ok(())
92    }
93
94    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
95        for writer in self.column_strategies.iter_mut() {
96            writer.flush(segment_writer)?;
97        }
98        Ok(())
99    }
100
101    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
102        let mut column_layouts = vec![];
103        for writer in self.column_strategies.iter_mut() {
104            column_layouts.push(writer.finish(segment_writer)?);
105        }
106        Ok(Layout::new_owned(
107            "struct".into(),
108            LayoutVTableRef::new_ref(&StructLayout),
109            self.dtype.clone(),
110            self.row_count,
111            vec![],
112            column_layouts,
113            None,
114        ))
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use std::sync::Arc;
121
122    use vortex_array::ArrayContext;
123    use vortex_dtype::{DType, Nullability, PType};
124
125    use crate::LayoutWriterExt;
126    use crate::layouts::flat::writer::{FlatLayoutStrategy, FlatLayoutWriter};
127    use crate::layouts::struct_::writer::StructLayoutWriter;
128
129    #[test]
130    #[should_panic]
131    fn fails_on_duplicate_field() {
132        StructLayoutWriter::try_new(
133            DType::Struct(
134                Arc::new(
135                    [
136                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
137                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
138                    ]
139                    .into_iter()
140                    .collect(),
141                ),
142                Nullability::NonNullable,
143            ),
144            vec![
145                FlatLayoutWriter::new(
146                    ArrayContext::empty(),
147                    DType::Primitive(PType::I32, Nullability::NonNullable),
148                    FlatLayoutStrategy::default(),
149                )
150                .boxed(),
151                FlatLayoutWriter::new(
152                    ArrayContext::empty(),
153                    DType::Primitive(PType::I32, Nullability::NonNullable),
154                    FlatLayoutStrategy::default(),
155                )
156                .boxed(),
157            ],
158        )
159        .unwrap();
160    }
161}