vortex_layout/layouts/struct_/
writer.rs

1use itertools::Itertools;
2use vortex_array::aliases::hash_set::HashSet;
3use vortex_array::{Array, ArrayContext, ArrayRef, ToCanonical};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6
7use crate::layouts::struct_::StructLayout;
8use crate::segments::SegmentWriter;
9use crate::strategy::LayoutStrategy;
10use crate::writer::LayoutWriter;
11use crate::{IntoLayout, LayoutRef};
12
13/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
14pub struct StructLayoutWriter {
15    column_strategies: Vec<Box<dyn LayoutWriter>>,
16    dtype: DType,
17    row_count: u64,
18}
19
20impl StructLayoutWriter {
21    pub fn try_new(
22        dtype: DType,
23        column_layout_writers: Vec<Box<dyn LayoutWriter>>,
24    ) -> VortexResult<Self> {
25        let struct_dtype = dtype
26            .as_struct()
27            .ok_or_else(|| vortex_err!("expected StructDType"))?;
28        if HashSet::from_iter(struct_dtype.names().iter()).len() != struct_dtype.names().len() {
29            vortex_bail!("StructLayout must have unique field names")
30        }
31        if struct_dtype.fields().len() != column_layout_writers.len() {
32            vortex_bail!(
33                "number of fields in struct dtype does not match number of column layout writers"
34            );
35        }
36        Ok(Self {
37            column_strategies: column_layout_writers,
38            dtype,
39            row_count: 0,
40        })
41    }
42
43    pub fn try_new_with_strategy<S: LayoutStrategy>(
44        ctx: &ArrayContext,
45        dtype: &DType,
46        factory: &S,
47    ) -> VortexResult<Self> {
48        let struct_dtype = dtype
49            .as_struct()
50            .ok_or_else(|| vortex_err!("expected StructDType"))?;
51        Self::try_new(
52            dtype.clone(),
53            struct_dtype
54                .fields()
55                .map(|field_dtype| factory.new_writer(ctx, &field_dtype))
56                .try_collect()?,
57        )
58    }
59}
60
61impl LayoutWriter for StructLayoutWriter {
62    fn push_chunk(
63        &mut self,
64        segment_writer: &mut dyn SegmentWriter,
65        chunk: ArrayRef,
66    ) -> VortexResult<()> {
67        assert_eq!(
68            chunk.dtype(),
69            &self.dtype,
70            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
71            chunk.dtype(),
72            self.dtype
73        );
74        let struct_array = chunk.to_struct()?;
75        if struct_array.struct_dtype().nfields() != self.column_strategies.len() {
76            vortex_bail!(
77                "number of fields in struct array does not match number of column layout writers"
78            );
79        }
80        self.row_count += struct_array.len() as u64;
81
82        for i in 0..struct_array.struct_dtype().nfields() {
83            // TODO(joe): handle struct validity
84            for column_chunk in struct_array.fields()[i].to_array_iterator() {
85                let column_chunk = column_chunk?;
86                self.column_strategies[i].push_chunk(segment_writer, column_chunk)?;
87            }
88        }
89
90        Ok(())
91    }
92
93    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
94        for writer in self.column_strategies.iter_mut() {
95            writer.flush(segment_writer)?;
96        }
97        Ok(())
98    }
99
100    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
101        let mut column_layouts = vec![];
102        for writer in self.column_strategies.iter_mut() {
103            column_layouts.push(writer.finish(segment_writer)?);
104        }
105        Ok(StructLayout::new(self.row_count, self.dtype.clone(), column_layouts).into_layout())
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use std::sync::Arc;
112
113    use vortex_array::ArrayContext;
114    use vortex_dtype::{DType, Nullability, PType};
115
116    use crate::LayoutWriterExt;
117    use crate::layouts::flat::writer::{FlatLayoutStrategy, FlatLayoutWriter};
118    use crate::layouts::struct_::writer::StructLayoutWriter;
119
120    #[test]
121    #[should_panic]
122    fn fails_on_duplicate_field() {
123        StructLayoutWriter::try_new(
124            DType::Struct(
125                Arc::new(
126                    [
127                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
128                        ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
129                    ]
130                    .into_iter()
131                    .collect(),
132                ),
133                Nullability::NonNullable,
134            ),
135            vec![
136                FlatLayoutWriter::new(
137                    ArrayContext::empty(),
138                    DType::Primitive(PType::I32, Nullability::NonNullable),
139                    FlatLayoutStrategy::default(),
140                )
141                .boxed(),
142                FlatLayoutWriter::new(
143                    ArrayContext::empty(),
144                    DType::Primitive(PType::I32, Nullability::NonNullable),
145                    FlatLayoutStrategy::default(),
146                )
147                .boxed(),
148            ],
149        )
150        .unwrap();
151    }
152}