vortex_layout/layouts/struct_/
writer.rs1use itertools::Itertools;
2use vortex_array::aliases::hash_set::HashSet;
3use vortex_array::iter::ArrayIteratorArrayExt;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
7
8use crate::LayoutVTableRef;
9use crate::data::Layout;
10use crate::layouts::struct_::StructLayout;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14
15pub struct StructLayoutWriter {
17 column_strategies: Vec<Box<dyn LayoutWriter>>,
18 dtype: DType,
19 row_count: u64,
20}
21
22impl StructLayoutWriter {
23 pub fn try_new(
24 dtype: DType,
25 column_layout_writers: Vec<Box<dyn LayoutWriter>>,
26 ) -> VortexResult<Self> {
27 let struct_dtype = dtype
28 .as_struct()
29 .ok_or_else(|| vortex_err!("expected StructDType"))?;
30 if HashSet::from_iter(struct_dtype.names().iter()).len() != struct_dtype.names().len() {
31 vortex_bail!("StructLayout must have unique field names")
32 }
33 if struct_dtype.fields().len() != column_layout_writers.len() {
34 vortex_bail!(
35 "number of fields in struct dtype does not match number of column layout writers"
36 );
37 }
38 Ok(Self {
39 column_strategies: column_layout_writers,
40 dtype,
41 row_count: 0,
42 })
43 }
44
45 pub fn try_new_with_strategy<S: LayoutStrategy>(
46 ctx: &ArrayContext,
47 dtype: &DType,
48 factory: S,
49 ) -> VortexResult<Self> {
50 let struct_dtype = dtype
51 .as_struct()
52 .ok_or_else(|| vortex_err!("expected StructDType"))?;
53 Self::try_new(
54 dtype.clone(),
55 struct_dtype
56 .fields()
57 .map(|dtype| factory.new_writer(ctx, &dtype))
58 .try_collect()?,
59 )
60 }
61}
62
63impl LayoutWriter for StructLayoutWriter {
64 fn push_chunk(
65 &mut self,
66 segment_writer: &mut dyn SegmentWriter,
67 chunk: ArrayRef,
68 ) -> VortexResult<()> {
69 let struct_array = chunk
70 .as_struct_typed()
71 .ok_or_else(|| vortex_err!("batch is not a struct array"))?;
72
73 if struct_array.nfields() != self.column_strategies.len() {
74 vortex_bail!(
75 "number of fields in struct array does not match number of column layout writers"
76 );
77 }
78 self.row_count += struct_array.len() as u64;
79
80 for i in 0..struct_array.nfields() {
81 let column = chunk
83 .as_struct_typed()
84 .vortex_expect("batch is a struct array")
85 .maybe_null_field_by_idx(i)
86 .vortex_expect("bounds already checked");
87
88 for column_chunk in column.to_array_iterator() {
89 self.column_strategies[i].push_chunk(segment_writer, column_chunk?)?;
90 }
91 }
92
93 Ok(())
94 }
95
96 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
97 for writer in self.column_strategies.iter_mut() {
98 writer.flush(segment_writer)?;
99 }
100 Ok(())
101 }
102
103 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
104 let mut column_layouts = vec![];
105 for writer in self.column_strategies.iter_mut() {
106 column_layouts.push(writer.finish(segment_writer)?);
107 }
108 Ok(Layout::new_owned(
109 "struct".into(),
110 LayoutVTableRef::new_ref(&StructLayout),
111 self.dtype.clone(),
112 self.row_count,
113 vec![],
114 column_layouts,
115 None,
116 ))
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::sync::Arc;
123
124 use vortex_array::ArrayContext;
125 use vortex_dtype::{DType, Nullability, PType};
126
127 use crate::LayoutWriterExt;
128 use crate::layouts::flat::writer::{FlatLayoutOptions, FlatLayoutWriter};
129 use crate::layouts::struct_::writer::StructLayoutWriter;
130
131 #[test]
132 #[should_panic]
133 fn fails_on_duplicate_field() {
134 StructLayoutWriter::try_new(
135 DType::Struct(
136 Arc::new(
137 [
138 ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
139 ("a", DType::Primitive(PType::I32, Nullability::NonNullable)),
140 ]
141 .into_iter()
142 .collect(),
143 ),
144 Nullability::NonNullable,
145 ),
146 vec![
147 FlatLayoutWriter::new(
148 ArrayContext::empty(),
149 DType::Primitive(PType::I32, Nullability::NonNullable),
150 FlatLayoutOptions::default(),
151 )
152 .boxed(),
153 FlatLayoutWriter::new(
154 ArrayContext::empty(),
155 DType::Primitive(PType::I32, Nullability::NonNullable),
156 FlatLayoutOptions::default(),
157 )
158 .boxed(),
159 ],
160 )
161 .unwrap();
162 }
163}